Spiral
class Spiral()Main client for interacting with the Spiral data platform.
Configuration is loaded with the following priority (highest to lowest):
- Explicit parameters.
- Environment variables (
SPIRAL__*) - Config file (
~/.spiral.toml) - Default values (production URLs)
Examples:
import spiral
# Default configuration
sp = spiral.Spiral()
# With config overrides
sp = spiral.Spiral(overrides={"limits.concurrency": "16"})Arguments:
config- Custom ClientSettings object. Defaults to global settings.overrides- Configuration overrides using dot notation, see the Client Configuration page for a full list.
config
@property
def config() -> ClientSettingsReturns the client’s configuration
authn
@property
def authn() -> AuthnGet the authentication handler for this client.
list_projects
def list_projects() -> list["Project"]List project IDs.
create_project
def create_project(*,
description: str | None = None,
id_prefix: str | None = None,
**kwargs) -> "Project"Create a project in the current, or given, organization.
project
def project(project_id: str) -> "Project"Open an existing project.
scan
def scan(*projections: ExprLike,
where: ExprLike | None = None,
asof: datetime | int | None = None,
shard: Shard | None = None,
limit: int | None = None,
hide_progress_bar: bool = False) -> ScanStarts a read transaction on the Spiral.
Arguments:
projections- a set of expressions that return struct arrays.where- a query expression to apply to the data.asof- execute the scan on the version of the table as of the given timestamp. Each transaction creates a new version of the table. The commit timestamp of the transaction can be used in asof. Seespiral txnfor transaction commands in CLI.shard- if provided, opens the scan only for the given shard. While shards can be provided when executing the scan, providing a shard here optimizes the scan planning phase and can significantly reduce metadata download.limit- maximum number of rows to return. When set, the scan will stop reading data once the limit is reached, providing efficient early termination.hide_progress_bar- if True, disables the progress bar during scan building.
scan_keys
def scan_keys(*projections: ExprLike,
where: ExprLike | None = None,
asof: datetime | int | None = None,
shard: Shard | None = None,
limit: int | None = None,
hide_progress_bar: bool = False) -> ScanStarts a keys-only read transaction on the Spiral.
To determine which keys are present in at least one column group of the table, key scan the table itself:
sp.scan_keys(table)Arguments:
projections- scan the keys of the column groups referenced by these expressions.where- a query expression to apply to the data.asof- execute the scan on the version of the table as of the given timestamp. Each transaction creates a new version of the table. The commit timestamp of the transaction can be used in asof. Seespiral txnfor transaction commands in CLI.shard- if provided, opens the scan only for the given shard. While shards can be provided when executing the scan, providing a shard here optimizes the scan planning phase and can significantly reduce metadata download.limit- maximum number of rows to return. When set, the scan will stop reading data once the limit is reached, providing efficient early termination.hide_progress_bar- if True, disables the progress bar during scan building.
sample
def sample(*projections: ExprLike,
sampler: Callable[[pa.Array], pa.Array],
where: ExprLike | None = None,
asof: datetime | int | None = None,
hide_progress_bar: bool = False) -> SampleScanCreates a SampleScan that can be inspected before execution.
NOTE: This API is experimental and will likely change in the near future.
For most use cases, prefer using sample() directly. This method is useful
when you need to inspect the key_scan and value_scan plans before executing
the sample operation. Call to_reader() on the returned SampleScan to
execute and get a RecordBatchReader.
Arguments:
projections- a set of expressions that return struct arrays.sampler- A function that takes a struct array of keys and returns a boolean array indicating which keys to sample.where- a query expression to apply to the data.asof- execute the scan on the version of the table as of the given timestamp. Each transaction creates a new version of the table. The commit timestamp of the transaction can be used in asof. Seespiral txnfor transaction commands in CLI.hide_progress_bar- if True, disables the progress bar during scan building.
Returns:
A SampleScan object with key_plan(), value_plan(), and to_reader() methods.
search
def search(top_k: int,
*rank_by: ExprLike,
filters: ExprLike | None = None,
freshness_window: timedelta | None = None) -> pa.RecordBatchReaderQueries the index with the given rank by and filters clauses. Returns a stream of scored keys.
Arguments:
top_k- The number of top results to return.rank_by- Rank by expressions are combined for scoring. Seese.text.findandse.text.boostfor scoring expressions.filters- Thefiltersexpression is used to filter the results. It must return a boolean value and use only conjunctions (ANDs). Expressions in filters statement are considered either amustormust_notclause in search terminology.freshness_window- If provided, the index will not be refreshed if its freshness does not exceed this window.
resume_scan
def resume_scan(scan_bytes: bytes) -> ScanOpen a serialized scan in this instance of a client.
Arguments:
scan_bytes- The compressed scan bytes returned by a previous scan’s to_bytes_compressed().
compute_shards
def compute_shards(*projections: ExprLike,
where: ExprLike | None = None,
asof: datetime | int | None = None,
batch_size: int | None = None) -> list[Shard]Computes shards over the given projections and filter.
Arguments:
projections- a set of expressions that return struct arrays.where- a query expression to apply to the data.asof- execute the scan on the version of the table as of the given timestamp. Each transaction creates a new version of the table. The commit timestamp of the transaction can be used in asof. Seespiral txnfor transaction commands in CLI.batch_size- a specific batch size, otherwise the shards will be computed based on the fragments in the table.
Project
class Project()dataset
def dataset(name: str) -> DatasetReturn a dataset with the given name.
table
def table(identifier: str) -> TableOpen a table with a dataset.table identifier, or table name using the default dataset.
create_table
def create_table(identifier: str,
*,
key_schema: Schema
| pa.Schema
| Iterable[pa.Field[pa.DataType]]
| Iterable[tuple[str, pa.DataType]]
| Mapping[str, pa.DataType],
root_uri: Uri | None = None,
exist_ok: bool = False) -> TableCreate a new table in the project.
Arguments:
identifier- The table identifier, in the formdataset.tableortable.key_schema- The schema of the table’s keys.root_uri- The root URI for the table.exist_ok- If True, do not raise an error if the table already exists.
move_table
def move_table(identifier: str, new_dataset: str)Move a table to a new dataset in the project.
Arguments:
identifier- The table identifier, in the formdataset.tableortable.new_dataset- The dataset into which to move this table.
rename_table
def rename_table(identifier: str, new_table: str)Move a table to a new dataset in the project.
Arguments:
identifier- The table identifier, in the formdataset.tableortable.new_dataset- The dataset into which to move this table.
drop_table
def drop_table(identifier: str)Drop a table from the project.
Arguments:
identifier- The table identifier, in the formdataset.tableortable.
text_index
def text_index(name: str) -> TextIndexReturns the index with the given name.
create_text_index
def create_text_index(name: str,
*projections: ExprLike,
where: ExprLike | None = None,
root_uri: Uri | None = None,
exist_ok: bool = False) -> TextIndexCreates a text index over the table projection.
Arguments:
name- The index name. Must be unique within the project.projections- At least one projection expression is required. All projections must reference the same table.where- An optional filter expression to apply to the index.root_uri- The root URI for the index.exist_ok- If True, do not raise an error if the index already exists.
compute
def compute() -> ComputeGets compute resources configured for this project.
NOTE: Compute is experimental and will likely change in the near future.
Table
class Table(Expr)API for interacting with a SpiralDB’s Table.
Spiral Table is a powerful and flexible way for storing, analyzing, and querying massive and/or multimodal datasets. The data model will feel familiar to users of SQL- or DataFrame-style systems, yet is designed to be more flexible, more powerful, and more useful in the context of modern data processing.
Tables are stored and queried directly from object storage.
identifier
@property
def identifier() -> strReturns the fully qualified identifier of the table.
project
@property
def project() -> str | NoneReturns the project of the table.
dataset
@property
def dataset() -> str | NoneReturns the dataset of the table.
name
@property
def name() -> str | NoneReturns the name of the table.
key_schema
@property
def key_schema() -> SchemaReturns the key schema of the table.
schema
def schema() -> SchemaReturns the FULL schema of the table.
NOTE: This can be expensive for large tables.
write
def write(table: LazyTableLike,
push_down_nulls: bool = False,
**kwargs) -> NoneWrite an item to the table inside a single transaction.
Arguments:
push_down_nulls: Whether to push down nullable structs down its children. E.g.[{"a": 1}, null]would become[{"a": 1}, {"a": null}]. SpiralDB doesn’t allow struct-level nullability, so use this option if your data contains nullable structs.table: The table to write.
enrich
def enrich(*projections: ExprLike,
where: ExprLike | None = None) -> EnrichmentReturns an Enrichment object that, when applied, produces new columns.
Enrichment can be applied in different ways, e.g. distributed.
Arguments:
projections: Projection expressions deriving new columns to write back. Expressions can be over multiple Spiral tables, but all tables including this one must share the same key schema.where: Optional filter expression to apply when reading the input tables.
drop_columns
def drop_columns(column_paths: list[str]) -> NoneDrops the specified columns from the table.
Arguments:
column_paths: Fully qualified column names. (e.g., “column_name” or “nested.field”). All columns must exist, if a column doesn’t exist the function will return an error.
snapshot
def snapshot(asof: datetime | int | None = None) -> SnapshotReturns a snapshot of the table at the given timestamp.
Each transaction creates a new version of the table. The commit timestamp
of the transaction can be used in asof. See spiral txn for transaction
commands in CLI.
txn
def txn(**kwargs) -> TransactionBegins a new transaction. Transaction must be committed for writes to become visible.
While transaction can be used to atomically write data to the table, it is important that the primary key columns are unique within the transaction. The behavior is undefined if this is not the case.
to_arrow_dataset
def to_arrow_dataset() -> "ds.Dataset"Returns a PyArrow Dataset representing the table.
to_polars_lazy_frame
def to_polars_lazy_frame() -> "pl.LazyFrame"Returns a Polars LazyFrame for the Spiral table.
to_duckdb_relation
def to_duckdb_relation() -> "duckdb.DuckDBPyRelation"Returns a DuckDB relation for the Spiral table.
key
def key(*parts) -> KeyCreates a Key object for the given parts according to the table’s key schema.
Arguments:
parts- Parts of the key. Must be a valid prefix of the table’s key schema.
Returns:
Key object representing the given parts.
column_group
def column_group(*paths: str) -> ColumnGroupCreates a ColumnGroup object for the given column paths.
Arguments:
paths- Path to the column group. List of column names or dot-separated paths.
Returns:
ColumnGroup object representing the given columns.
Snapshot
class Snapshot()Spiral table snapshot.
A snapshot represents a point-in-time view of a table.
asof
@property
def asof() -> TimestampReturns the asof timestamp of the snapshot.
schema
def schema() -> SchemaReturns the schema of the snapshot.
table
@property
def table() -> "Table"Returns the table associated with the snapshot.
to_arrow_dataset
def to_arrow_dataset() -> "ds.Dataset"Returns a PyArrow Dataset representing the table.
to_polars_lazy_frame
def to_polars_lazy_frame() -> "pl.LazyFrame"Returns a Polars LazyFrame for the Spiral table.
to_duckdb_relation
def to_duckdb_relation() -> "duckdb.DuckDBPyRelation"Returns a DuckDB relation for the Spiral table.
Scan
class Scan()Scan object.
limit
@property
def limit() -> int | NoneReturns the limit set on this scan, if any.
schema
@property
def schema() -> SchemaReturns the schema of the scan.
key_schema
@property
def key_schema() -> SchemaReturns the key schema of the scan.
table_ids
@property
def table_ids() -> list[str]Returns the set of table IDs referenced by this scan.
plan
def plan() -> "Plan"Builds the executable plan for this scan.
to_bytes_compressed
def to_bytes_compressed() -> bytesGet the scan state as compressed bytes.
This state can be used to resume the scan later using Spiral.resume_scan().
Returns:
Compressed bytes representing the internal scan state.
__getstate__
def __getstate__() -> bytesSerialize scan for pickling.
Enables seamless integration with distributed systems like Ray, Dask, and Python’s multiprocessing without requiring manual serialization.
Returns:
Zstd-compressed bytes containing JSON-serialized config and scan state.
__setstate__
def __setstate__(state: bytes) -> NoneDeserialize scan from pickled state.
Arguments:
state- Zstd-compressed bytes from getstate.
Plan
class Plan()Executable plan object.
limit
@property
def limit() -> int | NoneReturns the limit set on this scan, if any.
metrics
@property
def metrics() -> dict[str, Any]Returns metrics about the plan.
schema
@property
def schema() -> SchemaReturns the schema of the plan.
key_schema
@property
def key_schema() -> SchemaReturns the key schema of the plan.
is_empty
def is_empty() -> boolCheck if the Spiral is empty for the given key range.
False negatives are possible, but false positives are not, i.e. is_empty can return False and scan can return zero rows.
to_record_batches
def to_record_batches(*,
shards: list[Shard] | None = None,
key_table: TableLike | None = None,
batch_readahead: int | None = None,
batch_aligned: bool | None = None,
hide_progress_bar: bool | None = None,
grouping_prefix: list[str] | None = None,
explode: str | None = None) -> pa.RecordBatchReaderRead as a stream of RecordBatches.
Arguments:
shards- Optional list of shards to evaluate. If provided, only the specified shards will be read. Must not be provided together with key_table.key_table- a table of keys to “take” (including aux columns for cell-push-down). Key table must be either a table or a stream of table-like objects (e.g. Arrow’s RecordBatchReader). For optimal performance, each batch should contain sorted and unique keys. Unsorted and duplicate keys are still supported, but performance is less predictable.batch_readahead- the number of batches to prefetch in the background.batch_aligned- if True, ensures that batches are aligned with key_table batches. The stream will yield batches that correspond exactly to the batches in key_table, but may be less efficient and use more memory (aligning batches requires buffering and maybe a copy). Must only be used when key_table is provided.hide_progress_bar- If True, disables the progress bar during reading.grouping_prefix- list of key column names to group by. Must be a prefix of the key schema. Non-group columns are collected into List arrays.explode- name of aList<Struct>column to unnest. The struct fields become top-level columns and other columns are repeated to match. Typically used together with grouping_prefix.
to_unordered_record_batches
def to_unordered_record_batches(
*,
shards: list[Shard] | None = None,
key_table: TableLike | None = None,
batch_readahead: int | None = None,
hide_progress_bar: bool | None = None) -> pa.RecordBatchReaderRead as a stream of RecordBatches, NOT ordered by key.
Arguments:
shards- Optional list of shards to evaluate. If provided, only the specified shards will be read. Must not be provided together with key_table.key_table- a table of keys to “take” (including aux columns for cell-push-down). Key table must be either a table or a stream of table-like objects (e.g. Arrow’s RecordBatchReader). For optimal performance, each batch should contain sorted and unique keys. Unsorted and duplicate keys are still supported, but performance is less predictable.batch_readahead- the number of batches to prefetch in the background.hide_progress_bar- If True, disables the progress bar during reading.
to_table
def to_table(*,
shards: list[Shard] | None = None,
key_table: TableLike | None = None,
batch_readahead: int | None = None,
hide_progress_bar: bool | None = None) -> pa.TableRead into a single PyArrow Table.
Warnings:
This downloads the entire Spiral Table into memory on this machine.
Arguments:
shards- Optional list of shards to evaluate. If provided, only the specified shards will be read. Must not be provided together with key_table.key_table- a table of keys to “take” (including aux columns for cell-push-down).batch_readahead- the number of batches to prefetch in the background.hide_progress_bar- If True, disables the progress bar during reading.
Returns:
pyarrow.Table
to_dask
def to_dask(*,
shards: list[Shard] | None = None,
batch_readahead: int | None = None,
hide_progress_bar: bool | None = None) -> "dd.DataFrame"Read into a Dask DataFrame.
Requires the dask package to be installed.
Dask execution has some limitations, e.g. UDFs are not currently supported. These limitations usually manifest as serialization errors when Dask workers attempt to serialize the state. If you are encountering such issues, please reach out to the support for assistance.
Arguments:
shards- Optional list of shards to evaluate. If provided, only the specified shards will be read.batch_readahead- the number of batches to prefetch in the background. Applies to each shard read task.hide_progress_bar- If True, disables the progress bar during reading.
Returns:
dask.dataframe.DataFrame
to_ray
def to_ray(*,
shards: list[Shard] | None = None,
batch_readahead: int | None = None,
hide_progress_bar: bool | None = None) -> "ray.data.Dataset"Read into a Ray Dataset.
Requires the ray package to be installed.
Warnings:
The output row order is not guaranteed. Shards are read concurrently and Ray does not guarantee inter-block ordering. Sort the resulting dataset if order matters.
If the Scan returns zero rows, the resulting Ray Dataset will have an empty schema .
Arguments:
shards- Optional list of shards to evaluate. If provided, only the specified shards will be read.batch_readahead- the number of batches to prefetch in the background.hide_progress_bar- If True, disables the progress bar during reading.
Returns:
ray.data.Dataset- A Ray Dataset distributed across shards.
to_pandas
def to_pandas(*,
shards: list[Shard] | None = None,
key_table: TableLike = None,
batch_readahead: int | None = None,
hide_progress_bar: bool | None = None) -> "pd.DataFrame"Read into a Pandas DataFrame.
Requires the pandas package to be installed.
Warnings:
This downloads the entire Spiral Table into memory on this machine.
Arguments:
shards- Optional list of shards to evaluate. If provided, only the specified shards will be read. Must not be provided together with key_table.key_table- a table of keys to “take” (including aux columns for cell-push-down).batch_readahead- the number of batches to prefetch in the background.hide_progress_bar- If True, disables the progress bar during reading.
Returns:
pandas.DataFrame
to_polars
def to_polars(*,
shards: list[Shard] | None = None,
key_table: TableLike = None,
batch_readahead: int | None = None,
hide_progress_bar: bool | None = None) -> "pl.DataFrame"Read into a Polars DataFrame.
Requires the polars package to be installed.
Warnings:
This downloads the entire Spiral Table into memory on this machine. To lazily interact with a Spiral Table try Table.to_polars_lazy_frame.
Arguments:
shards- Optional list of shards to evaluate. If provided, only the specified shards will be read. Must not be provided together with key_table.key_table- a table of keys to “take” (including aux columns for cell-push-down).batch_readahead- the number of batches to prefetch in the background.hide_progress_bar- If True, disables the progress bar during reading.
Returns:
polars.DataFrame
to_data_loader
def to_data_loader(seed: int = 42,
shuffle_buffer_size: int = 0,
batch_size: int = 32,
**kwargs) -> "SpiralDataLoader"Read into a Torch-compatible DataLoader for single-node training.
Arguments:
seed- Random seed for reproducibility.shuffle_buffer_size- Size of shuffle buffer. Zero means no shuffling.batch_size- Batch size.**kwargs- Additional arguments passed to SpiralDataLoader constructor.
Returns:
SpiralDataLoader with shuffled shards.
to_distributed_data_loader
def to_distributed_data_loader(world: "World | None" = None,
shards: list[Shard] | None = None,
seed: int = 42,
shuffle_buffer_size: int = 0,
batch_size: int = 32,
**kwargs) -> "SpiralDataLoader"Read into a Torch-compatible DataLoader for distributed training.
Arguments:
world- World configuration with rank and world_size. If None, auto-detects from torch.distributed.shards- Optional sharding. Sharding is global, i.e. the world will be used to select the shards for this rank. If None, uses scan’s natural sharding.seed- Random seed for reproducibility.shuffle_buffer_size- Size of shuffle buffer. Zero means no shuffling.batch_size- Batch size.**kwargs- Additional arguments passed to SpiralDataLoader constructor.
Returns:
SpiralDataLoader with shards partitioned for this rank.
Auto-detect from PyTorch distributed:
import spiral
from spiral.dataloader import SpiralDataLoader, World
from spiral.demo import fineweb
sp = spiral.Spiral()
fineweb_table = fineweb(sp)
plan = sp.scan(fineweb_table[["text"]]).plan()
loader: SpiralDataLoader = plan.to_distributed_data_loader(batch_size=32)Explicit world configuration:
world = World(rank=0, world_size=4)
loader: SpiralDataLoader = plan.to_distributed_data_loader(world=world, batch_size=32)resume_data_loader
def resume_data_loader(state: dict[str, Any], **kwargs) -> "SpiralDataLoader"Create a DataLoader from checkpoint state, resuming from where it left off.
This is the recommended way to resume training from a checkpoint. It extracts the seed, samples_yielded, and shards from the state dict and creates a new DataLoader that will skip the already-processed samples.
Arguments:
state- Checkpoint state from state_dict().**kwargs- Additional arguments to pass to SpiralDataLoader constructor. These will override values in the state dict where applicable.
Returns:
New SpiralDataLoader instance configured to resume from the checkpoint.
Save checkpoint during training:
import spiral
from spiral.dataloader import SpiralDataLoader, World
from spiral.demo import images, fineweb
sp = spiral.Spiral()
table = images(sp)
fineweb_table = fineweb(sp)
plan = sp.scan(fineweb_table[["text"]]).plan()
loader = plan.to_distributed_data_loader(batch_size=32, seed=42)
checkpoint = loader.state_dict()Resume later - uses same shards from checkpoint:
resumed_loader = plan.resume_data_loader(
checkpoint,
batch_size=32,
# An optional transform_fn may be provided:
# transform_fn=my_transform,
)to_iterable_dataset
def to_iterable_dataset(shards: list[Shard] | None = None,
seed: int = 42,
shuffle_buffer_size: int = 0,
batch_readahead: int | None = None,
infinite: bool = False) -> "hf.IterableDataset"Returns a Huggingface’s IterableDataset.
Requires datasets package to be installed.
Note: For new code, consider using SpiralDataLoader instead.
Arguments:
shards- Optional list of shards to read. If None, uses scan’s natural sharding.seed- Base random seed for deterministic shuffling and checkpointing.shuffle_buffer_size- Size of shuffle buffer for within-shard shuffling. 0 means no shuffling.batch_readahead- Controls how many batches to read ahead concurrently. If pipeline includes work after reading (e.g. decoding, transforming, …) this can be set higher. Otherwise, it should be kept low to reduce next batch latency. Defaults to min(number of CPU cores, 64) or to shuffle.buffer_size/16 if shuffle is not None.infinite- If True, the returned IterableDataset will loop infinitely over the data, re-shuffling ranges after exhausting all data.
shards
def shards() -> list[Shard]Get list of shards for this plan.
The shards are based on the scan’s physical data layout (file fragments). Each shard contains a key range and cardinality (set to None when unknown).
Returns:
List of Shard objects with key range and cardinality (if known).
Transaction
class Transaction()Spiral table transaction.
While transaction can be used to atomically write data to the table, it is important that the primary key columns are unique within the transaction.
status
@property
def status() -> strThe status of the transaction.
table
@property
def table() -> TableThe table associated with this transaction.
is_empty
def is_empty() -> boolCheck if the transaction has no operations.
write
def write(table: LazyTableLike, push_down_nulls: bool = False)Write an item to the table inside a single transaction.
Arguments:
push_down_nulls: Whether to push down nullable structs down its children. E.g.[{"a": 1}, null]would become[{"a": 1}, {"a": null}]. SpiralDB doesn’t allow struct-level nullability, so use this option if your data contains nullable structs.table: The table to write.
writeback
def writeback(plan: Plan, *, shards: list[Shard] | None = None)Write back the results of a plan to the table.
Arguments:
plan: The plan to write back. The plan does NOT need to be over the same table as transaction, but it does need to have the same key schema.shards: The shards to read from. If not provided, all shards are read.
to_ray_datasink
def to_ray_datasink() -> ray.data.DatasinkReturns a Ray Datasink which writes into this transaction.
drop_columns
def drop_columns(column_paths: list[str])Drops the specified columns from the table.
Arguments:
column_paths: Fully qualified column names. (e.g., “column_name” or “nested.field”). All columns must exist, if a column doesn’t exist the function will return an error.
compact_key_space
def compact_key_space()Compact the key space of the table.
compact_column_group
def compact_column_group(column_group: ColumnGroup,
strategy: CompactionStrategy,
*,
key_range: KeyRange | Shard | None = None)Compact the specified column group of the table.
This method is a convenience method that combines planning, execution, and progress submission, when there is no distributed context, i.e. compaction is run on a single node. See https://docs.spiraldb.com/config for configuration options.
Arguments:
column_group: The column group to compact. Can be obtained from Table.column_groups.strategy: The compaction strategy to use.key_range: Optional key range or shard to limit the range of compaction. Requires that no fragment overlaps the given key range, but is not covered by it.
column_group_compaction
def column_group_compaction(
column_group: ColumnGroup,
strategy: CompactionStrategy,
*,
key_range: KeyRange | Shard | None = None) -> CompactionPlan a compaction for the specified column group.
Called by the “driver”.
Arguments:
column_group: The column group to compact. Can be obtained from Table.column_groups.strategy: The compaction strategy to use.key_range: Optional key range or shard to limit the range of compaction. Requires that no fragment overlaps the given key range, but is not covered by it.
Returns:
A Compaction object representing the planned compaction.
column_group_compaction_execute_tasks
def column_group_compaction_execute_tasks(tasks: CompactionTasks) -> NoneExecute the given compaction tasks inside the transaction.
Called by “worker” inside of a worker transaction.
Operations can be collected by calling take() after this method.
Arguments:
tasks: The compaction tasks to execute.
take
def take() -> TransactionOpsTake the operations from the transaction
Transaction can no longer be committed or aborted after calling this method. .
include
def include(ops: TransactionOps)Include the given operations in the transaction.
Checks for conflicts between the included operations and any existing operations.
IMPORTANT: The self transaction must be started at or before the timestamp of the included operations.
commit
def commit(*, txn_dump: str | None = None)Commit the transaction.
load_dumps
@staticmethod
def load_dumps(*txn_dump: str) -> list[Operation]Load a transaction from a dump file.
abort
def abort()Abort the transaction.
Compaction
class Compaction()Compaction is used to optimize the physical layout of data in a table’s column group.
column_group
@property
def column_group() -> ColumnGroupThe column group being compacted.
run
def run() -> NoneRun the compaction to completion by repeatedly getting tasks and executing them.
run_dask
def run_dask(client: dask.distributed.Client) -> NoneRun the compaction using distributed Dask workers.
Tasks are partitioned and distributed across Dask workers for parallel execution.
Arguments:
client- Dask distributed client. Required. Create with:from dask.distributed import Client; client = Client()
run_ray
def run_ray() -> NoneRun the compaction using distributed Ray workers.
Ray must be initialized before calling this method.
To initialize Ray run ray.init() for a local cluster or ray.init(address="ray://<address>:<port>")
to connect to an existing cluster.
Tasks are partitioned and distributed across Ray workers for parallel execution.
Enrichment
class Enrichment()An enrichment is used to derive new columns from the existing ones, such as fetching data from object storage
with se.s3.get or compute embeddings. With column groups design supporting 100s of thousands of columns,
horizontally expanding tables are a powerful primitive. Spiral optimizes enrichments where source and destination
table are the same.
table
@property
def table() -> TableThe table to write back into.
projection
@property
def projection() -> ExprThe projection expression.
where
@property
def where() -> Expr | NoneThe filter expression.
run
def run(*, shards: list[Shard] | None = None) -> NoneApply the enrichment onto the table in a streaming fashion.
For large tables, consider using run_dask() or run_ray() for distributed execution.
Arguments:
shards- Optional list of shards to process. If not provided, processes all data.
run_dask
def run_dask(client: dask.distributed.Client,
*,
shards: list[Shard] | None = None,
checkpoint: str | None = None,
udfs: list[UDF] | None = None) -> NoneUse distributed Dask to apply the enrichment. Requires dask[distributed] to be installed.
How shards are determined:
- If
shardsis provided, those will be used directly. - Else, if
checkpointis provided, shards will be loaded from the checkpoint file. - Else, the scan’s default sharding will be used.
Arguments:
client- Dask distributed client. Required. Create with:from dask.distributed import Client; client = Client()shards- Optional list of shards to process. If not provided, uses default sharding or checkpoint sharding if available.checkpoint- Optional path to checkpoint file for incremental progress. If the file exists, processing will resume from failed shards. Failed shards are written back to this file.udfs- Optional list of UDFs used in the projection or where clause. Required if any UDFs are used.
run_ray
def run_ray(*,
shards: list[Shard] | None = None,
checkpoint: str | None = None,
udfs: list[UDF] | None = None) -> NoneUse distributed Ray to apply the enrichment. Requires ray to be installed and initialized.
Ray must be initialized before calling this method.
To initialize Ray run ray.init() for a local cluster or ray.init(address="ray://<address>:<port>")
to connect to an existing cluster.
Ray execution has some limitations, e.g. UDFs are not currently supported. These limitations usually manifest as serialization errors when Ray workers attempt to serialize the state. If you are encountering such issues, consider splitting the enrichment into UDF-only derivation that will be executed in a streaming fashion, followed by a Ray enrichment for the rest of the computation. If that is not possible, please reach out to support for assistance.
How shards are determined:
- If
shardsis provided, those will be used directly. - Else, if
checkpointis provided, shards will be loaded from the checkpoint file. - Else, the scan’s default sharding will be used.
Arguments:
shards- Optional list of shards to process. If not provided, uses default sharding or checkpoint sharding if available.checkpoint- Optional path to checkpoint file for incremental progress. If the file exists, processing will resume from failed shards. Failed shards are written back to this file.udfs- Optional list of UDFs used in the projection or where clause. Required if any UDFs are used.
World
@dataclass(frozen=True)
class World()Distributed training configuration.
Attributes:
rank- Process rank (0 to world_size-1).world_size- Total number of processes.
shards
def shards(shards: list[Shard],
shuffle_seed: int | None = None) -> list[Shard]Partition shards for distributed training.
Arguments:
shards- List of Shard objects to partition.shuffle_seed- Optional seed to shuffle before partitioning.
Returns:
Subset of shards for this rank (round-robin partitioning).
from_torch
@classmethod
def from_torch(cls) -> WorldAuto-detect world configuration from PyTorch distributed.
SpiralDataLoader
class SpiralDataLoader()DataLoader optimized for Spiral’s multi-threaded streaming architecture.
Unlike PyTorch’s DataLoader which uses multiprocessing for I/O (num_workers), SpiralDataLoader leverages Spiral’s efficient Rust-based streaming and only uses multiprocessing for CPU-bound post-processing transforms.
Key differences from PyTorch DataLoader:
- No num_workers for I/O (Spiral’s Rust layer is already multi-threaded)
- map_workers for parallel post-processing (tokenization, decoding, etc.)
- Built-in checkpoint support via skip_samples
- Explicit shard-based architecture for distributed training
Simple usage:
def train_step(batch):
pass
loader = SpiralDataLoader(plan, batch_size=32)
for batch in loader:
train_step(batch)With parallel transforms:
def tokenize_batch(batch):
# ...
return batch
loader = SpiralDataLoader(
plan,
batch_size=32,
transform_fn=tokenize_batch,
map_workers=4,
)__init__
def __init__(plan: Plan,
*,
shards: list[Shard] | None = None,
shuffle_shards: bool = True,
seed: int = 42,
skip_samples: int = 0,
shuffle_buffer_size: int = 0,
batch_size: int = 32,
batch_readahead: int | None = None,
transform_fn: Callable[[pa.RecordBatch], Any] | None = None,
map_workers: int = 0,
infinite: bool = False)Initialize SpiralDataLoader.
Arguments:
plan- Spiral plan to load data from.shards- Optional list of Shard objects to read. If None, uses plan’s natural sharding based on physical layout.shuffle_shards- Whether to shuffle the list of shards. Uses the provided seed.seed- Base random seed for deterministic shuffling and checkpointing.skip_samples- Number of samples to skip at the beginning (for resuming from checkpoint).shuffle_buffer_size- Size of shuffle buffer for within-shard shuffling. 0 means no shuffling.batch_size- Number of rows per batch.batch_readahead- Number of batches to prefetch in background. If None, uses a sensible default based on whether transforms are applied.transform_fn- Optional function to transform each batch. Takes a PyArrow RecordBatch and returns any type. Users can call batch.to_pydict() inside the function if they need a dict. If map_workers > 0, this function must be picklable.map_workers- Number of worker processes for parallel transform_fn application. 0 means single-process (no parallelism). Use this for CPU-bound transforms like tokenization or audio decoding.infinite- Whether to cycle through the dataset infinitely. If True, the dataloader will repeat the dataset indefinitely. If False, the dataloader will stop after going through the dataset once.
__iter__
def __iter__() -> Iterator[Any]Iterate over batches.
state_dict
def state_dict() -> dict[str, Any]Get checkpoint state for resuming.
Returns:
Dictionary containing samples_yielded, seed, and shards.
Example checkpoint:
loader = SpiralDataLoader(plan, batch_size=32, seed=42)
for i, batch in enumerate(loader):
if i == 10:
checkpoint = loader.state_dict()
breakExample resume:
loader = SpiralDataLoader.from_state_dict(plan, checkpoint, batch_size=32)from_state_dict
@classmethod
def from_state_dict(cls, plan: Plan, state: dict[str, Any],
**kwargs) -> SpiralDataLoaderCreate a DataLoader from checkpoint state, resuming from where it left off.
This is the recommended way to resume training from a checkpoint. It extracts the seed, samples_yielded, and shards from the state dict and creates a new DataLoader that will skip the already-processed samples.
Arguments:
plan- Spiral plan to load data from.state- Checkpoint state from state_dict().**kwargs- Additional arguments to pass to SpiralDataLoader constructor. These will override values in the state dict where applicable.
Returns:
New SpiralDataLoader instance configured to resume from the checkpoint.
Save checkpoint during training:
loader = plan.to_distributed_data_loader(batch_size=32, seed=42)
checkpoint = loader.state_dict()Resume later using the same shards from checkpoint:
resumed_loader = SpiralDataLoader.from_state_dict(
plan,
checkpoint,
batch_size=32,
# An optional transform_fn may be provided:
# transform_fn=my_transform,
)