Skip to Content
Python APIspiral

Spiral

class Spiral()

Main client for interacting with the Spiral data platform.

Configuration is loaded with the following priority (highest to lowest):

  1. Explicit parameters.
  2. Environment variables (SPIRAL__*)
  3. Config file (~/.spiral.toml)
  4. Default values (production URLs)

Examples:

import spiral # Default configuration sp = spiral.Spiral() # With config overrides sp = spiral.Spiral(overrides={"limits.concurrency": "16"})

Arguments:

  • config - Custom ClientSettings object. Defaults to global settings.
  • overrides - Configuration overrides using dot notation, see the Client Configuration  page for a full list.

config

@property def config() -> ClientSettings

Returns the client’s configuration

authn

@property def authn() -> Authn

Get the authentication handler for this client.

list_projects

def list_projects() -> list["Project"]

List project IDs.

create_project

def create_project(*, description: str | None = None, id_prefix: str | None = None, **kwargs) -> "Project"

Create a project in the current, or given, organization.

project

def project(project_id: str) -> "Project"

Open an existing project.

scan

def scan(*projections: ExprLike, where: ExprLike | None = None, asof: datetime | int | None = None, shard: Shard | None = None, limit: int | None = None, hide_progress_bar: bool = False) -> Scan

Starts a read transaction on the Spiral.

Arguments:

  • projections - a set of expressions that return struct arrays.
  • where - a query expression to apply to the data.
  • asof - execute the scan on the version of the table as of the given timestamp. Each transaction creates a new version of the table. The commit timestamp of the transaction can be used in asof. See spiral txn for transaction commands in CLI.
  • shard - if provided, opens the scan only for the given shard. While shards can be provided when executing the scan, providing a shard here optimizes the scan planning phase and can significantly reduce metadata download.
  • limit - maximum number of rows to return. When set, the scan will stop reading data once the limit is reached, providing efficient early termination.
  • hide_progress_bar - if True, disables the progress bar during scan building.

scan_keys

def scan_keys(*projections: ExprLike, where: ExprLike | None = None, asof: datetime | int | None = None, shard: Shard | None = None, limit: int | None = None, hide_progress_bar: bool = False) -> Scan

Starts a keys-only read transaction on the Spiral.

To determine which keys are present in at least one column group of the table, key scan the table itself:

sp.scan_keys(table)

Arguments:

  • projections - scan the keys of the column groups referenced by these expressions.
  • where - a query expression to apply to the data.
  • asof - execute the scan on the version of the table as of the given timestamp. Each transaction creates a new version of the table. The commit timestamp of the transaction can be used in asof. See spiral txn for transaction commands in CLI.
  • shard - if provided, opens the scan only for the given shard. While shards can be provided when executing the scan, providing a shard here optimizes the scan planning phase and can significantly reduce metadata download.
  • limit - maximum number of rows to return. When set, the scan will stop reading data once the limit is reached, providing efficient early termination.
  • hide_progress_bar - if True, disables the progress bar during scan building.

sample

def sample(*projections: ExprLike, sampler: Callable[[pa.Array], pa.Array], where: ExprLike | None = None, asof: datetime | int | None = None, hide_progress_bar: bool = False) -> SampleScan

Creates a SampleScan that can be inspected before execution.

NOTE: This API is experimental and will likely change in the near future.

For most use cases, prefer using sample() directly. This method is useful when you need to inspect the key_scan and value_scan plans before executing the sample operation. Call to_reader() on the returned SampleScan to execute and get a RecordBatchReader.

Arguments:

  • projections - a set of expressions that return struct arrays.
  • sampler - A function that takes a struct array of keys and returns a boolean array indicating which keys to sample.
  • where - a query expression to apply to the data.
  • asof - execute the scan on the version of the table as of the given timestamp. Each transaction creates a new version of the table. The commit timestamp of the transaction can be used in asof. See spiral txn for transaction commands in CLI.
  • hide_progress_bar - if True, disables the progress bar during scan building.

Returns:

A SampleScan object with key_plan(), value_plan(), and to_reader() methods.

def search(top_k: int, *rank_by: ExprLike, filters: ExprLike | None = None, freshness_window: timedelta | None = None) -> pa.RecordBatchReader

Queries the index with the given rank by and filters clauses. Returns a stream of scored keys.

