Skip to main content

Streaming ML

This extension provides streaming machine learning (clustering, classification and regression) on event streams.

Features

  • bayesianRegression (StreamProcessor)

    This extension predicts using a Bayesian linear regression model.Bayesian linear regression allows determining the uncertainty of each prediction by estimating the full-predictive distribution

  • kMeansIncremental (StreamProcessor)

    Performs K-Means clustering on a streaming data set. Data points can be of any dimension and the dimensionality is calculated from number of parameters. All data points to be processed by a query should be of the same dimensionality. The Euclidean distance is taken as the distance metric. The algorithm resembles Sequential K-Means Clustering at https://www.cs.princeton.edu/courses/archive/fall08/cos436/Duda/C/sk_means.htm

  • kMeansMiniBatch (StreamProcessor)

    Performs K-Means clustering on a streaming data set. Data points can be of any dimension and the dimensionality is calculated from number of parameters. All data points to be processed in a single query should be of the same dimensionality. The Euclidean distance is taken as the distance metric. The algorithm resembles mini-batch K-Means. (refer Web-Scale K-Means Clustering by D.Sculley, Google, Inc.).

  • perceptronClassifier (StreamProcessor)

    This extension predicts using a linear binary classification Perceptron model.

  • updateBayesianRegression (StreamProcessor)

    This extension builds/updates a linear Bayesian regression model. This extension uses an improved version of stochastic variational inference.

  • updatePerceptronClassifier (StreamProcessor)

    This extension builds/updates a linear binary classification Perceptron model.

bayesianRegression

This extension predicts using a Bayesian linear regression model.Bayesian linear regression allows determining the uncertainty of each prediction by estimating the full-predictive distribution

Syntax

streamingml:bayesianRegression(<STRING> model.name, <DOUBLE|FLOAT|INT|LONG> model.feature, <DOUBLE|FLOAT|INT|LONG> ...)
streamingml:bayesianRegression(<STRING> model.name, <INT> prediction.samples, <DOUBLE|FLOAT|INT|LONG> model.feature, <DOUBLE|FLOAT|INT|LONG> ...)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
model.nameThe name of the model to be usedSTRINGNoNo
prediction.samplesThe number of samples to be drawn to estimate the prediction1000INTYesNo
model.featureThe features of the model that need to be attributes of the streamDOUBLE FLOAT INT LONGNoYes

Extra Return Attributes

NameDescriptionPossible Types
predictionThe predicted value (double)DOUBLE
confidenceInverse of the standard deviation of the predictive distributionDOUBLE

EXAMPLE 1

CREATE STREAM StreamA (attribute_0 double, attribute_1 double, attribute_2 double, attribute_3 double);

from StreamA#streamingml:bayesianRegression('model1', attribute_0, attribute_1, attribute_2, attribute_3)
insert all events into OutputStream;

This query uses a Bayesian regression model named model1 to predict the label of the feature vector represented by attribute_0, attribute_1, attribute_2, and attribute_3. The predicted value is emitted to the OutputStream streamalong with the prediction confidence (std of predictive distribution) and the feature vector. As a result, the OutputStream stream is defined as follows: (attribute_0 double, attribute_1 double, attribute_2 double, attribute_3 double, prediction double, confidence double).

kMeansIncremental

Performs K-Means clustering on a streaming data set. Data points can be of any dimension and the dimensionality is calculated from number of parameters. All data points to be processed by a query should be of the same dimensionality. The Euclidean distance is taken as the distance metric. The algorithm resembles Sequential K-Means Clustering at https://www.cs.princeton.edu/courses/archive/fall08/cos436/Duda/C/sk_means.htm

Syntax

streamingml:kMeansIncremental(<INT> no.of.clusters, <DOUBLE|FLOAT|INT|LONG> model.feature, <DOUBLE|FLOAT|INT|LONG> ...)
streamingml:kMeansIncremental(<INT> no.of.clusters, <DOUBLE> decay.rate, <DOUBLE|FLOAT|INT|LONG> model.feature, <DOUBLE|FLOAT|INT|LONG> ...)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
no.of.clustersThe assumed number of natural clusters in the data set.INTNoNo
decay.ratethis is the decay rate of old data compared to new data. Value of this will be in [0,1]. 0 means only old data used and1 will mean that only new data is used0.01DOUBLEYesNo
model.featureThis is a variable length argument. Depending on the dimensionality of data points we will receive coordinates as features along each axis.DOUBLE FLOAT INT LONGNoYes

