Spiral Expressions
The expressions module provides a powerful way to build and manipulate expressions for data processing.
This guide will walk you through common use cases using GitHub event data as examples.
Data Schema
Let’s take a look at the public GitHub Archive dataset hosted in Spiral:
import spiral as sp
sp = sp.Spiral()
table = sp.project("public").table("gharchive.events")All examples in this documentation work with the following GitHub events data schema:
table.schema().to_pyarrow()pa.schema({
'id': pa.int64(),
'created_at': pa.timestamp(),
'actor': pa.struct({
'id': pa.int64(),
'login': pa.string(),
'display_login': pa.string(),
'gravatar_id': pa.string(),
'url': pa.string(),
'avatar_url': pa.string()
}),
'repo': pa.struct({
'id': pa.int64(),
'name': pa.string(),
'url': pa.string()
}),
'payload': pa.struct({
'action': pa.string(),
'issue': pa.struct({
'url': pa.string(),
'number': pa.int64()
})
})
})Basic Concepts
To access the expression module, import it as follows:
from spiral import expressions as seTables
Spiral tables are structured as a series of nested structs. Use Python dict syntax to project columns from the table.
# Create a variable referring to the actor's login
actor_login = table["actor"]["login"]Scalars
Scalars are constant values that can be used in expressions.
# Compare with scalar
is_octocat = actor_login == "octocat"Scalars can be explicitly created using the se.scalar function.
# Create a scalar
example_user = se.scalar("octocat")Common Operations
Full list of common operations is available in API docs.
Filtering
# Find issues with number greater than 100
high_number_issues = table["payload"]["issue"]["number"] > 100Combining
octocat_high_issues = is_octocat & high_number_issuesThis is equivalent to:
# Find issues created by octocat that are high-numbered
octocat_high_issues = se.and_(
table["actor"]["login"] == "octocat",
table["payload"]["issue"]["number"] > 100
)Struct Operations
Struct operations allow you to manipulate nested data structures.
Although se.struct is a submodule of se, some functions are also available directly on se.
Selecting Fields
# Select only actor information.
actor_info = se.select(table["actor"], names=["login", "avatar_url"])
# Exclude the avatar URL.
actor_info_no_avatar = se.select(actor_info, exclude=["avatar_url"])Merging Structs
# Combine actor and repo information.
actor_and_repo = se.merge(table["actor"], table["repo"])On a key clash keys are taken from the right most struct.
Creating New Structs
# Create a simplified event representation
simple_event = se.pack({
"user": table["actor"]["login"],
"repo_name": table["repo"]["name"],
"action": table["payload"]["action"],
})Advanced Example
import spiral as sp
import pyarrow as pa
from spiral import expressions as se
from spiral import tables
from datetime import datetime
db = sp.Spiral()
table = sp.project("public").table("gharchive.events")
# First, let's create some reusable components
actor = table["actor"]
repo = table["repo"]
payload = table["payload"]
created_at = table["created_at"]
# 1. Filter conditions
is_2024 = created_at >= datetime(2024, 1, 1)
is_issue_event = payload["action"] == "opened"
has_avatar = actor["gravatar_id"] != ""
# 2. Popular repositories (arbitrary threshold for example)
popular_repo = repo["id"] > 1000000
# 3. Specific users of interest
vip_users = ["octocat", "torvalds", "gvanrossum"]
is_vip = se.or_(*[
actor["login"] == user
for user in vip_users
])
# 4. Combine all filters
where_filter = se.and_(
is_2024,
se.or_(
se.and_(is_issue_event, popular_repo),
is_vip
)
)
# 5. Create a scoring system
user_score = se.pack({
"base_score": se.modulo(actor["id"], 100) * 10,
"avatar_bonus": has_avatar.cast(pa.int64()) * 50
})
# 6. Create the final enriched event structure
projection = se.pack({
# Original fields
"timestamp": created_at,
# User info with computed fields
"user": se.pack({
"login": actor["login"],
"is_vip": is_vip,
"has_avatar": has_avatar,
**user_score
}),
# Repository info
"repository": se.pack({
"name": repo["name"],
"is_popular": popular_repo
}),
# Event type info
"event_type": se.pack({
"is_issue": is_issue_event,
# Null for non-issue events
"issue_number": payload["issue"]["number"],
}),
# Computed total score
"total_score": se.add(
user_score["base_score"],
user_score["avatar_bonus"]
)
})
# 7. Scan the table.
scan: Scan = sp.scan(projection, where=where_filter)
print(scan.schema){
'timestamp': pa.timestamp(),
'user': {
'login': pa.string(),
'is_vip': pa.bool_(),
'has_avatar': pa.bool_(),
'base_score': pa.int64(),
'avatar_bonus': pa.int64()
},
'repository': {
'name': pa.string(),
'is_popular': pa.bool_()
},
'event_type': {
'is_issue': pa.bool_(),
'issue_number': pa.int64()
},
'total_score': pa.int64()
}