Perfect-Mosquitto
该项目是基于 MQTT 客户端函数库 libmosquitto 上的 Swift 类封装。
该软件使用SPM进行编译和测试,本软件也是Perfect项目的一部分,可以作为服务器组件独立运行。
请确保您已经安装并激活了最新版本的 Swift 4.0 tool chain 工具链。
OS X 编译说明
Homebrew 组件安装
本项目依赖于 mosquitto C语言函数库。为了实现在苹果操作系统上编译,请使用以下brew
命令:
$ brew install mosquitto
PC 配置文件
本项目同时需要手工编辑配置文件/usr/local/lib/pkgconfig/mosquitto.pc
,内容如下:
Name: mosquitto
Description: Mosquitto Client Library
Version: 1.4.11
Requires:
Libs: -L/usr/local/lib -lmosquitto
Cflags: -I/usr/local/include
并且请确定当前终端环境中包括变量 $PKG_CONFIG_PATH
:
$ export PKG_CONFIG_PATH="/usr/local/lib/pkgconfig:/usr/lib/pkgconfig"
Linux 编译说明
本项目需要 Ubuntu 16.04 静态函数库 libmosquitto-dev
:
$ apt-get libmosquitto-dev
快速上手
函数库开放与关闭
在使用任何Perfect Mosquitto函数之前,请务必调用 Mosquitto.OpenLibrary()
打开函数库。同样,请在程序退出时关闭函数库,即调用Mosquitto.CloseLibrary()
静态方法。
注意 函数库开放与关闭操作不是线程安全的,因此请在执行多线程任务之前进行操作。
初始化一个 Mosquitto 类实例
初始化 Mosquitto 类时,可以带参数也可以不带任何参数。最简单的方法如下:
let m = Mosquitto()
意味着自动为该客户端对象分配一个随机序列号,而且在断开连接时所有消息和订阅都会被自动清除。
当然您也可以手工为客户端分配一个序列号,这种情况下在断点续传时会非常有用:
let mosquitto = Mosquitto(id: "myInstanceId", cleanSession: false)
连接到消息掮客
消息掮客就是用于中转维护消息的服务器,这里的消息服务器指的是符合MQTT协议的服务器——负责从消息源接收消息,并且分发到所有订阅者去。
虽然连接方法connect()
可以异步操作、维持长连接或绑定到特定网卡地址,一般来说,下面的方法是最简明扼要的:只需要提供服务器地址和端口(通常是1883)即可:
try moosquitto.connect(host: "mybroker.com", port: 1883)
尽管当例程结束时 Swift 会以清理对象的方式断开服务器连接,但我们仍然推荐显式调用 disconnect()
方法断开服务器连接。除此之外,同一个例程在断开连接后,还可以用 reconnect()
方法重新连回到服务器。
线程模型
服务线程的启动停止
Perfect Mosquitto 有两种线程调度方式。客户端可以调用start()
方法启动一个后台线程用于处理消息发布和接收,这种情况下主线程不会被阻塞,并且可以随时调用stop()
方法结束后台服务线程运行:
// 启动后台线程用于处理消息,不会阻塞主线程,调用后立刻返回。 try mosquitto.start() // 随后您可以在主线程内执行任意操作,比如消息发送;消息接收会以回调方式进行。 // 如果不需要继续处理消息,可以调用以下函数停止服务线程的运行 try mosquitto.stop()
等待消息事件
不同于上述方法,您还可以在主线程内处理所有事件,只不过需要频繁调用wait()
函数,实现消息轮询:
// 等待一个很短的时间(最小的时间片段) // 此时,mosquitto 会利用这个时间片段进行消息收发处理 try mosquitto.wait(0)
该方法唯一的参数就是等待的时间片毫秒数。0代表最小时间片(由系统决定),负数则被视为1秒钟(1000毫秒)。
⚠️ 注意 ⚠️ 两种线程模型后台 start()
/stop()
和轮询 wait()
不能混用!
发送消息
一旦连接到服务器,可以随时发送消息:
var msg = Mosquitto.Message() msg.id = 100 // 消息编码,请自行定义 msg.topic = "publish/test" // 消息主题,格式由MQTT决定 msg.string = "发送测试 🇨🇳🇨🇦" let mid = try mosquitto.publish(message: msg)
如上述例子,首先是创建一个空消息结构,然后可以指定消息编号、消息主题和消息内容;随后调用publish()
方法把准备好的消息发出去;该函数将返回实际的消息编号。
注意 消息内容也可以是一个二进制数组:
swift
// 发送一个二进制 [Int8] 字节数组
msg.payload = [40, 41, 42, 43, 44, 45]
一旦发布,请调用 start()
后台线程处理器或者 wait()
轮询进行实际消息发布,参考前文的消息模型。
消息的订阅和接收
在Perfect Mosquitto函数库中接收 MQTT 消息的唯一方法是自定义事件回调:
mosquitto.OnMessage = { msg in // 打印消息编号 print(msg.id) // 打印消息主题 print(msg.topic) // 打印消息内容 print(msg.string) // 以二进制字节流的形式输出消息内容 print(msg.payload) }//end on Message
一旦设置好事件回调函数,可以调用 subscribe()
完成特定主题消息的订阅:
try mosquitto.subscribe(topic: "publish/test")
一旦订阅,请调用 start()
后台线程处理器或者 wait()
轮询进行实际的消息接收,参考前文的消息模型。
其他函数参考
除上述简要说明之外,Perfect Mosquitto 还提供了完整的函数库,请参考项目的函数手册。
事件回调
如果有需要,请为您的mosquitto对象自行设置以下回调函数(闭包):
API | 参数 | 说明 |
---|---|---|
OnConnect { status in } |
ConnectionStatus |
连接到服务器时触发 |
OnDisconnected { status in } |
ConnectionStatus |
连接断开时出发 |
OnPublish { msg in } |
Message |
消息发送后触发 |
OnMessage { msg in } |
Message |
消息到达后触发 |
OnSubscribe { id, qos in } |
(Int32, [Int32]) |
消息订阅后触发 |
OnUnsubscribe { id in } |
Int32 (message id) |
消息取消订阅时出发 |
OnLog { level, content in } |
(LogLevel, String) |
如果有日志生成时出发 |
TLS 配置
设置 TLS 证书:
func setTLS(caFile: String, caPath: String, certFile: String? = nil, keyFile: String? = nil, keyPass: String? = nil) throws
设置 TLS 验证方法:
func setTLS(verify: SSLVerify = .PEER, version: String? = nil, ciphers: String? = nil) throws
设置 TLS 预分配钥匙:
func setTLS(psk: String, identity: String, ciphers: String? = nil) throws
其他函数
配置期望:
func setConfigWill(message: Message?) throws
配置用户名密码:
func login(username: String? = nil, password: String? = nil) throws
配置消息发送失败时重试等待时间
func setMessageRetry(max: UInt32 = 20)
设置插队消息QoS优先级:
func setInflightMessages(max: UInt32 = 20) throws
设置客户端断开时重连等待时间:
func reconnectSetDelay(delay: UInt32 = 2, delayMax: UInt32 = 10, backOff: Bool = false) throws
断线重连:
func reconnect(_ asynchronous: Bool = true) throws
当前客户端例程内容重启
func reset(id: String? = nil, cleanSession: Bool = true) throws
设置 MQTT 版本:
func setClientOption(_ version: MQTTVersion = .V31, value: UnsafeMutableRawPointer) throws
解释错误信息(英语)
static func Explain(_ fault: Exception) -> String