Extra Return Attributes

NameDescriptionPossible Types
euclideanDistanceToClosestCentroidRepresents the Euclidean distance between the current data point and the closest centroid.DOUBLE
closestCentroidCoordinateThis is a variable length attribute. Depending on the dimensionality(D) we will return closestCentroidCoordinate1, closestCentroidCoordinate2,... closestCentroidCoordinateD which are the d dimensional coordinates of the closest centroid from the model to the current event. This is the prediction result and this represents the cluster to which the current event belongs to.DOUBLE

EXAMPLE 1

CREATE STREAM InputStream (x double, y double);
@info(name = 'query1')
insert into OutputStream
select closestCentroidCoordinate1, closestCentroidCoordinate2, x, y
from InputStream#streamingml:kMeansIncremental(2, 0.2, x, y);

This is an example where user provides the decay rate. First two events will be used to initiate the model since the required number of clusters is specified as 2. After the first event itself prediction would start.

EXAMPLE 2

CREATE STREAM InputStream (x double, y double);
@info(name = 'query1')
insert into OutputStream
select closestCentroidCoordinate1, closestCentroidCoordinate2, x, y
from InputStream#streamingml:kMeansIncremental(2, x, y);

This is an example where user doesnt give the decay rate so the default value will be used

kMeansMiniBatch

Performs K-Means clustering on a streaming data set. Data points can be of any dimension and the dimensionality is calculated from number of parameters. All data points to be processed in a single query should be of the same dimensionality. The Euclidean distance is taken as the distance metric. The algorithm resembles mini-batch K-Means. (refer Web-Scale K-Means Clustering by D.Sculley, Google, Inc.).

Syntax

streamingml:kMeansMiniBatch(<INT> no.of.clusters, <DOUBLE|FLOAT|INT|LONG> model.feature, <DOUBLE|FLOAT|INT|LONG> ...)
streamingml:kMeansMiniBatch(<INT> no.of.clusters, <DOUBLE> decay.rate, <DOUBLE|FLOAT|INT|LONG> model.feature, <DOUBLE|FLOAT|INT|LONG> ...)
streamingml:kMeansMiniBatch(<INT> no.of.clusters, <INT> maximum.iterations, <DOUBLE|FLOAT|INT|LONG> model.feature, <DOUBLE|FLOAT|INT|LONG> ...)
streamingml:kMeansMiniBatch(<INT> no.of.clusters, <INT> no.of.events.to.retrain, <DOUBLE|FLOAT|INT|LONG> model.feature, <DOUBLE|FLOAT|INT|LONG> ...)
streamingml:kMeansMiniBatch(<INT> no.of.clusters, <DOUBLE> decay.rate, <INT> maximum.iterations, <DOUBLE|FLOAT|INT|LONG> model.feature, <DOUBLE|FLOAT|INT|LONG> ...)
streamingml:kMeansMiniBatch(<INT> no.of.clusters, <DOUBLE> decay.rate, <INT> no.of.events.to.retrain, <DOUBLE|FLOAT|INT|LONG> model.feature, <DOUBLE|FLOAT|INT|LONG> ...)
streamingml:kMeansMiniBatch(<INT> no.of.clusters, <INT> maximum.iterations, <INT> no.of.events.to.retrain, <DOUBLE|FLOAT|INT|LONG> model.feature, <DOUBLE|FLOAT|INT|LONG> ...)
streamingml:kMeansMiniBatch(<INT> no.of.clusters, <DOUBLE> decay.rate, <INT> maximum.iterations, <INT> no.of.events.to.retrain, <DOUBLE|FLOAT|INT|LONG> model.feature, <DOUBLE|FLOAT|INT|LONG> ...)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
no.of.clustersThe assumed number of natural clusters in the data set.INTNoNo
decay.ratethis is the decay rate of old data compared to new data. Value of this will be in [0,1]. 0 means only old data used and1 will mean that only new data is used0.01DOUBLEYesNo
maximum.iterationsNumber of iterations, the process iterates until the number of maximum iterations is reached or the centroids do not change50INTYesNo
no.of.events.to.retrainnumber of events to recalculate cluster centers.20INTYesNo
model.featureThis is a variable length argument. Depending on the dimensionality of data points we will receive coordinates as features along each axis.DOUBLE FLOAT INT LONGNoYes

