Queries
Kaskada computes results from the data you’ve loaded by executing queries. A query describes a computation to perform and configures how the query should be executed.
Kaskada queries are witten in a language called Fenl, which is designed to make it easy to describe computations over events. To learn more about Fenl:
-
The Quick Start is an overview of the query language.
-
The Function Catalog documents the operations and functions provided by Fenl.
-
The FAQ answers some comment questions about using Fenl.
Query syntax quickstart
Kaskada’s query language builds on the lessons of 50+ years of query language design to provide a declarative, composable, easy-to-read, and type-safe way of describing computations related to time. The following is a quick overview of the query language’s main features and syntax.
Viewing and filtering the contents of a table
Kaskada queries are built by composing simple expressions. Every expression returns a timeline.
Purchase | when(Purchase.amount > 10)
In this example we start with the expression Purchase
(the timeline of all purchase events) then filter it using when()
.
The result is a timeline of purchase events whose amount is greater than 10.
Stateful aggregations
Aggregate events to produce a continuous timeline whose value can be observed at arbitrary points in time.
{ max_verified_review_to_date: Review.stars | when(Review.verified) | max() }
What are the curly brackets for?
Every query needs to return a Record.
Records allow one or more values to be grouped into a single row.
You can create a record using the syntax |
In this example we first filter the timeline of Review
events to only include verified reviews, then aggregate the filtered results using the max()
aggregation.
The resulting timeline describes the maximum number of stars as-of every point in time.
Automatic joins
Every expression is associated with an entity, allowing tables and expressions to be automatically joined. Entities eliminate redundant boilerplate code.
{ purchases_per_page_view: count(Purchase) / count(Pageview) }
Here we’ve used the count()
aggregation to divide the number of purchases up to each point in time by the number of pageviews up to the same point in time.
The result is a timeline describing how each user’s purchase-per-pageview changes over time.
Since both the Purchase
and Pageview
tables have the same entity, we can easily combine them.
Event-based windowing
Collect events as you move through time, and aggregate them with respect to other events. Ordered aggregation makes it easy to describe temporal interactions.
{
pageviews_since_last_purchase: count(Pageview, window=since(Purchase)),
spend_since_last_review: count(Purchase, window=since(Review)),
}
By default, aggregations are applied from the beginning of time, but here we’ve used the since()
window function to configure the aggregation to reset each time there’s a Purchase
or Review
, respectively.
Pipelined operations
Pipe syntax allows multiple operations to be chained together. Write your operations in the same order you think about them. It’s timelines all the way down, making it easy to aggregate the results of aggregations.
{
largest_spend_over_2_purchases: purchase.amount
| when(Purchase.category == "food")
| sum(window=sliding(2, Purchase.category == "food")) # Inner aggregation
| max() # Outer aggregation
}
Row generators
Pivot from events to time-series. Unlike grouped aggregates, tick generators such as daily()
produce rows even when there’s no input, allowing you to react when something doesn’t happen.
{
signups_per_hour: count(Signups, window=since(daily()))
| when(daily())
| mean()
}
Continuous expressions
Observe the value of aggregations at arbitrary points in time. Timelines are either “discrete” (instantaneous values or events) or “continuous” (values produced by a stateful aggregations). Continuous timelines let you combine aggregates computed from different event sources.
let product_average = Review.stars
| with_key(Review.product_id)
| mean()
in { average_product_review: product_average | lookup(Purchase.product_id)) }
In this example the lookup()
function is used to observe a value computed for a different entity.
The variable product_average
computes the average review using the product’s ID as entity.
The lookup starts with each purchase
, then looks up the current value of the product’s average review, for the procduct ID specified in the purchase.
Native time travel
Shifting values forward (but not backward) in time, allows you to combine different temporal contexts without the risk of temporal leakage. Shifted values make it easy to compare a value “now” to a value from the past.
let purchases_now = count(Purchase)
let purchases_yesterday =
purchases_now | shift_by(days(1))
in { purchases_in_last_day: purchases_now - purchases_yesterday }
In this example we take the timeline produced by purchases_now
and move it forward in time by one day using the shift_by()
function.
We then subtract the shifted value from the original, unshifted value
Simple, composable syntax
It is functions all the way down. No global state, no dependencies to manage, and no spooky action at a distance. Quickly understand what a query is doing, and painlessly refactor to make it DRY.
# How many big purchases happen each hour and where?
let cadence = hourly()
# Anything can be named and re-used
let hourly_big_purchases = Purchase
| when(Purchase.amount > 10)
# Filter anywhere
| count(window=since(cadence))
# Aggregate anything
| when(cadence)
# No choosing between “when” & “having”
in {hourly_big_purchases}
# Records are just another type
| extend({
# …modify them sequentially
last_visit_region: last(Pageview.region)
})
Configuring how queries are computed
A given query can be computed in different ways.
Configuring how timelines are converted into tables
You can either return a table describing each change in the timeline, or a table describing the "final" value of the timeline.
Every query produces a timeline which may be returned in two different ways — the final results (at a specific time) or all historic results. The "result behavior" configures which results are produced. Queries for historic results return the full history of how the values changed over time for each entity. Queries for final results return the latest result for each entity at the specified time (default is after all events have been processed).
You determine which type of query to execute using the "result behavior" configuration at query time.
By default, historical results are returned.
To return final results, you must configure the final-results
behavior:
-
Jupyter
-
Python
-
CLI
%%fenl --result-behavior final-results
{
time: Purchase.purchase_time,
entity: Purchase.customer_id,
max_amount: Purchase.amount | max(),
min_amount: Purchase.amount | min(),
}
from kaskada import compute
query = """{
time: Purchase.purchase_time,
entity: Purchase.customer_id,
max_amount: last(Purchase.amount) | max(),
min_amount: Purchase.amount | min()
}"""
resp = query.create_query(expression=query, result_behavior="final-results")
# query.txt
{
time: Purchase.purchase_time,
entity: Purchase.customer_id,
max_amount: last(Purchase.amount) | max(),
min_amount: Purchase.amount | min()
}
cat query.fenl | kaskada-cli query run --result-behavior final-results
Querying with Python
Using python directly is one way to write queries.
from kaskada import compute
from kaskada.api.session import LocalBuilder
session = LocalBuilder().build()
query = """{
time: Purchase.purchase_time,
entity: Purchase.customer_id,
max_amount: last(Purchase.amount) | max(),
min_amount: Purchase.amount | min()
}"""
response_parquet = compute.query(query = query).output_to.object_store.output_paths[0]
# (Optional) view results as a Pandas dataframe.
import pandas
pandas.read_parquet(response_parquet)
This returns a dataframe with the results of the query.
Optional Parameters (with Python)
When querying directly from python, the following optional parameters are available:
-
with_tables: A list of tables to use in the query, in addition to the tables stored in the system.
-
with_views: A list of views to use in the query, in addition to the views stored in the system.
-
result_behavior: Determines which results are returned. Either
"all-results"
(default), or"final-results"
which returns only the final values for each entity. -
response_as: Determines how the response is returned. Either
"parquet"
(default) or"redis-bulk"
.-
If
"redis-bulk"
, result_behavior is assumed to be"final-results"
.
-
-
data_token_id: Enables repeatable queries. Queries performed against the same data token always run on the same input data.
-
limits: Configures limits on the output set.
Querying with fenlmagic
Using the fenlmagic IPython extension makes iterating on queries easier.
The fenlmagic IPython extension is optional and isn’t required to use Kaskada. Feel free to use whichever client interface fits your workflow. |
You can make Fenl queries by prefixing a query block with %%fenl
. The
query results will be computed and returned as a Pandas dataframe. The
query content starts on the next line and includes the rest of the code
block’s contents:
%%fenl
{
time: Purchase.purchase_time,
entity: Purchase.customer_id,
max_amount: Purchase.amount | max(),
min_amount: Purchase.amount | min(),
}
This returns a dataframe with the results of the query.
Optional Parameters (with fenlmagic)
When querying using fenlmagic, the following optional parameters are available:
-
--result-behavior: Determines which results are returned. Either
all-results
(default), orfinal-results
which returns only the final values for each entity. -
--output: Output format for the query results. One of
df
dataframe (default),json
,parquet
orredis-bulk
.-
If
redis-bulk
, --result-behavior is assumed to befinal-results
.
-
-
--data-token: Enables repeatable queries. Queries performed against the same data token always run on the same input data.
-
--preview-rows: Produces a preview of the data with at least this many rows.
-
--var: Assigns the body to a local variable with the given name.
Example use of some of these options can be found in the next section: Example Queries
Tables and Views
Most basic queries operate against tables. However, queries can also operate on views or a combination of tables and views.
Here’s an example of using a view to filter the values produced by an expression using a table.
%%fenl
{
time: Purchase.purchase_time,
entity: Purchase.customer_id,
total_purchases: Purchase.amount | sum(),
} | when(PurchaseStats.max_amount > 100)
Views may reference other views, so we could give this expression a name and create a view for it as well if we wanted to.
Views are useful any time you need to share or re-use expressions:
-
Cleaning operations
-
Common business logic
-
Final feature vectors
For more help with tables and views, see Working with Tables and Working with Views.
Using --data-token
--data-token
: Enables repeatable queries. Queries performed against
the same data token always run on the same input data.
-
use the data token id returned after loading the first file, and results only include rows from the first file
%%fenl --data-token bdc9e595-a8a0-448c-9a95-c2e3d886b633
purchases
data_token_id: "bdc9e595-a8a0-448c-9a95-c2e3d886b633"
request_details {
request_id: "3f737ff336666515a54dd29a9c5ace3a"
}
id | purchase_time | customer_id | vendor_id | amount | subsort_id | |
---|---|---|---|---|---|---|
0 |
cb_001 |
2020-01-01 00:00:00 |
karen |
chum_bucket |
9 |
0 |
1 |
kk_001 |
2020-01-01 00:00:00 |
patrick |
krusty_krab |
3 |
1 |
2 |
cb_002 |
2020-01-02 00:00:00 |
karen |
chum_bucket |
2 |
2 |
3 |
kk_002 |
2020-01-02 00:00:00 |
patrick |
krusty_krab |
5 |
3 |
4 |
cb_003 |
2020-01-03 00:00:00 |
karen |
chum_bucket |
4 |
4 |
5 |
kk_003 |
2020-01-03 00:00:00 |
patrick |
krusty_krab |
12 |
5 |
6 |
cb_004 |
2020-01-04 00:00:00 |
patrick |
chum_bucket |
5000 |
6 |
7 |
cb_005 |
2020-01-04 00:00:00 |
karen |
chum_bucket |
3 |
7 |
8 |
cb_006 |
2020-01-05 00:00:00 |
karen |
chum_bucket |
5 |
8 |
9 |
kk_004 |
2020-01-05 00:00:00 |
patrick |
krusty_krab |
9 |
9 |
-
use the data token id returned after loading the second file, and results rows from both files
%%fenl --data-token 24c83cac-8cf4-4a45-98f0-dac8d5b303a2
purchases
data_token_id: "24c83cac-8cf4-4a45-98f0-dac8d5b303a2"
request_details {
request_id: "3f737ff336666515a54dd29a9c5ace3a"
}
id | purchase_time | customer_id | vendor_id | amount | subsort_id | |
---|---|---|---|---|---|---|
0 |
cb_001 |
2020-01-01 00:00:00 |
karen |
chum_bucket |
9 |
0 |
1 |
kk_001 |
2020-01-01 00:00:00 |
patrick |
krusty_krab |
3 |
1 |
2 |
cb_002 |
2020-01-02 00:00:00 |
karen |
chum_bucket |
2 |
2 |
3 |
kk_002 |
2020-01-02 00:00:00 |
patrick |
krusty_krab |
5 |
3 |
4 |
cb_003 |
2020-01-03 00:00:00 |
karen |
chum_bucket |
4 |
4 |
5 |
kk_003 |
2020-01-03 00:00:00 |
patrick |
krusty_krab |
12 |
5 |
6 |
cb_004 |
2020-01-04 00:00:00 |
patrick |
chum_bucket |
5000 |
6 |
7 |
cb_005 |
2020-01-04 00:00:00 |
karen |
chum_bucket |
3 |
7 |
8 |
cb_006 |
2020-01-05 00:00:00 |
karen |
chum_bucket |
5 |
8 |
9 |
kk_004 |
2020-01-05 00:00:00 |
patrick |
krusty_krab |
9 |
9 |
10 |
kk_005 |
2020-01-06 00:00:00 |
patrick |
krusty_krab |
2 |
0 |
11 |
wh_001 |
2020-01-06 00:00:00 |
spongebob |
weenie_hut |
7 |
1 |
12 |
cb_007 |
2020-01-07 00:00:00 |
spongebob |
chum_bucket |
34 |
2 |
13 |
wh_002 |
2020-01-08 00:00:00 |
karen |
weenie_hut |
8 |
3 |
14 |
kk_006 |
2020-01-08 00:00:00 |
patrick |
krusty_krab |
9 |
4 |
Using --result-behavior
--result-behavior
: Determines which results are returned.
-
use
all-results
(default) to return all the results for each entity:
%%fenl --result-behavior all-results
purchases
data_token_id: "7bd4e740-9e63-418e-ba9b-5582db010959"
request_details {
request_id: "1badb8b0e220e26cc15b93b234ac3c14"
}
id | purchase_time | customer_id | vendor_id | amount | subsort_id | |
---|---|---|---|---|---|---|
0 |
cb_001 |
2020-01-01 00:00:00 |
karen |
chum_bucket |
9 |
0 |
1 |
kk_001 |
2020-01-01 00:00:00 |
patrick |
krusty_krab |
3 |
1 |
2 |
cb_002 |
2020-01-02 00:00:00 |
karen |
chum_bucket |
2 |
2 |
3 |
kk_002 |
2020-01-02 00:00:00 |
patrick |
krusty_krab |
5 |
3 |
4 |
cb_003 |
2020-01-03 00:00:00 |
karen |
chum_bucket |
4 |
4 |
5 |
kk_003 |
2020-01-03 00:00:00 |
patrick |
krusty_krab |
12 |
5 |
6 |
cb_004 |
2020-01-04 00:00:00 |
patrick |
chum_bucket |
5000 |
6 |
7 |
cb_005 |
2020-01-04 00:00:00 |
karen |
chum_bucket |
3 |
7 |
8 |
cb_006 |
2020-01-05 00:00:00 |
karen |
chum_bucket |
5 |
8 |
9 |
kk_004 |
2020-01-05 00:00:00 |
patrick |
krusty_krab |
9 |
9 |
10 |
kk_005 |
2020-01-06 00:00:00 |
patrick |
krusty_krab |
2 |
0 |
11 |
wh_001 |
2020-01-06 00:00:00 |
spongebob |
weenie_hut |
7 |
1 |
12 |
cb_007 |
2020-01-07 00:00:00 |
spongebob |
chum_bucket |
34 |
2 |
13 |
wh_002 |
2020-01-08 00:00:00 |
karen |
weenie_hut |
8 |
3 |
14 |
kk_006 |
2020-01-08 00:00:00 |
patrick |
krusty_krab |
9 |
4 |
-
use
final-results
to return only the most recent event for each entity
%%fenl --result-behavior final-results
purchases
data_token_id: "7bd4e740-9e63-418e-ba9b-5582db010959"
request_details {
request_id: "145bc51d9bac47f17fd202e5785e58b7"
}
id | purchase_time | customer_id | vendor_id | amount | subsort_id | |
---|---|---|---|---|---|---|
0 |
kk_006 |
2020-01-08 00:00:00 |
patrick |
krusty_krab |
9 |
4 |
1 |
wh_002 |
2020-01-08 00:00:00 |
karen |
weenie_hut |
8 |
3 |
2 |
cb_007 |
2020-01-07 00:00:00 |
spongebob |
chum_bucket |
34 |
2 |
Using --preview-rows
--preview-rows
: Produces a preview of the data with approximately this many rows.
-
Setting a limit allows you to quickly iterate on features and verify your results before running them over your full dataset
-
set to
50
on thetransactions
table to return a preview of at least 50 rows
%%fenl --preview-rows 50
transactions
Returns a dataframe of 71599 rows, instead of the full dataset of 100000 rows.
It may seem odd that many thousands of rows were returned when
only 50 were requested. This happens because query operates on batches
and will return the results of all batches processed in order to reach
the minimum set of rows requested. In this case, compute processed only
a single batch, but the batch had a size of 71599 rows. Note: Using
|
Querying with the CLI
The CLI can be used to make queries with the kaskada-cli query run
command.
To query Kaskada using the CLI, the query string should be provided on STDIN
.
An easy way to define a query is to create a text file containing the query.
{
time: Purchase.purchase_time,
entity: Purchase.customer_id,
max_amount: last(Purchase.amount) | max(),
min_amount: Purchase.amount | min()
}
Run the query by piping it to the CLI.
cat query.fenl | kaskada-cli query run
Running this query will return a JSON-formatted query response object. By default, query results are written to a Parquet file: the locations of these files are included along with additional query metadata in the query response.
{
"state": "STATE_SUCCESS",
"config": {
"dataTokenId": "b5bc4597-47d4-4770-a772-0b94e0d12483"
},
// ...
}
The resulting files are stored in the JSON path outputTo.objectStore.outputPaths.paths
as an array of paths.
To slice and/or filter JSON output we can use jq. |
cat query.fenl | kaskada-cli query run | jq '.outputTo.objectStore.outputPaths.paths'
This will return a list of result files.
[
"file:///<pwd>/data/results/3024b7ae-1429-4c12-867b-3a1f7e86099f/LfiWn-mIVspEbBE-G7yLYug3V2-dW4ASAGmV6g/6089a024-5d97-4f2c-9472-625791550505.parquet"
]
For queries returning a small number of rows, it may be more convenient to output them to STDOUT formatted as CSV.
cat query.fenl | kaskada-cli query run --stdout --response-as csv
You can see the full list of query arguments for the CLI with kaskada-cli query run --help
.
Executes a query on kaskada
Usage:
cli query run "query-text" [flags]
Flags:
--changed-since-time int (Optional) Unix timestamp bound (inclusive) after which results will be output. If 'response-behavior' is 'all-results', this will include rows for changes (events and ticks) after this time (inclusive). If it is 'final-results', this will include a final result for any entity that would be included in the changed results.
--data-token string (Optional) A token to run queries against. Enables repeatable queries.
--dry-run true (Optional) If this is true, then the query is validated and if there are no errors, the resultant schema is returned. No actual computation of results is performed.
--experimental-features true (Optional) If this is true, then experimental features are allowed. Data returned when using this flag is not guaranteed to be correct.
-h, --help help for run
--preview-rows int (Optional) Produces a preview of the results with at least this many rows.
--response-as string (Optional) How to encode the results. Either 'parquet' or 'csv'. (default "parquet")
--result-behavior string (Optional) Determines how results are returned. Either 'all-results' or 'final-results'. (default "all-results")
--stdout true (Optional) If this is true, output results are sent to STDOUT
Global Flags:
--config string config file (default is $HOME/.cli.yaml)
-d, --debug get debug log output
--kaskada-api-server string Kaskada API Server
--kaskada-client-id string Kaskada Client ID
--use-tls Use TLS when connecting to the Kaskada API (default true)