Skip to main content

Stream Workers (gdnsl stream-worker)

Get commands related to stream workers.

  gdnsl stream-worker <stream-worker-name> [flags]

Examples:

  
# Publish a stream worker.
gdnsl stream-worker TestStreamWorker --enable

# Unpublish a stream worker.
gdnsl stream-worker TestStreamWorker --disable

# Submit an ad hoc Stream query and get the result records from a store.
gdnsl stream-worker TestStream --query "SELECT * FROM TestStreamTable"

Options:

  -h, --help                  Help to create a stream worker.
--query string Query to return the result.
--enable Enable a stream worker.
--disable Disable a stream worker.
--fabric Name of the fabric to use.

Options inherited:

      --config string       gdnsl config file (default is ./gdnsl.yaml)

gdnsl stream-worker create

Create a stream worker.

  gdnsl stream-worker create [flags]

Examples:

  # Create a simple stream worker.
gdnsl stream-worker create
--name "cargo-stream-worker"
--description "my stream worker"
--source "SampleCargoAppInputTable WITH
(type = 'database', collection = "SampleCargoAppInputTable", collection.type="doc" , replication.type="global", map.type='json') (weight int);"
--sink "STREAM SampleCargoAppDestStream (weight int);"
--query "INSERT INTO SampleCargoAppDestStream
SELECT weight
FROM SampleCargoAppInputTable;"

# Create a stream worker using JS functions.
gdnsl stream-worker create
--name "abc-stream-worker"
--description "my stream worker2"
--function "concatFn[javascript] return string {
var str1 = data[0];
var str2 = data[1];
var str3 = data[2];
var response = str1 + str2 + str3;
return response;
};"
--source "STREAM SampleScriptAppInputStream (deviceID string, roomNo int, temperature double);"
--table "SampleScriptAppOutputTable (id string, temperature double);"
--query "INSERT INTO SampleScriptAppOutputTable
SELECT concatFn(roomNo,'-',deviceID) as id, temperature
FROM SampleScriptAppInputStream;"

# Create a cron stream worker.
gdnsl stream-worker create
--name "cron-stream-worker"
--description "This app will produce an event after every 5 secondsr"
--trigger "MyTrigger WITH ( interval = 5 sec );"
--sink "STREAM SampleStream (startTime long);"
--table "SampleScriptAppOutputTable (id string, temperature double);"
--query "INSERT INTO SampleStream
SELECT eventTimestamp() as startTime
FROM MyTrigger;"

# Create a stream worker with indexes.
gdnsl stream-worker create
--name "my-stream-worker2"
--description "This application creates different types of indexes on a given table."
--table "SampleGDNTable (sensorId string, temperature double);"
--index "SamplePersistentIndex ON TABLE SampleGDNTable WITH(type="persistent", sparse="true", deduplicate="true") (sensorId);"
--index "SampleHashIndex ON TABLE SampleGDNTable WITH(type="hash", sparse="true", deduplicate="true") (sensorId);"
--index "SampleFullTextIndex ON TABLE SampleGDNTable WITH(type="fulltext", minLength="3") (sensorId)"
--index "SampleGeoIndex ON TABLE SampleGDNTable WITH(type="geo", geoJson="false") (sensorId);"
--index "SampleTTLIndex ON TABLE SampleGDNTable WITH(type="ttl", expireAfter="3600") (sensorId);"

# Validate a stream worker.
gdnsl stream-worker create
--name "cargo-stream-worker"
--description "my stream worker"
--source "SampleCargoAppInputTable WITH
(type = 'database', collection = "SampleCargoAppInputTable", collection.type="doc" , replication.type="global", map.type='json') (weight int);"
--sink "STREAM SampleCargoAppDestStream (weight int);"
--query "INSERT INTO SampleCargoAppDestStream
SELECT weight
FROM SampleCargoAppInputTable;"
--validate

