Queries

Kaskada computes results from the data you’ve loaded by executing queries. A query describes a computation to perform and configures how the query should be executed.

Kaskada queries are witten in a language called Fenl, which is designed to make it easy to describe computations over events. To learn more about Fenl:

  • The Quick Start is an overview of the query language.

  • The Function Catalog documents the operations and functions provided by Fenl.

  • The FAQ answers some comment questions about using Fenl.

Query syntax quickstart

Kaskada’s query language builds on the lessons of 50+ years of query language design to provide a declarative, composable, easy-to-read, and type-safe way of describing computations related to time. The following is a quick overview of the query language’s main features and syntax.

Viewing and filtering the contents of a table

Kaskada queries are built by composing simple expressions. Every expression returns a timeline.

Purchase | when(Purchase.amount > 10)

In this example we start with the expression Purchase (the timeline of all purchase events) then filter it using when(). The result is a timeline of purchase events whose amount is greater than 10.

Stateful aggregations

Aggregate events to produce a continuous timeline whose value can be observed at arbitrary points in time.

{ max_verified_review_to_date: Review.stars | when(Review.verified) | max() }
What are the curly brackets for?

Every query needs to return a Record. Records allow one or more values to be grouped into a single row. You can create a record using the syntax {key: value, key2: value2}.

In this example we first filter the timeline of Review events to only include verified reviews, then aggregate the filtered results using the max() aggregation. The resulting timeline describes the maximum number of stars as-of every point in time.

Automatic joins

Every expression is associated with an entity, allowing tables and expressions to be automatically joined. Entities eliminate redundant boilerplate code.

{ purchases_per_page_view: count(Purchase) / count(Pageview) }

Here we’ve used the count() aggregation to divide the number of purchases up to each point in time by the number of pageviews up to the same point in time. The result is a timeline describing how each user’s purchase-per-pageview changes over time. Since both the Purchase and Pageview tables have the same entity, we can easily combine them.

Event-based windowing

Collect events as you move through time, and aggregate them with respect to other events. Ordered aggregation makes it easy to describe temporal interactions.

{
  pageviews_since_last_purchase:  count(Pageview, window=since(Purchase)),
  spend_since_last_review:        count(Purchase, window=since(Review)),
}

By default, aggregations are applied from the beginning of time, but here we’ve used the since() window function to configure the aggregation to reset each time there’s a Purchase or Review, respectively.

Pipelined operations

Pipe syntax allows multiple operations to be chained together. Write your operations in the same order you think about them. It’s timelines all the way down, making it easy to aggregate the results of aggregations.

{
  largest_spend_over_2_purchases: purchase.amount
  | when(Purchase.category == "food")
  | sum(window=sliding(2, Purchase.category == "food")) # Inner aggregation
  | max()                                               # Outer aggregation
}

Row generators

Pivot from events to time-series. Unlike grouped aggregates, tick generators such as daily() produce rows even when there’s no input, allowing you to react when something doesn’t happen.

{
  signups_per_hour: count(Signups, window=since(daily()))
  | when(daily())
  | mean()
}

Continuous expressions

Observe the value of aggregations at arbitrary points in time. Timelines are either “discrete” (instantaneous values or events) or “continuous” (values produced by a stateful aggregations). Continuous timelines let you combine aggregates computed from different event sources.

let product_average = Review.stars
| with_key(Review.product_id)
| mean()

in { average_product_review: product_average | lookup(Purchase.product_id)) }

In this example the lookup() function is used to observe a value computed for a different entity. The variable product_average computes the average review using the product’s ID as entity. The lookup starts with each purchase, then looks up the current value of the product’s average review, for the procduct ID specified in the purchase.

Native time travel

Shifting values forward (but not backward) in time, allows you to combine different temporal contexts without the risk of temporal leakage. Shifted values make it easy to compare a value “now” to a value from the past.

let purchases_now = count(Purchase)
let purchases_yesterday =
   purchases_now | shift_by(days(1))

in { purchases_in_last_day: purchases_now - purchases_yesterday }

In this example we take the timeline produced by purchases_now and move it forward in time by one day using the shift_by() function. We then subtract the shifted value from the original, unshifted value

Simple, composable syntax

It is functions all the way down. No global state, no dependencies to manage, and no spooky action at a distance. Quickly understand what a query is doing, and painlessly refactor to make it DRY.

# How many big purchases happen each hour and where?
let cadence = hourly()
# Anything can be named and re-used
let hourly_big_purchases = Purchase
| when(Purchase.amount > 10)
# Filter anywhere
| count(window=since(cadence))
# Aggregate anything
| when(cadence)
# No choosing between “when” & “having”

