workerman/mqtt

MQTTはクライアントサーバーアーキテクチャのパブリッシュ/サブスクライブモデルのメッセージ輸送プロトコルであり、IoTの重要な構成要素となっています。その設計思想は軽量、オープン、シンプル、規格化されており、実装が容易です。これらの特徴により、多くのシーンにおいて非常に良い選択肢となります。特に、機械間通信(M2M)やIoT環境のような制限された環境においては特に有用です。

workerman\mqttは、workermanに基づいた非同期のMQTTクライアントライブラリで、MQTTプロトコルのメッセージを受信または送信するために使用されます。QoS 0QoS 1QoS 2をサポートしています。MQTT3.13.1.15の各バージョンに対応しています。

プロジェクトのアドレス

https://github.com/walkor/mqtt

インストール

composer require workerman/mqtt

サポート

  • MQTT
  • MQTT5
  • MQTT over websocket

サンプル

subscribe.php

<?php
use Workerman\Worker;
require_once __DIR__ . '/vendor/autoload.php';

$worker = new Worker();
$worker->onWorkerStart = function(){
    $mqtt = new Workerman\Mqtt\Client('mqtt://test.mosquitto.org:1883');
    $mqtt->onConnect = function($mqtt) {
        $mqtt->subscribe('test');
    };
    $mqtt->onMessage = function($topic, $content){
        var_dump($topic, $content);
    };
    $mqtt->connect();
};
Worker::runAll();

コマンドラインで実行 php subscribe.php start で起動

publish.php

<?php
use Workerman\Worker;
require_once __DIR__ . '/vendor/autoload.php';

$worker = new Worker();
$worker->onWorkerStart = function(){
    $mqtt = new Workerman\Mqtt\Client('mqtt://test.mosquitto.org:1883');
    $mqtt->onConnect = function($mqtt) {
       $mqtt->publish('test', 'hello workerman mqtt');
    };
    $mqtt->connect();
};
Worker::runAll();

コマンドラインで実行 php publish.php start で起動。

workerman\mqtt\Clientインターフェイス


__construct (string $address, [array $options])

MQTTクライアントインスタンスを作成します。

  • $address MQTTサーバーのアドレス。形式は 'mqtt://test.mosquitto.org:1883' のようにします。

  • $options クライアントオプションの配列で、以下のオプションを設定できます:

    • keepalive: クライアントがサーバーに送信するハートビートの時間間隔。デフォルトは50秒。0に設定するとハートビートを無効にします。
    • client_id: クライアントID。未設定の場合、デフォルトは "workerman-mqtt-client-".mt_rand() になります。
    • protocol_name: プロトコル名。MQTT(3.1.1バージョン)または MQIsdp(3.1バージョン)。デフォルトはMQTTです。
    • protocol_level: プロトコルレベル。protocol_nameMQTTの場合は4protocol_nameMQIsdpの場合は3です。
    • clean_session: セッションをクリーンアップします。デフォルトはtrueです。falseに設定すると、QoS 1QoS 2レベルのオフラインメッセージを受信することができます。
    • reconnect_period: 再接続の時間間隔。デフォルトは1秒。0の場合は再接続しません。
    • connect_timeout: MQTT接続のタイムアウト時間。デフォルトは30秒です。
    • username: ユーザー名。オプションです。
    • password: パスワード。オプションです。
    • will: ウィルメッセージ。クライアントが切断されたとき、Brokerが自動的に他のクライアントにウィルメッセージを送信します。形式は以下の通りです:
      • topic: トピック
      • content: 内容
      • qos: QoSレベル
      • retain: retainフラグ
    • resubscribe : 異常に接続が切断されて再接続後、以前のトピックに再びサブスクライブするかどうか。デフォルトはtrueです。
    • bindto: Brokerに接続するために、ローカルのどのIPとポートを指定するか。デフォルト値は''です。
    • ssl: sslオプション。デフォルトはfalseです。trueに設定すると、ssl方式で接続します。同時に、ローカル証明書などを設定するためにsslコンテキストの配列を渡すことも支持します。sslコンテキストの詳細はhttps://php.net/manual/en/context.ssl.phpを参照してください。
    • debug: デバッグモードを有効にするかどうか。デバッグモードでは、Brokerとの通信の詳細情報を出力できます。デフォルトはfalseです。
    • uri: MQTT over websocketのuriアドレスで、通常は /mqtt です。

connect()

Brokerに接続します。


publish(String $topic, String $content, [array $options], [callable $callback])

特定のトピックにメッセージを公開します。

  • $topic トピック
  • $content メッセージ
  • $options オプション配列で、以下を含みます:
    • qos QoSレベル。デフォルトは0です。
    • retain retainフラグ。デフォルトはfalseです。
    • dup 再送信フラグ。デフォルトはfalseです。
  • $callback - function (\Exception $exception = null) (QoSが0の場合、この設定はサポートされていません)。エラーが発生した場合または公開が成功した場合にトリガーされます。$exceptionは例外オブジェクトで、エラーが発生しない場合は$exceptionnullです。

subscribe(mixed $topic, [array $options], [callable $callback])

1つまたは複数のトピックをサブスクライブします。

  • $topic は文字列(1つのトピックをサブスクライブ)または配列(複数のトピックをサブスクライブ)です。
    複数のトピックをサブスクライブする場合、$topicはトピックがキー、QoSが値の配列になります。例えば array('topic1'=> 0, 'topic2'=> 1) のようになります。
  • $options サブスクリプションオプションの配列で、以下の設定を含みます:
    • qos QoSレベル。デフォルトは 0です。
  • $callback - function (\Exception $exception = null, array $granted = [])
    コールバック関数で、サブスクライブ成功またはエラーが発生した際にトリガーされます。
    • exception 例外オブジェクト。エラーが発生しない時にはnullです。
    • granted サブスクリプション結果の配列。array('topic' => 'qos', 'topic' => 'qos') のようになります。ここで:
    • topic はサブスクライブしたトピックです。
    • qos はBrokerが受け入れたQoSレベルです。

unsubscribe(mixed $topic, [callable $callback])

サブスクリプションを解除します。

  • $topic は文字列または文字列の配列のようなもの、例えば array('topic1', 'topic2') です。
  • $callback - function (\Exception $e = null) 成功または失敗時にトリガーされるコールバックです。

disconnect()

正常にBrokerとの接続を切断します。DISCONNECTメッセージがBrokerに送信されます。


close()

強制的にBrokerとの接続を切断します。DISCONNECTメッセージはBrokerに送信されません。


callback onConnect(Client $mqtt)

Brokerとの接続が確立された後にトリガーされます。この時点で、BrokerからCONNACKメッセージが受信されています。


callback onMessage(String $topic, String $content, Client $mqtt)

function (topic, message, packet) {}

クライアントがPublishメッセージを受信した際にトリガーされます。

  • $topic 受信したトピック
  • $content 受信したメッセージの内容
  • $mqtt mqttクライアントインスタンス。

callback onError(\Exception $exception = null)

接続中に何らかのエラーが発生した際にトリガーされます。


callback onClose()

接続が閉じられた時にトリガーされます。クライアントが自発的に閉じた場合でも、サーバー側で閉じられた場合でも、onCloseはトリガーされます。

さらに多くのサンプルexamples/workerman