Extra Return Attributes

NameDescriptionPossible Types
euclideanDistanceToClosestCentroidRepresents the Euclidean distance between the current data point and the closest centroid.DOUBLE
closestCentroidCoordinateThis is a variable length attribute. Depending on the dimensionality(d) we will return closestCentroidCoordinate1 to closestCentroidCoordinated which are the d dimensional coordinates of the closest centroid from the model to the current event. This is the prediction result and this represents the cluster towhich the current event belongs to.DOUBLE

EXAMPLE 1

CREATE STREAM InputStream (x double, y double);
@info(name = 'query1')
insert into OutputStream
select closestCentroidCoordinate1, closestCentroidCoordinate2, x, y
from InputStream#streamingml:kMeansMiniBatch(2, 0.2, 10, 20, x, y);

This is an example where user gives all three hyper parameters. first 20 events will be consumed to build the model and from the 21st event prediction would start

EXAMPLE 2

CREATE STREAM InputStream (x double, y double);
@info(name = 'query1')
insert into OutputStream
select closestCentroidCoordinate1, closestCentroidCoordinate2, x, y
from InputStream#streamingml:kMeansMiniBatch(2, x, y);

This is an example where user has not specified hyper params. So default values will be used.

perceptronClassifier

This extension predicts using a linear binary classification Perceptron model.

Syntax

streamingml:perceptronClassifier(<STRING> model.name, <DOUBLE|FLOAT|INT|LONG> model.feature, <DOUBLE|FLOAT|INT|LONG> ...)
streamingml:perceptronClassifier(<STRING> model.name, <DOUBLE> model.bias, <DOUBLE|FLOAT|INT|LONG> model.feature, <DOUBLE|FLOAT|INT|LONG> ...)
streamingml:perceptronClassifier(<STRING> model.name, <DOUBLE> model.threshold, <DOUBLE|FLOAT|INT|LONG> model.feature, <DOUBLE|FLOAT|INT|LONG> ...)
streamingml:perceptronClassifier(<STRING> model.name, <DOUBLE> model.bias, <DOUBLE> model.threshold, <DOUBLE|FLOAT|INT|LONG> model.feature, <DOUBLE|FLOAT|INT|LONG> ...)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
model.nameThe name of the model to be used.STRINGNoNo
model.biasThe bias of the Perceptron algorithm.0.0DOUBLEYesNo
model.thresholdThe threshold that separates the two classes. The value specified must be between zero and one.0.5DOUBLEYesNo
model.featureThe features of the model that need to be attributes of the stream.DOUBLE FLOAT INT LONGNoYes

Extra Return Attributes

NameDescriptionPossible Types
predictionThe predicted value (true/false)BOOL
confidenceLevelThe probability of the predictionDOUBLE

EXAMPLE 1

CREATE STREAM StreamA (attribute_0 double, attribute_1 double, attribute_2 double, attribute_3 double);

insert all events into OutputStream
from StreamA#streamingml:perceptronClassifier('model1',0.0,0.5, attribute_0, attribute_1, attribute_2, attribute_3);

This query uses a Perceptron model named model1 with a 0.0 bias and a 0.5 threshold learning rate to predict the label of the feature vector represented by attribute_0, attribute_1, attribute_2, and attribute_3. The predicted label (true/false) is emitted to the OutputStream streamalong with the prediction confidence level(probability) and the feature vector. As a result, the OutputStream stream is defined as follows: (attribute_0 double, attribute_1 double, attribute_2 double, attribute_3 double, prediction bool, confidenceLevel double).