in {hourly_big_purchases}
# Records are just another type
| extend({
  # …modify them sequentially
  last_visit_region: last(Pageview.region)
})

Configuring how queries are computed

A given query can be computed in different ways.

Configuring how timelines are converted into tables

You can either return a table describing each change in the timeline, or a table describing the "final" value of the timeline.

Every query produces a timeline which may be returned in two different ways — the final results (at a specific time) or all historic results. The "result behavior" configures which results are produced. Queries for historic results return the full history of how the values changed over time for each entity. Queries for final results return the latest result for each entity at the specified time (default is after all events have been processed).

You determine which type of query to execute using the "result behavior" configuration at query time. By default, historical results are returned. To return final results, you must configure the final-results behavior:

  • Jupyter

  • Python

  • CLI

%%fenl --result-behavior final-results
{
    time: Purchase.purchase_time,
    entity: Purchase.customer_id,
    max_amount: Purchase.amount | max(),
    min_amount: Purchase.amount | min(),
}
from kaskada import compute

query = """{
  time: Purchase.purchase_time,
  entity: Purchase.customer_id,
  max_amount: last(Purchase.amount) | max(),
  min_amount: Purchase.amount | min()
}"""

resp = query.create_query(expression=query, result_behavior="final-results")
# query.txt
{
  time: Purchase.purchase_time,
  entity: Purchase.customer_id,
  max_amount: last(Purchase.amount) | max(),
  min_amount: Purchase.amount | min()
}
cat query.fenl | kaskada-cli query run --result-behavior final-results

Querying with Python

Using python directly is one way to write queries.

from kaskada import compute
from kaskada.api.session import LocalBuilder

session = LocalBuilder().build()

query = """{
  time: Purchase.purchase_time,
  entity: Purchase.customer_id,
  max_amount: last(Purchase.amount) | max(),
  min_amount: Purchase.amount | min()
}"""

response_parquet = compute.query(query = query).output_to.object_store.output_paths[0]

# (Optional) view results as a Pandas dataframe.
import pandas
pandas.read_parquet(response_parquet)

This returns a dataframe with the results of the query.

Optional Parameters (with Python)

When querying directly from python, the following optional parameters are available:

  • with_tables: A list of tables to use in the query, in addition to the tables stored in the system.

  • with_views: A list of views to use in the query, in addition to the views stored in the system.

  • result_behavior: Determines which results are returned. Either "all-results" (default), or "final-results" which returns only the final values for each entity.

  • response_as: Determines how the response is returned. Either "parquet" (default) or "redis-bulk".

    • If "redis-bulk", result_behavior is assumed to be "final-results".

  • data_token_id: Enables repeatable queries. Queries performed against the same data token always run on the same input data.

  • limits: Configures limits on the output set.

Querying with fenlmagic

Using the fenlmagic IPython extension makes iterating on queries easier.

The fenlmagic IPython extension is optional and isn’t required to use Kaskada. Feel free to use whichever client interface fits your workflow.

You can make Fenl queries by prefixing a query block with %%fenl. The query results will be computed and returned as a Pandas dataframe. The query content starts on the next line and includes the rest of the code block’s contents:

%%fenl
{
    time: Purchase.purchase_time,
    entity: Purchase.customer_id,
    max_amount: Purchase.amount | max(),
    min_amount: Purchase.amount | min(),
}

This returns a dataframe with the results of the query.

Optional Parameters (with fenlmagic)

When querying using fenlmagic, the following optional parameters are available:

  • --result-behavior: Determines which results are returned. Either all-results (default), or final-results which returns only the final values for each entity.

  • --output: Output format for the query results. One of df dataframe (default), json, parquet or redis-bulk.

    • If redis-bulk, --result-behavior is assumed to be final-results.

  • --data-token: Enables repeatable queries. Queries performed against the same data token always run on the same input data.

  • --preview-rows: Produces a preview of the data with at least this many rows.

  • --var: Assigns the body to a local variable with the given name.

Example use of some of these options can be found in the next section: Example Queries

Tables and Views

Most basic queries operate against tables. However, queries can also operate on views or a combination of tables and views.

Here’s an example of using a view to filter the values produced by an expression using a table.

%%fenl
{
  time: Purchase.purchase_time,
  entity: Purchase.customer_id,
  total_purchases: Purchase.amount | sum(),
} | when(PurchaseStats.max_amount > 100)

Views may reference other views, so we could give this expression a name and create a view for it as well if we wanted to.

Views are useful any time you need to share or re-use expressions:

  • Cleaning operations

  • Common business logic

  • Final feature vectors

