Skip to Content
TablesScan & Query

Scan & Query

Scanning tables is the process of reading data row-by-row performing row-based scalar transformations, or row-based filtering operations.

Scan Object

The scan method returns a Scan object that encapsulates a specific query. This object can then return rows of data, or be used to perform further operations.

import spiral import pyarrow as pa from spiral.demo import gharchive sp = spiral.Spiral() events_table = gharchive(sp) scan = sp.scan(events_table[["id", "type", "public", "actor", "payload.*", "repo"]]) # The result schema of the scan schema: pa.Schema = scan.schema # Whether the scan is definitely empty is_empty: bool = scan.is_empty() # Read as a stream of RecordBatches record_batches: pa.RecordBatchReader = scan.to_record_batches() # Read into a single PyArrow Table arrow_table: pa.Table = scan.to_table() # Read into a Dask DataFrame dask_df: dd.DataFrame = scan.to_dask() # Read into a Ray Dataset # ray_ds: ray.data.Dataset = scan.to_ray_dataset() # Read into a Pandas DataFrame pandas_df: pd.DataFrame = scan.to_pandas() # Read into a Polars DataFrame polars_df: pl.DataFrame = scan.to_polars() # Read into a Torch-compatible DataLoader for model training data_loader: torch.utils.data.DataLoader = scan.to_data_loader() # Read into a Torch-compatible DataLoader for distributed model training distributed_data_loader: torch.utils.data.DataLoader = scan.to_distributed_data_loader() # Read into a Torch-compatible IterableDataset for model training iterable_dataset: torch.utils.data.IterableDataset = scan.to_iterable_dataset()

Scan’s to_record_batches method returns rows sorted by the table’s key. If ordering is not important, consider using to_unordered_record_batches which can be faster.

Filtering

Filtering is the process of selecting rows that meet a certain condition. For example, to find events with a specific event type:

insertion_events = sp.scan( events_table, where=events_table['type'] == 'PullRequestEvent' )

Any expression that resolves to a boolean value can be used as a filter. See the Expressions documentation for more information.

Projection

Projection is the process of applying a transformation function to a single row of a table. This can be as simple as selecting a subset of columns, through to much more complex functions such as passing a string column through an LLM API.

A projection expression must resolve to a struct value. See Expressions documentation for more information.

Nested Data

Scanning a table selects all columns including nested column groups:

sp.scan(events_table.select(exclude=['payload'])).schema.to_arrow()
pa.schema({ "created_at": pa.timestamp("ms"), "id": pa.string(), "public": pa.bool_(), "type": pa.string(), "actor": pa.struct({ "avatar_url": pa.string(), "display_login": pa.string(), "gravatar_id": pa.string(), "id": pa.int64(), "login": pa.string(), "url": pa.string(), }), "repo": pa.struct({ "id": pa.int64(), "name": pa.string(), "url": pa.string(), }) })

You can select an entire column group using bracket notation. Remember, the result must always be a struct but column group is a struct so this is valid:

sp.scan(events_table["repo"]).schema.to_arrow()
pa.schema({ "id": pa.int64(), "name": pa.string(), "url": pa.string(), })

Selecting a column like this is not valid. When selecting column, use double brackets:

sp.scan(events_table[["public"]]).schema.to_arrow()
pa.schema({ "public": pa.bool_(), })

Or multiple columns:

sp.scan(events_table[["public", "repo"]]).schema.to_arrow()
pa.schema({ "public": pa.bool_(), "repo": pa.struct({ "id": pa.int64(), "name": pa.string(), "url": pa.string(), }), })

You can pack columns with custom names using a dictionary:

sp.scan({ "is_public": events_table["public"], "repo": events_table["repo"], }).schema.to_arrow()
pa.schema({ "is_public": pa.bool_(), "repo": pa.struct({ "id": pa.int64(), "name": pa.string(), "url": pa.string(), }), })

Double-brackets is a “syntax sugar” for the .select() method:

sp.scan(events_table.select("public")).schema.to_arrow()
pa.schema({ "public": pa.bool_(), })

Selection applies over columns and nested column groups:

sp.scan(events_table.select("public", "repo")).schema.to_arrow()
pa.schema({ "public": pa.bool_(), "repo": pa.struct({ "id": pa.int64(), "name": pa.string(), "url": pa.string(), }), })

It is possible to “select into” a column group to get specific fields within nested structures:

sp.scan(events_table.select("public", "repo.id", "repo.url")).schema.to_arrow()
pa.schema({ "public": pa.bool_(), "repo": pa.struct({ "id": pa.int64(), "url": pa.string(), }), })

You can select from multiple nested structures:

sp.scan(events_table.select("public", "repo", "actor.login")).schema.to_arrow()
pa.schema({ "public": pa.bool_(), "repo": pa.struct({ "id": pa.int64(), "name": pa.string(), "url": pa.string(), }), "actor": pa.struct({ "login": pa.string(), }), })

Note that selecting “into” a column group returns a nested structure. To flatten, “step into” the column group first:

sp.scan(events_table[["public"]], events_table["repo"][["id", "url"]]).schema.to_arrow()
pa.schema({ "public": pa.bool_(), "id": pa.int64(), "url": pa.string(), })

Column groups support the same selection operations. This is equivalent to the flattened example above:

sp.scan(events_table["repo"].select("id", "url")).schema.to_arrow()
pa.schema({ "id": pa.int64(), "url": pa.string(), })

Use exclude to remove specific columns, column groups, or keys:

sp.scan(events_table.select(exclude=["payload", "repo", "actor"])).schema.to_arrow()
pa.schema({ "created_at": pa.timestamp("ms"), "id": pa.string(), "public": pa.bool_(), "type": pa.string(), })

