Create Stream Application
Introduction
Stream applications are declarative specs that define the processing logic to process the events sent to the stream processor. A stream app definition contains the following configurations:
Configuration | Description |
---|---|
Stream | A logical series of events ordered in time with a uniquely identifiable name, and set of defined attributes with specific data types defining its schema. |
Source | This consumes data from external sources (such as `TCP` , ` Kafka ` , ` HTTP ` , etc) in the form of events, then converts each event (that can be in `XML` , ` JSON` , ` binary` , etc. format) to a stream event, and passes that to a stream for processing. |
Sink | This takes events arriving at a stream, maps them to a predefined data format (such as ` XML ` , `JSON,` `binary` , etc), and publishes them to external endpoints (such as ` E-mail ` , ` TCP ` , ` Kafka ` , `HTTP ` , etc). |
Table | A structured representation of data stored with a defined schema. Stored data can be backed by In-Memory, or external data stores such as RDBMS, MongoDB, etc. The tables can be accessed and manipulated at runtime. |
Executional Element | An executional element can be one of the following:
|
Macrometa provide in-build source, sink and store explained in the later section of this document.
Creating a Stream Application
To create a stream application follow the steps below:
Open the GUI. Click the Stream Apps tab.
Click New to define a new stream application.
Type a Name for the stream application. For example,
SweetProductionAnalysis
.Type a Description.
Add the following sample stream application.
CREATE SOURCE SweetProductionStream WITH (type = 'database', collection='SweetProductionData', map.type='json') (name string, amount double);
CREATE SINK ProductionAlertStream WITH (type= 'stream', stream='ProductionAlertStream', map.type='json') (name string, amount double);
INSERT INTO ProductionAlertStream
SELECT *
FROM SweetProductionStream;Click
Save
to save the stream app.Select all the regions to deploy your application in.
Click on
Save
.
Source
C8Streams
Syntax:
CREATE SOURCE SourceName WITH (type="stream", stream.list="STRING", replication.type="STRING", map.type='type') (strings);
Example:
CREATE SOURCE OrderStream WITH (type="stream", stream.list="OrderStream", replication.type="local", map.type='json') (product_id string, quantity integer);
Stream application will use the stream with the default query parameters explained in the chart below.
Query Parameters:
Name | Description | Default Value | Possible Data Types | Optional |
---|---|---|---|---|
stream.list | This specifies the list of streams to which the source must listen. This list can be provided as a set of comma-separated values e.g. `stream_one,stream_two` | STRING | No | |
replication.type | Specifies if the replication type of the streams. Possible values can be `local` and `global` | local | STRING | Yes |
C8DB
Syntax:
CREATE SOURCE SourceName WITH (type="database", collection="STRING", replication.type="STRING", collection.type="STRING", map.type='type') (strings);
Example:
CREATE SOURCE SweetProductionStream WITH (type="database", map.type='json') (name string, amount double);
Query Parameters:
Name | Description | Default Value | Possible Data Types | Optional |
---|---|---|---|---|
collection | This specifies the name of the c8db collection to which the source must listen. | STRING | No | |
replication.type | Specifies if the replication type of the c8db collection. Possible values can be `local` and `global` | local | STRING | Yes |
collection.type | This specifies the type of the data collection contains. Possible values can be `doc` and `edge`. | doc | STRING | Yes |
Sink
C8Streams
Syntax:
CREATE SINK SinkName WITH (type="stream", stream="STRING", replication.type="STRING", map.type='type') (strings);
@sink(type="c8streams", stream="<STRING>", replication.type="<STRING>", @map(...)))
Example:
CREATE SINK ProductionAlertStream WITH (type="stream", stream='ProductionAlertStream', map.type='json`) (name string, amount double);
Query Parameters:
Name | Description | Default Value | Possible Data Types | Optional |
---|---|---|---|---|
stream | The streams to which the C8Stream sink needs to publish events. | STRING | No | |
replication.type | Specifies if the replication type of the stream. Possible values can be `local` and `global` | local | STRING | Yes |
Table
C8DB
Syntax:
CREATE STORE StoreName WITH (type="database", collection="STRING", replication.type="STRING", collection.type="STRING", map.type='type', from="STRING", to="STRING") (strings);
Example:
CREATE STORE SweetProductionCollection WITH (type="database", collection="SweetProductionCollection", replication.type="local", map.type='json') (strings);
Stream applications will use the c8db with the default query parameters explained in the chart below.
Name | Description | Default Value | Possible Data Types | Optional |
---|---|---|---|---|
collection | This specifies the name of the c8db collection to which events must written. | STRING | No | |
replication.type | Specifies if the replication type of the c8db collection. Possible values can be `local` and `global` | local | STRING | Yes |
collection.type | This specifies the type of the data collection contains. Possible values can be `doc` and `edge`. | doc | STRING | Yes |
from | If `collection.type` is specified as `edge`, this field indicates which field to be considered as a source node of the edge. | _from | STRING | Yes |
to | If `collection.type` is specified as `edge`, this field indicates which field to be considered as a destination node of the edge. | _to | STRING | Yes |
Tutorials
Following tutorials cover various user scenarios using Macrometa Stream Processing.
- Publishing Data
- Consuming Data
- Filtering Data
- Transforming Data
- Enriching Data
- Executing Scripts
- Correlating Data
- Summarizing Data
Refer to Reference for additional stream processing examples.