For more help with tables and views, see Working with Tables and Working with Views.

Using --data-token

--data-token: Enables repeatable queries. Queries performed against the same data token always run on the same input data.

  • use the data token id returned after loading the first file, and results only include rows from the first file

%%fenl --data-token bdc9e595-a8a0-448c-9a95-c2e3d886b633
purchases
data_token_id: "bdc9e595-a8a0-448c-9a95-c2e3d886b633"
request_details {
  request_id: "3f737ff336666515a54dd29a9c5ace3a"
}
id purchase_time customer_id vendor_id amount subsort_id

0

cb_001

2020-01-01 00:00:00

karen

chum_bucket

9

0

1

kk_001

2020-01-01 00:00:00

patrick

krusty_krab

3

1

2

cb_002

2020-01-02 00:00:00

karen

chum_bucket

2

2

3

kk_002

2020-01-02 00:00:00

patrick

krusty_krab

5

3

4

cb_003

2020-01-03 00:00:00

karen

chum_bucket

4

4

5

kk_003

2020-01-03 00:00:00

patrick

krusty_krab

12

5

6

cb_004

2020-01-04 00:00:00

patrick

chum_bucket

5000

6

7

cb_005

2020-01-04 00:00:00

karen

chum_bucket

3

7

8

cb_006

2020-01-05 00:00:00

karen

chum_bucket

5

8

9

kk_004

2020-01-05 00:00:00

patrick

krusty_krab

9

9

  • use the data token id returned after loading the second file, and results rows from both files

%%fenl --data-token 24c83cac-8cf4-4a45-98f0-dac8d5b303a2
purchases
data_token_id: "24c83cac-8cf4-4a45-98f0-dac8d5b303a2"
request_details {
  request_id: "3f737ff336666515a54dd29a9c5ace3a"
}
id purchase_time customer_id vendor_id amount subsort_id

0

cb_001

2020-01-01 00:00:00

karen

chum_bucket

9

0

1

kk_001

2020-01-01 00:00:00

patrick

krusty_krab

3

1

2

cb_002

2020-01-02 00:00:00

karen

chum_bucket

2

2

3

kk_002

2020-01-02 00:00:00

patrick

krusty_krab

5

3

4

cb_003

2020-01-03 00:00:00

karen

chum_bucket

4

4

5

kk_003

2020-01-03 00:00:00

patrick

krusty_krab

12

5

6

cb_004

2020-01-04 00:00:00

patrick

chum_bucket

5000

6

7

cb_005

2020-01-04 00:00:00

karen

chum_bucket

3

7

8

cb_006

2020-01-05 00:00:00

karen

chum_bucket

5

8

9

kk_004

2020-01-05 00:00:00

patrick

krusty_krab

9

9

10

kk_005

2020-01-06 00:00:00

patrick

krusty_krab

2

0

11

wh_001

2020-01-06 00:00:00

spongebob

weenie_hut

7

1

12

cb_007

2020-01-07 00:00:00

spongebob

chum_bucket

34

2

13

wh_002

2020-01-08 00:00:00

karen

weenie_hut

8

3

14

kk_006

2020-01-08 00:00:00

patrick

krusty_krab

9

4

Using --result-behavior

--result-behavior: Determines which results are returned.

  • use all-results (default) to return all the results for each entity:

%%fenl --result-behavior all-results
purchases
data_token_id: "7bd4e740-9e63-418e-ba9b-5582db010959"
request_details {
  request_id: "1badb8b0e220e26cc15b93b234ac3c14"
}
id purchase_time customer_id vendor_id amount subsort_id

0

cb_001

2020-01-01 00:00:00

karen

chum_bucket

9

0

1

kk_001

2020-01-01 00:00:00

patrick

krusty_krab

3

1

2

cb_002

2020-01-02 00:00:00

karen

chum_bucket

2

2

3

kk_002

2020-01-02 00:00:00

patrick

krusty_krab

5

3

4

cb_003

2020-01-03 00:00:00

karen

chum_bucket

4

4

5

kk_003

2020-01-03 00:00:00

patrick

krusty_krab

12

5

6

cb_004

2020-01-04 00:00:00

patrick

chum_bucket

5000

6

7

cb_005

2020-01-04 00:00:00

karen

chum_bucket

3

7

8

cb_006

2020-01-05 00:00:00

karen

chum_bucket

5

8

9

kk_004

2020-01-05 00:00:00

patrick

krusty_krab

9

9

10

kk_005

2020-01-06 00:00:00

patrick

krusty_krab

2

0

11

wh_001

2020-01-06 00:00:00

spongebob

weenie_hut

