Skip to Content

Status: Example. Progress: Illustrative only; the training operators shown here are target syntax for the roadmap, not implemented engine APIs. Open questions: Final relation API spelling, SQL spelling, transform registry names, runtime binding names, and framework adapter shape remain open.

These examples describe the training data workflows Spiral should support once training batch streams are implemented. The syntax is preliminary and is expected to change. The point of the examples is to define the target behavior: relations stay engine-native, sampling and collation are visible in the logical plan, and the terminal object is a replayable stream consumable by PyTorch or other training frameworks.

The examples assume SpiralDB provides pre-registered catalog relations for the datasets. Users should not need to write raw file readers for common datasets. Catalog relations expose stable sample identity columns, semantic references to external bytes, and useful statistics for planning.

The common shape is:

catalog relation -> ordinary relational work -> Mix? # multiple populations or stream proportions -> Sample # order, epoch, rank, replay -> Collate # mini-batch membership and field representation -> prepared training stream

Sample defines the selected example stream. Collate defines both mini-batch membership and the representation of each batch’s fields. Fetch, decode, normalization, pinned memory, and device placement are planner and physical-lowering concerns unless the user explicitly requests them through the collation output contract.

Image Classification

This example mirrors the role of a DALI-style ImageNet input pipeline: shard the data across training ranks, shuffle deterministically, decode compressed images, apply image transforms, normalize, and emit GPU-ready tensors.

Catalog Relations

vision.imagenet_train is a registered relation over an image training split. It may be backed by Vortex files, local files, S3 objects, or another object store. The relation exposes image bytes through a semantic reference instead of eagerly decoding the image.

vision.imagenet_train sample_id: utf8 # stable example identity split: utf8 # train / validation image_ref: image/jpeg-ref # semantic handle to compressed image bytes label_id: int64 width: int32 height: int32 source_uri: utf8 storage_shard: utf8 # file, row group, object, or layout unit ingest_ts: timestamp_ns

vision.imagenet_labels is a small metadata relation:

vision.imagenet_labels label_id: int64 class_name: utf8

Target API Sketch

train = ( spiraldb.table("vision.imagenet_train") .join(spiraldb.table("vision.imagenet_labels"), on="label_id") .filter(col("split") == "train") .sample( identity="sample_id", order=ShardAwareShuffle( seed=17, shard_key=["storage_shard"], shuffle_block_size=1_000_000, ), epoch=RuntimeParam("epoch"), assignment=DataParallel( rank=RuntimeParam("data_parallel_rank"), world_size=RuntimeParam("data_parallel_world_size"), ), ) .collate( batch_size=256, last=DropPartial, fields={ "image": Stack( "image_ref", decode="jpeg", transforms=[ RandomResizedCrop(size=224), RandomHorizontalFlip(), Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), ], dtype="float16", layout="nchw", device="cuda", ), "label": Stack("label_id", dtype="int64"), "sample_id": List("sample_id"), }, ) .to_torch(prefetch=4) )

What This Demonstrates

  • Sample does not globally sort by a random key. It creates deterministic rank-local streams from shard-aware shuffle metadata.
  • storage_shard is a locality hint, not a claim that storage order is unbiased. EXPLAIN DATA should report label and source skew by shard when statistics are available.
  • Decode, crop, normalization, tensor layout, and device placement are planner choices driven by the Collate output fields.
  • DropPartial is part of collation because it decides whether the final short group becomes a model-visible batch.
  • The prepared stream binds epoch, data_parallel_rank, and data_parallel_world_size at run start or resume.

Expected PyTorch batch:

{ "image": torch.Tensor[B, 3, 224, 224] on cuda, "label": torch.Tensor[B], "sample_id": list[str], }

Mixed Text Pretraining

This example combines several text populations into one training stream. It is the best example for the Mix -> Sample -> Collate shape because the stream mixture, quality filters, deterministic shuffling, tokenization, token-budget batching, and dynamic padding all matter.

Catalog Relations

SpiralDB should register one relation per source population. Each relation exposes document text, quality metadata, language metadata, and storage locality. The exact source names are deployment-specific; the schema shape is the contract.

text.fineweb_documents sample_id: utf8 text: utf8 lang: utf8 quality_score: float64 token_count_estimate: int32 url: utf8 source_uri: utf8 storage_shard: utf8 text.wikipedia_documents sample_id: utf8 text: utf8 lang: utf8 quality_score: float64 token_count_estimate: int32 page_id: utf8 source_uri: utf8 storage_shard: utf8 text.code_documents sample_id: utf8 text: utf8 lang: utf8 # natural language of surrounding metadata language: utf8 # programming language quality_score: float64 token_count_estimate: int32 repository: utf8 license: utf8 source_uri: utf8 storage_shard: utf8

Target API Sketch