# Validate a stream worker from a file.
gdnsl stream-worker create -file "cargo-stream-worker.json" --validate

# Create a stream worker with indexes.
gdnsl stream-worker create
--name "my-rdbmc-cdc"
--description "This stream app will explain the usage of rdbms store extension using MySQL database"
--trigger "ceprdbmsTrigger WITH (interval=5 sec);"
--store " StockTable WITH
( type="rdbms",
jdbc.url="jdbc:mysql://dummy-mysql-server.com:3306/MySQLDB?useSSL=false",
username="my-username",
password="my-password",
jdbc.driver.name="com.mysql.jdbc.Driver",
field.length="symbol:100",
table.check.query="SELECT 1 FROM StockTable LIMIT",
PrimaryKey='id',
PrimaryKey='symbol',
Index='volume')
(id string, symbol string, price float, volume long);"
--query " INSERT INTO StockTable
SELECT convert(count(), 'string') as id,
convert(count(), 'string') as symbol,
23.33f as price,
eventTimestamp() as volume
FROM ceprdbmsTrigger; "

# Create a stream worker from a file.
gdnsl stream-worker create -file "cargo-stream-worker.json" --regions "gdn-us-west,gdn-ap-west"

# Create a stream worker using advanced mode
gdnsl stream-worker create --advanced"@App:name('Sample-Adhoc-Query')\n@App:description(\"This application demonstrates how to send adhoc queries and fetch data from Stores and named windows.\")\n@App:qlVersion('2')\n\n/**\nTesting the Stream Application:\n 1. Upload following data into `SampleAdhocQueryInputTable` C8DB Collection\n {\"sensorId\":\"sensor A1234\",\"temperature\":18}\n {\"sensorId\":\"sensor A1234\",\"temperature\":-32.2}\n {\"sensorId\":\"sensor FR45\",\"temperature\":20.9}\n {\"sensorId\":\"sensor meter1\",\"temperature\":49.6}\n\n 2. This application accumulates all the data for one minute in the named window `SampleAdhocQueryInputTableOneMinTimeWindow`\n Named window allows other application to query data in realtime.\n\n 3. Run the adhoc query on the `SampleAdhocQueryInputTableOneMinTimeWindow` (Refer [1] for running adhoc queries.)\n Query:\n select * from SampleAdhocQueryInputTableOneMinTimeWindow\n\n Output:\n [\n [\"sensor A1234\",18],\n [\"sensor A1234\",-32.2],\n [\"sensor FR45\",20.9],\n [\"sensor meter1\",49.6]\n ]\n\n 4. Similar to Named Windows one can run adhoc queries on the stores as well. Running adhoc query on \n `SampleAdhocQuerySensorA1234DestTable` C8DB Collection should produce below result\n\n Query: Store the result if sensorId is equal to \"sensor A1234\"\n SELECT * FROM SampleAdhocQuerySensorA1234DestTable\n\n Output:\n [\n [\"sensor A1234\",18],\n [\"sensor A1234\",-32.2]\n ]\n\n [1] https://macrometa.dev/cep/quickstart/#run-an-adhoc-query\n*/\n\n-- Defines `SampleAdhocQueryInputTable` collection to process events having `sensorId` and `temperature`(F).\nCREATE SOURCE SampleAdhocQueryInputTable WITH(type = 'database', collection = \"SampleAdhocQueryInputTable\", collection.type=\"doc\" , replication.type=\"global\", map.type='json') (sensorId string, temperature double);\n\n-- Named Window\nCREATE WINDOW SampleAdhocQueryInputTableOneMinTimeWindow (sensorId string, temperature double) SLIDING_TIME(1 min);\n\n-- Table\nCREATE TABLE SampleAdhocQuerySensorA1234DestTable(sensorId string, temperature double);\n\n@info(name = 'Insert-to-window')\nINSERT INTO SampleAdhocQueryInputTableOneMinTimeWindow\nSELECT *\nFROM SampleAdhocQueryInputTable;\n\n@info(name = 'EqualsFilter')\n-- Note: Filter out events with `sensorId` equalling `sensor A1234`\nINSERT INTO SampleAdhocQuerySensorA1234DestTable\nSELECT *\nFROM SampleAdhocQueryInputTable\nWHERE sensorId == 'sensor A1234';\n" --regions "gdn-us-west,gdn-ap-west"