Arguments:

  • top_k - The number of top results to return.
  • rank_by - Rank by expressions are combined for scoring. See se.text.find and se.text.boost for scoring expressions.
  • filters - The filters expression is used to filter the results. It must return a boolean value and use only conjunctions (ANDs). Expressions in filters statement are considered either a must or must_not clause in search terminology.
  • freshness_window - If provided, the index will not be refreshed if its freshness does not exceed this window.

resume_scan

def resume_scan(scan_bytes: bytes) -> Scan

Open a serialized scan in this instance of a client.

Arguments:

  • scan_bytes - The compressed scan bytes returned by a previous scan’s to_bytes_compressed().

compute_shards

def compute_shards(*projections: ExprLike, where: ExprLike | None = None, asof: datetime | int | None = None, batch_size: int | None = None) -> list[Shard]

Computes shards over the given projections and filter.

Arguments:

  • projections - a set of expressions that return struct arrays.
  • where - a query expression to apply to the data.
  • asof - execute the scan on the version of the table as of the given timestamp. Each transaction creates a new version of the table. The commit timestamp of the transaction can be used in asof. See spiral txn for transaction commands in CLI.
  • batch_size - a specific batch size, otherwise the shards will be computed based on the fragments in the table.

Project

class Project()

dataset

def dataset(name: str) -> Dataset

Return a dataset with the given name.

table

def table(identifier: str) -> Table

Open a table with a dataset.table identifier, or table name using the default dataset.

create_table

def create_table(identifier: str, *, key_schema: Schema | pa.Schema | Iterable[pa.Field[pa.DataType]] | Iterable[tuple[str, pa.DataType]] | Mapping[str, pa.DataType], root_uri: Uri | None = None, exist_ok: bool = False) -> Table

Create a new table in the project.

Arguments:

  • identifier - The table identifier, in the form dataset.table or table.
  • key_schema - The schema of the table’s keys.
  • root_uri - The root URI for the table.
  • exist_ok - If True, do not raise an error if the table already exists.

move_table

def move_table(identifier: str, new_dataset: str)

Move a table to a new dataset in the project.

Arguments:

  • identifier - The table identifier, in the form dataset.table or table.
  • new_dataset - The dataset into which to move this table.

rename_table

def rename_table(identifier: str, new_table: str)

Move a table to a new dataset in the project.

Arguments:

  • identifier - The table identifier, in the form dataset.table or table.
  • new_dataset - The dataset into which to move this table.

drop_table

def drop_table(identifier: str)

Drop a table from the project.

Arguments:

  • identifier - The table identifier, in the form dataset.table or table.

text_index

def text_index(name: str) -> TextIndex

Returns the index with the given name.

create_text_index

def create_text_index(name: str, *projections: ExprLike, where: ExprLike | None = None, root_uri: Uri | None = None, exist_ok: bool = False) -> TextIndex

Creates a text index over the table projection.

Arguments:

  • name - The index name. Must be unique within the project.
  • projections - At least one projection expression is required. All projections must reference the same table.
  • where - An optional filter expression to apply to the index.
  • root_uri - The root URI for the index.
  • exist_ok - If True, do not raise an error if the index already exists.

compute

def compute() -> Compute

Gets compute resources configured for this project.

NOTE: Compute is experimental and will likely change in the near future.

Table

class Table(Expr)

API for interacting with a SpiralDB’s Table.

Spiral Table is a powerful and flexible way for storing, analyzing, and querying massive and/or multimodal datasets. The data model will feel familiar to users of SQL- or DataFrame-style systems, yet is designed to be more flexible, more powerful, and more useful in the context of modern data processing.

Tables are stored and queried directly from object storage.

identifier

@property def identifier() -> str

Returns the fully qualified identifier of the table.

project

@property def project() -> str | None

Returns the project of the table.

dataset

@property def dataset() -> str | None

Returns the dataset of the table.

name

@property def name() -> str | None

Returns the name of the table.

key_schema

@property def key_schema() -> Schema

Returns the key schema of the table.

schema

def schema() -> Schema

Returns the FULL schema of the table.

NOTE: This can be expensive for large tables.

write

def write(table: LazyTableLike, push_down_nulls: bool = False, **kwargs) -> None

Write an item to the table inside a single transaction.

Arguments:

  • push_down_nulls: Whether to push down nullable structs down its children. E.g. [{"a": 1}, null] would become [{"a": 1}, {"a": null}]. SpiralDB doesn’t allow struct-level nullability, so use this option if your data contains nullable structs.
  • table: The table to write.

enrich

def enrich(*projections: ExprLike, where: ExprLike | None = None) -> Enrichment

