workerman/mqtt

MQTT is a messaging transport protocol based on a client-server architecture with a publish/subscribe model, which has become an important part of the Internet of Things (IoT). Its design philosophy is lightweight, open, simple, and standardized, making it easy to implement. These characteristics make it a good choice for many scenarios, especially in constrained environments such as Machine-to-Machine (M2M) communication and IoT settings.

workerman\mqtt is an asynchronous MQTT client library based on Workerman, which can be used to receive or send messages using the MQTT protocol. It supports QoS 0, QoS 1, and QoS 2. It supports MQTT 3.1, 3.1.1, and 5 versions.

Project URL

https://github.com/walkor/mqtt

Installation

composer require workerman/mqtt

Supported

  • MQTT
  • MQTT5
  • MQTT over websocket

Example

subscribe.php

<?php
use Workerman\Worker;
require_once __DIR__ . '/vendor/autoload.php';

$worker = new Worker();
$worker->onWorkerStart = function(){
    $mqtt = new Workerman\Mqtt\Client('mqtt://test.mosquitto.org:1883');
    $mqtt->onConnect = function($mqtt) {
        $mqtt->subscribe('test');
    };
    $mqtt->onMessage = function($topic, $content){
        var_dump($topic, $content);
    };
    $mqtt->connect();
};
Worker::runAll();

Run in command line with php subscribe.php start to start.

publish.php

<?php
use Workerman\Worker;
require_once __DIR__ . '/vendor/autoload.php';

$worker = new Worker();
$worker->onWorkerStart = function(){
    $mqtt = new Workerman\Mqtt\Client('mqtt://test.mosquitto.org:1883');
    $mqtt->onConnect = function($mqtt) {
       $mqtt->publish('test', 'hello workerman mqtt');
    };
    $mqtt->connect();
};
Worker::runAll();

Run in command line with php publish.php start to start.

workerman\mqtt\Client Interface


__construct (string $address, [array $options])

Creates an MQTT client instance.

  • $address is the address of the MQTT server, formatted like 'mqtt://test.mosquitto.org:1883'.

  • $options is an array of client options, which can include the following options:

    • keepalive: The time interval for the client to send a heartbeat to the server, default is 50 seconds; setting it to 0 disables the heartbeat.
    • client_id: Client ID; if not set, defaults to "workerman-mqtt-client-".mt_rand()
    • protocol_name: Protocol name, MQTT (version 3.1.1) or MQIsdp (version 3.1), default is MQTT.
    • protocol_level: Protocol level, when protocol_name is MQTT, the value is 4; when protocol_name is MQIsdp, the value is 3.
    • clean_session: Clean session, default is true. Setting it to false allows receiving offline messages with QoS 1 and QoS 2.
    • reconnect_period: Reconnection interval, default is 1 second; 0 means no reconnection.
    • connect_timeout: Connection timeout for MQTT, default is 30 seconds.
    • username: Username, optional.
    • password: Password, optional.
    • will: Last will message; when the client disconnects, the Broker will automatically send the last will message to other clients. The format is:
      • topic: The topic.
      • content: The content.
      • qos: QoS level.
      • retain: retain flag.
    • resubscribe: Whether to resubscribe to the previous topics after a disconnection and reconnection, default is true.
    • bindto: Specifies which IP and port to use locally to connect to the Broker, default is ''.
    • ssl: SSL options, default is false; if set to true, it connects using SSL. It also supports passing an SSL context array for configuring local certificates and others; for SSL context, refer to https://php.net/manual/en/context.ssl.php.
    • debug: Whether to enable debug mode, which outputs detailed information about communication with the Broker, default is false.
    • uri: The URI address for MQTT over websocket, typically /mqtt.

connect()

Connect to the Broker.


publish(String $topic, String $content, [array $options], [callable $callback])

Publish a message to a specified topic.

  • $topic: The topic.
  • $content: The message.
  • $options: An options array, which includes
    • qos: The QoS level, default is 0.
    • retain: Retain flag, default is false.
    • dup: Duplicate flag, default is false.
  • $callback: - function (\Exception $exception = null) (not supported when QoS is 0); triggered when an error occurs or when publishing is successful. $exception is an exception object; when no error occurs, $exception is null, and so on.

subscribe(mixed $topic, [array $options], [callable $callback])

Subscribe to a topic or multiple topics.

  • $topic is a string (to subscribe to one topic) or an array (to subscribe to multiple topics),
    when subscribing to multiple topics, $topic is an array with topic as key and QoS as value, for example, array('topic1'=> 0, 'topic2'=> 1).
  • $options: Subscription options array, containing the following settings:
    • qos: The QoS level, default 0.
  • $callback: - function (\Exception $exception = null, array $granted = [])
    Callback function triggered when the subscription is successful or when an error occurs.
    • exception: The exception object; it is null if no error occurred, and so on.
    • granted: Subscription result array, similar to array('topic' => 'qos', 'topic' => 'qos') where:
    • topic is the subscribed topic.
    • qos is the QoS level accepted by the Broker.

unsubscribe(mixed $topic, [callable $callback])

Unsubscribe from topics.

  • $topic is a string or an array of strings, similar to array('topic1', 'topic2').
  • $callback: - function (\Exception $e = null), callback triggered when successful or failed.

disconnect()

Disconnect from the Broker normally; a DISCONNECT message will be sent to the Broker.


close()

Forcefully disconnect from the Broker; a DISCONNECT message will not be sent to the Broker.


callback onConnect(Client $mqtt)

Triggered after establishing a connection with the Broker. The CONNACK message from the Broker has already been received at this point.


callback onMessage(String $topic, String $content, Client $mqtt)

function (topic, message, packet) {}

Triggered when the client receives a Publish message.

  • $topic: The received topic.
  • $content: The content of the received message.
  • $mqtt: The MQTT client instance.

callback onError(\Exception $exception = null)

Triggered when there is some error with the connection.


callback onClose()

Triggered when the connection is closed, regardless of whether it was closed by the client or the server.

More examples in examples/workerman