Skip to Content
CollectionsOverview

Collections

Collections are not yet available. Please contact us if you are interested.

SpiralDB stores your data as a tree of collections — like JSON arrays of objects, but backed by columnar storage. Each collection is physically a separate sorted table. Queries across collections are sort-merge joins — the cheapest possible join!

Schema

Here is a dataset of scene recordings for world modeling, where each scene contains clips from different cameras, and each clip contains video frames, object detections, and audio:

from spiral import Spiral sp = Spiral() ds = sp.dataset("recordings") print(ds.schema)
recordings ├── scenes {_id: utf8} │ ├── location: utf8 │ ├── clips {_id: utf8} │ │ ├── camera: utf8 │ │ ├── start_time: f64 │ │ ├── duration: f64 │ │ ├── frames [] one per video frame (e.g. 30fps) │ │ │ ├── rgb: tensor<u8>[H, W, 3] │ │ │ └── depth: tensor<f32>[H, W] │ │ ├── detections [] one per detected object (variable rate) │ │ │ ├── label: utf8 │ │ │ ├── confidence: f32 │ │ │ └── ts: f64 │ │ └── audio [] one per chunk (e.g. 512 samples at 16kHz) │ │ ├── pcm: tensor<i16>[512] │ │ └── level_db: f32 │ └── metadata: struct │ ├── weather: utf8 │ └── lighting: utf8 └── models {_id: utf8} ├── name: utf8 └── config: struct ├── lr: f64 └── batch_size: i32

Three kinds of things appear in this tree:

  • Collections (scenes, clips, frames, detections, audio, models). Records you can iterate over. Each has its own key — either keyed ({_id: type} — upserts match on _id) or positional ([] — rows are ordered, with a virtual _pos field, u64, starting at 0).

  • Structs (metadata, config). Single nested objects — one per parent record. Dot access navigates through them. They don’t introduce new rows.

  • Scalars (location, duration, rgb, confidence). Leaf values.

Note that frames, detections, and audio are sibling collections within clips. They have independent row counts — a 30-second clip might have 900 frames, 12 detections, and 960 audio chunks.

Physical Model

Each collection is stored as its own sorted columnar table. A child table’s sort key is always a prefix extension of its parent’s key:

scenes: sorted by (scenes_id) clips: sorted by (scenes_id, clips_id) frames: sorted by (scenes_id, clips_id, frames_pos) detections: sorted by (scenes_id, clips_id, detections_pos) audio: sorted by (scenes_id, clips_id, audio_pos)

Because every parent-child pair shares a key prefix, joins between them are sort-merge joins. When two streams have identical key sets (not just the same key schema, but the same actual rows), the join degenerates to a zip: walk both streams in lockstep with no searching.

Dot access walks the tree and returns a lazy reference. Nothing is read until you scan.

scenes = ds.scenes clips = scenes.clips frames = clips.frames detections = clips.detections audio = clips.audio frames.rgb # references recordings/scenes/clips/frames/rgb scenes.metadata.weather # references recordings/scenes/metadata/weather

Save any part of the tree to a variable and keep navigating. Path variables make queries readable — especially when referencing ancestors deep in the tree.

Expression Levels

Every expression has a level — the collection whose key columns define its row domain. The level is determined by the expression’s source and is always unambiguous.

Field references have the level of their collection:

scenes.location # level: scenes — one value per scene clips.duration # level: clips — one value per clip detections.confidence # level: detections — one value per detection

Scalar operations inherit the deepest level of their operands:

clips.duration * 2 # level: clips detections.confidence * 100 # level: detections detections.ts - clips.start_time # level: detections (clips.start_time broadcasts)

Reductions move one level up (to the immediate parent):

clips.duration.sum() # level: scenes (clips → scenes) detections.confidence.mean() # level: clips (detections → clips) detections.confidence.mean().sum() # level: scenes (detections → clips → scenes)

Scanning

ds.scan() reads data and produces rows. Pass one or more expressions — they must all lie on the same ancestor path (no siblings or unrelated branches). Ancestor expressions are broadcast to the scan level automatically.

ds.scan(duration=clips.duration, camera=clips.camera).to_arrow()
┌──────────┬────────┐ │ duration │ camera │ ├──────────┼────────┤ │ 30.0 │ front │ │ 15.0 │ front │ │ 20.0 │ rear │ └──────────┴────────┘

The scan level is the deepest result level among all expressions. Shallower expressions are broadcast automatically.

# Scan level = clips (deepest result level) ds.scan( duration=clips.duration, # level: clips location=scenes.location, # level: scenes → broadcasts to clips confidence_mean=detections.confidence.mean(), # level: clips (reduced from detections) ).to_arrow()
┌──────────┬──────────┬─────────────────┐ │ duration │ location │ confidence_mean │ ├──────────┼──────────┼─────────────────┤ │ 30.0 │ downtown │ 0.85 │ │ 15.0 │ downtown │ 0.92 │ │ 20.0 │ highway │ 0.78 │ └──────────┴──────────┴─────────────────┘