Returns an Enrichment object that, when applied, produces new columns.

Enrichment can be applied in different ways, e.g. distributed.

Arguments:

  • projections: Projection expressions deriving new columns to write back. Expressions can be over multiple Spiral tables, but all tables including this one must share the same key schema.
  • where: Optional filter expression to apply when reading the input tables.

drop_columns

def drop_columns(column_paths: list[str]) -> None

Drops the specified columns from the table.

Arguments:

  • column_paths: Fully qualified column names. (e.g., “column_name” or “nested.field”). All columns must exist, if a column doesn’t exist the function will return an error.

snapshot

def snapshot(asof: datetime | int | None = None) -> Snapshot

Returns a snapshot of the table at the given timestamp.

Each transaction creates a new version of the table. The commit timestamp of the transaction can be used in asof. See spiral txn for transaction commands in CLI.

txn

def txn(**kwargs) -> Transaction

Begins a new transaction. Transaction must be committed for writes to become visible.

While transaction can be used to atomically write data to the table, it is important that the primary key columns are unique within the transaction. The behavior is undefined if this is not the case.

to_arrow_dataset

def to_arrow_dataset() -> "ds.Dataset"

Returns a PyArrow Dataset representing the table.

to_polars_lazy_frame

def to_polars_lazy_frame() -> "pl.LazyFrame"

Returns a Polars LazyFrame for the Spiral table.

to_duckdb_relation

def to_duckdb_relation() -> "duckdb.DuckDBPyRelation"

Returns a DuckDB relation for the Spiral table.

key

def key(*parts) -> Key

Creates a Key object for the given parts according to the table’s key schema.

Arguments:

  • parts - Parts of the key. Must be a valid prefix of the table’s key schema.

Returns:

Key object representing the given parts.

column_group

def column_group(*paths: str) -> ColumnGroup

Creates a ColumnGroup object for the given column paths.

Arguments:

  • paths - Path to the column group. List of column names or dot-separated paths.

Returns:

ColumnGroup object representing the given columns.

Snapshot

class Snapshot()

Spiral table snapshot.

A snapshot represents a point-in-time view of a table.

asof

@property def asof() -> Timestamp

Returns the asof timestamp of the snapshot.

schema

def schema() -> Schema

Returns the schema of the snapshot.

table

@property def table() -> "Table"

Returns the table associated with the snapshot.

to_arrow_dataset

def to_arrow_dataset() -> "ds.Dataset"

Returns a PyArrow Dataset representing the table.

to_polars_lazy_frame

def to_polars_lazy_frame() -> "pl.LazyFrame"

Returns a Polars LazyFrame for the Spiral table.

to_duckdb_relation

def to_duckdb_relation() -> "duckdb.DuckDBPyRelation"

Returns a DuckDB relation for the Spiral table.

Scan

class Scan()

Scan object.

limit

@property def limit() -> int | None

Returns the limit set on this scan, if any.

schema

@property def schema() -> Schema

Returns the schema of the scan.

key_schema

@property def key_schema() -> Schema

Returns the key schema of the scan.

table_ids

@property def table_ids() -> list[str]

Returns the set of table IDs referenced by this scan.

plan

def plan() -> "Plan"

Builds the executable plan for this scan.

to_bytes_compressed

def to_bytes_compressed() -> bytes

Get the scan state as compressed bytes.

This state can be used to resume the scan later using Spiral.resume_scan().

Returns:

Compressed bytes representing the internal scan state.

__getstate__

def __getstate__() -> bytes

Serialize scan for pickling.

Enables seamless integration with distributed systems like Ray, Dask, and Python’s multiprocessing without requiring manual serialization.

Returns:

Zstd-compressed bytes containing JSON-serialized config and scan state.

__setstate__

def __setstate__(state: bytes) -> None

Deserialize scan from pickled state.

Arguments:

  • state - Zstd-compressed bytes from getstate.

Plan

class Plan()

Executable plan object.

limit

@property def limit() -> int | None

Returns the limit set on this scan, if any.

metrics

@property def metrics() -> dict[str, Any]

Returns metrics about the plan.

schema

@property def schema() -> Schema

Returns the schema of the plan.

key_schema

@property def key_schema() -> Schema

Returns the key schema of the plan.

is_empty

def is_empty() -> bool

Check if the Spiral is empty for the given key range.

False negatives are possible, but false positives are not, i.e. is_empty can return False and scan can return zero rows.

to_record_batches

