Skip to Content
Python APIspiral

Spiral

class Spiral()

Main client for interacting with the Spiral data platform.

Configuration is loaded with the following priority (highest to lowest):

  1. Explicit parameters.
  2. Environment variables (SPIRAL__*)
  3. Config file (~/.spiral.toml)
  4. Default values (production URLs)

Examples:

import spiral # Default configuration sp = spiral.Spiral() # With config overrides sp = spiral.Spiral(overrides={"limits.concurrency": "16"})

Arguments:

  • config - Custom ClientSettings object. Defaults to global settings.
  • overrides - Configuration overrides using dot notation, see the Client Configuration  page for a full list.

config

@property def config() -> ClientSettings

Returns the client’s configuration

authn

@property def authn() -> Authn

Get the authentication handler for this client.

list_projects

def list_projects() -> list["Project"]

List project IDs.

create_project

def create_project(id_prefix: str | None = None, *, name: str | None = None) -> "Project"

Create a project in the current, or given, organization.

project

def project(project_id: str) -> "Project"

Open an existing project.

table

def table(table_id: str) -> "Table"

Open a table using an ID.

text_index

def text_index(index_id: str) -> "TextIndex"

Open a text index using an ID.

key_space_index

def key_space_index(index_id: str) -> "KeySpaceIndex"

Open a key space index using an ID.

scan

def scan(*projections: ExprLike, where: ExprLike | None = None, asof: datetime | int | None = None) -> Scan

Starts a read transaction on the Spiral.

Arguments:

  • projections - a set of expressions that return struct arrays.
  • where - a query expression to apply to the data.
  • asof - execute the scan on the version of the table as of the given timestamp.

def search(top_k: int, *rank_by: ExprLike, filters: ExprLike | None = None, freshness_window: timedelta | None = None) -> pa.RecordBatchReader

Queries the index with the given rank by and filters clauses. Returns a stream of scored keys.

Arguments:

  • top_k - The number of top results to return.
  • rank_by - Rank by expressions are combined for scoring. See se.text.find and se.text.boost for scoring expressions.
  • filters - The filters expression is used to filter the results. It must return a boolean value and use only conjunctions (ANDs). Expressions in filters statement are considered either a must or must_not clause in search terminology.
  • freshness_window - If provided, the index will not be refreshed if its freshness does not exceed this window.

resume_scan

def resume_scan(state_json: str) -> Scan

Resumes a previously started scan using its scan state.

Arguments:

  • state_json - The scan state returned by a previous scan.

compute_shards

def compute_shards(max_batch_size: int, *projections: ExprLike, where: ExprLike | None = None, asof: datetime | int | None = None, stream: bool = False) -> list[Shard]

Computes shards over the given projections and filter.

Arguments:

  • max_batch_size - The maximum number of rows per shard.
  • projections - a set of expressions that return struct arrays.
  • where - a query expression to apply to the data.
  • asof - execute the scan on the version of the table as of the given timestamp.
  • stream - if true, builds shards in a streaming fashion, suitable for very large tables.

iceberg

@property def iceberg() -> "Iceberg"

Apache Iceberg is a powerful open-source table format designed for high-performance data lakes. Iceberg brings reliability, scalability, and advanced features like time travel, schema evolution, and ACID transactions to your warehouse.

Project

class Project()

table

def table(identifier: str) -> Table

Open a table with a dataset.table identifier, or table name using the default dataset.

create_table

def create_table(identifier: str, *, key_schema: Schema | pa.Schema | Iterable[pa.Field[pa.DataType]] | Iterable[tuple[str, pa.DataType]] | Mapping[str, pa.DataType], root_uri: Uri | None = None, exist_ok: bool = False) -> Table

Create a new table in the project.

Arguments:

  • identifier - The table identifier, in the form dataset.table or table.
  • key_schema - The schema of the table’s keys.
  • root_uri - The root URI for the table.
  • exist_ok - If True, do not raise an error if the table already exists.

move_table

def move_table(identifier: str, new_dataset: str)

Move a table to a new dataset in the project.

Arguments:

  • identifier - The table identifier, in the form dataset.table or table.
  • new_dataset - The dataset into which to move this table.

rename_table

def rename_table(identifier: str, new_table: str)

Move a table to a new dataset in the project.

Arguments:

  • identifier - The table identifier, in the form dataset.table or table.
  • new_dataset - The dataset into which to move this table.

text_index

def text_index(name: str) -> TextIndex

