Write Tables
We’re going to explore the Spiral Tables write API using the ever classic GitHub Archive dataset .
Create a Table
Currently, tables can only be created using the Spiral Python API.
import pyarrow as pa
import spiral
from spiral.demo import demo_project
sp = spiral.Spiral()
project = demo_project(sp)
# Define the key schema for the table.
# Random keys such as UUIDs should be avoided, so we're using `created_at` as part of primary key.
key_schema = pa.schema([('created_at', pa.timestamp('ms')), ('id', pa.string_view())])
# Create the 'events-v1' table in the 'gharchive' dataset
events = project.create_table('gharchive.events-v1', key_schema=key_schema, exist_ok=True)To open the table in the future:
events = project.table("gharchive.events-v1")We’ll define a function that downloads data from the GitHub Archive and download an hour of data.
import pyarrow as pa
import pandas as pd
import duckdb
def get_events(period: pd.Period, limit=100) -> pa.Table:
json_gz_url = f"https://data.gharchive.org/{period.strftime('%Y-%m-%d')}-{str(period.hour)}.json.gz"
arrow_table = (
duckdb.read_json(json_gz_url, union_by_name=True)
.limit(limit) # remove this line to load more rows
.select("""
* REPLACE (
cast(created_at AS TIMESTAMP_MS) AS created_at,
to_json(payload) AS payload
)
""")
.to_arrow_table()
)
events = duckdb.from_arrow(arrow_table).order("created_at, id").distinct().to_arrow_table()
events = (
events.drop_columns("id")
.add_column(0, "id", events["id"].cast(pa.large_string()))
.drop_columns("created_at")
.add_column(0, "created_at", events["created_at"].cast(pa.timestamp("ms")))
.drop_columns("org")
)
return events
data: pa.Table = get_events(pd.Period("2023-01-01T00:00:00Z", freq="h"))Write to a Table
All write operations to a Spiral table must include the key columns.
Writing nested data such as struct arrays or dictionaries into a table creates column groups. See Best Practices for how to design your schema for optimal performance.
The write API accepts a variety of data formats, including PyArrow tables and dictionaries. Since our function returns a PyArrow table, we can write it directly to the Spiral table.
events.write(data)We could also have written multiple times atomically within a transaction.
with events.txn() as tx:
for period in pd.period_range("2023-01-01T00:00:00Z", "2023-01-01T01:00:00Z", freq="h"):
data = get_events(period)
tx.write(data)Or in parallel, useful when ingesting large amounts of data:
from concurrent.futures import ThreadPoolExecutor
def write_period(tx, period: pd.Period):
data = get_events(period)
tx.write(data)
with events.txn() as tx:
with ThreadPoolExecutor() as executor:
for period in pd.period_range("2023-01-01T00:00:00Z", "2023-01-01T01:00:00Z", freq="h"):
executor.submit(write_period, tx, period)It is important that the primary key columns are unique within the transaction because all writes in a transaction will have a same timestamp.
If a different value is written for the same column and the same key in the same transaction, the result when reading the table is undefined.
Our table schema now looks like this:
events.schema().to_arrow()pa.schema({
"created_at": pa.timestamp("ms"),
"id": pa.string_view(),
"type": pa.string_view(),
"payload": pa.string_view(),
"public": pa.bool_(),
"actor": pa.struct({
"id": pa.int64(),
"login": pa.string_view(),
"display_login": pa.string_view(),
"gravatar_id": pa.string_view(),
"url": pa.string_view(),
"avatar_url": pa.string_view(),
}),
"repo": pa.struct({
"id": pa.int64(),
"name": pa.string_view(),
"url": pa.string_view(),
}),
})Recall that column groups are defined as sibling columns in the same node of the schema tree.
Our schema, in addition to the root column group, contains actor, org, and repo column groups.
While columnar storage formats are quite efficient at pruning unused columns, the distance in the file between the data for each column can introduce seek operations that can significantly slow down queries over object storage. In many cases, if you know upfront that you will always query a set of columns together, you can improve query performance by grouping these columns together in a column group.
Appending Columns
Spiral Tables are merged across writes making it very easy to append columns to an existing table.
Suppose we wanted to derive an action column. We could easily append these columns to the existing table without
rewriting the entire dataset.
This example will use some syntax from the spiral.expressions module. For more detailed examples, please refer to
the Expressions.
action = sp.scan({
'action': {
"repo_id": events["repo.id"],
"actor_id": events["actor.id"],
"type": events["type"],
"payload": events["payload"]
}
}, where=events["repo.name"] == "apache/arrow").to_table()Our table schema now contains action column group, like this:
events.schema().to_arrowpa.schema(
"created_at": pa.timestamp("ms"),
"id": pa.string_view(),
"type": pa.string_view(),
"payload": pa.string_view(),
"public": pa.bool_(),
"actor": pa.struct({
"id": pa.int64(),
"login": pa.string_view(),
"display_login": pa.string_view(),
"gravatar_id": pa.string_view(),
"url": pa.string_view(),
"avatar_url": pa.string_view(),
}),
"repo": pa.struct({
"id": pa.int64(),
"name": pa.string_view(),
"url": pa.string_view(),
}),
"action": pa.struct({
"org_id": pa.int64(),
"repo_id": pa.int64(),
"actor_id": pa.int64(),
"type": pa.string_view(),
"payload": pa.string_view(),
})
}