def to_record_batches(*, shards: list[Shard] | None = None, key_table: TableLike | None = None, batch_readahead: int | None = None, batch_aligned: bool | None = None, hide_progress_bar: bool | None = None, grouping_prefix: list[str] | None = None, explode: str | None = None) -> pa.RecordBatchReader

Read as a stream of RecordBatches.

Arguments:

  • shards - Optional list of shards to evaluate. If provided, only the specified shards will be read. Must not be provided together with key_table.
  • key_table - a table of keys to “take” (including aux columns for cell-push-down). Key table must be either a table or a stream of table-like objects (e.g. Arrow’s RecordBatchReader). For optimal performance, each batch should contain sorted and unique keys. Unsorted and duplicate keys are still supported, but performance is less predictable.
  • batch_readahead - the number of batches to prefetch in the background.
  • batch_aligned - if True, ensures that batches are aligned with key_table batches. The stream will yield batches that correspond exactly to the batches in key_table, but may be less efficient and use more memory (aligning batches requires buffering and maybe a copy). Must only be used when key_table is provided.
  • hide_progress_bar - If True, disables the progress bar during reading.
  • grouping_prefix - list of key column names to group by. Must be a prefix of the key schema. Non-group columns are collected into List arrays.
  • explode - name of a List<Struct> column to unnest. The struct fields become top-level columns and other columns are repeated to match. Typically used together with grouping_prefix.

to_unordered_record_batches

def to_unordered_record_batches( *, shards: list[Shard] | None = None, key_table: TableLike | None = None, batch_readahead: int | None = None, hide_progress_bar: bool | None = None) -> pa.RecordBatchReader

Read as a stream of RecordBatches, NOT ordered by key.

Arguments:

  • shards - Optional list of shards to evaluate. If provided, only the specified shards will be read. Must not be provided together with key_table.
  • key_table - a table of keys to “take” (including aux columns for cell-push-down). Key table must be either a table or a stream of table-like objects (e.g. Arrow’s RecordBatchReader). For optimal performance, each batch should contain sorted and unique keys. Unsorted and duplicate keys are still supported, but performance is less predictable.
  • batch_readahead - the number of batches to prefetch in the background.
  • hide_progress_bar - If True, disables the progress bar during reading.

to_table

def to_table(*, shards: list[Shard] | None = None, key_table: TableLike | None = None, batch_readahead: int | None = None, hide_progress_bar: bool | None = None) -> pa.Table

Read into a single PyArrow Table.

Warnings:

This downloads the entire Spiral Table into memory on this machine.

Arguments:

  • shards - Optional list of shards to evaluate. If provided, only the specified shards will be read. Must not be provided together with key_table.
  • key_table - a table of keys to “take” (including aux columns for cell-push-down).
  • batch_readahead - the number of batches to prefetch in the background.
  • hide_progress_bar - If True, disables the progress bar during reading.

Returns:

pyarrow.Table

to_dask

def to_dask(*, shards: list[Shard] | None = None, batch_readahead: int | None = None, hide_progress_bar: bool | None = None) -> "dd.DataFrame"

Read into a Dask DataFrame.

Requires the dask package to be installed.

Dask execution has some limitations, e.g. UDFs are not currently supported. These limitations usually manifest as serialization errors when Dask workers attempt to serialize the state. If you are encountering such issues, please reach out to the support for assistance.

Arguments:

  • shards - Optional list of shards to evaluate. If provided, only the specified shards will be read.
  • batch_readahead - the number of batches to prefetch in the background. Applies to each shard read task.
  • hide_progress_bar - If True, disables the progress bar during reading.

Returns:

dask.dataframe.DataFrame

to_ray

def to_ray(*, shards: list[Shard] | None = None, batch_readahead: int | None = None, hide_progress_bar: bool | None = None) -> "ray.data.Dataset"

Read into a Ray Dataset.

Requires the ray package to be installed.

Warnings:

The output row order is not guaranteed. Shards are read concurrently and Ray does not guarantee inter-block ordering. Sort the resulting dataset if order matters.

If the Scan returns zero rows, the resulting Ray Dataset will have an empty schema .

Arguments:

  • shards - Optional list of shards to evaluate. If provided, only the specified shards will be read.
  • batch_readahead - the number of batches to prefetch in the background.
  • hide_progress_bar - If True, disables the progress bar during reading.

Returns:

  • ray.data.Dataset - A Ray Dataset distributed across shards.

to_pandas