EXAMPLE 2

CREATE STREAM StreamA (attribute_0 double, attribute_1 double, attribute_2 double, attribute_3 double);

insert all events into OutputStream
from StreamA#streamingml:perceptronClassifier('model1',0.0, attribute_0, attribute_1, attribute_2, attribute_3);

This query uses a Perceptron model named model1 with a 0.0 bias to predict the label of the feature vector represented by attribute_0, attribute_1, attribute_2, and attribute_3. The prediction(true/false) is emitted to the OutputStreamstream along with the prediction confidence level(probability) and the feature. As a result, the OutputStream stream is defined as follows: (attribute_0 double, attribute_1 double, attribute_2 double, attribute_3 double, prediction bool, confidenceLevel double).

EXAMPLE 3

CREATE STREAM StreamA (attribute_0 double, attribute_1 double, attribute_2 double, attribute_3 double);

insert all events into OutputStream
from StreamA#streamingml:perceptronClassifier(`model1`, attribute_0, attribute_1, attribute_2);

This query uses a Perceptron model named model1 with a default 0.0 bias to predict the label of the feature vector represented by attribute_0, attribute_1, and attribute_2. The predicted probability is emitted to the OutputStream stream along with the feature vector. As a result, the OutputStream is defined as follows: (attribute_0 double, attribute_1 double, attribute_2 double, attribute_3 double, prediction bool, confidenceLevel double).

updateBayesianRegression

This extension builds/updates a linear Bayesian regression model. This extension uses an improved version of stochastic variational inference.

Syntax

streamingml:updateBayesianRegression(<STRING> model.name, <INT|DOUBLE|LONG|FLOAT> model.target, <DOUBLE|FLOAT|INT|LONG> model.feature, <DOUBLE|FLOAT|INT|LONG> ...)
streamingml:updateBayesianRegression(<STRING> model.name, <INT|DOUBLE|LONG|FLOAT> model.target, <INT> model.samples, <DOUBLE|FLOAT|INT|LONG> model.feature, <DOUBLE|FLOAT|INT|LONG> ...)
streamingml:updateBayesianRegression(<STRING> model.name, <INT|DOUBLE|LONG|FLOAT> model.target, <STRING> model.optimizer, <DOUBLE|FLOAT|INT|LONG> model.feature, <DOUBLE|FLOAT|INT|LONG> ...)
streamingml:updateBayesianRegression(<STRING> model.name, <INT|DOUBLE|LONG|FLOAT> model.target, <DOUBLE> learning.rate, <DOUBLE|FLOAT|INT|LONG> model.feature, <DOUBLE|FLOAT|INT|LONG> ...)
streamingml:updateBayesianRegression(<STRING> model.name, <INT|DOUBLE|LONG|FLOAT> model.target, <INT> model.samples, <STRING> model.optimizer, <DOUBLE|FLOAT|INT|LONG> model.feature, <DOUBLE|FLOAT|INT|LONG> ...)
streamingml:updateBayesianRegression(<STRING> model.name, <INT|DOUBLE|LONG|FLOAT> model.target, <INT> model.samples, <DOUBLE> learning.rate, <DOUBLE|FLOAT|INT|LONG> model.feature, <DOUBLE|FLOAT|INT|LONG> ...)
streamingml:updateBayesianRegression(<STRING> model.name, <INT|DOUBLE|LONG|FLOAT> model.target, <STRING> model.optimizer, <DOUBLE> learning.rate, <DOUBLE|FLOAT|INT|LONG> model.feature, <DOUBLE|FLOAT|INT|LONG> ...)
streamingml:updateBayesianRegression(<STRING> model.name, <INT|DOUBLE|LONG|FLOAT> model.target, <INT> model.samples, <STRING> model.optimizer, <DOUBLE> learning.rate, <DOUBLE|FLOAT|INT|LONG> model.feature, <DOUBLE|FLOAT|INT|LONG> ...)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
model.nameThe name of the model to be built.STRINGNoNo
model.targetThe target attribute (dependant variable) of the input stream.INT DOUBLE LONG FLOATNoYes
model.samplesNumber of samples used to construct the gradients.1INTYesNo
model.optimizerThe type of optimization usedADAMSTRINGYesNo
learning.rateThe learning rate of the updater0.05DOUBLEYesNo
model.featureFeatures of the model that need to be attributes of the stream.DOUBLE FLOAT INT LONGNoYes