7

1

12

cb_007

2020-01-07 00:00:00

spongebob

chum_bucket

34

2

13

wh_002

2020-01-08 00:00:00

karen

weenie_hut

8

3

14

kk_006

2020-01-08 00:00:00

patrick

krusty_krab

9

4

  • use final-results to return only the most recent event for each entity

%%fenl --result-behavior final-results
purchases
data_token_id: "7bd4e740-9e63-418e-ba9b-5582db010959"
request_details {
  request_id: "145bc51d9bac47f17fd202e5785e58b7"
}
id purchase_time customer_id vendor_id amount subsort_id

0

kk_006

2020-01-08 00:00:00

patrick

krusty_krab

9

4

1

wh_002

2020-01-08 00:00:00

karen

weenie_hut

8

3

2

cb_007

2020-01-07 00:00:00

spongebob

chum_bucket

34

2

Using --preview-rows

--preview-rows: Produces a preview of the data with approximately this many rows.

  • Setting a limit allows you to quickly iterate on features and verify your results before running them over your full dataset

  • set to 50 on the transactions table to return a preview of at least 50 rows

%%fenl --preview-rows 50
transactions

Returns a dataframe of 71599 rows, instead of the full dataset of 100000 rows.

It may seem odd that many thousands of rows were returned when only 50 were requested. This happens because query operates on batches and will return the results of all batches processed in order to reach the minimum set of rows requested. In this case, compute processed only a single batch, but the batch had a size of 71599 rows. Note: Using --preview-rows with --result-behavior final-results will cause the full dataset to be processed, as all inputs must be processed to produce final results.

Querying with the CLI

The CLI can be used to make queries with the kaskada-cli query run command. To query Kaskada using the CLI, the query string should be provided on STDIN.

An easy way to define a query is to create a text file containing the query.

query.fenl
{
  time: Purchase.purchase_time,
  entity: Purchase.customer_id,
  max_amount: last(Purchase.amount) | max(),
  min_amount: Purchase.amount | min()
}

Run the query by piping it to the CLI.

cat query.fenl | kaskada-cli query run

Running this query will return a JSON-formatted query response object. By default, query results are written to a Parquet file: the locations of these files are included along with additional query metadata in the query response.

{
	"state": "STATE_SUCCESS",
	"config": {
		"dataTokenId": "b5bc4597-47d4-4770-a772-0b94e0d12483"
	},
  // ...
}

The resulting files are stored in the JSON path outputTo.objectStore.outputPaths.paths as an array of paths.

To slice and/or filter JSON output we can use jq.

cat query.fenl | kaskada-cli query run | jq '.outputTo.objectStore.outputPaths.paths'

This will return a list of result files.

[
  "file:///<pwd>/data/results/3024b7ae-1429-4c12-867b-3a1f7e86099f/LfiWn-mIVspEbBE-G7yLYug3V2-dW4ASAGmV6g/6089a024-5d97-4f2c-9472-625791550505.parquet"
]

For queries returning a small number of rows, it may be more convenient to output them to STDOUT formatted as CSV.

cat query.fenl | kaskada-cli query run --stdout --response-as csv

You can see the full list of query arguments for the CLI with kaskada-cli query run --help.

Executes a query on kaskada

Usage:
  cli query run "query-text" [flags]

Flags:
      --changed-since-time int       (Optional) Unix timestamp bound (inclusive) after which results will be output. If 'response-behavior' is 'all-results', this will include rows for changes (events and ticks) after this time (inclusive). If it is 'final-results', this will include a final result for any entity that would be included in the changed results.
      --data-token string            (Optional) A token to run queries against. Enables repeatable queries.
      --dry-run true                 (Optional) If this is true, then the query is validated and if there are no errors, the resultant schema is returned. No actual computation of results is performed.
      --experimental-features true   (Optional) If this is true, then experimental features are allowed.  Data returned when using this flag is not guaranteed to be correct.
  -h, --help                         help for run
      --preview-rows int             (Optional) Produces a preview of the results with at least this many rows.
      --response-as string           (Optional) How to encode the results.  Either 'parquet' or 'csv'. (default "parquet")
      --result-behavior string       (Optional) Determines how results are returned.  Either 'all-results' or 'final-results'. (default "all-results")
      --stdout true                  (Optional) If this is true, output results are sent to STDOUT

Global Flags:
      --config string               config file (default is $HOME/.cli.yaml)
  -d, --debug                       get debug log output
      --kaskada-api-server string   Kaskada API Server
      --kaskada-client-id string    Kaskada Client ID
      --use-tls                     Use TLS when connecting to the Kaskada API (default true)