def to_pandas(*, shards: list[Shard] | None = None, key_table: TableLike = None, batch_readahead: int | None = None, hide_progress_bar: bool | None = None) -> "pd.DataFrame"

Read into a Pandas DataFrame.

Requires the pandas package to be installed.

Warnings:

This downloads the entire Spiral Table into memory on this machine.

Arguments:

  • shards - Optional list of shards to evaluate. If provided, only the specified shards will be read. Must not be provided together with key_table.
  • key_table - a table of keys to “take” (including aux columns for cell-push-down).
  • batch_readahead - the number of batches to prefetch in the background.
  • hide_progress_bar - If True, disables the progress bar during reading.

Returns:

pandas.DataFrame

to_polars

def to_polars(*, shards: list[Shard] | None = None, key_table: TableLike = None, batch_readahead: int | None = None, hide_progress_bar: bool | None = None) -> "pl.DataFrame"

Read into a Polars DataFrame.

Requires the polars package to be installed.

Warnings:

This downloads the entire Spiral Table into memory on this machine. To lazily interact with a Spiral Table try Table.to_polars_lazy_frame.

Arguments:

  • shards - Optional list of shards to evaluate. If provided, only the specified shards will be read. Must not be provided together with key_table.
  • key_table - a table of keys to “take” (including aux columns for cell-push-down).
  • batch_readahead - the number of batches to prefetch in the background.
  • hide_progress_bar - If True, disables the progress bar during reading.

Returns:

polars.DataFrame

to_data_loader

def to_data_loader(seed: int = 42, shuffle_buffer_size: int = 0, batch_size: int = 32, **kwargs) -> "SpiralDataLoader"

Read into a Torch-compatible DataLoader for single-node training.

Arguments:

  • seed - Random seed for reproducibility.
  • shuffle_buffer_size - Size of shuffle buffer. Zero means no shuffling.
  • batch_size - Batch size.
  • **kwargs - Additional arguments passed to SpiralDataLoader constructor.

Returns:

SpiralDataLoader with shuffled shards.

to_distributed_data_loader

def to_distributed_data_loader(world: "World | None" = None, shards: list[Shard] | None = None, seed: int = 42, shuffle_buffer_size: int = 0, batch_size: int = 32, **kwargs) -> "SpiralDataLoader"

Read into a Torch-compatible DataLoader for distributed training.

Arguments:

  • world - World configuration with rank and world_size. If None, auto-detects from torch.distributed.
  • shards - Optional sharding. Sharding is global, i.e. the world will be used to select the shards for this rank. If None, uses scan’s natural sharding.
  • seed - Random seed for reproducibility.
  • shuffle_buffer_size - Size of shuffle buffer. Zero means no shuffling.
  • batch_size - Batch size.
  • **kwargs - Additional arguments passed to SpiralDataLoader constructor.

Returns:

SpiralDataLoader with shards partitioned for this rank.

Auto-detect from PyTorch distributed:

import spiral from spiral.dataloader import SpiralDataLoader, World from spiral.demo import fineweb sp = spiral.Spiral() fineweb_table = fineweb(sp) plan = sp.scan(fineweb_table[["text"]]).plan() loader: SpiralDataLoader = plan.to_distributed_data_loader(batch_size=32)

Explicit world configuration:

world = World(rank=0, world_size=4) loader: SpiralDataLoader = plan.to_distributed_data_loader(world=world, batch_size=32)

resume_data_loader

def resume_data_loader(state: dict[str, Any], **kwargs) -> "SpiralDataLoader"

Create a DataLoader from checkpoint state, resuming from where it left off.

This is the recommended way to resume training from a checkpoint. It extracts the seed, samples_yielded, and shards from the state dict and creates a new DataLoader that will skip the already-processed samples.

Arguments:

  • state - Checkpoint state from state_dict().
  • **kwargs - Additional arguments to pass to SpiralDataLoader constructor. These will override values in the state dict where applicable.

Returns:

New SpiralDataLoader instance configured to resume from the checkpoint.

Save checkpoint during training:

import spiral from spiral.dataloader import SpiralDataLoader, World from spiral.demo import images, fineweb sp = spiral.Spiral() table = images(sp) fineweb_table = fineweb(sp) plan = sp.scan(fineweb_table[["text"]]).plan() loader = plan.to_distributed_data_loader(batch_size=32, seed=42) checkpoint = loader.state_dict()

Resume later - uses same shards from checkpoint:

