Skip to Content

Collections

Collections are not yet available.

SpiralDB stores your data as a tree of collections — repeated groups of records, like JSON arrays of objects, but backed by columnar storage. Each collection is physically a separate sorted table. Queries across collections are sort-merge joins on shared key prefixes.

Schema

Here is a dataset of scene recordings for world modeling, where each scene contains clips from different cameras, and each clip contains video frames, object detections, and audio as sibling collections:

from spiral import Spiral sp = Spiral() ds = sp.dataset("recordings") print(ds.schema)
recordings ├── scenes {_id: utf8} │ ├── location: utf8 │ ├── clips {_id: utf8} │ │ ├── camera: utf8 │ │ ├── start_time: f64 │ │ ├── duration: f64 │ │ ├── frames [] one per video frame (e.g. 30fps) │ │ │ ├── rgb: tensor<u8>[H, W, 3] │ │ │ └── depth: tensor<f32>[H, W] │ │ ├── detections [] one per detected object (variable rate) │ │ │ ├── label: utf8 │ │ │ ├── confidence: f32 │ │ │ └── ts: f64 │ │ └── audio [] one per chunk (e.g. 512 samples at 16kHz) │ │ ├── pcm: tensor<i16>[512] │ │ └── level_db: f32 │ └── metadata: struct │ ├── weather: utf8 │ └── lighting: utf8 └── models {_id: utf8} ├── name: utf8 └── config: struct ├── lr: f64 └── batch_size: i32

Three kinds of things appear in this tree:

  • Collections (scenes, clips, frames, detections, audio, models). Records you can iterate over. Each has its own key — either keyed ({_id: type} — upserts match on _id) or positional ([] — rows are ordered, with a virtual _pos field, u64, starting at 0).

  • Structs (metadata, config). Single nested objects — one per parent record. Dot access navigates through them. They don’t introduce new rows.

  • Scalars (location, duration, rgb, confidence). Leaf values.

Note that frames, detections, and audio are sibling collections within clips. They have independent row counts — a 30-second clip might have 900 frames, 12 detections, and 960 audio chunks.

Physical Model

Each collection is stored as its own sorted columnar table. A child table’s sort key is always a prefix extension of its parent’s key:

scenes: sorted by (_id) clips: sorted by (scenes._id, _id) frames: sorted by (scenes._id, clips._id, _pos) detections: sorted by (scenes._id, clips._id, _pos) audio: sorted by (scenes._id, clips._id, _pos)

Because every parent-child pair shares a key prefix, joins between them are sort-merge joins — the cheapest possible join. When two streams have identical key sets (not just the same key schema, but the same actual rows), the join degenerates to a zip: walk both streams in lockstep with no searching.

All query operations map to seven physical primitives:

PrimitiveWhat it doesEffect on keys
ScanRead a tableEstablishes the key
ZipLockstep walk of two streamsSame key
JoinSort-merge join on shared prefix (inner)Result has the longer key
GroupByStreaming group-by + aggregationShortens the key
ExplodeExpand a list column into rowsExtends the key
FilterRemove rows matching a predicateSame key, fewer rows
ResampleStreaming outer merge via monotonic binningReplaces the key

Dot access walks the tree and returns a lazy reference. Nothing is read until you scan.

scenes = ds.scenes clips = scenes.clips frames = clips.frames detections = clips.detections audio = clips.audio frames.rgb # <Expr recordings/scenes/clips/frames/rgb> scenes.metadata.weather # <Expr recordings/scenes/metadata/weather>

Save any part of the tree to a variable and keep navigating. Path variables make queries readable — especially when referencing ancestors deep in the tree.

Expression Levels

Every expression has a level — the collection whose key columns define its row domain. The level is determined by the expression’s source and is always unambiguous.

Field references have the level of their collection:

scenes.location # level: scenes — one value per scene clips.duration # level: clips — one value per clip detections.confidence # level: detections — one value per detection

Scalar operations inherit the deepest level of their operands:

clips.duration * 2 # level: clips detections.confidence * 100 # level: detections detections.ts - clips.start_time # level: detections (clips.start_time broadcasts)

Reductions move one level up (to the immediate parent):

clips.duration.sum() # level: scenes (clips → scenes) detections.confidence.mean() # level: clips (detections → clips) detections.confidence.mean().sum() # level: scenes (detections → clips → scenes)

These rules mean you can always determine an expression’s level by inspection, without knowing the scan context.

Scanning

sp.scan() reads data and produces rows. Pass one or more expressions — they must all lie on the same ancestor path (no siblings or unrelated branches). Ancestor expressions are broadcast to the scan level automatically.

