Skip to main content

Summarizing Data

Introduction

Summarizing data refers to obtaining aggregates in an incremental manner for a specified set of time periods.

Summarization by clock-time

Performing clock-time based based summarization involves calculating, storing, and then retrieving aggregations for a selected range of time granularities. This process is carried out in two parts:

  1. Calculating the aggregations for the selected time granularities and storing the results.
  2. Retrieving previously calculated aggregations for selected time granularities.

To understand data summarization further, consider the scenario where a business that sells multiple brands stores its sales data in a physical database for the purpose of retrieving them later to perform sales analysis. Each sales transaction is received with the following details:

`symbol`: The symbol that represents the brand of the items sold.
`price`: the price at which each item was sold.
`amount`: The number of items sold.

The Sales Analyst needs to retrieve the total number of items sold of each brand per month, per week, per day etc., and then retrieve these totals for specific time durations to prepare sales analysis reports.

info

It is not always required to maintain a physical database for incremental analysis, but it enables you to try out your aggregations with ease.

The following sections explain how to calculate and store time-based aggregations for this scenarios, and then retrieve them.

Calculate and store clock-time-based aggregate values

To calculate and store time-based aggregation values for the scenario explained above, follow the procedure below.

  1. Start creating a new stream application. You can name it TradeApp For instructions, see Creating a Stream Application.

    @App:name("TradeApp");
    @App:qlVersion("2")
  2. To capture the input events based on which the aggregations are calculated, define an input stream as follows.

    CREATE STREAM TradeStream (symbol string, price double, quantity long, timestamp long);
    info
    In addition to the `symbol`, `price`, and `quantity` attributes to capture the input details already mentioned, the above stream definition includes an attribute named timestamp to capture the time at which the sales transaction occurs. The aggregations are executed based on this time. This attribute's value could either be a long value (reflecting the Unix timestamp in milliseconds), or a string value adhering to one of the following formats.

    * **`<YYYY>-<MM>-<dd> <HH>:<mm>:<ss> <Z>`**: This format can be used if the timezone needs to be specified explicitly. Here the ISO 8601 UTC offset must be provided for <Z> . e.g., +05:30 reflects the India Time Zone. If time is not in GMT, this value must be provided.)
    * **`<yyyy>-<MM>-<dd> <HH>:<mm>:<ss>`**: This format can be used if the timezone is in GMT.
  3. Create an aggregation as follows. You can name it TradeAggregation.

    info
    The system uses the aggregation name you define here as part of the database table name. Table name is `<Aggregation_Name>_<Granularity>`. System will automatically create a collection `TradeAggregation_HOUR` in `c8db` as we will be calculating the aggregation hourly in the next step.
    CREATE AGGREGATION TradeAggregation
  4. To calculate aggregations, include a query as follows:

    1. To select attributes to be included in the output event, add a select clause as follows.

      select symbol, avg(price) as avgPrice, sum(quantity) as total

      Here, the avg() fuction is applied to the price attribute to derive the average price. The sum() function is applied to the quantity attribute to derive the total quantity.

    2. To get input events from the TradeStream stream that you previously defined, add a from clause as follows.

      from TradeStream
    3. To group the output by the symbol, add a group by clause as follows.

      group by symbol
    4. The timestamp included in each input event allows you to calculate aggregates for the range of time granularities seconds-years. Therefore, to calculate aggregates for each time granularity within this range, add the aggregate by clause to this aggregate query as follows.

      aggregate by timestamp every hour;
  5. The completed stream application is as follows.

    @App:name("TradeApp")
    @App:qlVersion("2")

    CREATE STREAM TradeStream (symbol string, price double, quantity long, timestamp long);

    CREATE AGGREGATION TradeAggregation

    @info(name = 'CalculatingAggregates')
    select symbol, avg(price) as avgPrice, sum(quantity) as total
    from TradeStream
    group by symbol
    aggregate by timestamp every hour;

Retrieve the stored aggregate values

This section involves retrieving the aggregate values that you calculated and persisted in the Calculate and store clock-time-based aggregate values subsection.

To do this, let's add the definitions and queries required for retrieval to the TradeApp stream application that you have already created in the previous section.

  1. Open the TradeApp stream application.

  2. To retrieve aggregations, you need to make retrieval requests. To capture these requests as events, let's define a stream as follows.

    CREATE STREAM TradeSummaryRetrievalStream (symbol string);
  3. To process the events captured via the TradeSummaryRetrievalStream stream you defined, add a new query as follows.

    insert into TradeSummaryStream
    select a.symbol, a.total, a.avgPrice
    from TradeSummaryRetrievalStream as b join TradeAggregation as a
    on a.symbol == b.symbol
    within "2014-02-15 00:00:00 +05:30", "2014-03-16 00:00:00 +05:30"
    per "days" ;
  4. The completed stream application is as follows.

    @App:name("TradeApp")
    @App:qlVersion("2")

    CREATE STREAM TradeStream (symbol string, price double, quantity long, timestamp long);

    CREATE STREAM TradeSummaryRetrievalStream (symbol string);

    CREATE AGGREGATION TradeAggregation
    select symbol, avg(price) as avgPrice, sum(quantity) as total
    from TradeStream
    group by symbol
    aggregate by timestamp every hour;

    @info(name = 'RetrievingAggregates')
    insert into TradeSummaryStream
    select a.symbol, a.total, a.avgPrice
    from TradeSummaryRetrievalStream as b join TradeAggregation as a
    on a.symbol == b.symbol
    within "2014-02-15 00:00:00 +05:30", "2014-03-16 00:00:00 +05:30"
    per "days" ;