Options:

  -h, --help                  Help to create a stream worker.
--name string Stream worker name. Mandatory field.
--description Stream worker description. Mandatory field.
--source Source definition. Can be provided multiple times.
--sink Sink definition. Can be provided multiple times.
--trigger Trigger definition. Can be provided only once.
--store Store definition. Can be provided multiple times.
--query Stream query. Can be provided multiple times.
--table Table definition. Can be provided multiple times.
--index Index definition. Can be provided multiple times.
--function JS function definition. Can be provided multiple times.
--advanced string Complete stream worker definiton as string.
--file string Json file from where the stream worker definition is to be read from.
--regions string Comma separated regions where stream workers should be deployed. Default to local region.
--validate Validate stream worker definition. Stream worker will not be created.
--fabric Name of the fabric to use

Options inherited:

      --config string  gdnsl config file (default is ./gdnsl.yaml)

gdnsl stream-worker delete

Delete a stream worker.

  gdnsl stream-worker delete <stream-worker-name>

Examples:

  # Delete a stream worker.
gdnsl stream-worker delete TestStreamWorker

Options:

  -h, --help                  Help to describe stream workers.
--fabric Name of the fabric to use.

Options inherited:

      --config string         gdnsl config file (default is ./gdnsl.yaml)

gdnsl stream-worker describe

Describe a stream worker.

  gdnsl stream-worker describe <stream-name>

Examples:


# Describe a stream worker.
gdnsl stream-worker describe TestStreamWorker

Options:

  -h, --help                  Help to describe stream workers.
--fabric Name of the fabric to use.

Options inherited:

--config string         gdnsl config file (default is ./gdnsl.yaml)

gdnsl stream-worker list

List stream workers.

  gdnsl stream-worker list [flags]

Examples:


# List stream workers.
gdnsl stream-worker list

# List sample stream workers.
gdnsl stream-worker list --sample

Options:

  -h, --help                  Help to list stream workers.
--sample List sample stream workers.
--fabric Name of the fabric to use.

Options inherited:

      --config string         gdnsl config file (default is ./gdnsl.yaml)

gdnsl stream-worker update

Update a stream worker.

  gdnsl stream-worker update <stream-worker-name> [flags]

Examples:


# Update a simple stream worker.
gdnsl stream-worker update
--name "cargo-stream-worker"
--description "my stream worker"
--source "SampleCargoAppInputTable WITH
(type = 'database', collection = "SampleCargoAppInputTable", collection.type="doc" , replication.type="global", map.type='json') (weight int);"
--sink "STREAM SampleCargoAppDestStream (weight int);"
--query "INSERT INTO SampleCargoAppDestStream
SELECT weight
FROM SampleCargoAppInputTable;"

# Update a stream worker using JS functions.
gdnsl stream-worker update
--name "abc-stream-worker"
--description "my stream worker2"
--function "concatFn[javascript] return string {
var str1 = data[0];
var str2 = data[1];
var str3 = data[2];
var response = str1 + str2 + str3;
return response;
};"
--source "STREAM SampleScriptAppInputStream (deviceID string, roomNo int, temperature double);"
--table "SampleScriptAppOutputTable (id string, temperature double);"
--query "INSERT INTO SampleScriptAppOutputTable
SELECT concatFn(roomNo,'-',deviceID) as id, temperature
FROM SampleScriptAppInputStream;"