Returns the index with the given name.

create_text_index

def create_text_index(name: str, *projections: ExprLike, where: ExprLike | None = None, root_uri: Uri | None = None, exist_ok: bool = False) -> TextIndex

Creates a text index over the table projection.

See se.text.field for how to create and configure indexable fields.

Arguments:

  • name - The index name. Must be unique within the project.
  • projections - At least one projection expression is required. All projections must reference the same table.
  • where - An optional filter expression to apply to the index.
  • root_uri - The root URI for the index.
  • exist_ok - If True, do not raise an error if the index already exists.

key_space_index

def key_space_index(name: str) -> KeySpaceIndex

Returns the index with the given name.

create_key_space_index

def create_key_space_index(name: str, granularity: int, *projections: ExprLike, where: ExprLike | None = None, root_uri: Uri | None = None, exist_ok: bool = False) -> KeySpaceIndex

Creates a key space index over the table projection.

Arguments:

  • name - The index name. Must be unique within the project.
  • granularity - The granularity at which to store keys, i.e. the size of desired key ranges. The key ranges will not be greater than 2x the granularity, but may be smaller.
  • projections - At least one projection expression is required. All projections must reference the same table.
  • where - An optional filter expression to apply to the index.
  • root_uri - The root URI for the index.
  • exist_ok - If True, do not raise an error if the index already exists.

Table

class Table(Expr)

API for interacting with a SpiralDB’s Table.

Spiral Table is a powerful and flexible way for storing, analyzing, and querying massive and/or multimodal datasets. The data model will feel familiar to users of SQL- or DataFrame-style systems, yet is designed to be more flexible, more powerful, and more useful in the context of modern data processing.

Tables are stored and queried directly from object storage.

identifier

@property def identifier() -> str

Returns the fully qualified identifier of the table.

project

@property def project() -> str | None

Returns the project of the table.

dataset

@property def dataset() -> str | None

Returns the dataset of the table.

name

@property def name() -> str | None

Returns the name of the table.

key_schema

@property def key_schema() -> Schema

Returns the key schema of the table.

schema

def schema() -> Schema

Returns the FULL schema of the table.

NOTE: This can be expensive for large tables.

write

def write(expr: ExprLike, push_down_nulls: bool = False, **kwargs) -> None

Write an item to the table inside a single transaction.

Arguments:

  • push_down_nulls: Whether to push down nullable structs down its children. E.g. [{"a": 1}, null] would become [{"a": 1}, {"a": null}]. SpiralDB doesn’t allow struct-level nullability, so use this option if your data contains nullable structs.
  • expr: The expression to write. Must evaluate to a struct array.

enrich

def enrich(*projections: ExprLike, where: ExprLike | None = None) -> Enrichment

Returns an Enrichment object that, when applied, produces new columns.

Enrichment can be applied in different ways, e.g. distributed.

Arguments:

  • projections: Projection expressions deriving new columns to write back. Expressions can be over multiple Spiral tables, but all tables including this one must share the same key schema.
  • where: Optional filter expression to apply when reading the input tables.

drop_columns

def drop_columns(column_paths: list[str]) -> None

Drops the specified columns from the table.

Arguments:

  • column_paths: Fully qualified column names. (e.g., “column_name” or “nested.field”). All columns must exist, if a column doesn’t exist the function will return an error.

snapshot

def snapshot(asof: datetime | int | None = None) -> Snapshot

Returns a snapshot of the table at the given timestamp.

txn

def txn(**kwargs) -> Transaction

Begins a new transaction. Transaction must be committed for writes to become visible.

While transaction can be used to atomically write data to the table, it is important that the primary key columns are unique within the transaction. The behavior is undefined if this is not the case.

to_dataset

def to_dataset() -> "ds.Dataset"

Returns a PyArrow Dataset representing the table.

to_polars

def to_polars() -> "pl.LazyFrame"

Returns a Polars LazyFrame for the Spiral table.

to_duckdb

def to_duckdb() -> "duckdb.DuckDBPyRelation"

Returns a DuckDB relation for the Spiral table.

to_streaming

def to_streaming(index: "KeySpaceIndex", *, projection: Expr | None = None, cache_dir: str | None = None, shard_row_block_size: int | None = None) -> "SpiralStream"

Returns a stream to be used with MosaicML’s StreamingDataset.

Requires streaming package to be installed.