Extra Return Attributes

NameDescriptionPossible Types
lossloss of the model.DOUBLE

EXAMPLE 1

CREATE STREAM StreamA (attribute_0 double, attribute_1 double, attribute_2 double, attribute_3 double, attribute_4 double );

insert all events into outputStream
from StreamA#streamingml:updateBayesianRegression('model1', attribute_4, attribute_0, attribute_1, attribute_2, attribute_3);

This query builds/updates a Bayesian Linear regression model named model1 using attribute_0, attribute_1, attribute_2, and attribute_3 as features, and attribute_4 as the label. Updated weights of the model are emitted to the OutputStream stream.

EXAMPLE 2

CREATE STREAM StreamA (attribute_0 double, attribute_1 double, attribute_2 double, attribute_3 double, attribute_4 double );

insert all events into outputStream
from StreamA#streamingml:updateBayesianRegression('model1', attribute_4, 2, 'NADAM', 0.01, attribute_0, attribute_1, attribute_2, attribute_3);

This query builds/updates a Bayesian Linear regression model named model1 with a 0.01 learning rate using attribute_0, attribute_1, attribute_2, and attribute_3 as features, and attribute_4 as the label. Updated weights of the model are emitted to the OutputStream stream. This model draws two samples during monte-carlo integration and uses NADAM optimizer.

updatePerceptronClassifier

This extension builds/updates a linear binary classification Perceptron model.

Syntax

streamingml:updatePerceptronClassifier(<STRING> model.name, <BOOL|STRING> model.label, <DOUBLE|FLOAT|INT|LONG> model.feature, <DOUBLE|FLOAT|INT|LONG> ...)
streamingml:updatePerceptronClassifier(<STRING> model.name, <BOOL|STRING> model.label, <DOUBLE> learning.rate, <DOUBLE|FLOAT|INT|LONG> model.feature, <DOUBLE|FLOAT|INT|LONG> ...)

QUERY PARAMETERS

NameDescriptionDefault ValuePossible Data TypesOptionalDynamic
model.nameThe name of the model to be built/updated.STRINGNoNo
model.labelThe attribute of the label or the class of the dataset.BOOL STRINGNoYes
learning.rateThe learning rate of the Perceptron algorithm.0.1DOUBLEYesNo
model.featureFeatures of the model that need to be attributes of the stream.DOUBLE FLOAT INT LONGNoYes

Extra Return Attributes

NameDescriptionPossible Types
featureWeightWeight of the feature.name of the model.DOUBLE

EXAMPLE 1

CREATE STREAM StreamA (attribute_0 double, attribute_1 double, attribute_2 double, attribute_3 double, attribute_4 string );

insert all events into outputStream
from StreamA#streamingml:updatePerceptronClassifier('model1', attribute_4, 0.01, attribute_0, attribute_1, attribute_2, attribute_3);

This query builds/updates a Perceptron model named model1 with a 0.01 learning rate using attribute_0, attribute_1, attribute_2, and attribute_3 as features, and attribute_4 as the label. Updated weights of the model are emitted to the OutputStream stream.

EXAMPLE 2

CREATE STREAM StreamA (attribute_0 double, attribute_1 double, attribute_2 double,attribute_3 double, attribute_4 string );

insert all events into outputStream
from StreamA#streamingml:updatePerceptronClassifier('model1', attribute_4, attribute_0, attribute_1, attribute_2, attribute_3);

This query builds/updates a Perceptron model named model1 with a default 0.1 learning rate using attribute_0, attribute_1, attribute_2, and attribute_3 as features, and attribute_4 as the label. The updated weights of the model are appended to the outputStream.