Subscribe to Streams
This page explains how to subscribe to a stream in Macrometa.
- Python SDK
- JavaScript SDK
- REST API - Python
- REST API - JavaScript
You must Install the Python SDK before you can run this code.
import base64
import json
from c8 import C8Client
# Connect to GDN.
URL = "play.paas.macrometa.io"
GEO_FABRIC = "_system"
API_KEY = "xxxxxx" # Change this to your API key
is_local = False # For a global stream pass False and True for local stream
demo_stream = "streamQuickstart"
client = C8Client(protocol='https', host=URL, port=443, apikey=API_KEY, geofabric=GEO_FABRIC)
# Create the subscriber and receive data
subscriber = client.subscribe(stream=demo_stream, local=is_local,
subscription_name="test-subscription-1")
for i in range(10):
print("In ",i)
m1 = json.loads(subscriber.recv()) # Listen on stream for any receiving messages
msg1 = base64.b64decode(m1["payload"]).decode('utf-8')
print(F"Received message '{msg1}' id='{m1['messageId']}'") # Print the received message
subscriber.send(json.dumps({'messageId': m1['messageId']})) # Acknowledge the received message
You must Install the JavaScript SDK before you can run this code.
const jsc8 = require("jsc8");
const client = new jsc8({ url: "https://play.paas.macrometa.io", apiKey: "xxxxx", fabricName: "_system" });
const stream = "streamQuickstart";
(async function () {
// Here the last boolean value tells if the stream is local or global. false means that it is global.
const consumer = await client.createStreamReader(stream, "my-subscription", false);
consumer.on("message", (msg) => {
const { payload, messageId } = JSON.parse(msg);
// Received message payload
console.log(Buffer.from(payload, "base64").toString("ascii"));
// Send message acknowledgement
consumer.send(JSON.stringify({ messageId }));
});
})();
import base64
import json
import requests
from websocket import create_connection
# Constants
URL = "api-play.paas.macrometa.io"
HTTP_URL = f"https://{URL}"
FABRIC = "_system"
STREAM_NAME = "streamQuickstart"
API_KEY = "XXXXX" # Use your API key here
AUTH_TOKEN = f"apikey {API_KEY}" # Append the key word for the API key
TENANT_NAME = "XXXXX" # Add your tenant name here
CONSUMER_NAME = "testconsumer"
IS_GLOBAL = True # For a global stream pass True and False for local stream
stream_type = ""
if IS_GLOBAL:
stream_type = "c8global"
else:
stream_type = "c8local"
# Create a HTTPS session
session = requests.session()
session.headers.update({"content-type": 'application/json'})
session.headers.update({"authorization": AUTH_TOKEN})
# Subscribe to stream
consumerurl = f"wss://{URL}/_ws/ws/v2/consumer/persistent/{TENANT_NAME}/{stream_type}.{FABRIC}/{stream_type}s.{STREAM_NAME}/{CONSUMER_NAME}"
def create_consumer():
ws = create_connection(consumerurl, header=[f"Authorization: {AUTH_TOKEN}"])
while True:
msg = json.loads(ws.recv())
if msg:
print(f"Message received: {base64.b64decode(msg['payload']).decode('utf-8')}")
# Acknowledge successful processing
ws.send(json.dumps({'messageId': msg['messageId']}))
break
ws.close()
create_consumer()
const WebSocket = require('ws');
class APIRequest {
_headers = {
Accept: "application/json",
"Content-Type": "application/json"
};
constructor (url, apiKey) {
this._url = url;
this._headers.authorization = `apikey ${apiKey}`; // Append the key word for the API key
}
_handleResponse (response, resolve, reject) {
if (response.ok) {
resolve(response.json());
} else {
reject(response);
}
}
req (endpoint, { body, ...options } = {}) {
const self = this;
return new Promise(function (resolve, reject) {
fetch(self._url + endpoint, {
headers: self._headers,
body: body ? JSON.stringify(body) : undefined,
...options
}).then((response) => self._handleResponse(response, resolve, reject));
});
}
}
const apiKey = "xxxxx"; // Use your apikey here
const federationName = "api-play.paas.macrometa.io";
const federationUrl = `https://${federationName}`;
const fabric = "_system"
const stream = "streamQuickstart";
const isGlobal = true;
const tenant = "xxxxx" // Use your tenant name here
const consumerName = "testConsumer";
const run = async function () {
const connection = new APIRequest(federationUrl, apiKey);
const region = isGlobal ? "c8global" : "c8local";
const streamName = `${region}s.${stream}`;
// Fetching local URL in case the stream is local
const localDcDetails = await connection.req(`/datacenter/local`, {
method: "GET"
});
const dcUrl = localDcDetails.tags.url;
const url = isGlobal
? federationName
: `api-${dcUrl}`;
const otpConsumer = await connection.req(`/apid/otp`, {
method: "POST"
});
const consumerUrl = `wss://${url}/_ws/ws/v2/consumer/persistent/${tenant}/${region}.${fabric}/${streamName}/${consumerName}?otp=${otpConsumer.otp}`;
let consumer;
// Subscribe to stream
const initConsumer = async function () {
consumer = new WebSocket(consumerUrl);
consumer.onopen = function () {
console.log("WebSocket:Consumer is open now for " + streamName);
};
consumer.onerror = function () {
console.log(
"Failed to establish WebSocket:Consumer connection for " +
streamName
);
};
consumer.onclose = function () {
console.log("Closed WebSocket:Consumer connection for " + streamName);
};
consumer.onmessage = function (message) {
const receivedMsg = message.data;
console.log(
`WebSocket:Consumer message received at ${new Date()}`,
receivedMsg
);
const { payload, messageId } = JSON.parse(receivedMsg);
console.log(Buffer.from(payload, "base64").toString("ascii"));
// Send message acknowledgement
consumer.send(JSON.stringify({ messageId }));
};
};
await initConsumer();
await new Promise((resolve) => setTimeout(resolve, 1 * 40 * 1000));
console.log("CONSUMER CLOSING...");
consumer.close();
}
run();