Status: Example. Progress: Illustrative only; the training operators shown here are target syntax for the roadmap, not implemented engine APIs. Open questions: Final relation API spelling, SQL spelling, transform registry names, runtime binding names, and framework adapter shape remain open.
These examples describe the training data workflows Spiral should support once training batch streams are implemented. The syntax is preliminary and is expected to change. The point of the examples is to define the target behavior: relations stay engine-native, sampling and collation are visible in the logical plan, and the terminal object is a replayable stream consumable by PyTorch or other training frameworks.
The examples assume SpiralDB provides pre-registered catalog relations for the datasets. Users should not need to write raw file readers for common datasets. Catalog relations expose stable sample identity columns, semantic references to external bytes, and useful statistics for planning.
The common shape is:
catalog relation
-> ordinary relational work
-> Mix? # multiple populations or stream proportions
-> Sample # order, epoch, rank, replay
-> Collate # mini-batch membership and field representation
-> prepared training streamSample defines the selected example stream. Collate defines both
mini-batch membership and the representation of each batch’s fields. Fetch,
decode, normalization, pinned memory, and device placement are planner and
physical-lowering concerns unless the user explicitly requests them through the
collation output contract.
Image Classification
This example mirrors the role of a DALI-style ImageNet input pipeline: shard the data across training ranks, shuffle deterministically, decode compressed images, apply image transforms, normalize, and emit GPU-ready tensors.
Catalog Relations
vision.imagenet_train is a registered relation over an image training split.
It may be backed by Vortex files, local files, S3 objects, or another object
store. The relation exposes image bytes through a semantic reference instead of
eagerly decoding the image.
vision.imagenet_train
sample_id: utf8 # stable example identity
split: utf8 # train / validation
image_ref: image/jpeg-ref # semantic handle to compressed image bytes
label_id: int64
width: int32
height: int32
source_uri: utf8
storage_shard: utf8 # file, row group, object, or layout unit
ingest_ts: timestamp_nsvision.imagenet_labels is a small metadata relation:
vision.imagenet_labels
label_id: int64
class_name: utf8Target API Sketch
train = (
spiraldb.table("vision.imagenet_train")
.join(spiraldb.table("vision.imagenet_labels"), on="label_id")
.filter(col("split") == "train")
.sample(
identity="sample_id",
order=ShardAwareShuffle(
seed=17,
shard_key=["storage_shard"],
shuffle_block_size=1_000_000,
),
epoch=RuntimeParam("epoch"),
assignment=DataParallel(
rank=RuntimeParam("data_parallel_rank"),
world_size=RuntimeParam("data_parallel_world_size"),
),
)
.collate(
batch_size=256,
last=DropPartial,
fields={
"image": Stack(
"image_ref",
decode="jpeg",
transforms=[
RandomResizedCrop(size=224),
RandomHorizontalFlip(),
Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]),
],
dtype="float16",
layout="nchw",
device="cuda",
),
"label": Stack("label_id", dtype="int64"),
"sample_id": List("sample_id"),
},
)
.to_torch(prefetch=4)
)What This Demonstrates
Sampledoes not globally sort by a random key. It creates deterministic rank-local streams from shard-aware shuffle metadata.storage_shardis a locality hint, not a claim that storage order is unbiased.EXPLAIN DATAshould report label and source skew by shard when statistics are available.- Decode, crop, normalization, tensor layout, and device placement are planner
choices driven by the
Collateoutput fields. DropPartialis part of collation because it decides whether the final short group becomes a model-visible batch.- The prepared stream binds
epoch,data_parallel_rank, anddata_parallel_world_sizeat run start or resume.
Expected PyTorch batch:
{
"image": torch.Tensor[B, 3, 224, 224] on cuda,
"label": torch.Tensor[B],
"sample_id": list[str],
}Mixed Text Pretraining
This example combines several text populations into one training stream. It is
the best example for the Mix -> Sample -> Collate shape because the stream
mixture, quality filters, deterministic shuffling, tokenization, token-budget
batching, and dynamic padding all matter.
Catalog Relations
SpiralDB should register one relation per source population. Each relation exposes document text, quality metadata, language metadata, and storage locality. The exact source names are deployment-specific; the schema shape is the contract.
text.fineweb_documents
sample_id: utf8
text: utf8
lang: utf8
quality_score: float64
token_count_estimate: int32
url: utf8
source_uri: utf8
storage_shard: utf8
text.wikipedia_documents
sample_id: utf8
text: utf8
lang: utf8
quality_score: float64
token_count_estimate: int32
page_id: utf8
source_uri: utf8
storage_shard: utf8
text.code_documents
sample_id: utf8
text: utf8
lang: utf8 # natural language of surrounding metadata
language: utf8 # programming language
quality_score: float64
token_count_estimate: int32
repository: utf8
license: utf8
source_uri: utf8
storage_shard: utf8Target API Sketch
docs = spiraldb.mix(
[
Stream("fineweb", spiraldb.table("text.fineweb_documents"),
proportion=0.70),
Stream("wikipedia", spiraldb.table("text.wikipedia_documents"),
proportion=0.10),
Stream("code", spiraldb.table("text.code_documents"),
proportion=0.20),
],
epoch_size=10_000_000_000,
)
train = (
docs
.filter((col("lang") == "en") & (col("quality_score") >= 0.80))
.project(
"sample_id",
"stream_id",
"text",
"token_count_estimate",
"source_uri",
"storage_shard",
)
.sample(
identity="sample_id",
order=ShardAwareShuffle(
seed=9176,
shard_key=["stream_id", "storage_shard"],
shuffle_block_size=4_000_000,
sampling_granularity="row_group",
),
epoch=RuntimeParam("epoch"),
assignment=DataParallel(
rank=RuntimeParam("data_parallel_rank"),
world_size=RuntimeParam("data_parallel_world_size"),
),
)
.map(Tokenize("text", tokenizer="llama-tokenizer") >> "tokens")
.collate(
batch_size=None,
formation=WindowedBucket(
window_size=8192,
keys=[BucketKey("len(tokens)")],
budget=BatchBudget(max_tokens=2_000_000),
),
composition=StratifiedByStream("stream_id"),
last=DropPartial,
fields={
"input_ids": Pad("tokens", pad_to_multiple_of=128),
"attention_mask": PadMask("tokens", pad_to_multiple_of=128),
"labels": ShiftedLabels("tokens", ignore_index=-100),
"sample_id": List("sample_id"),
"stream_id": List("stream_id"),
},
)
.to_torch(prefetch=8, device="cuda")
)What This Demonstrates
Mixis separate from shuffle. It defines the population and per-epoch source proportions before sampling decides order.- Relational filters and projections run before tokenization and before remote fetch/decode work where possible.
ShardAwareShuffleexposes the throughput/randomness tradeoff instead of implying an online global random sort.WindowedBucketminimizes padding waste with bounded memory and replayable tie-breaking.StratifiedByStreammakes per-batch stream composition explicit instead of relying on whatever order happens to emerge from storage.state_dict()must resume at the same rank-local sample and batch position without replaying the full epoch prefix.
Expected PyTorch batch:
{
"input_ids": torch.Tensor[B, T] on cuda,
"attention_mask": torch.Tensor[B, T] on cuda,
"labels": torch.Tensor[B, T] on cuda,
"sample_id": list[str],
"stream_id": list[str],
}Video Clip Training
This example targets video models and multimodal systems. It stresses expensive decode, sequence tensors, shape variance, source skew, and prefetch behavior.
Catalog Relations
video.training_segments is a registered relation where each row is a
candidate training clip or segment. A row may point to a full video plus
timestamp range, or to a precomputed clip object.
video.training_segments
sample_id: utf8
video_id: utf8
clip_ref: video/h264-ref
start_time_ms: int64
duration_ms: int64
frame_count: int32
width: int32
height: int32
fps: float32
source: utf8
creator_id: utf8
upload_ts: timestamp_ns
storage_shard: utf8video.segment_labels carries supervised labels, weak labels, or multimodal
targets:
video.segment_labels
sample_id: utf8
label_id: int64
caption: utf8
confidence: float32Target API Sketch
train = (
spiraldb.table("video.training_segments")
.join(spiraldb.table("video.segment_labels"), on="sample_id")
.filter((col("duration_ms") >= 2_000) & (col("confidence") >= 0.7))
.sample(
identity="sample_id",
order=HashBucketed(
seed=123456,
bucket_count=8192,
bucket_order=ShuffleBuckets(seed=123456),
),
epoch=RuntimeParam("epoch"),
assignment=DataParallel(
rank=RuntimeParam("data_parallel_rank"),
world_size=RuntimeParam("data_parallel_world_size"),
),
)
.collate(
batch_size=32,
formation=WindowedBucket(
window_size=2048,
keys=[
BucketKey("frame_count"),
BucketKey("width * height"),
],
budget=BatchBudget(max_frames=32 * 16, max_bytes=8_000_000_000),
),
last=KeepPartial,
fields={
"frames": Stack(
"clip_ref",
decode="h264",
sequence_length=16,
resize_shorter=256,
crop=224,
dtype="float16",
layout="ntchw",
device="cuda",
),
"label": Stack("label_id", dtype="int64"),
"caption": List("caption"),
"video_id": List("video_id"),
"sample_id": List("sample_id"),
},
)
.to_torch(prefetch=2)
)What This Demonstrates
- Sampling is based on stable
sample_id, not upload time or UUIDv7 order. Video ingest order is often correlated with creator, event, region, or topic. HashBucketedcan provide deterministic distribution without a globalORDER BY hash(sample_id)sort.WindowedBucketgroups examples with similar frame and resolution costs while preserving a bounded-memory streaming contract.- The planner can choose whether clip decode happens before or after batch membership based on which metadata is already available.
EXPLAIN DATAshould report decode bytes, frame bytes, cache hit rate, prefetch depth, padding or dropped-frame behavior, and skew by source or creator when statistics are present.
Expected PyTorch batch:
{
"frames": torch.Tensor[B, T, C, H, W] on cuda,
"label": torch.Tensor[B],
"caption": list[str],
"video_id": list[str],
"sample_id": list[str],
}Relationship to Existing Systems
These examples intentionally cover common workflows from existing training data systems:
- PyTorch
DataLoaderstyle batching, distributed sampling, and epoch-bound reshuffling. - DALI-style decode, augmentation, GPU delivery, and per-device sharding.
- Mosaic Streaming-style stream mixing, cache-aware shuffling, mid-epoch resumption, and locality-aware sample assignment.
- Hugging Face-style streaming text datasets, tokenization, dynamic padding, and language-modeling collation.
The Spiral difference is that these behaviors are represented as relation
planning and execution contracts. The engine should be able to explain the data
path, replay it, resume it, and share the same physical ArrayRef data plane
used by ordinary query execution.
Reference workflows:
- PyTorch data loading, samplers, distributed samplers, batching, and collation: PyTorch data docs
- NVIDIA DALI image pipelines and PyTorch integration: PyTorch basic example
- NVIDIA DALI video reader pipeline: video reader example
- Mosaic Streaming dataset mixing: mixing data sources
- Mosaic Streaming shuffling and fast resumption: shuffling and fast resumption
- Hugging Face streaming datasets and language modeling collation: streaming datasets and language modeling