Scan & Query
Scanning tables is the process of reading data row-by-row performing row-based scalar transformations, or row-based filtering operations.
Scan Object
The scan method returns a Scan object that encapsulates a specific query. This object can then return
rows of data, or be used to perform further operations.
import spiral
import pyarrow as pa
from spiral.demo import gharchive
sp = spiral.Spiral()
events_table = gharchive(sp)
scan = sp.scan(events_table[["id", "type", "public", "actor", "payload.*", "repo"]])
# The result schema of the scan
schema: pa.Schema = scan.schema
# Whether the scan is definitely empty
is_empty: bool = scan.is_empty()
# Read as a stream of RecordBatches
record_batches: pa.RecordBatchReader = scan.to_record_batches()
# Read into a single PyArrow Table
arrow_table: pa.Table = scan.to_table()
# Read into a Dask DataFrame
dask_df: dd.DataFrame = scan.to_dask()
# Read into a Ray Dataset
# ray_ds: ray.data.Dataset = scan.to_ray_dataset()
# Read into a Pandas DataFrame
pandas_df: pd.DataFrame = scan.to_pandas()
# Read into a Polars DataFrame
polars_df: pl.DataFrame = scan.to_polars()
# Read into a Torch-compatible DataLoader for model training
data_loader: torch.utils.data.DataLoader = scan.to_data_loader()
# Read into a Torch-compatible DataLoader for distributed model training
distributed_data_loader: torch.utils.data.DataLoader = scan.to_distributed_data_loader()
# Read into a Torch-compatible IterableDataset for model training
iterable_dataset: torch.utils.data.IterableDataset = scan.to_iterable_dataset()Scan’s to_record_batches method returns rows sorted by the table’s key.
If ordering is not important, consider using to_unordered_record_batches which can be faster.
Filtering
Filtering is the process of selecting rows that meet a certain condition. For example, to find events with a specific event type:
insertion_events = sp.scan(
events_table,
where=events_table['type'] == 'PullRequestEvent'
)Any expression that resolves to a boolean value can be used as a filter. See the Expressions documentation for more information.
Projection
Projection is the process of applying a transformation function to a single row of a table. This can be as simple as selecting a subset of columns, through to much more complex functions such as passing a string column through an LLM API.
A projection expression must resolve to a struct value. See Expressions documentation for more information.
Nested Data
Scanning a table selects all columns including nested column groups:
sp.scan(events_table.select(exclude=['payload'])).schema.to_arrow()pa.schema({
"created_at": pa.timestamp("ms"),
"id": pa.string(),
"public": pa.bool_(),
"type": pa.string(),
"actor": pa.struct({
"avatar_url": pa.string(),
"display_login": pa.string(),
"gravatar_id": pa.string(),
"id": pa.int64(),
"login": pa.string(),
"url": pa.string(),
}),
"repo": pa.struct({
"id": pa.int64(),
"name": pa.string(),
"url": pa.string(),
})
})You can select an entire column group using bracket notation. Remember, the result must always be a struct but column group is a struct so this is valid:
sp.scan(events_table["repo"]).schema.to_arrow()pa.schema({
"id": pa.int64(),
"name": pa.string(),
"url": pa.string(),
})Selecting a column like this is not valid. When selecting column, use double brackets:
sp.scan(events_table[["public"]]).schema.to_arrow()pa.schema({
"public": pa.bool_(),
})Or multiple columns:
sp.scan(events_table[["public", "repo"]]).schema.to_arrow()pa.schema({
"public": pa.bool_(),
"repo": pa.struct({
"id": pa.int64(),
"name": pa.string(),
"url": pa.string(),
}),
})You can pack columns with custom names using a dictionary:
sp.scan({
"is_public": events_table["public"],
"repo": events_table["repo"],
}).schema.to_arrow()pa.schema({
"is_public": pa.bool_(),
"repo": pa.struct({
"id": pa.int64(),
"name": pa.string(),
"url": pa.string(),
}),
})Double-brackets is a “syntax sugar” for the .select() method:
sp.scan(events_table.select("public")).schema.to_arrow()pa.schema({
"public": pa.bool_(),
})Selection applies over columns and nested column groups:
sp.scan(events_table.select("public", "repo")).schema.to_arrow()pa.schema({
"public": pa.bool_(),
"repo": pa.struct({
"id": pa.int64(),
"name": pa.string(),
"url": pa.string(),
}),
})It is possible to “select into” a column group to get specific fields within nested structures:
sp.scan(events_table.select("public", "repo.id", "repo.url")).schema.to_arrow()pa.schema({
"public": pa.bool_(),
"repo": pa.struct({
"id": pa.int64(),
"url": pa.string(),
}),
})You can select from multiple nested structures:
sp.scan(events_table.select("public", "repo", "actor.login")).schema.to_arrow()pa.schema({
"public": pa.bool_(),
"repo": pa.struct({
"id": pa.int64(),
"name": pa.string(),
"url": pa.string(),
}),
"actor": pa.struct({
"login": pa.string(),
}),
})Note that selecting “into” a column group returns a nested structure. To flatten, “step into” the column group first:
sp.scan(events_table[["public"]], events_table["repo"][["id", "url"]]).schema.to_arrow()pa.schema({
"public": pa.bool_(),
"id": pa.int64(),
"url": pa.string(),
})Column groups support the same selection operations. This is equivalent to the flattened example above:
sp.scan(events_table["repo"].select("id", "url")).schema.to_arrow()pa.schema({
"id": pa.int64(),
"url": pa.string(),
})Use exclude to remove specific columns, column groups, or keys:
sp.scan(events_table.select(exclude=["payload", "repo", "actor"])).schema.to_arrow()pa.schema({
"created_at": pa.timestamp("ms"),
"id": pa.string(),
"public": pa.bool_(),
"type": pa.string(),
})Wildcard "*" allows you to select or exclude columns from a specific column group, without including or excluding nested column groups:
sp.scan(events_table.select("*")).schema.to_arrow()pa.schema({
"created_at": pa.timestamp("ms"),
"id": pa.string(),
"public": pa.bool_(),
"type": pa.string(),
})Exclude all columns to keep only nested groups except payload column group:
sp.scan(events_table.select(exclude=["*", "payload"])).schema.to_arrow()pa.schema({
"actor": pa.struct({
"avatar_url": pa.string(),
"display_login": pa.string(),
"gravatar_id": pa.string(),
"id": pa.int64(),
"login": pa.string(),
"url": pa.string(),
}),
"repo": pa.struct({
"id": pa.int64(),
"name": pa.string(),
"url": pa.string(),
}),
})Wildcards can be mixed with other selections:
sp.scan(events_table.select("*", "actor.login")).schema.to_arrow()pa.schema({
"created_at": pa.timestamp("ms"),
"id": pa.string(),
"public": pa.bool_(),
"type": pa.string(),
"actor": pa.struct({
"login": pa.string(),
}),
})Querying
Recent years have seen several excellent query engines including:
- Polars - with Python and Rust DataFrame APIs
- DuckDB - with a SQL oriented API
- DataFusion - SQL as well as a Rust DataFrame API
Polars
Tables can also be queried with Polars LazyFrames :
tbl = gharchive(sp)
df = tbl.to_polars_lazy_frame()
result = df.collect()DuckDB
Tables can be turned into PyArrow Datasets with to_dataset(), which in turn enables the DuckDB Python API .
import duckdb
tbl = gharchive(sp)
ds = tbl.to_arrow_dataset()
result = duckdb.execute("SELECT type, COUNT(*) FROM ds GROUP BY type")Or using the DuckDB Relational API :
result = tbl.to_duckdb_relation().filter("public is not null").to_arrow_table()Key Table
Key Table scan is an equivalent of a primary key lookup. Any scan can be evaluated against a sorted and unique table of keys to return only the rows that match the keys, with columns defined by the scan’s projection. The result will always contain all the rows from the input key table, and if scan specifies any filters, rows that do not match the filter will be returned as nulls.
import pyarrow as pa
scan = sp.scan(events_table)
key_table = pa.table({
"created_at": ['2025-11-20 13:00:00'],
"id": ['4807206745'],
})
results = scan.to_record_batches(key_table=key_table)It is possible to stream the keys into the scan using an Arrow RecordBatchReader. In case of a stream, each batch
must be sorted and not contain duplicates but the stream as a whole does not need to be sorted.
import pyarrow as pa
from typing import Iterable
from spiral.demo import abc
table_abc = abc(sp)
scan = sp.scan(table_abc.select("a", "b"))
def batches() -> Iterable[pa.RecordBatch]:
yield from [
pa.record_batch(
{
"a": pa.array([3, 5, 8], type=pa.int64()),
}
),
pa.record_batch(
{
"a": pa.array([0, 1, 2, 6, 7, 9], type=pa.int64()),
}
),
pa.record_batch(
{
"a": pa.array([1, 2, 3, 4, 5], type=pa.int64()),
}
),
]
scan.to_record_batches(
key_table=pa.RecordBatchReader.from_batches(
table_abc.key_schema.to_arrow(),
batches(),
)
).read_all().to_pylist()[
{"a": 3, "b": 103},
{"a": 5, "b": 105},
{"a": 8, "b": 108},
{"a": 0, "b": 100},
{"a": 1, "b": 101},
{"a": 2, "b": 102},
{"a": 6, "b": 106},
{"a": 7, "b": 107},
{"a": 9, "b": 109},
{"a": 1, "b": 101},
{"a": 2, "b": 102},
{"a": 3, "b": 103},
{"a": 4, "b": 104},
{"a": 5, "b": 105},
]Note that when streaming scan results with to_record_batches, the result stream can contain more batches
than the input key stream; the scan is optimized to return rows as soon as possible. The batch in the result stream will
never contain rows that span multiple input key batches. to_record_batches supports a batch_aligned parameter that
can be used to ensure that each output batch corresponds exactly to an input key batch. The flag can have a performance
impact since it may require buffering rows until the end of the input key batch is reached and even an extra copy.
It is recommended to only use this flag when the full result for a given input batch is needed before processing can continue.
Serialization
Scan objects support Python’s pickle protocol, enabling seamless integration with distributed systems like Ray and multiprocessing without requiring manual serialization.
Pickle
import pickle
from spiral.demo import abc
table_abc = abc(sp)
scan = sp.scan(table_abc[["a", "b"]], where=table_abc["a"] < 3)
# Serialize
pickled = pickle.dumps(scan)
# Deserialize
restored = pickle.loads(pickled)
restored.to_table().to_pylist()[{"a": 0, "b": 100}, {"a": 1, "b": 101}, {"a": 2, "b": 102}]Ray
import ray
from spiral.demo import abc
table_abc = abc(sp)
@ray.remote
def process_scan(scan):
return scan.to_table().num_rows
scan = sp.scan(table_abc)
count = ray.get(process_scan.remote(scan))Manual State (JSON-compatible)
For non-pickle contexts, use state_bytes() and resume_scan():
from spiral.demo import abc
table_abc = abc(sp)
scan = sp.scan(table_abc[["a", "b"]], where=table_abc["a"] < 3)
# Serialize scan state
state = scan.state_bytes() # bytes, can be base64-encoded for JSON
# Resume on another process or machine
restored = sp.resume_scan(state)
restored.to_table().to_pylist()[{"a": 0, "b": 100}, {"a": 1, "b": 101}, {"a": 2, "b": 102}]Cross-table
It is possible to jointly scan any tables that have a common key schema. This has the same behaviour as an outer join but is more efficient since we know the tables are both sorted by the key columns.
sp.scan(
{
"events": events_table,
"other_events": other_events_table,
}
)