Arguments:

  • index - Prebuilt KeysIndex to use when creating the stream. The index’s asof will be used when scanning.
  • projection - Optional projection to use when scanning the table if index’s projection is not used. Projection must be compatible with the index’s projection for correctness.
  • cache_dir - Directory to use for caching data. If None, a temporary directory will be used.
  • shard_row_block_size - Number of rows per segment of a shard file. Defaults to 8192. Value should be set to lower for larger rows.

Snapshot

class Snapshot()

Spiral table snapshot.

A snapshot represents a point-in-time view of a table.

asof

@property def asof() -> Timestamp

Returns the asof timestamp of the snapshot.

schema

def schema() -> Schema

Returns the schema of the snapshot.

table

@property def table() -> "Table"

Returns the table associated with the snapshot.

to_dataset

def to_dataset() -> "ds.Dataset"

Returns a PyArrow Dataset representing the table.

to_polars

def to_polars() -> "pl.LazyFrame"

Returns a Polars LazyFrame for the Spiral table.

to_duckdb

def to_duckdb() -> "duckdb.DuckDBPyRelation"

Returns a DuckDB relation for the Spiral table.

Scan

class Scan()

Scan object.

metrics

@property def metrics() -> dict[str, Any]

Returns metrics about the scan.

schema

@property def schema() -> Schema

Returns the schema of the scan.

key_schema

@property def key_schema() -> Schema

Returns the key schema of the scan.

is_empty

def is_empty() -> bool

Check if the Spiral is empty for the given key range.

False negatives are possible, but false positives are not, i.e. is_empty can return False and scan can return zero rows.

to_record_batches

def to_record_batches(*, shards: list[Shard] | None = None, key_table: pa.Table | pa.RecordBatchReader | None = None, batch_size: int | None = None, batch_readahead: int | None = None, hide_progress_bar: bool = False) -> pa.RecordBatchReader

Read as a stream of RecordBatches.

Arguments:

  • shards - Optional list of shards to evaluate. If provided, only the specified shards will be read. Must not be provided together with key_table.
  • key_table - a table of keys to “take” (including aux columns for cell-push-down). If None, the scan will be executed without a key table.
  • batch_size - the maximum number of rows per returned batch. This is currently only respected when the key_table is used. If key table is a RecordBatchReader, the batch_size argument must be None, and the existing batching is respected.
  • batch_readahead - the number of batches to prefetch in the background.
  • hide_progress_bar - If True, disables the progress bar during reading.

to_unordered_record_batches

def to_unordered_record_batches( *, shards: list[Shard] | None = None, key_table: pa.Table | pa.RecordBatchReader | None = None, batch_size: int | None = None, batch_readahead: int | None = None, hide_progress_bar: bool = False) -> pa.RecordBatchReader

Read as a stream of RecordBatches, NOT ordered by key.

Arguments:

  • shards - Optional list of shards to evaluate. If provided, only the specified shards will be read. Must not be provided together with key_table.
  • key_table - a table of keys to “take” (including aux columns for cell-push-down). If None, the scan will be executed without a key table.
  • batch_size - the maximum number of rows per returned batch. This is currently only respected when the key_table is used. If key table is a RecordBatchReader, the batch_size argument must be None, and the existing batching is respected.
  • batch_readahead - the number of batches to prefetch in the background.
  • hide_progress_bar - If True, disables the progress bar during reading.

to_table

def to_table(**kwargs) -> pa.Table

Read into a single PyArrow Table.

to_dask

def to_dask() -> "dd.DataFrame"

Read into a Dask DataFrame.

Requires the dask package to be installed.

Dask execution has some limitations, e.g. UDFs are not currently supported. These limitations usually manifest as serialization errors when Dask workers attempt to serialize the state. If you are encountering such issues, please reach out to the support for assistance.

to_pandas

def to_pandas(**kwargs) -> "pd.DataFrame"

Read into a Pandas DataFrame.

Requires the pandas package to be installed.

to_polars

def to_polars(**kwargs) -> "pl.DataFrame"

Read into a Polars DataFrame.

Requires the polars package to be installed.

to_data_loader

def to_data_loader(seed: int = 42, shuffle_buffer_size: int = 0, batch_size: int = 32, **kwargs) -> "SpiralDataLoader"

Read into a Torch-compatible DataLoader for single-node training.

Arguments:

  • seed - Random seed for reproducibility.
  • shuffle_buffer_size - Size of shuffle buffer. Zero means no shuffling.
  • batch_size - Batch size.
  • **kwargs - Additional arguments passed to SpiralDataLoader constructor.