Wildcard "*" allows you to select or exclude columns from a specific column group, without including or excluding nested column groups:

sp.scan(events_table.select("*")).schema.to_arrow()
pa.schema({ "created_at": pa.timestamp("ms"), "id": pa.string(), "public": pa.bool_(), "type": pa.string(), })

Exclude all columns to keep only nested groups except payload column group:

sp.scan(events_table.select(exclude=["*", "payload"])).schema.to_arrow()
pa.schema({ "actor": pa.struct({ "avatar_url": pa.string(), "display_login": pa.string(), "gravatar_id": pa.string(), "id": pa.int64(), "login": pa.string(), "url": pa.string(), }), "repo": pa.struct({ "id": pa.int64(), "name": pa.string(), "url": pa.string(), }), })

Wildcards can be mixed with other selections:

sp.scan(events_table.select("*", "actor.login")).schema.to_arrow()
pa.schema({ "created_at": pa.timestamp("ms"), "id": pa.string(), "public": pa.bool_(), "type": pa.string(), "actor": pa.struct({ "login": pa.string(), }), })

Querying

Recent years have seen several excellent query engines including:

Polars

Tables can also be queried with Polars LazyFrames :

tbl = gharchive(sp) df = tbl.to_polars_lazy_frame() result = df.collect()

DuckDB

Tables can be turned into PyArrow Datasets with to_dataset(), which in turn enables the DuckDB Python API .

import duckdb tbl = gharchive(sp) ds = tbl.to_arrow_dataset() result = duckdb.execute("SELECT type, COUNT(*) FROM ds GROUP BY type")

Or using the DuckDB Relational API :

result = tbl.to_duckdb_relation().filter("public is not null").to_arrow_table()

Key Table

Key Table scan is an equivalent of a primary key lookup. Any scan can be evaluated against a sorted and unique table of keys to return only the rows that match the keys, with columns defined by the scan’s projection. The result will always contain all the rows from the input key table, and if scan specifies any filters, rows that do not match the filter will be returned as nulls.

import pyarrow as pa scan = sp.scan(events_table) key_table = pa.table({ "created_at": ['2025-11-20 13:00:00'], "id": ['4807206745'], }) results = scan.to_record_batches(key_table=key_table)

It is possible to stream the keys into the scan using an Arrow RecordBatchReader. In case of a stream, each batch must be sorted and not contain duplicates but the stream as a whole does not need to be sorted.

import pyarrow as pa from typing import Iterable from spiral.demo import abc table_abc = abc(sp) scan = sp.scan(table_abc.select("a", "b")) def batches() -> Iterable[pa.RecordBatch]: yield from [ pa.record_batch( { "a": pa.array([3, 5, 8], type=pa.int64()), } ), pa.record_batch( { "a": pa.array([0, 1, 2, 6, 7, 9], type=pa.int64()), } ), pa.record_batch( { "a": pa.array([1, 2, 3, 4, 5], type=pa.int64()), } ), ] scan.to_record_batches( key_table=pa.RecordBatchReader.from_batches( table_abc.key_schema.to_arrow(), batches(), ) ).read_all().to_pylist()
[ {"a": 3, "b": 103}, {"a": 5, "b": 105}, {"a": 8, "b": 108}, {"a": 0, "b": 100}, {"a": 1, "b": 101}, {"a": 2, "b": 102}, {"a": 6, "b": 106}, {"a": 7, "b": 107}, {"a": 9, "b": 109}, {"a": 1, "b": 101}, {"a": 2, "b": 102}, {"a": 3, "b": 103}, {"a": 4, "b": 104}, {"a": 5, "b": 105}, ]

Note that when streaming scan results with to_record_batches, the result stream can contain more batches than the input key stream; the scan is optimized to return rows as soon as possible. The batch in the result stream will never contain rows that span multiple input key batches. to_record_batches supports a batch_aligned parameter that can be used to ensure that each output batch corresponds exactly to an input key batch. The flag can have a performance impact since it may require buffering rows until the end of the input key batch is reached and even an extra copy. It is recommended to only use this flag when the full result for a given input batch is needed before processing can continue.

Serialization

Scan objects support Python’s pickle protocol, enabling seamless integration with distributed systems like Ray and multiprocessing without requiring manual serialization.

Pickle

import pickle from spiral.demo import abc table_abc = abc(sp) scan = sp.scan(table_abc[["a", "b"]], where=table_abc["a"] < 3) # Serialize pickled = pickle.dumps(scan) # Deserialize restored = pickle.loads(pickled) restored.to_table().to_pylist()
[{"a": 0, "b": 100}, {"a": 1, "b": 101}, {"a": 2, "b": 102}]

Ray

import ray from spiral.demo import abc table_abc = abc(sp) @ray.remote def process_scan(scan): return scan.to_table().num_rows scan = sp.scan(table_abc) count = ray.get(process_scan.remote(scan))

Manual State (JSON-compatible)

For non-pickle contexts, use state_bytes() and resume_scan():

from spiral.demo import abc table_abc = abc(sp) scan = sp.scan(table_abc[["a", "b"]], where=table_abc["a"] < 3) # Serialize scan state state = scan.state_bytes() # bytes, can be base64-encoded for JSON # Resume on another process or machine restored = sp.resume_scan(state) restored.to_table().to_pylist()
[{"a": 0, "b": 100}, {"a": 1, "b": 101}, {"a": 2, "b": 102}]

Cross-table

It is possible to jointly scan any tables that have a common key schema. This has the same behaviour as an outer join but is more efficient since we know the tables are both sorted by the key columns.

sp.scan( { "events": events_table, "other_events": other_events_table, } )
Last updated on