Summarization by Windowing Criteria

This section explains how to apply stream processing logic to process a subset of events received to a stream based on time or the number of events. This is achieved via stream windows.

The window can apply to a batch of events or in a sliding manner. This is further explained in the following sections.

Performing a time-based summarization in a sliding manner

This subsection demonstrates how to summarize data for a short term based on time and well as how to do a summarization in a sliding manner.

To demonstrate this, consider a factory manager who wants to be able to check the production for the last hour at any given time. Every event represents a production run. For this purpose, a Stream application can be created as follows:

  1. Start creating a new stream application. You can name it PastHourProductionApp For instructions, see Creating a Stream Application.

    @App:name('PastHourProductionApp');
    @App:qlVersion("2")
  2. To capture details about each production run, define an input stream as follows.

    CREATE STREAM ProductionStream (name string, amount long, timestamp long);
  3. To publish the production for the last hour, define the output stream as follows.

    CREATE STREAM PastHourProductionStream WITH (type='log', prefix='Production totals over the past hour:') (name string, pastHourTotal long);
    note
    A sink annotation is connected to the output stream to log the output events. For more information about adding sinks to publish events, see the [Publishing Data](/cep/tutorials/publishing-data).
  4. To define how the output is derived, add the select statement as follows:

    select name, sum(amount) as pastHourTotal

    Here, the total is derived by applying the sum() function to the amount attribute of the ProductionStream input stream.

  5. To specify that the processing done as defined via the select statement applies to a time window, add the from clause and include the time window as shown below. This must be added above the select clause.

    from ProductionStream#window.time(1 hour)
    note
    `window.time` indicates that the window added is a time window. The time considered is one hour. The window is a sliding window which considers the last hour at any given time.

    (For example, when the stream processor calculates the total production during the time 13.00-14.00, next it calculates the total production during the time 13.01-14.01 after the 13.01 minute as elapsed.)

    For details about other window types supported, see [Plugins- Unique](/cep/reference/extensions/execution/unique).
  6. To group by the product name, add the group by clause as follows.

    group by name
  7. To insert the results into the PastHourProductionStream output stream, add the insert into clause as follows.

    insert into PastHourProductionStream
  8. The completed stream application is as follows:

    @App:name('PastHourProductionApp')
    @App:qlVersion("2")

    CREATE STREAM ProductionStream (name string, amount long, timestamp long);

    CREATE STREAM PastHourProductionStream WITH (type='log', prefix='Production totals over the past hour:') (name string, pastHourTotal long);

    insert into PastHourProductionStream
    select name, sum(amount) as pastHourTotal
    from ProductionStream#window.time(1 hour)
    group by name;

Performing a length-based summarization to a batch of events

This subsection demonstrates how to summarize data for a specific number of events as well as how to do that summarization for batches of events.

To demonstrate this, assume that a factory manager wants to track the maximum production in every 10 production runs. IOn order to do so, let's create a Stream application as follows:

  1. Start creating a new stream application. You can name it ProductionApp For instructions, see Creating a Stream Application.

    @App:name('MaximumProductionApp')
    @App:qlVersion("2")
  2. Define an input stream as follows to capture details about the production.

    CREATE STREAM ProductionStream (name string, amount long);
  3. To output the maximum production detected every 10 production runs, define an output stream as follows.

    CREATE STREAM DetectedMaximumProductionStream WITH (type='log', prefix='Maximum production in last 10 runs') (name string, maximumValue long);
    note
    A sink annotation is connected to the output stream to log the output events. For more information about adding sinks to publish events, see the [Publishing Data](/cep/tutorials/publishing-data).
  4. To define the subset of events to be considered based on the number of events, add the from clause with a lengthBatch window as follows.

    from ProductionStream#window.lengthBatch(10)

    window.lengthBatch indicates that the window added is a length window that considers events in batches when determin ing subsets. The number of events in each batch is 10. For details about other window types supported, see Plugins - Unique.

  5. To derive the values for the DetectedMaximumProductionStream output stream, add the select statement as follows.

    select name, max(amount) as maximumValue

    Here, the max() function is applied to the amount attribute to derive the maximum value.

  6. To group by the product name, add the group by clause as follows.

    group by name
  7. To insert the maximum production detected into the DetectedMaximumProductionStream output stream, add the insert into clause as follows.

    insert into DetectedMaximumProductionStream

The completed stream application is as follows.

@App:name('MaximumProductionApp') 
@App:qlVersion("2")

CREATE STREAM ProductionStream (name string, amount long, timestamp long);

CREATE STREAM DetectedMaximumProductionStream WITH (type='log', prefix='Maximum production in last 10 runs') (name string, maximumValue long);

insert into DetectedMaximumProductionStream
select name, max(amount) as maximumValue
from ProductionStream#window.lengthBatch(10)
group by name
;