sp.scan(clips.duration, clips.camera).to_arrow()
┌──────────┬────────┐ │ duration │ camera │ ├──────────┼────────┤ │ 30.0 │ front │ │ 15.0 │ front │ │ 20.0 │ rear │ └──────────┴────────┘

The scan level is the deepest result level among all expressions. Shallower expressions are broadcast automatically.

# Scan level = clips (deepest result level) sp.scan( clips.duration, # level: clips scenes.location, # level: scenes → broadcasts to clips detections.confidence.mean(), # level: clips (reduced from detections) ).to_arrow()
┌──────────┬──────────┬─────────────────┐ │ duration │ location │ confidence_mean │ ├──────────┼──────────┼─────────────────┤ │ 30.0 │ downtown │ 0.85 │ │ 15.0 │ downtown │ 0.92 │ │ 20.0 │ highway │ 0.78 │ └──────────┴──────────┴─────────────────┘

Projection forms. Several shorthand forms work inside sp.scan():

# Bare collection — all scalar and struct fields from that level sp.scan(frames, clips.duration) # Select/exclude sp.scan(frames.select(exclude=['depth']), clips.duration) # Dict — keys become output column names, nested dicts become structs sp.scan({ 'image': frames.rgb, 'meta': { 'dur': clips.duration, 'loc': scenes.location, }, })

Implicit Broadcasting

When an ancestor expression appears in a scan at a deeper level, its values are automatically repeated for every child row. This is the only implicit operation — it is always unambiguous because each child row has exactly one parent.