resumed_loader = plan.resume_data_loader( checkpoint, batch_size=32, # An optional transform_fn may be provided: # transform_fn=my_transform, )

to_iterable_dataset

def to_iterable_dataset(shards: list[Shard] | None = None, seed: int = 42, shuffle_buffer_size: int = 0, batch_readahead: int | None = None, infinite: bool = False) -> "hf.IterableDataset"

Returns a Huggingface’s IterableDataset.

Requires datasets package to be installed.

Note: For new code, consider using SpiralDataLoader instead.

Arguments:

  • shards - Optional list of shards to read. If None, uses scan’s natural sharding.
  • seed - Base random seed for deterministic shuffling and checkpointing.
  • shuffle_buffer_size - Size of shuffle buffer for within-shard shuffling. 0 means no shuffling.
  • batch_readahead - Controls how many batches to read ahead concurrently. If pipeline includes work after reading (e.g. decoding, transforming, …) this can be set higher. Otherwise, it should be kept low to reduce next batch latency. Defaults to min(number of CPU cores, 64) or to shuffle.buffer_size/16 if shuffle is not None.
  • infinite - If True, the returned IterableDataset will loop infinitely over the data, re-shuffling ranges after exhausting all data.

shards

def shards() -> list[Shard]

Get list of shards for this plan.

The shards are based on the scan’s physical data layout (file fragments). Each shard contains a key range and cardinality (set to None when unknown).

Returns:

List of Shard objects with key range and cardinality (if known).

Transaction

class Transaction()

Spiral table transaction.

While transaction can be used to atomically write data to the table, it is important that the primary key columns are unique within the transaction.

status

@property def status() -> str

The status of the transaction.

table

@property def table() -> Table

The table associated with this transaction.

is_empty

def is_empty() -> bool

Check if the transaction has no operations.

write

def write(table: LazyTableLike, push_down_nulls: bool = False)

Write an item to the table inside a single transaction.

Arguments:

  • push_down_nulls: Whether to push down nullable structs down its children. E.g. [{"a": 1}, null] would become [{"a": 1}, {"a": null}]. SpiralDB doesn’t allow struct-level nullability, so use this option if your data contains nullable structs.
  • table: The table to write.

writeback

def writeback(plan: Plan, *, shards: list[Shard] | None = None)

Write back the results of a plan to the table.

Arguments:

  • plan: The plan to write back. The plan does NOT need to be over the same table as transaction, but it does need to have the same key schema.
  • shards: The shards to read from. If not provided, all shards are read.

to_ray_datasink

def to_ray_datasink() -> ray.data.Datasink

Returns a Ray Datasink which writes into this transaction.

drop_columns

def drop_columns(column_paths: list[str])

Drops the specified columns from the table.

Arguments:

  • column_paths: Fully qualified column names. (e.g., “column_name” or “nested.field”). All columns must exist, if a column doesn’t exist the function will return an error.

compact_key_space

def compact_key_space()

Compact the key space of the table.

compact_column_group

def compact_column_group(column_group: ColumnGroup, strategy: CompactionStrategy, *, key_range: KeyRange | Shard | None = None)

Compact the specified column group of the table.

This method is a convenience method that combines planning, execution, and progress submission, when there is no distributed context, i.e. compaction is run on a single node. See https://docs.spiraldb.com/config  for configuration options.

Arguments:

  • column_group: The column group to compact. Can be obtained from Table.column_groups.
  • strategy: The compaction strategy to use.
  • key_range: Optional key range or shard to limit the range of compaction. Requires that no fragment overlaps the given key range, but is not covered by it.

column_group_compaction

def column_group_compaction( column_group: ColumnGroup, strategy: CompactionStrategy, *, key_range: KeyRange | Shard | None = None) -> Compaction

Plan a compaction for the specified column group.

Called by the “driver”.

Arguments:

  • column_group: The column group to compact. Can be obtained from Table.column_groups.
  • strategy: The compaction strategy to use.
  • key_range: Optional key range or shard to limit the range of compaction. Requires that no fragment overlaps the given key range, but is not covered by it.

Returns:

A Compaction object representing the planned compaction.

column_group_compaction_execute_tasks

def column_group_compaction_execute_tasks(tasks: CompactionTasks) -> None

Execute the given compaction tasks inside the transaction.

Called by “worker” inside of a worker transaction.

Operations can be collected by calling take() after this method.

Arguments:

  • tasks: The compaction tasks to execute.

take

def take() -> TransactionOps

Take the operations from the transaction

Transaction can no longer be committed or aborted after calling this method. .

include

def include(ops: TransactionOps)

Include the given operations in the transaction.

Checks for conflicts between the included operations and any existing operations.

IMPORTANT: The self transaction must be started at or before the timestamp of the included operations.

commit

def commit(*, txn_dump: str | None = None)

Commit the transaction.

load_dumps

@staticmethod def load_dumps(*txn_dump: str) -> list[Operation]

Load a transaction from a dump file.

abort

def abort()

Abort the transaction.

Compaction

class Compaction()

Compaction is used to optimize the physical layout of data in a table’s column group.

column_group

@property def column_group() -> ColumnGroup

The column group being compacted.

run

def run() -> None

Run the compaction to completion by repeatedly getting tasks and executing them.

run_dask

def run_dask(client: dask.distributed.Client) -> None

Run the compaction using distributed Dask workers.

Tasks are partitioned and distributed across Dask workers for parallel execution.

Arguments:

  • client - Dask distributed client. Required. Create with: from dask.distributed import Client; client = Client()

run_ray

def run_ray() -> None

Run the compaction using distributed Ray workers.

Ray must be initialized before calling this method. To initialize Ray run ray.init() for a local cluster or ray.init(address="ray://<address>:<port>") to connect to an existing cluster.

Tasks are partitioned and distributed across Ray workers for parallel execution.

Enrichment

class Enrichment()

An enrichment is used to derive new columns from the existing ones, such as fetching data from object storage with se.s3.get or compute embeddings. With column groups design supporting 100s of thousands of columns, horizontally expanding tables are a powerful primitive. Spiral optimizes enrichments where source and destination table are the same.

table

@property def table() -> Table

The table to write back into.

projection

@property def projection() -> Expr

The projection expression.

where

@property def where() -> Expr | None

The filter expression.

run

def run(*, shards: list[Shard] | None = None) -> None

Apply the enrichment onto the table in a streaming fashion.

For large tables, consider using run_dask() or run_ray() for distributed execution.

Arguments:

  • shards - Optional list of shards to process. If not provided, processes all data.

run_dask

def run_dask(client: dask.distributed.Client, *, shards: list[Shard] | None = None, checkpoint: str | None = None, udfs: list[UDF] | None = None) -> None

Use distributed Dask to apply the enrichment. Requires dask[distributed] to be installed.

How shards are determined:

  • If shards is provided, those will be used directly.
  • Else, if checkpoint is provided, shards will be loaded from the checkpoint file.
  • Else, the scan’s default sharding will be used.

Arguments:

  • client - Dask distributed client. Required. Create with: from dask.distributed import Client; client = Client()
  • shards - Optional list of shards to process. If not provided, uses default sharding or checkpoint sharding if available.
  • checkpoint - Optional path to checkpoint file for incremental progress. If the file exists, processing will resume from failed shards. Failed shards are written back to this file.
  • udfs - Optional list of UDFs used in the projection or where clause. Required if any UDFs are used.

run_ray

def run_ray(*, shards: list[Shard] | None = None, checkpoint: str | None = None, udfs: list[UDF] | None = None) -> None

Use distributed Ray to apply the enrichment. Requires ray to be installed and initialized.

Ray must be initialized before calling this method. To initialize Ray run ray.init() for a local cluster or ray.init(address="ray://<address>:<port>") to connect to an existing cluster.

Ray execution has some limitations, e.g. UDFs are not currently supported. These limitations usually manifest as serialization errors when Ray workers attempt to serialize the state. If you are encountering such issues, consider splitting the enrichment into UDF-only derivation that will be executed in a streaming fashion, followed by a Ray enrichment for the rest of the computation. If that is not possible, please reach out to support for assistance.

How shards are determined:

  • If shards is provided, those will be used directly.
  • Else, if checkpoint is provided, shards will be loaded from the checkpoint file.
  • Else, the scan’s default sharding will be used.

Arguments:

  • shards - Optional list of shards to process. If not provided, uses default sharding or checkpoint sharding if available.
  • checkpoint - Optional path to checkpoint file for incremental progress. If the file exists, processing will resume from failed shards. Failed shards are written back to this file.
  • udfs - Optional list of UDFs used in the projection or where clause. Required if any UDFs are used.

World

@dataclass(frozen=True) class World()

Distributed training configuration.

Attributes:

  • rank - Process rank (0 to world_size-1).
  • world_size - Total number of processes.

shards

def shards(shards: list[Shard], shuffle_seed: int | None = None) -> list[Shard]

Partition shards for distributed training.

Arguments:

  • shards - List of Shard objects to partition.
  • shuffle_seed - Optional seed to shuffle before partitioning.

Returns:

Subset of shards for this rank (round-robin partitioning).

from_torch

@classmethod def from_torch(cls) -> World

Auto-detect world configuration from PyTorch distributed.

SpiralDataLoader

class SpiralDataLoader()

DataLoader optimized for Spiral’s multi-threaded streaming architecture.

Unlike PyTorch’s DataLoader which uses multiprocessing for I/O (num_workers), SpiralDataLoader leverages Spiral’s efficient Rust-based streaming and only uses multiprocessing for CPU-bound post-processing transforms.

Key differences from PyTorch DataLoader:

  • No num_workers for I/O (Spiral’s Rust layer is already multi-threaded)
  • map_workers for parallel post-processing (tokenization, decoding, etc.)
  • Built-in checkpoint support via skip_samples
  • Explicit shard-based architecture for distributed training

Simple usage:

def train_step(batch): pass loader = SpiralDataLoader(plan, batch_size=32) for batch in loader: train_step(batch)

With parallel transforms:

def tokenize_batch(batch): # ... return batch loader = SpiralDataLoader( plan, batch_size=32, transform_fn=tokenize_batch, map_workers=4, )

__init__

def __init__(plan: Plan, *, shards: list[Shard] | None = None, shuffle_shards: bool = True, seed: int = 42, skip_samples: int = 0, shuffle_buffer_size: int = 0, batch_size: int = 32, batch_readahead: int | None = None, transform_fn: Callable[[pa.RecordBatch], Any] | None = None, map_workers: int = 0, infinite: bool = False)

Initialize SpiralDataLoader.

Arguments:

  • plan - Spiral plan to load data from.
  • shards - Optional list of Shard objects to read. If None, uses plan’s natural sharding based on physical layout.
  • shuffle_shards - Whether to shuffle the list of shards. Uses the provided seed.
  • seed - Base random seed for deterministic shuffling and checkpointing.
  • skip_samples - Number of samples to skip at the beginning (for resuming from checkpoint).
  • shuffle_buffer_size - Size of shuffle buffer for within-shard shuffling. 0 means no shuffling.
  • batch_size - Number of rows per batch.
  • batch_readahead - Number of batches to prefetch in background. If None, uses a sensible default based on whether transforms are applied.
  • transform_fn - Optional function to transform each batch. Takes a PyArrow RecordBatch and returns any type. Users can call batch.to_pydict() inside the function if they need a dict. If map_workers > 0, this function must be picklable.
  • map_workers - Number of worker processes for parallel transform_fn application. 0 means single-process (no parallelism). Use this for CPU-bound transforms like tokenization or audio decoding.
  • infinite - Whether to cycle through the dataset infinitely. If True, the dataloader will repeat the dataset indefinitely. If False, the dataloader will stop after going through the dataset once.

__iter__

def __iter__() -> Iterator[Any]

Iterate over batches.

state_dict

def state_dict() -> dict[str, Any]

Get checkpoint state for resuming.

Returns:

Dictionary containing samples_yielded, seed, and shards.

Example checkpoint:

loader = SpiralDataLoader(plan, batch_size=32, seed=42) for i, batch in enumerate(loader): if i == 10: checkpoint = loader.state_dict() break

Example resume:

loader = SpiralDataLoader.from_state_dict(plan, checkpoint, batch_size=32)

from_state_dict

@classmethod def from_state_dict(cls, plan: Plan, state: dict[str, Any], **kwargs) -> SpiralDataLoader

Create a DataLoader from checkpoint state, resuming from where it left off.

This is the recommended way to resume training from a checkpoint. It extracts the seed, samples_yielded, and shards from the state dict and creates a new DataLoader that will skip the already-processed samples.

Arguments:

  • plan - Spiral plan to load data from.
  • state - Checkpoint state from state_dict().
  • **kwargs - Additional arguments to pass to SpiralDataLoader constructor. These will override values in the state dict where applicable.

Returns:

New SpiralDataLoader instance configured to resume from the checkpoint.

Save checkpoint during training:

loader = plan.to_distributed_data_loader(batch_size=32, seed=42) checkpoint = loader.state_dict()

Resume later using the same shards from checkpoint:

resumed_loader = SpiralDataLoader.from_state_dict( plan, checkpoint, batch_size=32, # An optional transform_fn may be provided: # transform_fn=my_transform, )
Last updated on