Skip to main content

Stream

A stream is a logical series of events ordered in time. Its schema is defined via the stream definition. A stream definition contains the stream name and a set of attributes with specific types and uniquely identifiable names within the stream. All events associated to the stream will have the same schema (i.e., have the same attributes in the same order).

Stream processor groups common types of events together with a schema. This helps in various ways such as, processing all events together in queries and performing data format transformations together when they are consumed and published via sources and sinks.

Syntax

The syntax for defining a new stream is as follows.

CREATE STREAM <stream_name> (<attribute_name> <attribute_type>,
<attribute_name> <attribute_type>, ... );

The following parameters are used to configure a stream definition.

ParameterDescription
stream nameThe name of the stream created. (It is recommended to define a stream name in PascalCase.)
attribute nameUniquely identifiable name of the stream attribute. (It is recommended to define attribute names in camelCase.)
attribute typeThe type of each attribute defined in the schema. This can be STRING, INT, LONG, DOUBLE, FLOAT, BOOL or OBJECT.

To use and refer stream and attribute names that do not follow [a-zA-Z_][a-zA-Z_0-9]* format enclose them in `. E.g. `$test(0)`.

To make the stream process events with multi-threading and asynchronously, we add Async to the WITH() property. For example: WITH(async='true')

This example creates a stream called TempStream with the following attributes:

  • deviceID of type long
  • roomNo of type int
  • temp of type double
CREATE STREAM TempStream (deviceID long, roomNo int, temp double);

Source

Sources receive events via multiple transports and in various data formats, and direct them into streams for processing.

A source configuration allows to define a mapping in order to convert each incoming event from its native data format to a stream event. When customizations to such mappings are not provided, stream processor assumes that the arriving event adheres to the predefined format based on the stream definition and the configured message mapping type.

Purpose

Source provides a way to consume events from internal & external services and convert them to be processed by the associated stream.

Syntax

To configure a stream that consumes events via a source, add the source configuration to a stream definition by adding the source.type annotation with the required parameter values.

The source syntax is as follows:

CREATE SOURCE <source_name> WITH (type = 'source_type', <static_key>='<value>', map.type='json') (<attribute1>='<attribute mapping>', <attribute2>='<attribute mapping>')

This syntax includes the following annotations.

Source

The type parameter of CREATE SOURCE annotation defines the source type that receives events. The other parameters of source.type annotation depends upon the selected source type, and here some of its parameters can be optional.

The following is the list of source types supported by Stream:

Source typeDescription
databaseAllow stream app to consume events from collections (doc, graphs) running in the same or different geo fabric.
streamAllow stream app to consume events from streams (local, geo-replicated) running in the same or different geo fabric.
KafkaSubscribe to Kafka topic to consume events.

Source Mapper

Each source.type configuration can have a mapping denoted by the map.type annotation that defines how to convert the incoming event format to Stream events.

The type parameter of the map.type defines the map type to be used in converting the incoming events. The other parameters of map.type annotation depends on the mapper selected, and some of its parameters can be optional.

For detailed information about the parameters see the documentation of the relevant mapper.

Map Attributes

attributes is an optional annotation used with map.type to define custom mapping. When attributes is not provided, each mapper assumes that the incoming events adheres to its own default message format and attempt to convert the events from that format. By adding the attributes annotation, users can selectively extract data from the incoming message and assign them to the attributes.

There are two ways to configure attributes. In both cases, add the attributes in parentheses after the query:

  • Define attribute names as keys, and mapping configurations as values:

    ... ( <attribute1>='<mapping>', <attributeN>='<mapping>')
  • Define the mapping configurations in the same order as the attributes defined in stream definition:

    ... ( '<mapping for attribute1>', '<mapping for attributeN>')

Supported Source Mapping Types

The following is the list of source mapping types supported by Stream:

Source mapping typeDescription
PassThroughOmits data conversion on Stream events.
JSONConverts JSON messages to Stream events.
XMLConverts XML messages to Stream events.
TEXTConverts plain text messages to Stream events.
Key ValueConverts key-value HashMaps to Stream events.
CSVConverts CSV like delimiter separated events to Stream events.
tip

When the map.type annotation is not provided map.type='passThrough' is used as default, that passes the consumed stream events directly to the streams without any data conversion.

Example 1

Receive JSON messages via stream, and direct them to InputStream stream for processing. The stream expects the JSON messages to be on the default data format that's supported by the JSON mapper as follows.

{
"name":"Paul",
"age":20,
"country":"UK"
}

The configuration of the stream source and JSON source mapper to achieve the above is as follows.

CREATE SOURCE InputStream WITH (source.type='stream', streams.list='foo', map.type='json') (name string, age int, country string);

Example 2

Receive JSON messages via database, and direct them to StockStream stream for processing. Here the incoming JSON, as given below, do not adhere to the default data format that's supported by the JSON mapper.

{
"portfolio":{
"stock":{
"volume":100,
"company":{
"symbol":"FB"
},
"price":55.6
}
}
}

The configuration of the database source and the custom JSON source mapping to achieve the above is as follows.

CREATE SOURCE StockStream WITH (source.type='database', collection='foo', map.type='json', enclosing.element="$.portfolio",) (symbol = "stock.company.symbol", price = "stock.price", volume = "stock.volume");

The same can also be configured by omitting the attribute names as below.

CREATE SOURCE StockStream WITH (source.type='database', collection='foo', map.type='json', enclosing.element="$.portfolio",) ("stock.company.symbol", "stock.price", "stock.volume");

Sink

Sinks consumes events from streams and publish them via multiple transports to external endpoints in various data formats.

A sink configuration allows users to define a mapping to convert the Stream events in to the required output data format (such as JSON, TEXT, XML, etc.) and publish the events to the configured endpoints. When customizations to such mappings are not provided, Stream converts events to the predefined event format based on the stream definition and the configured message mapper type before publishing the events.

Purpose

Sink provides a way to publish stream events of a stream to external systems by converting events to their supported format.

Syntax

To configure a stream to publish events via a sink, add the sink configuration to a stream definition by adding the sink.type annotation with the required parameter values.

The sink syntax is as follows:

CREATE SINK <stream name> WITH (sink.type='<sink type>', <static.key>='<value>', <dynamic.key>='{{<value>}}', map.type='<map type>', <static.key>='<value>', <dynamic.key>='{{<value>}}', map.payload'<payload mapping>')) (<attribute1> <type>, <attributeN> <type>);
note

"Dynamic Properties" The sink and sink mapper properties that are categorized as dynamic have the ability to absorb attribute values dynamically from the Stream events of their associated streams. This can be configured by enclosing the relevant attribute names in double curly braces as{{...}}, and using it within the property values.

Some valid dynamic properties values are:

  • '{{attribute1}}'
  • 'This is {{attribute1}}'
  • {{attribute1}} > {{attributeN}}

Here the attribute names in the double curly braces will be replaced with the values from the events before they are published.

This syntax includes the following annotations.

Sink

The type parameter of the sink.type annotation defines the sink type that publishes the events. The other parameters of the sink.type annotation depends upon the selected sink type, and here some of its parameters can be optional and/or dynamic.

The following is a list of sink types supported by stream processor:

Source typeDescription
databaseAllow StreamApp to publish events to collections (doc, graphs) in the same or different geofabric.
HTTPPublish events to an HTTP endpoint.
KafkaPublish events to Kafka topic.
TCPPublish events to a TCP service.
EmailSend emails via SMTP protocols.
Web SocketPublish events to a Web Socket

Distributed Sink

Distributed Sinks publish events from a defined stream to multiple endpoints using load balancing or partitioning strategies.

Any sink can be used as a distributed sink. A distributed sink configuration allows users to define a common mapping to convert and send the Stream events for all its destination endpoints.

Purpose

Distributed sink provides a way to publish Stream events to multiple endpoints in the configured event format.

Syntax

To configure distributed sink add the sink configuration to a stream definition by adding the sink.type property and add the configuration parameters that are common of all the destination endpoints inside it, along with the common parameters also add the distribution.strategy property specifying the distribution strategy (i.e. roundRobin or partitioned) and destination properties providing each endpoint specific configurations.

The distributed sink syntax is as follows:

RoundRobin Distributed Sink

Publishes events to defined destinations in a round robin manner.

CREATE SINK <stream name> WITH (sink.type='<sink type>', <common.static.key>='<value>', <common.dynamic.key>='{{<value>}}', map.type='<map type>', <static.key>='<value>', <dynamic.key>='{{<value>}}', map.payload='<payload mapping>' distribution.strategy='roundRobin', destination.<key>='<value>', destination.<key>='<value>') (<attribute1> <type>, <attributeN> <type>);

Partitioned Distributed Sink

Publishes events to defined destinations by partitioning them based on the partitioning key.


CREATE SINK <stream name> WITH (sink.type='<sink type>', <common.static.key>='<value>', <common.dynamic.key>='{{<value>}}', map.type='<map type>', <static.key>='<value>', <dynamic.key>='{{<value>}}', map.payload='<payload mapping>', distribution.strategy='partitioned', partitionKey='<partition key>', destination.<key>='<value>', destination.<key>='<value>') (<attribute1> <type>, <attributeN> <type>);

Sink Mapper

Each sink.type configuration can have a mapping denoted by the map.type annotation that defines how to convert Stream events to outgoing messages with the defined format.

The type parameter of the map.type defines the map type to be used in converting the outgoing events. The other parameters of map.type annotation depends on the mapper selected, and some of its parameters can be optional and/or dynamic.

For detailed information about the parameters see the documentation of the relevant mapper.

Map Payload

map.payload is an optional annotation used with map.type to define custom mapping. When the map.payload annotation is not provided, each mapper maps the outgoing events to its own default event format. The map.payload annotation allow users to configure mappers to produce the output payload of their choice, and by using dynamic properties within the payload they can selectively extract and add data from the published Stream events.

There are two ways you to configure map.payload annotation.

  1. Some mappers such as XML, JSON, and Test only accept one output payload:

    map.payload='This is a test message from {{user}}.'
  2. Some mappers such key-value accept series of mapping values:

    map.payload= key1='mapping_1', 'key2'='user : {{user}}'

    Here, the keys of payload mapping can be defined using the dot notation as a.b.c, or using any constant string value as '$abc'.

Supported Sink Mapping Types

The following is a list of sink mapping types supported by Stream:

Sink mapping typeDescription
PassThroughOmits data conversion on Stream events.
JSONConverts JSON messages to Stream events.
XMLConverts XML messages to Stream events.
TEXTConverts plain text messages to Stream events.
Key ValueConverts key-value HashMaps to Stream events.
CSVConverts CSV like delimiter separated events to Stream events.
tip

When the map.type annotation is not provided map.type='passThrough' is used as default, that passes the outgoing Stream events directly to the sinks without any data conversion.

Example 1

Publishes OutputStream events by converting them to JSON messages with the default format, and by sending to an HTTP endpoint http://localhost:8005/endpoint1, using POST method, Accept header, and basic authentication having admin is both username and password.

The configuration of the HTTP sink and JSON sink mapper to achieve the above is as follows.


CREATE SINK OutputStream WITH (sink.type='http', publisher.url='http://localhost:8005/endpoint', method='POST', headers='Accept-Date:20/02/2017', basic.auth.enabled='true', basic.auth.username='admin', basic.auth.password='admin', map.type='json') (name string, age int, country string);

This will publish a JSON message on the following format:

{
"event":{
"name":"Paul",
"age":20,
"country":"UK"
}
}

Example 2

Publishes StockStream events by converting them to user defined JSON messages, and by sending to an HTTP endpoint http://localhost:8005/stocks.

The configuration of the HTTP sink and custom JSON sink mapping to achieve the above is as follows.

CREATE SINK StockStream WITH (sink.type='http', publisher.url='http://localhost:8005/stocks', map.type='json', validate.json='true', enclosing.element='$.Portfolio', map.payload="""{"StockData":{ "Symbol":"{{symbol}}", "Price":{{price}} }}""") (symbol string, price float, volume long);

This will publish a single event as the JSON message on the following format:

{
"Portfolio":{
"StockData":{
"Symbol":"GOOG",
"Price":55.6
}
}
}

This can also publish multiple events together as a JSON message on the following format:

{
"Portfolio":[
{
"StockData":{
"Symbol":"GOOG",
"Price":55.6
}
},
{
"StockData":{
"Symbol":"FB",
"Price":57.0
}
}
]
}

Example 3

Publishes events from the OutputStream stream to multiple the HTTP endpoints using a partitioning strategy. Here the events are sent to either http://localhost:8005/endpoint1 or http://localhost:8006/endpoint2 based on the partitioning key country. It uses default JSON mapping, POST method, and used admin as both the username and the password when publishing to both the endpoints.

The configuration of the distributed HTTP sink and JSON sink mapper to achieve the above is as follows.

CREATE SINK OutputStream WITH (sink.type='http', method='POST', basic.auth.enabled='true', basic.auth.username='admin', basic.auth.password='admin', map.type='json', distribution.strategy='partitioned', partitionKey='country', destination.publisher.url='http://localhost:8005/endpoint1', destination.publisher.url='http://localhost:8006/endpoint2') (name string, age int, country string);

This will partition the outgoing events and publish all events with the same country attribute value to the same endpoint. The JSON message published will be on the following format:

{
"event":{
"name":"Paul",
"age":20,
"country":"UK"
}
}

Error Handling

Errors in Stream can be handled at the Streams and in Sinks.

Error Handling at Stream

When errors are thrown by Stream elements subscribed to the stream, the error gets propagated up to the stream that delivered the event to those Stream elements. By default the error is logged and dropped at the stream, but this behavior can be altered by by adding OnError property to the corresponding stream definition.

OnError property can help users to capture the error and the associated event, and handle them gracefully by sending them to a fault stream.

The OnError property and the required action to be specified as below.

CREATE SOURCE <stream name> WITH (OnError.action='<action>') (<attribute name> <attribute type>, <attribute name> <attribute type>, ... );

The action parameter of the OnError property defines the action to be executed during failure scenarios.

The following actions can be specified to OnError property to handle erroneous scenarios.

  • STREAM: Creates a fault stream and redirects the event and the error to it. The created fault stream will have all the attributes defined in the base stream to capture the error causing event, and in addition it also contains _error attribute of type object to containing the error information. The fault stream can be referred by adding ! in front of the base stream name as !<stream name>.

Example

Handle errors in TempStream by redirecting the errors to a fault stream.

The configuration of TempStream stream and OnError property is as follows.

CREATE STREAM TempStream WITH(OnError.action="STREAM") (deviceID long, roomNo int, temp double;

Stream infers and automatically defines the fault stream of TempStream as given below.

CREATE STREAM !TempStream (deviceID long, roomNo int, temp double, _error object);

The StreamApp extending the above the use-case by adding failure generation and error handling with the use of queries is as follows.

Note: Details on writing processing logics via queries will be explained in later sections.

-- Define fault stream to handle error occurred at TempStream subscribers

CREATE STREAM TempStream WITH(OnError.action="STREAM") (deviceID long, roomNo int, temp double;

-- Error generation through a custom function `createError()`
@name('error-generation')
insert into IgnoreStream1
from TempStream#custom:createError();

-- Handling error by simply logging the event and error.
@name('handle-error')
insert into IgnoreStream2
select deviceID, roomNo, temp, _error
from !TempStream#log("Error Occurred!");

Error Handling at Sink

There can be cases where external systems becoming unavailable or coursing errors when the events are published to them. By default sinks log and drop the events causing event losses, and this can be handled gracefully by configuring on.error parameter of the sink.type annotation.

The on.error parameter of the sink.type annotation can be specified as below.

CREATE SINK <stream name> WITH (sink.type='<sink type>', on.error.action='<on error action>', <key>='<value>', ...) (<attribute name> <attribute type>, <attribute name> <attribute type>, ... );

The following actions can be specified to on.error parameter of sink.type annotation to handle erroneous scenarios.

  • WAIT : Publishing threads wait in back-off and re-trying mode, and only send the events when the connection is re-established. During this time the threads will not consume any new messages causing the systems to introduce back pressure on the systems that publishes to it.

  • STREAM: Pushes the failed events with the corresponding error to the associated fault stream the sink belongs to.

Example 1

Introduce back pressure on the threads who bring events via TempStream when the system cannot connect to Kafka.

The configuration of TempStream stream and sink.type Kafka annotation with on.error property is as follows.

CREATE SINK TempStream WITH (sink.type='kafka', on.error.action='WAIT', topic='{{roomNo}}', bootstrap.servers='localhost:9092', map.type='xml') (deviceID long, roomNo int, temp double);

Example 2

Send events to the fault stream of TempStream when the system cannot connect to Kafka.

The configuration of TempStream stream with associated fault stream, sink.type Kafka annotation with on.error property and a queries to handle the error is as follows.

Note: Details on writing processing logics via queries will be explained in later sections.

CREATE SINK TempStream WITH (sink.type='kafka', on.error.action='STREAM', topic='{{roomNo}}', bootstrap.servers='localhost:9092', map.type='xml') (deviceID long, roomNo int, temp double);

-- Handling error by simply logging the event and error.
@name('handle-error')
insert into IgnoreStream;
select deviceID, roomNo, temp, _error
from !TempStream#log("Error Occurred!")