sp.scan( frames.rgb, # level: frames clips.duration, # level: clips → broadcast to frames scenes.location, # level: scenes → broadcast to frames ).to_arrow()
┌─────┬──────────┬──────────┐ │ rgb │ duration │ location │ ├─────┼──────────┼──────────┤ │ f0 │ 30.0 │ downtown │ │ f1 │ 30.0 │ downtown │ │ f2 │ 15.0 │ downtown │ │ f3 │ 20.0 │ highway │ └─────┴──────────┴──────────┘

Physically, this is a sort-merge join between the frames table and the clips/scenes tables on their shared key prefix.

Siblings and unrelated collections are errors:

# ✗ frames and detections are siblings — neither is an ancestor of the other sp.scan(frames.rgb, detections.label) # LevelError: cannot combine 'frames' and 'detections' — they are siblings # ✗ models is on a different branch entirely sp.scan(clips.duration, ds.models.name) # LevelError: 'clips' and 'models' share no ancestor path

Reduction

Referencing a child collection’s fields from a parent level requires an explicit reduction — you must specify how many-to-one values collapse. Reduction methods move the expression one level up to the immediate parent.

# At scenes level: one value per scene clips.duration.sum() # total recording time per scene clips.duration.mean() # average clip duration per scene clips.duration.list() # list of clip durations per scene clips.duration.count() # number of clips per scene clips.duration.first() # first clip's duration per scene clips.duration.any(lambda d: d > 60) # any clip longer than 60s? clips.duration.agg(my_func) # custom aggregation

Reductions compose — each step moves one level up:

detections.confidence.mean() # level: clips (mean per clip) detections.confidence.mean().sum() # level: scenes (sum of per-clip means)

Multi-level reduction with .per(). By default, reduction targets the immediate parent. To skip intermediate levels, use .per(target):

# Direct mean of ALL detections per scene (not per-clip, then per-scene): detections.confidence.per(scenes).mean() # level: scenes # Per-clip means, collected as a list per scene: detections.confidence.mean().list() # level: scenes — list of per-clip means # Compare: mean of all detections per scene vs mean of per-clip means detections.confidence.per(scenes).mean() # grand mean (correct for uniform weighting) detections.confidence.mean().mean() # mean of means (not the same thing!)

.per(target) works on any reduction method:

detections.confidence.sum(per=scenes) # sum all confidences per scene detections.confidence.list(per=scenes) # flat list per scene detections.confidence.count(per=scenes) # total detection count per scene

Filtering

Filter narrows a collection’s row domain without changing its level. It is a first-class operation on collections.

.where() on a collection returns a filtered collection that can be reused:

long_clips = clips.where(clips.duration > 10) # Use the filtered collection in scans: sp.scan(long_clips.duration, scenes.location).to_arrow()
┌──────────┬──────────┐ │ duration │ location │ ├──────────┼──────────┤ │ 30.0 │ downtown │ │ 15.0 │ downtown │ │ 20.0 │ highway │ └──────────┴──────────┘

where= in a scan is a convenience for inline filtering:

sp.scan( clips.duration, scenes.location, where=clips.duration > 10, ).to_arrow()

Ancestor predicates are valid in filters. The optimizer pushes them down into the ancestor’s table scan:

sp.scan( clips.duration, where=scenes.location == "downtown", ) # Physical: Filter(Scan(scenes), location=="downtown") → Join(Scan(clips))

Filters affect reductions. When a filtered collection is aggregated, only the matching rows participate:

high_conf = detections.where(detections.confidence > 0.9) # Count only high-confidence detections per clip: sp.scan(clips.camera, high_conf.confidence.count())

Filter placement is an optimizer concern. The user specifies what to filter; the system decides where in the plan to apply it. When two streams have identical row domains, the join becomes a zip (lockstep walk, no searching). The optimizer weighs this against the benefit of pushing filters down to reduce data volume.

Lists and Collections

Lists and child collections are the same abstraction viewed from different levels.

From the clips level, detections is conceptually a list — for each clip, there is a variable-length sequence of detection rows. Reducing that list is the same as a streaming GroupBy over the detections table:

# These are equivalent: detections.confidence.sum() # streaming: one pass, constant memory detections.confidence.list().agg(lambda xs: sum(xs)) # materializes the list, then reduces

Both produce one scalar per clip. The first streams; the second materializes the list then applies a function. A smart optimizer converts the second into the first when possible.

Going the other direction, if a table has a list-typed column, you can explode it to create a child level:

# Suppose scenes has a tags: list<utf8> column scenes.tags # level: scenes, type: list<utf8> scenes.tags.len() # level: scenes, type: u64 scenes.tags.explode() # level: (_id, $idx) — one row per tag

.explode() is the inverse of .list(). A list column IS a pre-materialized child collection.

This duality means the same methods work on both child collection fields and list columns:

# Child collection field at parent level: detections.confidence.sum() # sum of confidences per clip detections.confidence.list() # list of confidences per clip detections.confidence.count() # number of detections per clip # List column in the schema: scenes.tags.sum() # (if numeric) sum of elements scenes.tags.list() # identity — it's already a list scenes.tags.count() # number of elements

Window Functions

A window function computes a value for each row using its neighboring rows within the same parent group. It is a reduction that preserves the input’s structure — the result stays at the same level as the input.

# Rolling 3-frame window: for each frame, collect [frame-2, frame-1, frame] frames.rgb.rolling(before=2, agg="list") # list is default aggregation for rolling windows # level: frames, type: list<binary> # Rolling mean over a 5-detection window: detections.confidence.rolling(before=2, after=2, agg="mean") # level: detections, type: f32

Windows never cross parent boundaries. A rolling window on frames is implicitly partitioned by clips — frame 0 of clip c2 never includes frames from clip c1. This is because a window function is fundamentally a reduction grouped by the parent level:

  1. Group frames by parent clip
  2. Within each group, compute a sliding aggregate
  3. Result: one value per frame (same cardinality as input)

Internally, this is equivalent to:

# Conceptual decomposition (not how you write it): frames.rgb.per(clips).rolling(before=2).explode()

The system optimizes this as a streaming window — no materialization of intermediate lists.

The agg parameter accepts the same functions as reduction: "list", "sum", "mean", "first", "count", or a callable.

Aligning Siblings

frames, detections, and audio are siblings — they share a parent (clips) but have independent row domains. To combine them in a single scan, you need to align them to a shared index.

Timelines

A timeline is a scalar function that returns a list of time steps. It takes parameters (frame rate, duration) and produces a list per parent row:

from spiral import expressions as se # For each clip, generate time steps at 10fps over its duration tl = se.timeline(fps=10, stop=clips.duration) # level: clips, type: list<struct{step: u64, time: f64}>

This is an ordinary scalar expression — no special primitive. Exploding it creates a child level:

tl.explode() # level: (scenes._id, clips._id, tl_step) — one row per time step per clip

Timelines are compared by object identity. Two expressions that resample to the same timeline object produce identical row domains and can be zipped together. Different timeline objects — even with identical parameters — are treated as incompatible.

Resampling

.resample() aligns a source collection to a timeline. It is a physical primitive — a streaming outer merge driven by a monotonic binning function.

The on expression maps each source row to a position on the timeline’s time axis. Because both the source and the timeline are sorted by the same key prefix, and on increases monotonically with the sort order, the bin assignment is a streaming walk — both streams advance forward together with no random access.

Unlike every other join in the model (which are inner — every child has a parent, every zip row has a match), Resample uses outer semantics: the timeline drives the output domain. Timeline steps with no matching source rows still appear in the output as nulls (or are forward-filled, depending on method). This is essential for sibling alignment — both sides of a zip must have exactly the same rows.

The two parameters:

  • on — an expression that maps each source row to a position on the timeline’s time axis. For frames at 30fps, frames._pos / 30 converts frame index to seconds. For detections with timestamps, detections.ts - clips.start_time gives the offset from clip start.

  • method — the aggregation applied within each timeline step:

    • "first" — take the first matching value
    • "ffill" — forward-fill from the most recent preceding value
    • "list" — collect all matching values into a list
    • "mean", "sum", etc. — numeric aggregations
    • A callable — custom aggregation function (e.g. se.audio.concat)
scenes = ds.scenes clips = scenes.clips frames = clips.frames detections = clips.detections audio = clips.audio tl = se.timeline(fps=10, stop=clips.duration) # Resample video frames (30fps) → 10fps resampled_rgb = frames.rgb.resample(tl, on=frames._pos / 30, method="first") # Resample detections (variable rate) → 10fps resampled_dets = detections.resample( tl, on=detections.ts - clips.start_time, method="list", ) # Resample audio (e.g. 16kHz samples) → 10fps, concatenating chunks resampled_audio = audio.pcm.resample( tl, on=audio._pos * 512 / 16000, method=se.audio.concat, ) # All three share the same tl → zip sp.scan( resampled_rgb, resampled_dets, resampled_audio, clips._id, ).to_arrow()
┌─────┬─────┬──────────────────────────────────┬───────┐ │ _id │ rgb │ detections │ audio │ ├─────┼─────┼──────────────────────────────────┼───────┤ │ c1 │ f0 │ [{label: "car", conf: 0.9}] │ a0 │ │ c1 │ f1 │ [] │ a1 │ │ c2 │ f2 │ [{label: "person", conf: 0.85}] │ a2 │ │ c3 │ f3 │ [] │ a3 │ └─────┴─────┴──────────────────────────────────┴───────┘

Row Selection

The [] operator steps into a collection’s key prefix, pinning one or more leading key values. Each index removes the leftmost unbound key dimension from the row domain — you are walking into the tree, not filtering arbitrary rows.

ds.scenes["s1"] # pin scenes._id = "s1" ds.scenes["s1"].clips["c1"] # pin scenes._id = "s1", clips._id = "c1" ds.scenes["s1"].clips["c1"].frames[0] # fully specified — one frame

Because the data is sorted by key prefix, stepping into a prefix is a seek, not a scan. All downstream expressions and joins see the narrowed domain:

s1_clips = ds.scenes["s1"].clips sp.scan(s1_clips.duration, scenes.location).to_arrow()
┌──────────┬──────────┐ │ duration │ location │ ├──────────┼──────────┤ │ 30.0 │ downtown │ │ 15.0 │ downtown │ └──────────┴──────────┘

When every collection in the path is scalar-indexed, you have a fully-specified path to a single value:

>>> ds.scenes["s1"].location.to_py() "downtown" >>> ds.scenes["s1"].clips["c1"].duration.to_py() 30.0

Selection forms

Scalar index — pins one key value at one level:

ds.scenes["s1"] # one scene ds.scenes["s1"].clips["c1"] # one clip within one scene ds.scenes["s1"].clips["c1"].frames[0] # one frame (by _pos)

Array of keys — selects specific rows (must be sorted):

ds.scenes[["s1", "s2", "s5"]] # level: scenes, row domain: 3 scenes

Key table — multi-level row selection. Each column corresponds to a level in the hierarchy, rows correspond element-wise:

import pyarrow as pa ds.scenes.clips[pa.table({ "scenes": ["s1", "s1", "s2"], "scenes.clips": ["c1", "c2", "c3"], })] # level: clips, row domain: 3 specific clips

Key tables are useful for sampling, precomputed batch indices, or feeding an externally-computed selection into a training loop.

Execution Model

Physical Storage

Each collection is a separate sorted table. Conceptually, child tables store parent keys for joining:

scenes — sorted by (_id)

┌─────┬──────────┐ │ _id │ location │ ├─────┼──────────┤ │ s1 │ downtown │ │ s2 │ highway │ └─────┴──────────┘

clips — sorted by (scenes._id, _id)

┌────────────┬─────┬──────────┐ │ scenes._id │ _id │ duration │ ├────────────┼─────┼──────────┤ │ s1 │ c1 │ 30.0 │ │ s1 │ c2 │ 15.0 │ │ s2 │ c3 │ 20.0 │ └────────────┴─────┴──────────┘

frames — sorted by (scenes._id, clips._id, _pos)

┌────────────┬───────────┬─────┬───────┐ │ scenes._id │ clips._id │ rgb │ depth │ ├────────────┼───────────┼─────┼───────┤ │ s1 │ c1 │ f0 │ d0 │ │ s1 │ c1 │ f1 │ d1 │ │ s1 │ c2 │ f2 │ d2 │ │ s2 │ c3 │ f3 │ d3 │ └────────────┴───────────┴─────┴───────┘

In practice, SpiralDB maintains efficient indexes to optimize cross-level joins, and the Vortex columnar format makes broadcasting a zero-copy operation. But the logical model above is equivalent.

Execution Plan

Consider the same resampling query from the previous section, but with a filter on detections:

ped_dets = detections.where(detections.label == "pedestrian") sp.scan( frames.rgb.resample(tl, on=frames._pos / 30, method="first"), ped_dets.resample(tl, on=ped_dets.ts - clips.start_time, method="list"), clips._id, clips.duration, ).to_arrow()

Each node below is a streaming operator; data flows top-to-bottom from table scans to the final output:

Scan(scenes) Scan(clips) Scan(frames) Scan(detections) │ │ │ │ └─── Join ────┘ │ Filter(label=="pedestrian") │ │ │ │ Resample(tl, Resample(tl, │ method=first) method=list) │ │ │ │ └──────── Zip ───────────┘ │ │ └─────────── Join ──────────────┘ Output

Four table scans feed into the plan. The filter on label is pushed down before resampling. Each Resample is a streaming outer merge — it walks the source and the timeline forward together, binning source rows into timeline steps via the monotonic on expression. Because both sides use the same tl object, the output domains are identical and the combine is a zip (lockstep walk, no searching). The ancestor fields (scenes, clips) are joined via sort-merge on the shared key prefix.

Filter placement is an optimizer choice. The filter could instead be applied after the zip — this preserves the cheap lockstep walk but processes more rows through the resample. Pushing the filter before the zip reduces data volume but may break the row-domain identity, turning the zip into a join. The optimizer weighs filter selectivity against join cost.

Sideways information passing. When the filter is pushed to the detections scan, it can also accelerate the frames scan. Both streams are sorted by the same key prefix (scenes._id, clips._id). As the filtered detections stream skips past clips with no matching detections, it can pass updated key bounds to the frames stream, allowing it to seek forward rather than scan through frames that will be discarded after the zip. This cross-operator communication — where one scan’s progress informs another’s seek position — is known as sideways information passing.

Rules Summary

ExpressionBehaviorPhysical operation
Same-level fieldDirect column readScan
Ancestor fieldImplicit broadcastSort-merge join on prefix
Descendant field (no reduction)Error
Descendant field + .sum() etc.Explicit reductionStreaming GroupBy
Sibling field (no alignment)Error
Sibling field + .resample(tl)Temporal alignmentResample (outer) + Zip
.where(pred)FilterFilter (optimizer places it)
.rolling(before=, agg=)Window functionStreaming window within parent groups
.explode()List → child rowsExplode

Writing Data

All writes are upserts. Writing to an existing key replaces the record; writing to a new key creates it.

Prefer batch writes for best performance — columnar storage is most efficient when writing many records at once.

Single records — setitem with a dict:

ds.scenes["s3"] = { "location": "park", "metadata": {"weather": "sunny", "lighting": "natural"}, "clips": [ { "_id": "c10", "camera": "front", "start_time": 0.0, "duration": 60.0, "frames": [{"rgb": b"f10", "depth": b"d10"}], "detections": [{"label": "car", "confidence": 0.9, "ts": 0.5}], "audio": [{"pcm": b"a10", "level_db": -20.0}], }, ], }

Nested records:

ds.scenes["s1"].clips["c1"].frames[0] = { "rgb": b"f0_v2", "depth": b"d0_v2", }

Scalars — when the path is fully indexed:

ds.scenes["s1"].location = "downtown_v2" ds.scenes["s1"].clips["c1"].duration = 45.0

Batch writes — setitem with an Arrow table or list of dicts:

import pyarrow as pa ds.scenes["s1"].clips["c1"].frames = pa.table({ "rgb": [b"f20", b"f21", b"f22"], "depth": [b"d20", b"d21", b"d22"], })

Note that positional collections (like frames) do not support upserting individual rows by index. You can only replace the entire collection at once.

Scoped writes — the client batches them for you:

with ds.scenes.write() as w: for scene in scene_dicts: w[scene['_id']] = scene # all writes flushed as a single batch on exit

Deleting Data

del ds.scenes["s3"] del ds.scenes["s1"].clips["c2"] del ds.scenes["s1"].clips["c1"].frames[0] # Bulk delete del ds.scenes[["s1", "s2", "s5"]]
Last updated on