docs = spiraldb.mix( [ Stream("fineweb", spiraldb.table("text.fineweb_documents"), proportion=0.70), Stream("wikipedia", spiraldb.table("text.wikipedia_documents"), proportion=0.10), Stream("code", spiraldb.table("text.code_documents"), proportion=0.20), ], epoch_size=10_000_000_000, ) train = ( docs .filter((col("lang") == "en") & (col("quality_score") >= 0.80)) .project( "sample_id", "stream_id", "text", "token_count_estimate", "source_uri", "storage_shard", ) .sample( identity="sample_id", order=ShardAwareShuffle( seed=9176, shard_key=["stream_id", "storage_shard"], shuffle_block_size=4_000_000, sampling_granularity="row_group", ), epoch=RuntimeParam("epoch"), assignment=DataParallel( rank=RuntimeParam("data_parallel_rank"), world_size=RuntimeParam("data_parallel_world_size"), ), ) .map(Tokenize("text", tokenizer="llama-tokenizer") >> "tokens") .collate( batch_size=None, formation=WindowedBucket( window_size=8192, keys=[BucketKey("len(tokens)")], budget=BatchBudget(max_tokens=2_000_000), ), composition=StratifiedByStream("stream_id"), last=DropPartial, fields={ "input_ids": Pad("tokens", pad_to_multiple_of=128), "attention_mask": PadMask("tokens", pad_to_multiple_of=128), "labels": ShiftedLabels("tokens", ignore_index=-100), "sample_id": List("sample_id"), "stream_id": List("stream_id"), }, ) .to_torch(prefetch=8, device="cuda") )

What This Demonstrates

  • Mix is separate from shuffle. It defines the population and per-epoch source proportions before sampling decides order.
  • Relational filters and projections run before tokenization and before remote fetch/decode work where possible.
  • ShardAwareShuffle exposes the throughput/randomness tradeoff instead of implying an online global random sort.
  • WindowedBucket minimizes padding waste with bounded memory and replayable tie-breaking.
  • StratifiedByStream makes per-batch stream composition explicit instead of relying on whatever order happens to emerge from storage.
  • state_dict() must resume at the same rank-local sample and batch position without replaying the full epoch prefix.

Expected PyTorch batch:

{ "input_ids": torch.Tensor[B, T] on cuda, "attention_mask": torch.Tensor[B, T] on cuda, "labels": torch.Tensor[B, T] on cuda, "sample_id": list[str], "stream_id": list[str], }

Video Clip Training

This example targets video models and multimodal systems. It stresses expensive decode, sequence tensors, shape variance, source skew, and prefetch behavior.

Catalog Relations

video.training_segments is a registered relation where each row is a candidate training clip or segment. A row may point to a full video plus timestamp range, or to a precomputed clip object.

video.training_segments sample_id: utf8 video_id: utf8 clip_ref: video/h264-ref start_time_ms: int64 duration_ms: int64 frame_count: int32 width: int32 height: int32 fps: float32 source: utf8 creator_id: utf8 upload_ts: timestamp_ns storage_shard: utf8

video.segment_labels carries supervised labels, weak labels, or multimodal targets:

video.segment_labels sample_id: utf8 label_id: int64 caption: utf8 confidence: float32

Target API Sketch

train = ( spiraldb.table("video.training_segments") .join(spiraldb.table("video.segment_labels"), on="sample_id") .filter((col("duration_ms") >= 2_000) & (col("confidence") >= 0.7)) .sample( identity="sample_id", order=HashBucketed( seed=123456, bucket_count=8192, bucket_order=ShuffleBuckets(seed=123456), ), epoch=RuntimeParam("epoch"), assignment=DataParallel( rank=RuntimeParam("data_parallel_rank"), world_size=RuntimeParam("data_parallel_world_size"), ), ) .collate( batch_size=32, formation=WindowedBucket( window_size=2048, keys=[ BucketKey("frame_count"), BucketKey("width * height"), ], budget=BatchBudget(max_frames=32 * 16, max_bytes=8_000_000_000), ), last=KeepPartial, fields={ "frames": Stack( "clip_ref", decode="h264", sequence_length=16, resize_shorter=256, crop=224, dtype="float16", layout="ntchw", device="cuda", ), "label": Stack("label_id", dtype="int64"), "caption": List("caption"), "video_id": List("video_id"), "sample_id": List("sample_id"), }, ) .to_torch(prefetch=2) )

What This Demonstrates

  • Sampling is based on stable sample_id, not upload time or UUIDv7 order. Video ingest order is often correlated with creator, event, region, or topic.
  • HashBucketed can provide deterministic distribution without a global ORDER BY hash(sample_id) sort.
  • WindowedBucket groups examples with similar frame and resolution costs while preserving a bounded-memory streaming contract.
  • The planner can choose whether clip decode happens before or after batch membership based on which metadata is already available.
  • EXPLAIN DATA should report decode bytes, frame bytes, cache hit rate, prefetch depth, padding or dropped-frame behavior, and skew by source or creator when statistics are present.

Expected PyTorch batch:

{ "frames": torch.Tensor[B, T, C, H, W] on cuda, "label": torch.Tensor[B], "caption": list[str], "video_id": list[str], "sample_id": list[str], }

Relationship to Existing Systems

These examples intentionally cover common workflows from existing training data systems:

  • PyTorch DataLoader style batching, distributed sampling, and epoch-bound reshuffling.
  • DALI-style decode, augmentation, GPU delivery, and per-device sharding.
  • Mosaic Streaming-style stream mixing, cache-aware shuffling, mid-epoch resumption, and locality-aware sample assignment.
  • Hugging Face-style streaming text datasets, tokenization, dynamic padding, and language-modeling collation.

The Spiral difference is that these behaviors are represented as relation planning and execution contracts. The engine should be able to explain the data path, replay it, resume it, and share the same physical ArrayRef data plane used by ordinary query execution.

Reference workflows:

Last updated on