workerman/mqtt
MQTTはクライアントサーバーアーキテクチャのパブリッシュ/サブスクライブモデルのメッセージ輸送プロトコルであり、IoTの重要な構成要素となっています。その設計思想は軽量、オープン、シンプル、規格化されており、実装が容易です。これらの特徴により、多くのシーンにおいて非常に良い選択肢となります。特に、機械間通信(M2M)やIoT環境のような制限された環境においては特に有用です。
workerman\mqttは、workermanに基づいた非同期のMQTTクライアントライブラリで、MQTTプロトコルのメッセージを受信または送信するために使用されます。QoS 0、QoS 1、QoS 2をサポートしています。MQTTの3.1、3.1.1、5の各バージョンに対応しています。
プロジェクトのアドレス
https://github.com/walkor/mqtt
インストール
composer require workerman/mqtt
サポート
- MQTT
- MQTT5
- MQTT over websocket
サンプル
subscribe.php
<?php
use Workerman\Worker;
require_once __DIR__ . '/vendor/autoload.php';
$worker = new Worker();
$worker->onWorkerStart = function(){
$mqtt = new Workerman\Mqtt\Client('mqtt://test.mosquitto.org:1883');
$mqtt->onConnect = function($mqtt) {
$mqtt->subscribe('test');
};
$mqtt->onMessage = function($topic, $content){
var_dump($topic, $content);
};
$mqtt->connect();
};
Worker::runAll();
コマンドラインで実行 php subscribe.php start で起動
publish.php
<?php
use Workerman\Worker;
require_once __DIR__ . '/vendor/autoload.php';
$worker = new Worker();
$worker->onWorkerStart = function(){
$mqtt = new Workerman\Mqtt\Client('mqtt://test.mosquitto.org:1883');
$mqtt->onConnect = function($mqtt) {
$mqtt->publish('test', 'hello workerman mqtt');
};
$mqtt->connect();
};
Worker::runAll();
コマンドラインで実行 php publish.php start で起動。
workerman\mqtt\Clientインターフェイス
Client::__construct()Client::connect()Client::publish()Client::subscribe()Client::unsubscribe()Client::disconnect()Client::close()callback onConnectcallback onMessagecallback onErrorcallback onClose
__construct (string $address, [array $options])
MQTTクライアントインスタンスを作成します。
-
$addressMQTTサーバーのアドレス。形式は 'mqtt://test.mosquitto.org:1883' のようにします。 -
$optionsクライアントオプションの配列で、以下のオプションを設定できます:keepalive: クライアントがサーバーに送信するハートビートの時間間隔。デフォルトは50秒。0に設定するとハートビートを無効にします。client_id: クライアントID。未設定の場合、デフォルトは"workerman-mqtt-client-".mt_rand()になります。protocol_name: プロトコル名。MQTT(3.1.1バージョン)またはMQIsdp(3.1バージョン)。デフォルトはMQTTです。protocol_level: プロトコルレベル。protocol_nameがMQTTの場合は4、protocol_nameがMQIsdpの場合は3です。clean_session: セッションをクリーンアップします。デフォルトはtrueです。falseに設定すると、QoS 1やQoS 2レベルのオフラインメッセージを受信することができます。reconnect_period: 再接続の時間間隔。デフォルトは1秒。0の場合は再接続しません。connect_timeout: MQTT接続のタイムアウト時間。デフォルトは30秒です。username: ユーザー名。オプションです。password: パスワード。オプションです。will: ウィルメッセージ。クライアントが切断されたとき、Brokerが自動的に他のクライアントにウィルメッセージを送信します。形式は以下の通りです:topic: トピックcontent: 内容qos:QoSレベルretain:retainフラグ
resubscribe: 異常に接続が切断されて再接続後、以前のトピックに再びサブスクライブするかどうか。デフォルトはtrueです。bindto: Brokerに接続するために、ローカルのどのIPとポートを指定するか。デフォルト値は''です。ssl: sslオプション。デフォルトはfalseです。trueに設定すると、ssl方式で接続します。同時に、ローカル証明書などを設定するためにsslコンテキストの配列を渡すことも支持します。sslコンテキストの詳細はhttps://php.net/manual/en/context.ssl.phpを参照してください。debug: デバッグモードを有効にするかどうか。デバッグモードでは、Brokerとの通信の詳細情報を出力できます。デフォルトはfalseです。uri: MQTT over websocketのuriアドレスで、通常は/mqttです。
connect()
Brokerに接続します。
publish(String $topic, String $content, [array $options], [callable $callback])
特定のトピックにメッセージを公開します。
$topicトピック$contentメッセージ$optionsオプション配列で、以下を含みます:qosQoSレベル。デフォルトは0です。retainretainフラグ。デフォルトはfalseです。dup再送信フラグ。デフォルトはfalseです。
$callback-function (\Exception $exception = null)(QoSが0の場合、この設定はサポートされていません)。エラーが発生した場合または公開が成功した場合にトリガーされます。$exceptionは例外オブジェクトで、エラーが発生しない場合は$exceptionはnullです。
subscribe(mixed $topic, [array $options], [callable $callback])
1つまたは複数のトピックをサブスクライブします。
$topicは文字列(1つのトピックをサブスクライブ)または配列(複数のトピックをサブスクライブ)です。
複数のトピックをサブスクライブする場合、$topicはトピックがキー、QoSが値の配列になります。例えばarray('topic1'=> 0, 'topic2'=> 1)のようになります。$optionsサブスクリプションオプションの配列で、以下の設定を含みます:qosQoSレベル。デフォルトは0です。
$callback-function (\Exception $exception = null, array $granted = [])
コールバック関数で、サブスクライブ成功またはエラーが発生した際にトリガーされます。exception例外オブジェクト。エラーが発生しない時にはnullです。grantedサブスクリプション結果の配列。array('topic' => 'qos', 'topic' => 'qos')のようになります。ここで:topicはサブスクライブしたトピックです。qosはBrokerが受け入れたQoSレベルです。
unsubscribe(mixed $topic, [callable $callback])
サブスクリプションを解除します。
$topicは文字列または文字列の配列のようなもの、例えばarray('topic1', 'topic2')です。$callback-function (\Exception $e = null)成功または失敗時にトリガーされるコールバックです。
disconnect()
正常にBrokerとの接続を切断します。DISCONNECTメッセージがBrokerに送信されます。
close()
強制的にBrokerとの接続を切断します。DISCONNECTメッセージはBrokerに送信されません。
callback onConnect(Client $mqtt)
Brokerとの接続が確立された後にトリガーされます。この時点で、BrokerからCONNACKメッセージが受信されています。
callback onMessage(String $topic, String $content, Client $mqtt)
function (topic, message, packet) {}
クライアントがPublishメッセージを受信した際にトリガーされます。
$topic受信したトピック$content受信したメッセージの内容$mqttmqttクライアントインスタンス。
callback onError(\Exception $exception = null)
接続中に何らかのエラーが発生した際にトリガーされます。
callback onClose()
接続が閉じられた時にトリガーされます。クライアントが自発的に閉じた場合でも、サーバー側で閉じられた場合でも、onCloseはトリガーされます。