Aggregation and Windowing
Temporal computation is most important when dealing with aggregations, because aggregations incorporate values associated with different times.
Basic Aggregations
We can think of aggregations as consuming a stream of input values and producing a stream of output values. By default each time an aggregation consumes an input it produces an output. In this case the time associated with each output is the same as the time associated with the corresponding input, and the output’s value is the result of applying the aggregation to all the inputs consumed up to that time.
Purchase.amount | sum()
Time | Purchase.amount | … | sum() |
---|---|---|
2012-02-23 |
5 |
5 |
2012-05-10 |
2 |
7 |
2018-11-03 |
13 |
20 |
2019-10-26 |
4 |
24 |
Windowed Aggregations
The default behavior of aggregations is to produce an output whose value is an aggregation of all inputs seen to date each time an input is consumed. This behavior can be controlled using windowed aggregations.
Controlling What is Aggregated
The first aspect describes the set of input values used in an aggregation. The default behavior is for every input value to contribute. In some cases it may be preferable to only include the N most recent inputs, or to include every input since a particular event occurred.
Controlling When is Aggregated
The second aspect describes when the result of the aggregation should be produced. The default behavior is to produce an output value every time an input value is consumed. In some cases it may be preferable to produce an output value at the end of each day, when a particular event occurs.
Windowing Examples
Aggregations may be windowed by providing a window generator for the
aggregation’s window
parameter. For example the
sliding(2, is_valid(Purchase))
window generator computes the sum of
the two most recent valid purchases.
The sliding(n, bool)
window generator affects what is aggregated but
retains the default when behavior of producing an output associated
with each input.
Purchase.amount | sum(window = sliding(2, is_valid(Purchase))
Time | Purchase.amount | … | sum(window = sliding(2, is_valid(Purchase)) |
---|---|---|
2012-02-23 |
5 |
null |
2012-05-10 |
2 |
7 |
2018-11-03 |
13 |
15 |
2019-10-26 |
4 |
17 |
The yearly()
window generator can be used to compute the
total of all purchases at the beginning of each year.
Purchase.amount | sum(window = since(yearly()))
Time | … | sum(window = since(yearly())) |
---|---|
2013-01-01 |
7 |
2014-01-01 |
0 |
2015-01-01 |
0 |
2016-01-01 |
0 |
2017-01-01 |
0 |
2018-01-01 |
0 |
2019-01-01 |
13 |
2020-01-01 |
4 |
Going Deeper
Yearly windows produce values at the end of the window, but when should we stop producing windows? The set of times associated with events is finite and known when a computation takes place, but there is an unbounded number of year boundaries. To avoid producing unbounded results, Fenl limits "cron-style" windows to time intervals that begin before the newest event and end after the oldest event in the dataset, across all entities. |
Repeated Aggregation
Events may be aggregated multiple times. The events themselves are a sequence of timestamped data for each entity. The result of the first aggregation is the same — a sequence of timestamped data for each entity. Applying an additional aggregation simply aggregates over those times. For example, we can compute the average purchase amount sum.
Purchase.amount | sum() | mean()
Time | Purchase.amount | …| sum() | … | mean() |
---|---|---|---|
2012-02-23 |
5 |
5 |
5 |
2012-05-10 |
2 |
7 |
6 |
2018-11-03 |
13 |
20 |
10.666 |
2019-10-26 |
4 |
24 |
14 |