Skip to main content

MQTT

This an extension that receives and publishes events to and from MQTT.

Features

  • MQTT (Sink)

    This sends messages to a topic in MQTT server.

  • MQTT (Source)

    This receives messages from a topic in MQTT server.

MQTT Sink

The MQTT sink publishes messages to a topic in the MQTT server. If the required topic does not exist, then the MQTT sink creates the topic and publishes messages to it.

MQTT Sink Syntax:

CREATE SINK <name> WITH (type="mqtt", url="<STRING>", topic="<STRING>", map.type="<STRING>", username="<STRING>", password="<STRING>", client.id="<STRING>", quality.of.service="<STRING>", clean.session="<BOOL>", message.retain="<STRING>", keep.alive="<INT>", connection.timeout="<INT>");

MQTT Sink Query Parameters:

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
urlThe URL of the MQTT broker. It is used to connect to the MQTT broker.STRINGNoNo
topicThe ID of the topic to which the messages that are processed by Macrometa are published.STRINGNoNo
usernameThe username to be provided when the MQTT client is authenticated by the broker.STRINGYesNo
passwordThe password to be provided when the MQTT client is authenticated by the broker.STRINGYesNo
client.idA unique ID for the MQTT client. The server uses this to identify the client when it reconnects.System generatedSTRINGYesNo
clean.sessionThis is an optional paramater. If this parameter is set to true, the subscriptions made by the MQTT client during a session expire when the session ends and they need to be recreated for the next session.trueBOOLYesNo
message.retainIf this parameter is set to true, the last message sent from the topic is retained until the next message is sent.falseSTRINGYesNo
keep.aliveThe maximum number of seconds the connection between the MQTT client and the broker should be maintained without any events being transferred.60INTYesNo
connection.timeoutThe maximum number of seconds that the MQTT client should spend attempting to connect to the MQTT broker.30INTYesNo
quality.of.serviceThe quality of service provided by the MQTT client.1STRINGYesNo

MQTT Sink Example:

CREATE SINK SinkStream WITH (type="mqtt", url="tcp://test.mosquitto.org:1883", topic="topicA", map.type="json", clean.session="true", message.retain="false", quality.of.service= "1", keep.alive= "60",connection.timeout="30") (startTime long);

This query publishes messages to a topic in the MQTT server. Here, the messages are published to topicA topic.

MQTT Source

The MQTT source receives events to be processed by Macrometa from a topic in the MQTT server.

MQTT Source Syntax:

CREATE SOURCE <name> WITH (type='mqtt', url= '<STRING>',  topic='<STRING>', map.type='<STRING>', username="<STRING>", password="<STRING>", client.id="<STRING>", quality.of.service="<STRING>", clean.session="<BOOL>", message.retain="<STRING>", keep.alive="<INT>", connection.timeout="<INT>);

MQTT Source Query Parameters:

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
urlThe URL of the MQTT broker. It is used to connect to the MQTT broker.STRINGNoNo
topicThe ID of the topic to which the messages that are processed by Macrometa are published.STRINGNoNo
usernameThe username to be provided when the MQTT client is authenticated by the broker.STRINGYesNo
passwordThe password to be provided when the MQTT client is authenticated by the broker.STRINGYesNo
client.idA unique ID for the MQTT client. The server uses this to identify the client when it reconnects.System GeneratedSTRINGYesNo
clean.sessionThis is an optional paramater. If this parameter is set to true, the subscriptions made by the MQTT client during a session expire when the session ends and they need to be recreated for the next session.trueBOOLYesNo
keep.aliveThe maximum number of seconds the connection between the MQTT client and the broker should be maintained without any events being transferred.60INTYesNo
connection.timeoutThe maximum number of seconds that the MQTT client should spend attempting to connect to the MQTT broker.30INTYesNo
quality.of.serviceThe quality of service provided by the MQTT client.1STRINGYesNo

Parameters Example:

CREATE SOURCE SourceStream WITH (type='mqtt', url= 'tcp://test.mosquitto.org:1883',  topic='demo1', map.type='json', clean.session="true", quality.of.service= "1", keep.alive= "60",connection.timeout="30") (startTime long);

This query shows how to subscribe to a mqtt topic.