Data Transformation
Math & Logical Operations
This example shows the use of math or logical operations on events.
CREATE STREAM TemperatureStream (sensorId string, temperature double);
@info(name = 'celciusTemperature')
-- Converts Celsius value into Fahrenheit.
insert into FahrenheitTemperatureStream
select sensorId, (temperature * 9 / 5) + 32 as temperature
from TemperatureStream;
@info(name = 'Overall-analysis')
-- Calculate approximated temperature to the first digit
insert all events into OverallTemperatureStream
select sensorId, math:floor(temperature) as approximateTemp
from FahrenheitTemperatureStream;
@info(name = 'RangeFilter')
-- Filter out events where `-2 < approximateTemp < 40`
insert into NormalTemperatureStream
select *
from OverallTemperatureStream[ approximateTemp > -2 and approximateTemp < 40];
Input
Below event is sent to TemperatureStream
,
['SensorId'
, -17
]
Output
After processing, the following events will be arriving at each stream:
- FahrenheitTemperatureStream: [
'SensorId'
,1.4
] - OverallTemperatureStream: [
'SensorId'
,1.0
] - NormalTemperatureStream: [
'SensorId'
,1.0
]
Transform JSON
This example shows transforming JSON objects within a stream application.
CREATE STREAM InputStream(jsonString string);
-- Transforms JSON string to JSON object which can then be manipulated
insert into PersonalDetails
select json:toObject(jsonString) as jsonObj
from InputStream ;
insert into OutputStream
select jsonObj,
-- Get the `name` element(string) form the JSON
json:getString(jsonObj,'$.name') as name,
-- Validate if `salary` element is available
json:isExists(jsonObj, '$.salary') as isSalaryAvailable,
-- Stringify the JSON object
json:toString(jsonObj) as jsonString
from PersonalDetails;
-- Set `salary` element to `0` is not available
insert into PreprocessedStream
select json:setElement(jsonObj, '$', 0f, 'salary') as jsonObj
from OutputStream[isSalaryAvailable == false];
Input
Below event is sent to InputStream
,
[
{
"name" : "streamapp.user",
"address" : {
"country": "USA"
},
"contact": "+9xxxxxxxx"
}
]
Output
After processing, the following events will be arriving:
- OutputStream:
[
{
"address": {
"country":"USA"
},
"contact":"+9xxxxxxxx",
"name":"streamapp.user"
}
]
- PreprocessedStream:
[
{
"name" : "streamapp.user",
"salary": 0.0,
"address" : {
"country": "USA"
},
"contact": "+9xxxxxxxx"
}
]