Returns:

SpiralDataLoader with shuffled shards.

to_distributed_data_loader

def to_distributed_data_loader(world: Optional["World"] = None, shards: list[Shard] | None = None, seed: int = 42, shuffle_buffer_size: int = 0, batch_size: int = 32, **kwargs) -> "SpiralDataLoader"

Read into a Torch-compatible DataLoader for distributed training.

Arguments:

  • world - World configuration with rank and world_size. If None, auto-detects from torch.distributed.
  • shards - Optional sharding. Sharding is global, i.e. the world will be used to select the shards for this rank. If None, uses scan’s natural sharding.
  • seed - Random seed for reproducibility.
  • shuffle_buffer_size - Size of shuffle buffer. Zero means no shuffling.
  • batch_size - Batch size.
  • **kwargs - Additional arguments passed to SpiralDataLoader constructor.

Returns:

SpiralDataLoader with shards partitioned for this rank.

Auto-detect from PyTorch distributed:

import spiral from spiral.dataloader import SpiralDataLoader, World from spiral.demo import fineweb sp = spiral.Spiral() fineweb_table = fineweb(sp) scan = sp.scan(fineweb_table[["text"]]) loader: SpiralDataLoader = scan.to_distributed_data_loader(batch_size=32)

Explicit world configuration:

world = World(rank=0, world_size=4) loader: SpiralDataLoader = scan.to_distributed_data_loader(world=world, batch_size=32)

resume_data_loader

def resume_data_loader(state: dict[str, Any], **kwargs) -> "SpiralDataLoader"

Create a DataLoader from checkpoint state, resuming from where it left off.

This is the recommended way to resume training from a checkpoint. It extracts the seed, samples_yielded, and shards from the state dict and creates a new DataLoader that will skip the already-processed samples.

Arguments:

  • state - Checkpoint state from state_dict().
  • **kwargs - Additional arguments to pass to SpiralDataLoader constructor. These will override values in the state dict where applicable.

Returns:

New SpiralDataLoader instance configured to resume from the checkpoint.

Save checkpoint during training:

import spiral from spiral.dataloader import SpiralDataLoader, World from spiral.demo import images, fineweb sp = spiral.Spiral() table = images(sp) fineweb_table = fineweb(sp) scan = sp.scan(fineweb_table[["text"]]) loader = scan.to_distributed_data_loader(batch_size=32, seed=42) checkpoint = loader.state_dict()

Resume later - uses same shards from checkpoint:

resumed_loader = scan.resume_data_loader( checkpoint, batch_size=32, # An optional transform_fn may be provided: # transform_fn=my_transform, )

to_iterable_dataset

def to_iterable_dataset(shards: list[Shard] | None = None, shuffle: ShuffleConfig | None = None, batch_readahead: int | None = None, infinite: bool = False) -> "hf.IterableDataset"

Returns a Huggingface’s IterableDataset.

Requires datasets package to be installed.

Note: For new code, consider using SpiralDataLoader instead.

Arguments:

  • shards - Optional list of shards to read. If None, uses scan’s natural sharding.
  • shuffle - Optional ShuffleConfig for configuring within-shard sample shuffling. If None, no shuffling is performed.
  • batch_readahead - Controls how many batches to read ahead concurrently. If pipeline includes work after reading (e.g. decoding, transforming, …) this can be set higher. Otherwise, it should be kept low to reduce next batch latency. Defaults to 2.
  • infinite - If True, the returned IterableDataset will loop infinitely over the data, re-shuffling ranges after exhausting all data.

shards

def shards() -> list[Shard]

Get list of shards for this scan.

The shards are based on the scan’s physical data layout (file fragments). Each shard contains a key range and cardinality (set to None when unknown).

Returns:

List of Shard objects with key range and cardinality (if known).

state_json

def state_json() -> str

Get the scan state as a JSON string.

This state can be used to resume the scan later using Spiral.resume_scan().

Returns:

JSON string representing the internal scan state.

Transaction

class Transaction()

Spiral table transaction.

While transaction can be used to atomically write data to the table, it is important that the primary key columns are unique within the transaction.

status

@property def status() -> str

The status of the transaction.

is_empty

def is_empty() -> bool

Check if the transaction has no operations.

write

def write(expr: ExprLike, push_down_nulls: bool = False)

Write an item to the table inside a single transaction.

Arguments:

  • push_down_nulls: Whether to push down nullable structs down its children. E.g. [{"a": 1}, null] would become [{"a": 1}, {"a": null}]. SpiralDB doesn’t allow struct-level nullability, so use this option if your data contains nullable structs.
  • expr: The expression to write. Must evaluate to a struct array.

writeback

def writeback(scan: Scan, *, shards: list[Shard] | None = None)

Write back the results of a scan to the table.

Arguments:

  • scan: The scan to write back. The scan does NOT need to be over the same table as transaction, but it does need to have the same key schema.
  • shards: The shards to read from. If not provided, all shards are read.

drop_columns

def drop_columns(column_paths: list[str])

Drops the specified columns from the table.

Arguments:

  • column_paths: Fully qualified column names. (e.g., “column_name” or “nested.field”). All columns must exist, if a column doesn’t exist the function will return an error.

compact_key_space

def compact_key_space()

Compact the key space of the table.

take

def take() -> list[Operation]

Take the operations from the transaction

Transaction can no longer be committed or aborted after calling this method. .

include

def include(ops: list[Operation])

Include the given operations in the transaction.

Checks for conflicts between the included operations and any existing operations.

commit

def commit(*, txn_dump: str | None = None, compact: bool = False)

Commit the transaction.

load_dumps

@staticmethod def load_dumps(*txn_dump: str) -> list[Operation]

Load a transaction from a dump file.

abort

def abort()

Abort the transaction.

Enrichment

class Enrichment()

An enrichment is used to derive new columns from the existing once, such as fetching data from object storage with se.s3.get or compute embeddings. With column groups design supporting 100s of thousands of columns, horizontally expanding tables are a powerful primitive.

NOTE: Spiral aims to optimize enrichments where source and destination table are the same.

table

@property def table() -> Table

The table to write back into.

projection

@property def projection() -> Expr

The projection expression.

where

@property def where() -> Expr | None

The filter expression.

apply

def apply(*, txn_dump: str | None = None) -> None

Apply the enrichment onto the table in a streaming fashion.

For large tables, consider using apply_dask for distributed execution.

Arguments:

  • txn_dump - Optional path to dump the transaction JSON for debugging.

apply_dask

def apply_dask(*, max_task_size: int | None = None, checkpoint_dump: str | None = None, shards: list[Shard] | None = None, txn_dump: str | None = None, client: dask.distributed.Client | None = None, **kwargs) -> None

Use distributed Dask to apply the enrichment. Requires dask[distributed] to be installed.

If “address” of an existing Dask cluster is not provided in kwargs, a local cluster will be created.

Dask execution has some limitations, e.g. UDFs are not currently supported. These limitations usually manifest as serialization errors when Dask workers attempt to serialize the state. If you are encountering such issues, consider splitting the enrichment into UDF-only derivation that will be executed in a streaming fashion, followed by a Dask enrichment for the rest of the computation. If that is not possible, please reach out to the support for assistance.

How shards are determined:

  • If shards is provided, those will be used directly.
  • Else, if checkpoint_dump is provided, shards will be loaded from the checkpoint.
  • Else, if max_task_size is provided, shards will be created based on the task size.
  • Else, the scan’s default sharding will be used.

Arguments:

  • max_task_size - Optional size task limit, in number of rows. Used for sharding. If provided and checkpoint is present, the checkpoint shards will be used instead. If not provided, the scan’s default sharding will be used.
  • checkpoint_dump - Optional path to dump intermediate checkpoints for incremental progress.
  • shards - Optional list of shards to process. If provided, max_task_size and checkpoint_dump are ignored.
  • txn_dump - Optional path to dump the transaction JSON for debugging.
  • client - Optional Dask distributed client. If not provided, a new client will be created
  • **kwargs - Additional keyword arguments to pass to dask.distributed.Client such as address to connect to an existing cluster.

World

@dataclass(frozen=True) class World()

Distributed training configuration.

Attributes:

  • rank - Process rank (0 to world_size-1).
  • world_size - Total number of processes.

shards

def shards(shards: list[Shard], shuffle_seed: int | None = None) -> list[Shard]

Partition shards for distributed training.

Arguments:

  • shards - List of Shard objects to partition.
  • shuffle_seed - Optional seed to shuffle before partitioning.

Returns:

Subset of shards for this rank (round-robin partitioning).

from_torch

@classmethod def from_torch(cls) -> World

Auto-detect world configuration from PyTorch distributed.

SpiralDataLoader

class SpiralDataLoader()

DataLoader optimized for Spiral’s multi-threaded streaming architecture.

Unlike PyTorch’s DataLoader which uses multiprocessing for I/O (num_workers), SpiralDataLoader leverages Spiral’s efficient Rust-based streaming and only uses multiprocessing for CPU-bound post-processing transforms.

Key differences from PyTorch DataLoader:

  • No num_workers for I/O (Spiral’s Rust layer is already multi-threaded)
  • map_workers for parallel post-processing (tokenization, decoding, etc.)
  • Built-in checkpoint support via skip_samples
  • Explicit shard-based architecture for distributed training

Simple usage:

def train_step(batch): pass loader = SpiralDataLoader(scan, batch_size=32) for batch in loader: train_step(batch)

With parallel transforms:

def tokenize_batch(batch): # ... return batch loader = SpiralDataLoader( scan, batch_size=32, transform_fn=tokenize_batch, map_workers=4, )

__init__

def __init__(scan: Scan, *, shards: list[Shard] | None = None, shuffle_shards: bool = True, seed: int = 42, skip_samples: int = 0, shuffle_buffer_size: int = 0, batch_size: int = 32, batch_readahead: int | None = None, transform_fn: Callable[[pa.RecordBatch], Any] | None = None, map_workers: int = 0, infinite: bool = False)

Initialize SpiralDataLoader.

Arguments:

  • scan - Spiral scan to load data from.
  • shards - Optional list of Shard objects to read. If None, uses scan’s natural sharding based on physical layout.
  • shuffle_shards - Whether to shuffle the list of shards. Uses the provided seed.
  • seed - Base random seed for deterministic shuffling and checkpointing.
  • skip_samples - Number of samples to skip at the beginning (for resuming from checkpoint).
  • shuffle_buffer_size - Size of shuffle buffer for within-shard shuffling. 0 means no shuffling.
  • batch_size - Number of rows per batch.
  • batch_readahead - Number of batches to prefetch in background. If None, uses a sensible default based on whether transforms are applied.
  • transform_fn - Optional function to transform each batch. Takes a PyArrow RecordBatch and returns any type. Users can call batch.to_pydict() inside the function if they need a dict. If map_workers > 0, this function must be picklable.
  • map_workers - Number of worker processes for parallel transform_fn application. 0 means single-process (no parallelism). Use this for CPU-bound transforms like tokenization or audio decoding.
  • infinite - Whether to cycle through the dataset infinitely. If True, the dataloader will repeat the dataset indefinitely. If False, the dataloader will stop after going through the dataset once.

__iter__

def __iter__() -> Iterator[Any]

Iterate over batches.

state_dict

def state_dict() -> dict[str, Any]

Get checkpoint state for resuming.

Returns:

Dictionary containing samples_yielded, seed, and shards.

Example checkpoint:

loader = SpiralDataLoader(scan, batch_size=32, seed=42) for i, batch in enumerate(loader): if i == 10: checkpoint = loader.state_dict() break

Example resume:

loader = SpiralDataLoader.from_state_dict(scan, checkpoint, batch_size=32)

from_state_dict

@classmethod def from_state_dict(cls, scan: Scan, state: dict[str, Any], **kwargs) -> SpiralDataLoader

Create a DataLoader from checkpoint state, resuming from where it left off.

This is the recommended way to resume training from a checkpoint. It extracts the seed, samples_yielded, and shards from the state dict and creates a new DataLoader that will skip the already-processed samples.

Arguments:

  • scan - Spiral scan to load data from.
  • state - Checkpoint state from state_dict().
  • **kwargs - Additional arguments to pass to SpiralDataLoader constructor. These will override values in the state dict where applicable.

Returns:

New SpiralDataLoader instance configured to resume from the checkpoint.

Save checkpoint during training:

loader = scan.to_distributed_data_loader(batch_size=32, seed=42) checkpoint = loader.state_dict()

Resume later using the same shards from checkpoint:

resumed_loader = SpiralDataLoader.from_state_dict( scan, checkpoint, batch_size=32, # An optional transform_fn may be provided: # transform_fn=my_transform, )

Iceberg

class Iceberg()

Apache Iceberg is a powerful open-source table format designed for high-performance data lakes. Iceberg brings reliability, scalability, and advanced features like time travel, schema evolution, and ACID transactions to your warehouse.

catalog

def catalog() -> "Catalog"

Open the Iceberg catalog.

Last updated on