Spiral
class Spiral()Main client for interacting with the Spiral data platform.
Configuration is loaded with the following priority (highest to lowest):
- Explicit parameters.
- Environment variables (
SPIRAL__*) - Config file (
~/.spiral.toml) - Default values (production URLs)
Examples:
import spiral
# Default configuration
sp = spiral.Spiral()
# With config overrides
sp = spiral.Spiral(overrides={"limits.read_max_concurrency": "16"})Arguments:
config- Custom ClientSettings object. Defaults to global settings.overrides- Configuration overrides using dot notation, see the Client Configuration page for a full list.
config
@property
def config() -> ClientSettingsReturns the client’s configuration
authn
@property
def authn() -> AuthnGet the authentication handler for this client.
list_projects
def list_projects() -> list["Project"]List project IDs.
create_project
def create_project(*,
description: str | None = None,
id_prefix: str | None = None,
**kwargs) -> "Project"Create a project in the current, or given, organization.
project
def project(project_id: str) -> "Project"Open an existing project.
scan
def scan(*projections: ExprLike,
where: ExprLike | None = None,
asof: datetime | int | None = None,
limit: int | None = None,
hide_progress_bar: bool = False) -> ScanStarts a read transaction on the Spiral.
Arguments:
projections- a set of expressions that return struct arrays.where- a query expression to apply to the data.asof- execute the scan on the version of the table as of the given timestamp. Each transaction creates a new version of the table. The commit timestamp of the transaction can be used in asof. Seespiral txnfor transaction commands in CLI.limit- maximum number of rows to return. When set, the scan will stop reading data once the limit is reached, providing efficient early termination.hide_progress_bar- if True, disables the progress bar during scan building.
scan_keys
def scan_keys(*projections: ExprLike,
where: ExprLike | None = None,
asof: datetime | int | None = None,
limit: int | None = None,
hide_progress_bar: bool = False) -> ScanStarts a keys-only read transaction on the Spiral.
To determine which keys are present in at least one column group of the table, key scan the table itself:
sp.scan_keys(table)Arguments:
projections- scan the keys of the column groups referenced by these expressions.where- a query expression to apply to the data.asof- execute the scan on the version of the table as of the given timestamp. Each transaction creates a new version of the table. The commit timestamp of the transaction can be used in asof. Seespiral txnfor transaction commands in CLI.limit- maximum number of rows to return. When set, the scan will stop reading data once the limit is reached, providing efficient early termination.hide_progress_bar- if True, disables the progress bar during scan building.
open
def open(uri: str, *, key_columns: list[str] | None = None) -> RelationOpen a data source as a Relation.
Arguments:
uri- Currently a URI to the Vortex file. The API will support more inputs, including in-memory tables, soon.key_columns- If provided, only these columns will be used as keys.Relationhas primary key and is sorted by it.
sample
def sample(*projections: ExprLike,
sampler: Callable[[pa.Array], pa.Array],
where: ExprLike | None = None,
asof: datetime | int | None = None,
hide_progress_bar: bool = False) -> SampleScanCreates a SampleScan that can be inspected before execution.
NOTE: This API is experimental and will likely change in the near future.
For most use cases, prefer using sample() directly. This method is useful
when you need to inspect the key_scan and value_scan plans before executing
the sample operation. Call to_record_batches() on the returned SampleScan to
execute and get a RecordBatchReader.
Arguments:
projections- a set of expressions that return struct arrays.sampler- A function that takes a struct array of keys and returns a boolean array indicating which keys to sample.where- a query expression to apply to the data.asof- execute the scan on the version of the table as of the given timestamp. Each transaction creates a new version of the table. The commit timestamp of the transaction can be used in asof. Seespiral txnfor transaction commands in CLI.hide_progress_bar- if True, disables the progress bar during scan building.
Returns:
A SampleScan object with key_plan(), value_plan(), and to_record_batches() methods.
search
def search(top_k: int,
*rank_by: ExprLike,
filters: ExprLike | None = None,
freshness_window: timedelta | None = None) -> pa.RecordBatchReaderQueries the index with the given rank by and filters clauses. Returns a stream of scored keys.
Arguments:
top_k- The number of top results to return.rank_by- Rank by expressions are combined for scoring. Seese.text.findandse.text.boostfor scoring expressions.filters- Thefiltersexpression is used to filter the results. It must return a boolean value and use only conjunctions (ANDs). Expressions in filters statement are considered either amustormust_notclause in search terminology.freshness_window- If provided, the index will not be refreshed if its freshness does not exceed this window.
resume_scan
def resume_scan(scan_bytes: bytes) -> ScanOpen a Scan from a serialized logical plan in this instance of a client.
Arguments:
scan_bytes- The compressed bytes returned by a previous scan’sto_bytes_compressed(). These bytes encode the serialized logical plan.
compute_shards
def compute_shards(*projections: ExprLike,
where: ExprLike | None = None,
asof: datetime | int | None = None,
batch_size: int | None = None) -> list[Shard]Computes shards over the given projections and filter.
Arguments:
projections- a set of expressions that return struct arrays.where- a query expression to apply to the data.asof- execute the scan on the version of the table as of the given timestamp. Each transaction creates a new version of the table. The commit timestamp of the transaction can be used in asof. Seespiral txnfor transaction commands in CLI.batch_size- a specific batch size, otherwise the shards will be computed based on the fragments in the table.
Project
class Project()table
def table(identifier: str) -> TableOpen a table with a dataset.table identifier, or table name using the default dataset.
create_table
def create_table(identifier: str,
*,
key_schema: Schema
| pa.Schema
| Iterable[pa.Field[pa.DataType]]
| Iterable[tuple[str, pa.DataType | str]]
| Mapping[str, pa.DataType | str],
root_uri: Uri | None = None,
exist_ok: bool = False) -> TableCreate a new table in the project.
Arguments:
identifier- The table identifier, in the formdataset.tableortable.key_schema- The schema of the table’s keys. Field types may be passed as pyarrowDataTypeinstances or as string aliases (e.g. “int64”).root_uri- The root URI for the table.exist_ok- If True, do not raise an error if the table already exists.
move_table
def move_table(identifier: str, new_dataset: str)Move a table to a new dataset in the project.
Arguments:
identifier- The table identifier, in the formdataset.tableortable.new_dataset- The dataset into which to move this table.
rename_table
def rename_table(identifier: str, new_table: str)Move a table to a new dataset in the project.
Arguments:
identifier- The table identifier, in the formdataset.tableortable.new_dataset- The dataset into which to move this table.
drop_table
def drop_table(identifier: str)Drop a table from the project.
Arguments:
identifier- The table identifier, in the formdataset.tableortable.
text_index
def text_index(name: str) -> TextIndexReturns the index with the given name.
create_text_index
def create_text_index(name: str,
*projections: ExprLike,
where: ExprLike | None = None,
root_uri: Uri | None = None,
exist_ok: bool = False) -> TextIndexCreates a text index over the table projection.
Arguments:
name- The index name. Must be unique within the project.projections- At least one projection expression is required. All projections must reference the same table.where- An optional filter expression to apply to the index.root_uri- The root URI for the index.exist_ok- If True, do not raise an error if the index already exists.
compute
def compute() -> ComputeGets compute resources configured for this project.
NOTE: Compute is experimental and will likely change in the near future.
deployments
def deployments() -> DeploymentsLong-running predefined deployments on this project’s compute cluster.
NOTE: Deployments are experimental and will likely change in the near future.
Table
class Table(Expr)API for interacting with a SpiralDB’s Table.
Spiral Table is a powerful and flexible way for storing, analyzing, and querying massive and/or multimodal datasets. The data model will feel familiar to users of SQL- or DataFrame-style systems, yet is designed to be more flexible, more powerful, and more useful in the context of modern data processing.
Tables are stored and queried directly from object storage.
identifier
@property
def identifier() -> strReturns the fully qualified identifier of the table.
project
@property
def project() -> str | NoneReturns the project of the table.
dataset
@property
def dataset() -> str | NoneReturns the dataset of the table.
name
@property
def name() -> str | NoneReturns the name of the table.
key_schema
@property
def key_schema() -> SchemaReturns the key schema of the table.
schema
def schema() -> SchemaReturns the FULL schema of the table.
NOTE: This can be expensive for large tables.
write
def write(table: Union[TableLike, "pl.LazyFrame", Scan],
push_down_nulls: bool = False,
**kwargs) -> NoneWrite an item to the table inside a single transaction.
Arguments:
push_down_nulls: Whether to push down nullable structs down its children. E.g.[{"a": 1}, null]would become[{"a": 1}, {"a": null}]. SpiralDB doesn’t allow struct-level nullability, so use this option if your data contains nullable structs.table: The table, or a Scan over one or more other tables, to write. When a Scan is passed, its record batches are streamed into a single transaction; the Scan’s projection must produce this table’s key columns.
enrich
def enrich(*projections: ExprLike,
where: ExprLike | None = None) -> EnrichmentReturns an Enrichment object that, when applied, produces new columns.
Enrichment can be applied in different ways, e.g. distributed.
Arguments:
projections: Projection expressions deriving new columns to write back. Expressions can be over multiple Spiral tables, but all tables including this one must share the same key schema.where: Optional filter expression to apply when reading the input tables.
drop_columns
def drop_columns(column_paths: list[str]) -> NoneDrops the specified columns from the table.
Arguments:
column_paths: Fully qualified column names. (e.g., “column_name” or “nested.field”). All columns must exist, if a column doesn’t exist the function will return an error.
snapshot
def snapshot(asof: datetime | int | None = None) -> SnapshotReturns a snapshot of the table at the given timestamp.
Each transaction creates a new version of the table. The commit timestamp
of the transaction can be used in asof. See spiral txn for transaction
commands in CLI.
txn
def txn(**kwargs) -> TransactionBegins a new transaction. Transaction must be committed for writes to become visible.
While transaction can be used to atomically write data to the table, it is important that the primary key columns are unique within the transaction. The behavior is undefined if this is not the case.
to_arrow_dataset
def to_arrow_dataset(*,
column_group: ColumnGroup | None = None,
skip_children: bool = False) -> "ds.Dataset"Returns a PyArrow Dataset representing the table.
When column_group is provided, the dataset is scoped to that column
group. By default the group’s descendants are included; pass
skip_children=True to read only the fields directly within the
column group.
to_polars_lazy_frame
def to_polars_lazy_frame(*,
column_group: ColumnGroup | None = None,
skip_children: bool = False) -> "pl.LazyFrame"Returns a Polars LazyFrame for the Spiral table.
See to_arrow_dataset for the meaning of column_group and
skip_children.
to_duckdb_relation
def to_duckdb_relation(
*,
column_group: ColumnGroup | None = None,
skip_children: bool = False) -> "duckdb.DuckDBPyRelation"Returns a DuckDB relation for the Spiral table.
See to_arrow_dataset for the meaning of column_group and
skip_children.
key
def key(*parts) -> KeyCreates a Key object for the given parts according to the table’s key schema.
Arguments:
parts- Parts of the key. Must be a valid prefix of the table’s key schema.
Returns:
Key object representing the given parts.
set_metadata
def set_metadata(metadata: dict[str, bytes]) -> NoneSet metadata on the table.
Entries are upserted: existing keys are overwritten, keys not mentioned are left unchanged.
Warnings:
The metadata is not versioned. Parallel calls to set_metadata (on the same machine or on different machines) race to set a value. Ordering is not guaranteed.
Each key + value pair can be at most 8 KiB, keys can be at most 255 characters, and a table can have at most 1024 entries.
Examples:
table.set_metadata({"source": b"sensor", "version": b"2"})If you need to store more complex data, use json to serialize:
import json
table.set_metadata({"config": json.dumps({"f1": 123, "f2": "abc"}).encode()})Arguments:
metadata- A dict mapping string keys to bytes values.
get_metadata
def get_metadata() -> dict[str, bytes]Get metadata for the table.
Returns:
Dict mapping string keys to bytes values.
drop_metadata
def drop_metadata(keys: list[str]) -> NoneRemove the given keys from the table’s metadata.
Keys that do not exist are silently ignored.
Arguments:
keys- List of metadata keys to remove.
mount_file_system
def mount_file_system(file_system_project: "Project", directory: str) -> strMount a file system from another project to this table.
Allows table scans to access files stored in the bucket configured for the given project.
Arguments:
file_system_project- Project for which a bucket is configured; that bucket is being mounted.directory- Directory path within the configured file system project (e.g./data/).
Returns:
Mount ID of the newly created mount.
show_file_systems
def show_file_systems() -> "list[Mount]"Return all file system mounts for this table.
Returns:
List of Mount objects. The primary mount (if configured) is first, followed by any additional secondary mounts.
column_group
def column_group(*paths: str) -> ColumnGroupCreates a ColumnGroup object for the given column paths.
Arguments:
paths- Path to the column group. List of column names or dot-separated paths.
Returns:
ColumnGroup object representing the given columns.
Snapshot
class Snapshot()Spiral table snapshot.
A snapshot represents a point-in-time view of a table.
asof
@property
def asof() -> TimestampReturns the asof timestamp of the snapshot.
table
@property
def table() -> "Table"Returns the table associated with the snapshot.
schema
def schema() -> SchemaReturns the schema of the snapshot.
lazy
def lazy() -> "Relation"Start a lazy query rooted at this snapshot. Returns a lazy computation graph as Relation.
to_arrow_dataset
def to_arrow_dataset(*,
column_group: ColumnGroup | None = None,
skip_children: bool = False) -> "ds.Dataset"Returns a PyArrow Dataset representing the table.
When column_group is provided, the dataset is scoped to that column
group. By default the group’s descendants are included; pass
skip_children=True to read only the fields directly within the
column group.
to_polars_lazy_frame
def to_polars_lazy_frame(*,
column_group: ColumnGroup | None = None,
skip_children: bool = False) -> "pl.LazyFrame"Returns a Polars LazyFrame for the Spiral table.
See to_arrow_dataset for the meaning of column_group and
skip_children.
to_duckdb_relation
def to_duckdb_relation(
*,
column_group: ColumnGroup | None = None,
skip_children: bool = False) -> "duckdb.DuckDBPyRelation"Returns a DuckDB relation for the Spiral table.
See to_arrow_dataset for the meaning of column_group and
skip_children.
set_column_metadata
def set_column_metadata(column_path: str, metadata: dict[str, bytes]) -> NoneSet metadata on a column, identified by its dotted path.
Entries are upserted: existing keys are overwritten, keys not mentioned are left unchanged.
Arguments:
column_path- Dotted path to the column (e.g. “color” or “video.frames”).metadata- A dict mapping string keys to bytes values.
get_column_metadata
def get_column_metadata(column_path: str) -> dict[str, bytes]Get metadata for a column, identified by its dotted path.
Arguments:
column_path- Dotted path to the column (e.g. “color” or “video.frames”).
Returns:
Dict mapping string keys to bytes values.
drop_column_metadata
def drop_column_metadata(column_path: str, keys: list[str]) -> NoneRemove the given keys from a column’s metadata.
Keys that do not exist are silently ignored.
Arguments:
column_path- Dotted path to the column (e.g. “color” or “video.frames”).keys- List of metadata keys to remove.
Transaction
@final
class Transaction()Spiral table transaction.
While transaction can be used to atomically write data to the table, it is important that the primary key columns are unique within the transaction.
status
@property
def status() -> strThe status of the transaction.
table
@property
def table() -> TableThe table associated with this transaction.
is_empty
def is_empty() -> boolCheck if the transaction has no operations.
write
def write(table: TableLike | pl.LazyFrame, push_down_nulls: bool = False)Write an item to the table inside a single transaction.
Arguments:
push_down_nulls: Whether to push down nullable structs down its children. E.g.[{"a": 1}, null]would become[{"a": 1}, {"a": null}]. SpiralDB doesn’t allow struct-level nullability, so use this option if your data contains nullable structs.table: The table to write.
writeback
def writeback(scan: Scan, *, shards: Iterable[Shard] | None = None)Write back the results of a scan to the table.
Arguments:
scan: The scan to write back. The plan does NOT need to be over the same table as transaction, but it does need to have the same key schema.shards: The shards to read from. If not provided, all shards are read.
to_ray_datasink
def to_ray_datasink() -> ray.data.Datasink[TransactionOps]Returns a Ray Datasink which writes into this transaction.
drop_columns
def drop_columns(column_paths: list[str])Drops the specified columns from the table.
Arguments:
column_paths: Fully qualified column names. (e.g., “column_name” or “nested.field”). All columns must exist, if a column doesn’t exist the function will return an error.
compact_column_group
def compact_column_group(column_group: ColumnGroup,
strategy: CompactionStrategy,
*,
key_range: KeyRange | Shard | None = None)Compact the specified column group of the table.
This method is a convenience method that combines planning, execution, and progress submission, when there is no distributed context, i.e. compaction is run on a single node. See https://docs.spiraldb.com/config for configuration options.
Arguments:
column_group: The column group to compact. Can be obtained from Table.column_groups.strategy: The compaction strategy to use.key_range: Optional key range or shard to limit the range of compaction. Requires that no fragment overlaps the given key range, but is not covered by it.
column_group_compaction
def column_group_compaction(
column_group: ColumnGroup,
strategy: CompactionStrategy,
*,
key_range: KeyRange | Shard | None = None) -> CompactionPlan a compaction for the specified column group.
Called by the “driver”.
Arguments:
column_group: The column group to compact. Can be obtained from Table.column_groups.strategy: The compaction strategy to use.key_range: Optional key range or shard to limit the range of compaction. Requires that no fragment overlaps the given key range, but is not covered by it.
Returns:
A Compaction object representing the planned compaction.
column_group_compaction_execute_tasks
def column_group_compaction_execute_tasks(
tasks: CompactionTasks) -> CompactionOutputsExecute the given compaction tasks inside the transaction.
Called by “worker” inside of a worker transaction.
Operations can be collected by calling take() after this method.
Arguments:
tasks: The compaction tasks to execute.
take
def take() -> TransactionOpsTake the operations from the transaction
Transaction can no longer be committed or aborted after calling this method. .
include
def include(ops: TransactionOps)Include the given operations in the transaction.
Checks for conflicts between the included operations and any existing operations.
IMPORTANT: The self transaction must be started at or before the timestamp of the included operations.
commit
def commit(*, txn_dump: str | None = None)Commit the transaction.
load_dumps
@staticmethod
def load_dumps(client: Spiral, *txn_dump: str) -> list[Operation]Load a transaction from a dump file.
abort
def abort()Abort the transaction.
Scan
class Scan()Executable scan object.
limit
@property
def limit() -> int | NoneReturns the limit set on this scan, if any.
metrics
@property
def metrics() -> dict[str, Any]Returns metrics about the plan.
schema
@property
def schema() -> SchemaReturns the schema of the scan.
key_schema
@property
def key_schema() -> SchemaReturns the key schema of the scan.
is_empty
def is_empty() -> boolCheck if the Spiral is empty for the given key range.
False negatives are possible, but false positives are not, i.e. is_empty can return False and scan can return zero rows.
to_bytes_compressed
def to_bytes_compressed() -> bytesGet the serialized logical plan as compressed bytes.
These bytes contain the serialized logical plan and can be used to resume
the scan later using Spiral.resume_scan().
Returns:
Compressed bytes representing the serialized logical plan.
__getstate__
def __getstate__() -> bytesSerialize the scan for pickling by encoding its logical plan and client state.
Enables seamless integration with distributed systems like Ray, Dask, and Python’s multiprocessing without requiring manual serialization.
Returns:
Zstd-compressed bytes containing JSON-serialized config and logical plan state.
__setstate__
def __setstate__(state: bytes) -> NoneDeserialize scan from pickled logical plan state.
Arguments:
state- Zstd-compressed bytes from getstate.
to_record_batches
def to_record_batches(*,
shards: Iterable[Shard] | None = None,
key_table: TableLike | None = None,
batch_readahead: int | None = None,
batch_aligned: bool | None = None,
grouping_prefix: list[str] | None = None,
explode: str | None = None,
hide_progress_bar: bool | None = None,
disable_view_types: bool = True) -> pa.RecordBatchReaderRead as a stream of RecordBatches.
Arguments:
shards- Optional iterable of shards to evaluate. If provided, only the specified shards will be read. Must not be provided together with key_table.key_table- a table of keys to “take” (including aux columns for cell-push-down). Key table must be either a table or a stream of table-like objects (e.g. Arrow’s RecordBatchReader). For optimal performance, each batch should contain sorted and unique keys. Unsorted and duplicate keys are still supported, but performance is less predictable.batch_readahead- the number of batches to prefetch in the background.batch_aligned- if True, emit exactly one record batch per element of the input. Whenshardsis provided, that means one batch per user-supplied shard; whenkey_tableis provided, that means one batch per input key-table batch. Aligning batches requires buffering and a possible copy, so it is less efficient and uses more memory. Requiresshardsorkey_table.grouping_prefix- list of key column names to group by. Must be a prefix of the key schema. Non-group columns are collected into List arrays.explode- name of aList<Struct>column to unnest. The struct fields become top-level columns and other columns are repeated to match. Typically used together with grouping_prefix.hide_progress_bar- DEPRECATED, the progress bar can be hidden when opening a scandisable_view_types- if True (default), returnUtf8/LargeBinaryinstead of view types — needed for Ray (https://github.com/ray-project/ray/issues/59951 ) and pyarrow kernels that lack view-type support. Set to False to opt into Vortex’s zero-copy view types.
to_unordered_record_batches
def to_unordered_record_batches(
*,
shards: Iterable[Shard] | None = None,
key_table: TableLike | None = None,
batch_readahead: int | None = None,
batch_aligned: bool | None = None,
hide_progress_bar: bool | None = None,
disable_view_types: bool = True) -> pa.RecordBatchReaderRead as a stream of RecordBatches, NOT ordered by key.
Arguments:
shards- Optional iterable of shards to evaluate. If provided, only the specified shards will be read. Must not be provided together with key_table.key_table- a table of keys to “take” (including aux columns for cell-push-down). Key table must be either a table or a stream of table-like objects (e.g. Arrow’s RecordBatchReader). For optimal performance, each batch should contain sorted and unique keys. Unsorted and duplicate keys are still supported, but performance is less predictable.batch_readahead- the number of batches to prefetch in the background.batch_aligned- seeto_record_batches.hide_progress_bar- DEPRECATED, the progress bar can be hidden when opening a scandisable_view_types- seeto_record_batches.
to_table
def to_table(*,
shards: Iterable[Shard] | None = None,
key_table: TableLike | None = None,
batch_readahead: int | None = None,
batch_aligned: bool | None = None,
hide_progress_bar: bool | None = None,
disable_view_types: bool = True) -> pa.TableRead into a single PyArrow Table.
Warnings:
This downloads the entire Spiral Table into memory on this machine.
Arguments:
shards- Optional iterable of shards to evaluate. If provided, only the specified shards will be read. Must not be provided together with key_table.key_table- a table of keys to “take” (including aux columns for cell-push-down).batch_readahead- the number of batches to prefetch in the background.batch_aligned- seeto_record_batches.hide_progress_bar- If True, disables the progress bar during reading.disable_view_types- seeto_record_batches.
Returns:
pyarrow.Table
to_dask
def to_dask(*,
shards: Iterable[Shard] | None = None,
batch_readahead: int | None = None,
hide_progress_bar: bool | None = None) -> "dd.DataFrame"Read into a Dask DataFrame.
Requires the dask package to be installed.
Dask execution has some limitations, e.g. UDFs are not currently supported. These limitations usually manifest as serialization errors when Dask workers attempt to serialize the state. If you are encountering such issues, please reach out to the support for assistance.
Arguments:
shards- Optional iterable of shards to evaluate. If provided, only the specified shards will be read.batch_readahead- the number of batches to prefetch in the background. Applies to each shard read task.hide_progress_bar- If True, disables the progress bar during reading.
Returns:
dask.dataframe.DataFrame
to_ray
def to_ray(*,
shards: Iterable[Shard] | None = None,
batch_readahead: int | None = None,
hide_progress_bar: bool | None = None) -> "ray.data.Dataset"Read into a Ray Dataset.
Requires the ray package to be installed.
Warnings:
The output row order is not guaranteed. Shards are read concurrently and Ray does not guarantee inter-block ordering. Sort the resulting dataset if order matters.
If the Scan returns zero rows, the resulting Ray Dataset will have an empty schema .
Arguments:
shards- Optional iterable of shards to evaluate. If provided, only the specified shards will be read.batch_readahead- the number of batches to prefetch in the background.hide_progress_bar- If True, disables the progress bar during reading.
Returns:
ray.data.Dataset- A Ray Dataset distributed across shards.
to_pandas
def to_pandas(*,
shards: Iterable[Shard] | None = None,
key_table: TableLike | None = None,
batch_readahead: int | None = None,
hide_progress_bar: bool | None = None) -> "pd.DataFrame"Read into a Pandas DataFrame.
Requires the pandas package to be installed.
Warnings:
This downloads the entire Spiral Table into memory on this machine.
Arguments:
shards- Optional iterable of shards to evaluate. If provided, only the specified shards will be read. Must not be provided together with key_table.key_table- a table of keys to “take” (including aux columns for cell-push-down).batch_readahead- the number of batches to prefetch in the background.hide_progress_bar- If True, disables the progress bar during reading.
Returns:
pandas.DataFrame
to_polars
def to_polars(*,
shards: Iterable[Shard] | None = None,
key_table: TableLike | None = None,
batch_readahead: int | None = None,
hide_progress_bar: bool | None = None) -> "pl.DataFrame"Read into a Polars DataFrame.
Requires the polars package to be installed.
Warnings:
This downloads the entire Spiral Table into memory on this machine. To lazily interact with a Spiral Table try Table.to_polars_lazy_frame.
Arguments:
shards- Optional iterable of shards to evaluate. If provided, only the specified shards will be read. Must not be provided together with key_table.key_table- a table of keys to “take” (including aux columns for cell-push-down).batch_readahead- the number of batches to prefetch in the background.hide_progress_bar- If True, disables the progress bar during reading.
Returns:
polars.DataFrame
to_data_loader
def to_data_loader(seed: int = 42,
shuffle_buffer_size: int = 0,
batch_size: int = 32,
disable_view_types: bool = True,
**kwargs) -> "SpiralDataLoader"Read into a Torch-compatible DataLoader for single-node training.
Arguments:
seed- Random seed for reproducibility.shuffle_buffer_size- Size of shuffle buffer. Zero means no shuffling.batch_size- Batch size.disable_view_types- If True (default), convert Vortex view types to ArrowUtf8/LargeBinaryfor compatibility with consumers that don’t accept view types (e.g. Ray, some pyarrow kernels). Set to False to keepUtf8View/BinaryViewfor zero-copy.**kwargs- Additional arguments passed to SpiralDataLoader constructor.
Returns:
SpiralDataLoader with shuffled shards.
to_distributed_data_loader
def to_distributed_data_loader(world: "World | None" = None,
shards: Iterable[Shard] | None = None,
seed: int = 42,
shuffle_buffer_size: int = 0,
batch_size: int = 32,
disable_view_types: bool = True,
**kwargs) -> "SpiralDataLoader"Read into a Torch-compatible DataLoader for distributed training.
Arguments:
world- World configuration with rank and world_size. If None, auto-detects from torch.distributed.shards- Optional sharding. Sharding is global, i.e. the world will be used to select the shards for this rank. If None, uses scan’s natural sharding.seed- Random seed for reproducibility.shuffle_buffer_size- Size of shuffle buffer. Zero means no shuffling.batch_size- Batch size.disable_view_types- If True (default), convert Vortex view types to ArrowUtf8/LargeBinaryfor compatibility with consumers that don’t accept view types (e.g. Ray, some pyarrow kernels). Set to False to keepUtf8View/BinaryViewfor zero-copy.**kwargs- Additional arguments passed to SpiralDataLoader constructor.
Returns:
SpiralDataLoader with shards partitioned for this rank.
Auto-detect from PyTorch distributed:
import spiral
from spiral.dataloader import SpiralDataLoader, World
from spiral.demo import fineweb
sp = spiral.Spiral()
fineweb_table = fineweb(sp)
scan = sp.scan(fineweb_table[["text"]])
loader: SpiralDataLoader = scan.to_distributed_data_loader(batch_size=32)Explicit world configuration:
world = World(rank=0, world_size=4)
loader: SpiralDataLoader = scan.to_distributed_data_loader(world=world, batch_size=32)resume_data_loader
def resume_data_loader(state: dict[str, Any], **kwargs) -> "SpiralDataLoader"Create a DataLoader from checkpoint state, resuming from where it left off.
This is the recommended way to resume training from a checkpoint. It extracts the seed, samples_yielded, and shards from the state dict and creates a new DataLoader that will skip the already-processed samples.
Arguments:
state- Checkpoint state from state_dict().**kwargs- Additional arguments to pass to SpiralDataLoader constructor. These will override values in the state dict where applicable.
Returns:
New SpiralDataLoader instance configured to resume from the checkpoint.
Save checkpoint during training:
import spiral
from spiral.dataloader import SpiralDataLoader, World
from spiral.demo import images, fineweb
sp = spiral.Spiral()
table = images(sp)
fineweb_table = fineweb(sp)
scan = sp.scan(fineweb_table[["text"]])
loader = scan.to_distributed_data_loader(batch_size=32, seed=42)
checkpoint = loader.state_dict()Resume later - uses same shards from checkpoint:
resumed_loader = scan.resume_data_loader(
checkpoint,
batch_size=32,
# An optional transform_fn may be provided:
# transform_fn=my_transform,
)to_iterable_dataset
def to_iterable_dataset(shards: Iterable[Shard] | None = None,
seed: int = 42,
shuffle_buffer_size: int = 0,
batch_readahead: int | None = None,
infinite: bool = False) -> "hf.IterableDataset"Returns a Huggingface’s IterableDataset.
Requires datasets package to be installed.
Note: For new code, consider using SpiralDataLoader instead.
Arguments:
shards- Optional iterable of shards to read. If None, uses scan’s natural sharding.seed- Base random seed for deterministic shuffling and checkpointing.shuffle_buffer_size- Size of shuffle buffer for within-shard shuffling. 0 means no shuffling.batch_readahead- Controls how many batches to read ahead concurrently. If pipeline includes work after reading (e.g. decoding, transforming, …) this can be set higher. Otherwise, it should be kept low to reduce next batch latency. Defaults to min(number of CPU cores, 64) or to shuffle.buffer_size/16 if shuffle is not None.infinite- If True, the returned IterableDataset will loop infinitely over the data, re-shuffling ranges after exhausting all data.
shards
def shards() -> list[Shard]Get list of shards for this scan.
The shards are based on the scan’s physical data layout (file fragments). Each shard contains a key range and cardinality (set to None when unknown).
Returns:
List of Shard objects with key range and cardinality (if known).
Plan
class Plan()Physical execution plan ready to run and produce results.
to_record_batches
def to_record_batches(*,
batch_readahead: int | None = None,
disable_view_types: bool = True) -> pa.RecordBatchReaderRead the plan incrementally as a stream of RecordBatches.
Relation
class Relation()Representation of a Lazy computation graph/query against a Spiral Table(s).
Relation objects are the building blocks of the scanning API.
You obtain them from a :class:Table and compose them with arithmetic, comparisons,
reductions, window functions, and filters before converting to a :class:Plan for execution.
For users familiar with Polars, Relation is conceptually similar to a Polars’ LazyFrame.
abstract
@property
def abstract() -> _AbstractNodeThe abstract, untyped plan tree backing this relation.
bound
@property
def bound() -> _BoundNode | NoneThe bound, typed plan tree, or None while this relation contains free scope.
explain
def explain() -> strReturn a human-readable representation of the logical plan.
Useful for understanding how a query will be executed before calling :meth:plan.
print(clips.select(camera=_.camera, n_detections=_.detections.label.count()).explain())Raises:
ValueError: if the node cannot be resolved (contains unbound placeholders).
plan
def plan(*, hide_progress_bar: bool = False) -> PlanPlan this node into an executable plan.
key_schema
@property
def key_schema() -> KeySchemaThe key schema for this table.
Key schema defines a primary key and a sort order of the result.
:raises ValueError if the Relation cannot be resolved to a LogicalNode
schema
@property
def schema() -> SchemaThe data schema for this table.
When executed, the result will have this schema.
:raises ValueError if the Relation cannot be resolved to a LogicalNode
__getattr__
def __getattr__(name: str) -> RelationNavigate to a column group or field named name.
Raises :exc:AttributeError if name is not a known column group or field.
clips = tbl.scenes.clips # child column group
detections = tbl.scenes.clips.detections # grandchild column group
camera = tbl.scenes.clips.camera # utf8 field
duration = tbl.scenes.clips.duration # f64 field__getitem__
def __getitem__(name: str) -> RelationNavigate to a column group or field by subscript.
Equivalent to attribute access but avoids conflicts with Python properties
like domain, schema, and Python keywords:
tbl.scenes["clips"] # same as tbl.scenes.clips__add__
def __add__(other: RelationInput) -> RelationElement-wise addition. clips.start_time + clips.duration.
__sub__
def __sub__(other: RelationInput) -> RelationElement-wise subtraction. clips.duration - 1.0.
__mul__
def __mul__(other: RelationInput) -> RelationElement-wise multiplication. detections.confidence * 100.
__truediv__
def __truediv__(other: RelationInput) -> RelationElement-wise division. clips.duration / 60.
__mod__
def __mod__(other: RelationInput) -> RelationElement-wise modulo. clips.duration % 10.
__neg__
def __neg__() -> RelationNegate every element. -clips.duration.
__eq__
def __eq__(other: RelationInput) -> RelationElement-wise equality — returns a boolean Relation.
Commonly used to build filter predicates:
clips.where(_.camera == "rear")
clips.where(_.detections.label == "car")__ne__
def __ne__(other: RelationInput) -> RelationElement-wise inequality. clips.camera != "rear".
__lt__
def __lt__(other: RelationInput) -> RelationElement-wise less-than. detections.confidence < 0.5.
__gt__
def __gt__(other: RelationInput) -> RelationElement-wise greater-than. clips.duration > 30.0.
__le__
def __le__(other: RelationInput) -> RelationElement-wise less-than-or-equal. detections.ts <= clips.duration.
__ge__
def __ge__(other: RelationInput) -> RelationElement-wise greater-than-or-equal. detections.confidence >= 0.9.
__and__
def __and__(other: RelationInput) -> RelationLogical AND. (_.camera == "rear") & (_.duration > 30.0).
__or__
def __or__(other: RelationInput) -> RelationLogical OR. (_.camera == "front") | (_.camera == "rear").
__invert__
def __invert__() -> RelationLogical NOT. ~(_.camera == "front").
cast
def cast(dtype: pa.DataType) -> RelationCast each element to dtype.
clips.duration.cast(pa.int32()) # f64 → int32
detections.confidence.cast(pa.float64()) # f32 → f64round
def round() -> RelationRound each element to the nearest integer. detections.ts.round().
floor
def floor() -> RelationRound each element down to the nearest integer.
ceil
def ceil() -> RelationRound each element up to the nearest integer. detections.ts.ceil().
abs
def abs() -> RelationAbsolute value of each element. (clips.start_time - clips.duration).abs().
sum
def sum() -> AggregateSum all values. clips.detections.confidence.sum().
mean
def mean() -> AggregateArithmetic mean. clips.detections.confidence.mean().
count
def count() -> AggregateCount non-null values. clips.detections.label.count().
min
def min() -> AggregateMinimum value. clips.detections.ts.min().
max
def max() -> AggregateMaximum value. clips.detections.ts.max().
first
def first() -> AggregateFirst value in order. clips.detections.label.first().
last
def last() -> AggregateLast value in order. clips.detections.label.last().
list
def list() -> AggregateCollect values into a list. clips.detections.label.list().
any
def any() -> AggregateTrue if any value is true. (clips.detections.confidence > 0.9).any().
all
def all() -> AggregateTrue if all values are true. (clips.detections.confidence > 0.5).all().
select
def select(*args: dict[str, RelationInput] | RelationInput,
**kwargs: RelationInput) -> RelationBuild a table from field expressions with self row domain.
self defines the row domain — the result has the same rows as self,
semantically a left join on self’s key. Ancestor fields are broadcast
down; aggregates are reduced up; non-aggregate descendants are an error.
Pass keyword arguments to project named fields, pass one or more strings to project fields by name, pass exactly one dictionary mapping field names to expressions, or pass one expression to project a single value.
from spiral import _
clips = tbl.scenes.clips
# Own fields — attribute access or string subscript are both fine
clips.select(camera=_.camera, duration=_.duration)
clips.select(camera=_["camera"], duration=_["duration"])
clips.select("camera", "duration")
# Computed expressions — arithmetic, comparisons, any Expr
clips.select(
duration_ms = _.duration * 1000,
is_long = _.duration > 30.0,
)
# Child aggregations reduced to clip level
clips.select(
n_detections = _.detections.label.count(),
avg_conf = _.detections.confidence.mean(),
)
# Mix of own fields, child aggregations, and ancestor broadcast
clips.select(
camera = _.camera,
duration = _.duration,
location = tbl.scenes.location, # ancestor field, broadcast down
avg_conf = _.detections.confidence.mean(), # descendant aggregate, reduced up
)
# A dictionary is accepted when the field set is already dynamic.
clips.select({"camera": _.camera, "duration": _.duration})
# A single expression projects one value over the row domain.
clips.select(1)where
def where(predicate: RelationInput) -> RelationFilter rows where predicate is true.
The predicate may reference fields at any depth. Ancestor fields are broadcast down
to the current depth; aggregates are reduced up to the shallowest depth in the
predicate. It is an error if the predicate is deeper than self.
If predicate is a pure expression it is evaluated as a boolean mask over self.
If it is a Relation at a different key domain, a join-filter is performed.
from spiral import _
clips = tbl.scenes.clips
# Filter by own field
rear_clips = clips.where(_.camera == "rear")
# Filter by compound predicate
long_rear = clips.where((_.camera == "rear") & (_.duration > 30.0))
# Filter detections by confidence threshold
confident = clips.detections.where(_.confidence >= 0.9)
# Filter scenes by a pre-computed per-scene mask (different key domain → join-filter)
high_conf = clips.detections.confidence.max().per(tbl.scenes) >= 0.9
scenes = tbl.scenes.where(high_conf)window
def window(frame: tuple[int, int]) -> RelationExpand each row to include its neighbors within frame.
frame is a (before, after) offset relative to each row,
applied within the current key group. Pushes a virtual position
component onto the key. Chain a reduction to aggregate the window:
from spiral import _
clips = tbl.scenes.clips
# 3-clip rolling mean of duration per scene
rolling_dur = clips.duration.window((-1, 1)).mean()
# Rolling max confidence over a 5-detection window
rolling_conf = clips.detections.confidence.window((-2, 2)).max()
clips.select(
camera = _.camera,
duration = _.duration,
rolling_dur = rolling_dur,
)regroup
def regroup(key: RelationInput) -> RelationGroup rows by a monotonic key expression.
Creates a new [parent..., __id, __idx] key schema. The key
expression is evaluated per row to determine the group; rows with the
same key value get sequential __idx values within the group.
Typically followed by :meth:rekey to rename the synthetic key component
to match an existing collection’s domain.
clips = tbl.scenes.clips
# Group detections by integer second (floor of ts)
detections.regroup(_.ts.floor())
# Group detections by confidence decile, then rekey onto clip domain
detections.regroup((_.confidence * 10).floor()).rekey(clips)rekey
def rekey(onto: Relation) -> RelationReplace the leading key components of this table with onto’s key schema.
onto must have a key depth ≤ self’s key depth, and each replaced
component must have a cast-compatible type. Physical execution is a pure
column rename/cast — no re-sorting.
Typically used after :meth:regroup to rename the synthetic key component
to match an existing collection’s domain:
clips = tbl.scenes.clips
# Group detections by integer-second bucket, then rename that key to match clips
detections.regroup(_.ts.floor()).rekey(clips)
# Group detections by confidence decile, rekey to a hypothetical "buckets" domain
buckets = ...
detections.regroup((_.confidence * 10).floor()).rekey(buckets)join
def join(other: Relation, method: str = "outer") -> RelationJoin this table with other at the same key domain.
Both tables must share the same key schema. The result merges all value columns from both sides. Use method to control join semantics:
"outer"(default): all rows from both sides;nullfor mismatches."left": all rows fromself;nullfor missing other rows."inner": only rows present in both.
clips = tbl.scenes.clips
# Merge camera and duration columns (both at clip level)
merged = clips.camera.join(clips.duration)
# Outer-join clips with a per-clip detection count
enriched = clips.join(clips.detections.label.count().per(clips))Aggregate
class Aggregate()An unresolved aggregate expression.
Use inside select() or call .per() to resolve.
per
def per(target: Relation) -> RelationReduce to target’s domain.
The result contains only keys that exist in the aggregate’s source domain (no null fill).
clips = tbl.scenes.clips
# Average confidence per scene (only scenes that have detections)
avg_conf_per_scene = clips.detections.confidence.mean().per(tbl.scenes)
# Max audio level per scene
max_level_per_scene = clips.audio.level_db.max().per(tbl.scenes)lit
def lit(value: RelationInput) -> RelationCreate a literal lazy value.
Compaction
class Compaction()Compaction is used to optimize the physical layout of data in a table’s column group.
column_group
@property
def column_group() -> ColumnGroupThe column group being compacted.
run
def run() -> NoneRun the compaction to completion by repeatedly getting tasks and executing them.
run_dask
def run_dask(client: dask.distributed.Client) -> NoneRun the compaction using distributed Dask workers.
Tasks are partitioned and distributed across Dask workers for parallel execution.
Arguments:
client- Dask distributed client. Required. Create with:from dask.distributed import Client; client = Client()
run_ray
def run_ray() -> NoneRun the compaction using distributed Ray workers.
Ray must be initialized before calling this method.
To initialize Ray run ray.init() for a local cluster or ray.init(address="ray://<address>:<port>")
to connect to an existing cluster.
Tasks are partitioned and distributed across Ray workers for parallel execution.
Enrichment
class Enrichment()An enrichment is used to derive new columns from the existing ones, such as fetching data from object storage
with se.s3.get or compute embeddings. With column groups design supporting 100s of thousands of columns,
horizontally expanding tables are a powerful primitive. Spiral optimizes enrichments where source and destination
table are the same.
table
@property
def table() -> TableThe table to write back into.
projection
@property
def projection() -> ExprThe projection expression.
where
@property
def where() -> Expr | NoneThe filter expression.
run
def run(*, shards: Iterable[Shard] | None = None) -> NoneApply the enrichment onto the table in a streaming fashion.
For large tables, consider using run_dask() or run_ray() for distributed execution.
Arguments:
shards- Optional iterable of shards to process. If not provided, processes all data.
run_dask
def run_dask(client: dask.distributed.Client,
*,
shards: Collection[Shard] | None = None,
checkpoint: str | None = None) -> NoneUse distributed Dask to apply the enrichment. Requires dask[distributed] to be installed.
How shards are determined:
- If
shardsis provided, those will be used directly. - Else, if
checkpointis provided, shards will be loaded from the checkpoint file. - Else, the scan’s default sharding will be used.
Arguments:
client- Dask distributed client. Required. Create with:from dask.distributed import Client; client = Client()shards- Optional iterable of shards to process. If not provided, uses default sharding or checkpoint sharding if available.checkpoint- Optional path to checkpoint file for incremental progress. If the file exists, processing will resume from failed shards. Failed shards are written back to this file.
run_ray
def run_ray(*,
shards: Collection[Shard] | None = None,
checkpoint: str | None = None) -> NoneUse distributed Ray to apply the enrichment. Requires ray to be installed and initialized.
Ray must be initialized before calling this method.
To initialize Ray run ray.init() for a local cluster or ray.init(address="ray://<address>:<port>")
to connect to an existing cluster.
Ray execution has some limitations. These limitations usually manifest as serialization errors when Ray workers attempt to serialize the state. If you are encountering such issues, consider splitting the enrichment into UDF-only derivation that will be executed in a streaming fashion, followed by a Ray enrichment for the rest of the computation. If that is not possible, please reach out to support for assistance.
How shards are determined:
- If
shardsis provided, those will be used directly. - Else, if
checkpointis provided, shards will be loaded from the checkpoint file. - Else, the scan’s default sharding will be used.
Arguments:
shards- Optional iterable of shards to process. If not provided, uses default sharding or checkpoint sharding if available.checkpoint- Optional path to checkpoint file for incremental progress. If the file exists, processing will resume from failed shards. Failed shards are written back to this file.
World
@dataclass(frozen=True)
class World()Distributed training configuration.
Attributes:
rank- Process rank (0 to world_size-1).world_size- Total number of processes.
shards
def shards(shards: list[Shard],
shuffle_seed: int | None = None) -> list[Shard]Partition shards for distributed training.
Arguments:
shards- List of Shard objects to partition.shuffle_seed- Optional seed to shuffle before partitioning.
Returns:
Subset of shards for this rank (round-robin partitioning).
from_torch
@classmethod
def from_torch(cls) -> WorldAuto-detect world configuration from PyTorch distributed.
SpiralDataLoader
class SpiralDataLoader()DataLoader optimized for Spiral’s multi-threaded streaming architecture.
Unlike PyTorch’s DataLoader which uses multiprocessing for I/O (num_workers), SpiralDataLoader leverages Spiral’s efficient Rust-based streaming and only uses multiprocessing for CPU-bound post-processing transforms.
Key differences from PyTorch DataLoader:
- No num_workers for I/O (Spiral’s Rust layer is already multi-threaded)
- map_workers for parallel post-processing (tokenization, decoding, etc.)
- Built-in checkpoint support via skip_samples
- Explicit shard-based architecture for distributed training
Simple usage:
def train_step(batch):
pass
loader = SpiralDataLoader(scan, batch_size=32)
for batch in loader:
train_step(batch)With parallel transforms:
def tokenize_batch(batch):
# ...
return batch
loader = SpiralDataLoader(
scan,
batch_size=32,
transform_fn=tokenize_batch,
map_workers=4,
)__init__
def __init__(scan: Scan,
*,
shards: list[Shard] | None = None,
shuffle_shards: bool = True,
seed: int = 42,
skip_samples: int = 0,
shuffle_buffer_size: int = 0,
batch_size: int = 32,
batch_readahead: int | None = None,
transform_fn: Callable[[pa.RecordBatch], Any] | None = None,
map_workers: int = 0,
infinite: bool = False,
disable_view_types: bool = True)Initialize SpiralDataLoader.
Arguments:
scan- Spiral scan to load data from.shards- Optional list of Shard objects to read. If None, uses scan’s natural sharding based on physical layout.shuffle_shards- Whether to shuffle the list of shards. Uses the provided seed.seed- Base random seed for deterministic shuffling and checkpointing.skip_samples- Number of samples to skip at the beginning (for resuming from checkpoint).shuffle_buffer_size- Size of shuffle buffer for within-shard shuffling. 0 means no shuffling.batch_size- Number of rows per batch.batch_readahead- Number of batches to prefetch in background. If None, uses a sensible default based on whether transforms are applied.transform_fn- Optional function to transform each batch. Takes a PyArrow RecordBatch and returns any type. Users can call batch.to_pydict() inside the function if they need a dict. If map_workers > 0, this function must be picklable.map_workers- Number of worker processes for parallel transform_fn application. 0 means single-process (no parallelism). Use this for CPU-bound transforms like tokenization or audio decoding.infinite- Whether to cycle through the dataset infinitely. If True, the dataloader will repeat the dataset indefinitely. If False, the dataloader will stop after going through the dataset once.disable_view_types- If True (default), convert Vortex view types to ArrowUtf8/LargeBinaryfor compatibility with consumers that don’t accept view types (e.g. Ray, some pyarrow kernels). Set to False to keepUtf8View/BinaryViewfor zero-copy.
__iter__
def __iter__() -> Iterator[Any]Iterate over batches.
state_dict
def state_dict() -> dict[str, Any]Get checkpoint state for resuming.
Returns:
Dictionary containing samples_yielded, seed, and shards.
Example checkpoint:
loader = SpiralDataLoader(scan, batch_size=32, seed=42)
for i, batch in enumerate(loader):
if i == 10:
checkpoint = loader.state_dict()
breakExample resume:
loader = SpiralDataLoader.from_state_dict(scan, checkpoint, batch_size=32)from_state_dict
@classmethod
def from_state_dict(cls, scan: Scan, state: dict[str, Any],
**kwargs) -> SpiralDataLoaderCreate a DataLoader from checkpoint state, resuming from where it left off.
This is the recommended way to resume training from a checkpoint. It extracts the seed, samples_yielded, and shards from the state dict and creates a new DataLoader that will skip the already-processed samples.
Arguments:
scan- Spiral scan to load data from.state- Checkpoint state from state_dict().**kwargs- Additional arguments to pass to SpiralDataLoader constructor. These will override values in the state dict where applicable.
Returns:
New SpiralDataLoader instance configured to resume from the checkpoint.
Save checkpoint during training:
loader = scan.to_distributed_data_loader(batch_size=32, seed=42)
checkpoint = loader.state_dict()Resume later using the same shards from checkpoint:
resumed_loader = SpiralDataLoader.from_state_dict(
scan,
checkpoint,
batch_size=32,
# An optional transform_fn may be provided:
# transform_fn=my_transform,
)