# Update a cron stream worker.
gdnsl stream-worker update
--name "cron-stream-worker"
--description "This app will produce an event after every 5 secondsr"
--trigger "MyTrigger WITH ( interval = 5 sec );"
--sink "STREAM SampleStream (startTime long);"
--table "SampleScriptAppOutputTable (id string, temperature double);"
--query "INSERT INTO SampleStream
SELECT eventTimestamp() as startTime
FROM MyTrigger;"

# Update a stream worker with indexes.
gdnsl stream-worker update
--name "my-stream-worker2"
--description "This application creates different types of indexes on a given table."
--table "SampleGDNTable (sensorId string, temperature double);"
--index "SamplePersistentIndex ON TABLE SampleGDNTable WITH(type="persistent", sparse="true", deduplicate="true") (sensorId);"
--index "SampleHashIndex ON TABLE SampleGDNTable WITH(type="hash", sparse="true", deduplicate="true") (sensorId);"
--index "SampleFullTextIndex ON TABLE SampleGDNTable WITH(type="fulltext", minLength="3") (sensorId)"
--index "SampleGeoIndex ON TABLE SampleGDNTable WITH(type="geo", geoJson="false") (sensorId);"
--index "SampleTTLIndex ON TABLE SampleGDNTable WITH(type="ttl", expireAfter="3600") (sensorId);"

# Validate a stream worker.
gdnsl stream-worker update
--name "cargo-stream-worker"
--description "my stream worker"
--source "SampleCargoAppInputTable WITH
(type = 'database', collection = "SampleCargoAppInputTable", collection.type="doc" , replication.type="global", map.type='json') (weight int);"
--sink "STREAM SampleCargoAppDestStream (weight int);"
--query "INSERT INTO SampleCargoAppDestStream
SELECT weight
FROM SampleCargoAppInputTable;"
--validate

# Validate a stream worker from a file.
gdnsl stream-worker update --file "cargo-stream-worker.json" --validate

# Update a stream worker with indexes.
gdnsl stream-worker update
--name "my-rdbmc-cdc"
--description "This stream app will explain the usage of rdbms store extension using MySQL database"
--trigger "ceprdbmsTrigger WITH (interval=5 sec);"
--store " StockTable WITH
( type="rdbms",
jdbc.url="jdbc:mysql://dummy-mysql-server.com:3306/MySQLDB?useSSL=false",
username="my-username",
password="my-password",
jdbc.driver.name="com.mysql.jdbc.Driver",
field.length="symbol:100",
table.check.query="SELECT 1 FROM StockTable LIMIT",
PrimaryKey='id',
PrimaryKey='symbol',
Index='volume')
(id string, symbol string, price float, volume long);"
--query " INSERT INTO StockTable
SELECT convert(count(), 'string') as id,
convert(count(), 'string') as symbol,
23.33f as price,
eventTimestamp() as volume
FROM ceprdbmsTrigger; "

# Update a stream worker from a file.
gdnsl stream-worker update --file "cargo-stream-worker.json" --regions "gdn-us-west,gdn-ap-west"

Options:

  -h, --help                  Help to create a stream worker.
--name string Stream worker name. Mandatory field.
--description Stream worker description. Mandatory field.
--source Source definition. Can be provided multiple times.
--sink Sink definition. Can be provided multiple times.
--trigger Tigger definition. Can be provided only once.
--store Store definition. Can be provided multiple times.
--query Stream query. Can be provided multiple times.
--table Table definition. Can be provided multiple times.
--index Index definition. Can be provided multiple times.
--function JS function definition. Can be provided multiple times.
--advanced string Complete stream worker definiton as string
--file string Json file from where the stream worker definition is to be read from
--regions string Comma separated regions where stream workers should be deployed. Default to local region.
--validate Validate stream worker definition. Stream worker will not be updated.
--fabric Name of the fabric to use.

Options inherited:

      --config string         gdnsl config file (default is ./gdnsl.yaml)