Projection forms. Several shorthand forms work inside ds.scan():

# Bare collections ds.scan(frames) # Kwargs — keys become output column names ds.scan(rgb=frames.rgb, dur=clips.duration) # Dict — keys become output column names, nested dicts become structs ds.scan({ 'image': frames.rgb, 'meta': { 'dur': clips.duration, 'loc': scenes.location, }, })

Implicit Broadcasting

When an ancestor expression appears in a scan at a deeper level, its values are automatically repeated for every child row. This is the only implicit operation — it is always unambiguous because each child row has exactly one parent.

ds.scan( rgb=frames.rgb, # level: frames duration=clips.duration, # level: clips → broadcast to frames location=scenes.location, # level: scenes → broadcast to frames ).to_arrow()
┌─────┬──────────┬──────────┐ │ rgb │ duration │ location │ ├─────┼──────────┼──────────┤ │ f0 │ 30.0 │ downtown │ │ f1 │ 30.0 │ downtown │ │ f2 │ 15.0 │ downtown │ │ f3 │ 20.0 │ highway │ └─────┴──────────┴──────────┘

Physically, this is a sort-merge join between the frames table and the clips/scenes tables on their shared key prefix.

Siblings and unrelated collections are errors:

# ✗ frames and detections are siblings — neither is an ancestor of the other ds.scan(rgb=frames.rgb, label=detections.label) # Error: cannot combine 'frames' and 'detections' — they are siblings # ✗ models is on a different branch entirely ds.scan(dur=clips.duration, name=ds.models.name) # Error: 'clips' and 'models' share no ancestor path

Siblings must be explicitly aligned to be combined in the same scan — see Aligning Siblings below.

Reduction

Referencing a child collection’s fields from a parent level requires an explicit reduction — you must specify how many-to-one values collapse. Reduction methods move the expression one level up to the immediate parent.

# At scenes level: one value per scene clips.duration.sum() # total recording time per scene clips.duration.mean() # average clip duration per scene clips.duration.list() # list of clip durations per scene clips.duration.count() # number of clips per scene clips.duration.min() # shortest clip per scene clips.duration.max() # longest clip per scene clips.duration.first() # first clip's duration per scene clips.duration.last() # last clip's duration per scene

Reductions compose — each step moves one level up:

detections.confidence.mean() # level: clips (mean per clip) detections.confidence.mean().sum() # level: scenes (sum of per-clip means)

Filtering

Filter narrows a collection’s row domain without changing its level. It is a first-class operation on collections.

.where() on a collection returns a filtered collection that can be reused:

from spiral import _ # `_` refers to the collection being filtered: long_clips = clips.where(_.duration > 10) # Use the filtered collection in scans: ds.scan(duration=long_clips.duration).to_arrow()
┌──────────┐ │ duration │ ├──────────┤ │ 30.0 │ │ 15.0 │ │ 20.0 │ └──────────┘

_ supports arithmetic, comparisons, and boolean operators (&, |, ~):

from spiral import _ # Compound predicates: high_conf_cars = detections.where((_.confidence > 0.8) & (_.label == "car")) cars_or_peds = detections.where((_.label == "car") | (_.label == "pedestrian")) not_bicycles = detections.where(~(_.label == "bicycle"))

Filters affect reductions. When a filtered collection is aggregated, only the matching rows participate:

high_conf = detections.where(_.confidence > 0.9) # Count only high-confidence detections per clip: ds.scan(cnt=high_conf.count())

Filter placement is an optimizer concern. The user specifies what to filter; the system decides where in the plan to apply it. When two streams have identical row domains, the join becomes a zip (lockstep walk, no searching). The optimizer weighs this against the benefit of pushing filters down to reduce data volume.

Lists and Collections

Lists and child collections are the same abstraction viewed from different levels.

From the clips level, detections is conceptually a list — for each clip, there is a variable-length sequence of detection rows. Reducing that list is the same as a streaming GroupBy over the detections collection:

detections.confidence.sum() # streaming: one pass, constant memory detections.confidence.list() # materializes all values as a list per clip

Both produce one value per clip. The first streams and reduces in a single pass; the second materializes the list for later processing. Use .sum(), .mean(), etc. when you want a scalar; use .list() when you need the raw values.

Going the other direction, if a table has a list-typed column, you can explode it to create a child level:

# Suppose scenes has a tags: list<utf8> column scenes.tags # level: scenes, type: list<utf8> scenes.tags.len() # level: scenes, type: u64 scenes.tags.explode() # level: (_id, $idx) — one row per tag

.explode() is the inverse of .list(). A list column IS a pre-materialized child collection.

This duality means the same methods work on both child collection fields and list columns:

# Child collection field at parent level: detections.confidence.sum() # sum of confidences per clip detections.confidence.list() # list of confidences per clip detections.confidence.count() # number of detections per clip # List column in the schema: scenes.tags.sum() # (if numeric) sum of elements scenes.tags.list() # identity — it's already a list scenes.tags.count() # number of elements

Window Functions

A window function computes a value for each row using its neighboring rows within the same parent group. It is a reduction that preserves the input’s structure — the result stays at the same level as the input.

# Rolling 3-frame window: for each frame, collect [frame-2, frame-1, frame] frames.rgb.window(frame=(-2, 0)).list() # level: frames, type: list<binary> # Rolling mean over a 5-detection window: detections.confidence.window(frame=(-2, 2)).mean() # level: detections, type: f32

Windows never cross parent boundaries. A rolling window on frames is implicitly partitioned by clips — frame 0 of clip c2 never includes frames from clip c1. This is because a window function is fundamentally a reduction grouped by the parent level:

  1. Group frames by parent clip
  2. Within each group, compute a sliding aggregate
  3. Result: one value per frame (same cardinality as input)

The system optimizes this as a streaming window — no materialization of intermediate lists.

Aligning Siblings

frames, detections, and audio are siblings — they share a parent (clips) but have independent row domains. To combine them in a single scan, you need to align them to a shared index.

Resampling

.resample(by, target) is a bucket-join: each row in self is assigned to a row in target by evaluating the by expression, which must produce a value in target’s positional key space (i.e. a valid _pos index into target).

Because both self and target are sorted by the same key prefix, and by must be monotonically non-decreasing, the assignment is a streaming walk — both sides advance forward together with no random access.

Multiple source rows may land in the same bucket, so resample pushes a virtual _pos component onto the result key. Chain a reduction (.mean(), .count(), .list(), etc.) to collapse back to target’s key level.

The two parameters:

  • by — an expression evaluated over self that maps each source row to a target position. Must be monotonically non-decreasing.
  • target — a sibling collection (same parent as self) that defines the output row domain.
clips = ds.scenes.clips detections = clips.detections frames = clips.frames # Convert detection timestamp (seconds) → frame index at clip fps. # At 10fps: ts=0.05 → frame 0, ts=0.15 → frame 1. frame_idx = (detections.ts * clips.fps).floor() resampled = detections.resample(by=frame_idx, target=frames) # resampled key: (scenes_id, clips_id, frames_pos, _pos) # Chain reductions to get back to frames level. ds.scan( rgb = frames.rgb, avg_conf = resampled.confidence.mean(), det_count = resampled.count(), ).to_arrow()
┌─────┬─────┬──────────┬───────────┐ │ _id │ rgb │ avg_conf │ det_count │ ├─────┼─────┼──────────┼───────────┤ │ c1 │ f0 │ 0.90 │ 1 │ │ c1 │ f1 │ 0.85 │ 1 │ │ c2 │ f2 │ 0.70 │ 1 │ │ c3 │ f3 │ 0.775 │ 2 │ └─────┴─────┴──────────┴───────────┘

Row Selection

The [] operator steps into a collection’s key prefix, pinning one or more leading key values. Each index removes the leftmost unbound key dimension from the row domain — you are walking into the tree, not filtering arbitrary rows.

ds.scenes["s1"] # pin scenes._id = "s1" ds.scenes["s1"].clips["c1"] # pin scenes._id = "s1", clips._id = "c1" ds.scenes["s1"].clips["c1"].frames[0] # fully specified — one frame

Because the data is sorted by key prefix, stepping into a prefix is a seek, not a scan. All downstream expressions and joins see the narrowed domain:

s1_clips = ds.scenes["s1"].clips ds.scan(duration=s1_clips.duration, location=scenes.location).to_arrow()
┌──────────┬──────────┐ │ duration │ location │ ├──────────┼──────────┤ │ 30.0 │ downtown │ │ 15.0 │ downtown │ └──────────┴──────────┘

When every collection in the path is scalar-indexed, you have a fully-specified path to a single value:

>>> ds.scenes["s1"].location.to_py() "downtown" >>> ds.scenes["s1"].clips["c1"].duration.to_py() 30.0

Selection forms

Scalar index — pins one key value at one level:

ds.scenes["s1"] # one scene ds.scenes["s1"].clips["c1"] # one clip within one scene ds.scenes["s1"].clips["c1"].frames[0] # one frame (by _pos)

Array of keys — selects specific rows (must be sorted):

ds.scenes[["s1", "s2", "s5"]] # level: scenes, row domain: 3 scenes

Key table — multi-level row selection. Each column corresponds to a level in the hierarchy, rows correspond element-wise:

import pyarrow as pa ds.scenes.clips[pa.table({ "scenes": ["s1", "s1", "s2"], "scenes.clips": ["c1", "c2", "c3"], })] # level: clips, row domain: 3 specific clips

Key tables are useful for sampling, precomputed batch indices, or feeding an externally-computed selection into a training loop.

Execution Model

Physical Storage

Each collection is a separate sorted table. Conceptually, child tables store parent keys for joining:

scenes — sorted by (scenes_id)

┌───────────┬──────────┐ │ scenes_id │ location │ ├───────────┼──────────┤ │ s1 │ downtown │ │ s2 │ highway │ └───────────┴──────────┘

clips — sorted by (scenes_id, clips_id)

┌───────────┬──────────┬──────────┐ │ scenes_id │ clips_id │ duration │ ├───────────┼──────────┼──────────┤ │ s1 │ c1 │ 30.0 │ │ s1 │ c2 │ 15.0 │ │ s2 │ c3 │ 20.0 │ └───────────┴──────────┴──────────┘

frames — sorted by (scenes_id, clips_id, frames_pos)

┌───────────┬──────────┬────────────┬─────┬───────┐ │ scenes_id │ clips_id │ frames_pos │ rgb │ depth │ ├───────────┼──────────┼────────────┼─────┼───────┤ │ s1 │ c1 │ 0 │ f0 │ d0 │ │ s1 │ c1 │ 1 │ f1 │ d1 │ │ s1 │ c2 │ 0 │ f2 │ d2 │ │ s2 │ c3 │ 0 │ f3 │ d3 │ └───────────┴──────────┴────────────┴─────┴───────┘

In practice, SpiralDB maintains efficient indexes to optimize cross-level joins, and the Vortex columnar format makes broadcasting a zero-copy operation. But the logical model above is equivalent.

Execution Plan

Consider the same resampling query from the previous section, but with a filter on detections:

dts = detections.where(detections.label == "pedestrian") ds.scan( frames.rgb, dts.resample(by=(dts.ts * clips.fps).floor(), target=frames).list(), clips.id, clips.duration, ).to_arrow()

Each node below is a streaming operator; data flows top-to-bottom from table scans to the final output:

Scan(scenes) Scan(clips) Scan(frames) Scan(detections) │ │ │ │ └─── Join ────┘ │ Filter(label=="pedestrian") │ │ │ │ │ Resample(agg=list) │ │ │ │ └──────── Join ──────────┘ │ │ └─────────── Join ──────────────┘ Output

Four table scans feed into the plan. The filter on label is pushed down before resampling. Resample is bucket join, and enables sort-merge join with the frames branch. The ancestor fields (scenes, clips) are joined via sort-merge on the shared key prefix.

Filter placement is an optimizer choice. The filter could instead be applied after the zip — this preserves the cheap lockstep walk but processes more rows through the resample. Pushing the filter before the zip reduces data volume but may break the row-domain identity, turning the zip into a join. The optimizer weighs filter selectivity against join cost.

Sideways information passing. When the filter is pushed to the detections scan, it can also accelerate the frames scan. Both streams are sorted by the same key prefix (scenes_id, clips_id). As the filtered detections stream skips past clips with no matching detections, it can pass updated key bounds to the frames stream, allowing it to seek forward rather than scan through frames that will be discarded after the zip. This cross-operator communication — where one scan’s progress informs another’s seek position — is known as sideways information passing.

Writing Data

All writes are upserts. Writing to an existing key replaces the record; writing to a new key creates it.

Prefer batch writes for best performance — columnar storage is most efficient when writing many records at once.

Single records — setitem with a dict:

ds.scenes["s3"] = { "location": "park", "metadata": {"weather": "sunny", "lighting": "natural"}, "clips": [ { "_id": "c10", "camera": "front", "start_time": 0.0, "duration": 60.0, "frames": [{"rgb": b"f10", "depth": b"d10"}], "detections": [{"label": "car", "confidence": 0.9, "ts": 0.5}], "audio": [{"pcm": b"a10", "level_db": -20.0}], }, ], }

Scalars — when the path is fully indexed:

ds.scenes["s1"].location = "downtown_v2" ds.scenes["s1"].clips["c1"].duration = 45.0

Batch writes — setitem with an Arrow table or list of dicts:

import pyarrow as pa ds.scenes["s1"].clips["c1"].frames = pa.table({ "rgb": [b"f20", b"f21", b"f22"], "depth": [b"d20", b"d21", b"d22"], })

Note that positional collections (like frames) do not support upserting individual rows by index. You can only replace the entire collection at once.

Scoped writes — the client batches them for you:

with ds.write() as w: for scene in scene_dicts: w.scenes[scene['_id']] = scene # all writes flushed as a single batch on exit
Last updated on