Collections
Collections are not yet available.
SpiralDB stores your data as a tree of collections — repeated groups of records, like JSON arrays of objects, but backed by columnar storage. Each collection is physically a separate sorted table. Queries across collections are sort-merge joins on shared key prefixes.
Schema
Here is a dataset of scene recordings for world modeling, where each scene contains clips from different cameras, and each clip contains video frames, object detections, and audio as sibling collections:
from spiral import Spiral
sp = Spiral()
ds = sp.dataset("recordings")
print(ds.schema)recordings
├── scenes {_id: utf8}
│ ├── location: utf8
│ ├── clips {_id: utf8}
│ │ ├── camera: utf8
│ │ ├── start_time: f64
│ │ ├── duration: f64
│ │ ├── frames [] one per video frame (e.g. 30fps)
│ │ │ ├── rgb: tensor<u8>[H, W, 3]
│ │ │ └── depth: tensor<f32>[H, W]
│ │ ├── detections [] one per detected object (variable rate)
│ │ │ ├── label: utf8
│ │ │ ├── confidence: f32
│ │ │ └── ts: f64
│ │ └── audio [] one per chunk (e.g. 512 samples at 16kHz)
│ │ ├── pcm: tensor<i16>[512]
│ │ └── level_db: f32
│ └── metadata: struct
│ ├── weather: utf8
│ └── lighting: utf8
└── models {_id: utf8}
├── name: utf8
└── config: struct
├── lr: f64
└── batch_size: i32Three kinds of things appear in this tree:
-
Collections (
scenes,clips,frames,detections,audio,models). Records you can iterate over. Each has its own key — either keyed ({_id: type}— upserts match on_id) or positional ([]— rows are ordered, with a virtual_posfield,u64, starting at 0). -
Structs (
metadata,config). Single nested objects — one per parent record. Dot access navigates through them. They don’t introduce new rows. -
Scalars (
location,duration,rgb,confidence). Leaf values.
Note that frames, detections, and audio are sibling collections
within clips. They have independent row counts — a 30-second clip
might have 900 frames, 12 detections, and 960 audio chunks.
Physical Model
Each collection is stored as its own sorted columnar table. A child table’s sort key is always a prefix extension of its parent’s key:
scenes: sorted by (_id)
clips: sorted by (scenes._id, _id)
frames: sorted by (scenes._id, clips._id, _pos)
detections: sorted by (scenes._id, clips._id, _pos)
audio: sorted by (scenes._id, clips._id, _pos)Because every parent-child pair shares a key prefix, joins between them are sort-merge joins — the cheapest possible join. When two streams have identical key sets (not just the same key schema, but the same actual rows), the join degenerates to a zip: walk both streams in lockstep with no searching.
All query operations map to seven physical primitives:
| Primitive | What it does | Effect on keys |
|---|---|---|
| Scan | Read a table | Establishes the key |
| Zip | Lockstep walk of two streams | Same key |
| Join | Sort-merge join on shared prefix (inner) | Result has the longer key |
| GroupBy | Streaming group-by + aggregation | Shortens the key |
| Explode | Expand a list column into rows | Extends the key |
| Filter | Remove rows matching a predicate | Same key, fewer rows |
| Resample | Streaming outer merge via monotonic binning | Replaces the key |
Navigating the Tree
Dot access walks the tree and returns a lazy reference. Nothing is read until you scan.
scenes = ds.scenes
clips = scenes.clips
frames = clips.frames
detections = clips.detections
audio = clips.audio
frames.rgb # <Expr recordings/scenes/clips/frames/rgb>
scenes.metadata.weather # <Expr recordings/scenes/metadata/weather>Save any part of the tree to a variable and keep navigating. Path variables make queries readable — especially when referencing ancestors deep in the tree.
Expression Levels
Every expression has a level — the collection whose key columns define its row domain. The level is determined by the expression’s source and is always unambiguous.
Field references have the level of their collection:
scenes.location # level: scenes — one value per scene
clips.duration # level: clips — one value per clip
detections.confidence # level: detections — one value per detectionScalar operations inherit the deepest level of their operands:
clips.duration * 2 # level: clips
detections.confidence * 100 # level: detections
detections.ts - clips.start_time # level: detections (clips.start_time broadcasts)Reductions move one level up (to the immediate parent):
clips.duration.sum() # level: scenes (clips → scenes)
detections.confidence.mean() # level: clips (detections → clips)
detections.confidence.mean().sum() # level: scenes (detections → clips → scenes)These rules mean you can always determine an expression’s level by inspection, without knowing the scan context.
Scanning
sp.scan() reads data and produces rows. Pass one or more
expressions — they must all lie on the same ancestor path (no
siblings or unrelated branches). Ancestor expressions are
broadcast to the scan level automatically.
sp.scan(clips.duration, clips.camera).to_arrow()┌──────────┬────────┐
│ duration │ camera │
├──────────┼────────┤
│ 30.0 │ front │
│ 15.0 │ front │
│ 20.0 │ rear │
└──────────┴────────┘The scan level is the deepest result level among all expressions. Shallower expressions are broadcast automatically.
# Scan level = clips (deepest result level)
sp.scan(
clips.duration, # level: clips
scenes.location, # level: scenes → broadcasts to clips
detections.confidence.mean(), # level: clips (reduced from detections)
).to_arrow()┌──────────┬──────────┬─────────────────┐
│ duration │ location │ confidence_mean │
├──────────┼──────────┼─────────────────┤
│ 30.0 │ downtown │ 0.85 │
│ 15.0 │ downtown │ 0.92 │
│ 20.0 │ highway │ 0.78 │
└──────────┴──────────┴─────────────────┘Projection forms. Several shorthand forms work inside sp.scan():
# Bare collection — all scalar and struct fields from that level
sp.scan(frames, clips.duration)
# Select/exclude
sp.scan(frames.select(exclude=['depth']), clips.duration)
# Dict — keys become output column names, nested dicts become structs
sp.scan({
'image': frames.rgb,
'meta': {
'dur': clips.duration,
'loc': scenes.location,
},
})Implicit Broadcasting
When an ancestor expression appears in a scan at a deeper level, its values are automatically repeated for every child row. This is the only implicit operation — it is always unambiguous because each child row has exactly one parent.
sp.scan(
frames.rgb, # level: frames
clips.duration, # level: clips → broadcast to frames
scenes.location, # level: scenes → broadcast to frames
).to_arrow()┌─────┬──────────┬──────────┐
│ rgb │ duration │ location │
├─────┼──────────┼──────────┤
│ f0 │ 30.0 │ downtown │
│ f1 │ 30.0 │ downtown │
│ f2 │ 15.0 │ downtown │
│ f3 │ 20.0 │ highway │
└─────┴──────────┴──────────┘Physically, this is a sort-merge join between the frames table and the clips/scenes tables on their shared key prefix.
Siblings and unrelated collections are errors:
# ✗ frames and detections are siblings — neither is an ancestor of the other
sp.scan(frames.rgb, detections.label)
# LevelError: cannot combine 'frames' and 'detections' — they are siblings
# ✗ models is on a different branch entirely
sp.scan(clips.duration, ds.models.name)
# LevelError: 'clips' and 'models' share no ancestor pathReduction
Referencing a child collection’s fields from a parent level requires an explicit reduction — you must specify how many-to-one values collapse. Reduction methods move the expression one level up to the immediate parent.
# At scenes level: one value per scene
clips.duration.sum() # total recording time per scene
clips.duration.mean() # average clip duration per scene
clips.duration.list() # list of clip durations per scene
clips.duration.count() # number of clips per scene
clips.duration.first() # first clip's duration per scene
clips.duration.any(lambda d: d > 60) # any clip longer than 60s?
clips.duration.agg(my_func) # custom aggregationReductions compose — each step moves one level up:
detections.confidence.mean() # level: clips (mean per clip)
detections.confidence.mean().sum() # level: scenes (sum of per-clip means)Multi-level reduction with .per().
By default, reduction targets the immediate parent. To skip
intermediate levels, use .per(target):
# Direct mean of ALL detections per scene (not per-clip, then per-scene):
detections.confidence.per(scenes).mean() # level: scenes
# Per-clip means, collected as a list per scene:
detections.confidence.mean().list() # level: scenes — list of per-clip means
# Compare: mean of all detections per scene vs mean of per-clip means
detections.confidence.per(scenes).mean() # grand mean (correct for uniform weighting)
detections.confidence.mean().mean() # mean of means (not the same thing!).per(target) works on any reduction method:
detections.confidence.sum(per=scenes) # sum all confidences per scene
detections.confidence.list(per=scenes) # flat list per scene
detections.confidence.count(per=scenes) # total detection count per sceneFiltering
Filter narrows a collection’s row domain without changing its level. It is a first-class operation on collections.
.where() on a collection returns a filtered collection that can
be reused:
long_clips = clips.where(clips.duration > 10)
# Use the filtered collection in scans:
sp.scan(long_clips.duration, scenes.location).to_arrow()┌──────────┬──────────┐
│ duration │ location │
├──────────┼──────────┤
│ 30.0 │ downtown │
│ 15.0 │ downtown │
│ 20.0 │ highway │
└──────────┴──────────┘where= in a scan is a convenience for inline filtering:
sp.scan(
clips.duration,
scenes.location,
where=clips.duration > 10,
).to_arrow()Ancestor predicates are valid in filters. The optimizer pushes them down into the ancestor’s table scan:
sp.scan(
clips.duration,
where=scenes.location == "downtown",
)
# Physical: Filter(Scan(scenes), location=="downtown") → Join(Scan(clips))Filters affect reductions. When a filtered collection is aggregated, only the matching rows participate:
high_conf = detections.where(detections.confidence > 0.9)
# Count only high-confidence detections per clip:
sp.scan(clips.camera, high_conf.confidence.count())Filter placement is an optimizer concern. The user specifies what to filter; the system decides where in the plan to apply it. When two streams have identical row domains, the join becomes a zip (lockstep walk, no searching). The optimizer weighs this against the benefit of pushing filters down to reduce data volume.
Lists and Collections
Lists and child collections are the same abstraction viewed from different levels.
From the clips level, detections is conceptually a list — for
each clip, there is a variable-length sequence of detection rows.
Reducing that list is the same as a streaming GroupBy over the
detections table:
# These are equivalent:
detections.confidence.sum() # streaming: one pass, constant memory
detections.confidence.list().agg(lambda xs: sum(xs)) # materializes the list, then reducesBoth produce one scalar per clip. The first streams; the second materializes the list then applies a function. A smart optimizer converts the second into the first when possible.
Going the other direction, if a table has a list-typed column, you can explode it to create a child level:
# Suppose scenes has a tags: list<utf8> column
scenes.tags # level: scenes, type: list<utf8>
scenes.tags.len() # level: scenes, type: u64
scenes.tags.explode() # level: (_id, $idx) — one row per tag.explode() is the inverse of .list(). A list column IS a
pre-materialized child collection.
This duality means the same methods work on both child collection fields and list columns:
# Child collection field at parent level:
detections.confidence.sum() # sum of confidences per clip
detections.confidence.list() # list of confidences per clip
detections.confidence.count() # number of detections per clip
# List column in the schema:
scenes.tags.sum() # (if numeric) sum of elements
scenes.tags.list() # identity — it's already a list
scenes.tags.count() # number of elementsWindow Functions
A window function computes a value for each row using its neighboring rows within the same parent group. It is a reduction that preserves the input’s structure — the result stays at the same level as the input.
# Rolling 3-frame window: for each frame, collect [frame-2, frame-1, frame]
frames.rgb.rolling(before=2, agg="list") # list is default aggregation for rolling windows
# level: frames, type: list<binary>
# Rolling mean over a 5-detection window:
detections.confidence.rolling(before=2, after=2, agg="mean")
# level: detections, type: f32Windows never cross parent boundaries. A rolling window on frames is implicitly partitioned by clips — frame 0 of clip c2 never includes frames from clip c1. This is because a window function is fundamentally a reduction grouped by the parent level:
- Group frames by parent clip
- Within each group, compute a sliding aggregate
- Result: one value per frame (same cardinality as input)
Internally, this is equivalent to:
# Conceptual decomposition (not how you write it):
frames.rgb.per(clips).rolling(before=2).explode()The system optimizes this as a streaming window — no materialization of intermediate lists.
The agg parameter accepts the same functions as reduction:
"list", "sum", "mean", "first", "count", or a callable.
Aligning Siblings
frames, detections, and audio are siblings — they share a parent
(clips) but have independent row domains. To combine them in a
single scan, you need to align them to a shared index.
Timelines
A timeline is a scalar function that returns a list of time steps. It takes parameters (frame rate, duration) and produces a list per parent row:
from spiral import expressions as se
# For each clip, generate time steps at 10fps over its duration
tl = se.timeline(fps=10, stop=clips.duration)
# level: clips, type: list<struct{step: u64, time: f64}>This is an ordinary scalar expression — no special primitive. Exploding it creates a child level:
tl.explode()
# level: (scenes._id, clips._id, tl_step) — one row per time step per clipTimelines are compared by object identity. Two expressions that resample to the same timeline object produce identical row domains and can be zipped together. Different timeline objects — even with identical parameters — are treated as incompatible.
Resampling
.resample() aligns a source collection to a timeline. It is a
physical primitive — a streaming outer merge driven by a
monotonic binning function.
The on expression maps each source row to a position on the
timeline’s time axis. Because both the source and the timeline
are sorted by the same key prefix, and on increases
monotonically with the sort order, the bin assignment is a
streaming walk — both streams advance forward together with no
random access.
Unlike every other join in the model (which are inner — every
child has a parent, every zip row has a match), Resample uses
outer semantics: the timeline drives the output domain.
Timeline steps with no matching source rows still appear in the
output as nulls (or are forward-filled, depending on method).
This is essential for sibling alignment — both sides of a zip
must have exactly the same rows.
The two parameters:
-
on— an expression that maps each source row to a position on the timeline’s time axis. For frames at 30fps,frames._pos / 30converts frame index to seconds. For detections with timestamps,detections.ts - clips.start_timegives the offset from clip start. -
method— the aggregation applied within each timeline step:"first"— take the first matching value"ffill"— forward-fill from the most recent preceding value"list"— collect all matching values into a list"mean","sum", etc. — numeric aggregations- A callable — custom aggregation function (e.g.
se.audio.concat)
scenes = ds.scenes
clips = scenes.clips
frames = clips.frames
detections = clips.detections
audio = clips.audio
tl = se.timeline(fps=10, stop=clips.duration)
# Resample video frames (30fps) → 10fps
resampled_rgb = frames.rgb.resample(tl, on=frames._pos / 30, method="first")
# Resample detections (variable rate) → 10fps
resampled_dets = detections.resample(
tl,
on=detections.ts - clips.start_time,
method="list",
)
# Resample audio (e.g. 16kHz samples) → 10fps, concatenating chunks
resampled_audio = audio.pcm.resample(
tl,
on=audio._pos * 512 / 16000,
method=se.audio.concat,
)
# All three share the same tl → zip
sp.scan(
resampled_rgb,
resampled_dets,
resampled_audio,
clips._id,
).to_arrow()┌─────┬─────┬──────────────────────────────────┬───────┐
│ _id │ rgb │ detections │ audio │
├─────┼─────┼──────────────────────────────────┼───────┤
│ c1 │ f0 │ [{label: "car", conf: 0.9}] │ a0 │
│ c1 │ f1 │ [] │ a1 │
│ c2 │ f2 │ [{label: "person", conf: 0.85}] │ a2 │
│ c3 │ f3 │ [] │ a3 │
└─────┴─────┴──────────────────────────────────┴───────┘Row Selection
The [] operator steps into a collection’s key prefix,
pinning one or more leading key values. Each index removes the
leftmost unbound key dimension from the row domain — you are
walking into the tree, not filtering arbitrary rows.
ds.scenes["s1"] # pin scenes._id = "s1"
ds.scenes["s1"].clips["c1"] # pin scenes._id = "s1", clips._id = "c1"
ds.scenes["s1"].clips["c1"].frames[0] # fully specified — one frameBecause the data is sorted by key prefix, stepping into a prefix is a seek, not a scan. All downstream expressions and joins see the narrowed domain:
s1_clips = ds.scenes["s1"].clips
sp.scan(s1_clips.duration, scenes.location).to_arrow()┌──────────┬──────────┐
│ duration │ location │
├──────────┼──────────┤
│ 30.0 │ downtown │
│ 15.0 │ downtown │
└──────────┴──────────┘When every collection in the path is scalar-indexed, you have a fully-specified path to a single value:
>>> ds.scenes["s1"].location.to_py()
"downtown"
>>> ds.scenes["s1"].clips["c1"].duration.to_py()
30.0Selection forms
Scalar index — pins one key value at one level:
ds.scenes["s1"] # one scene
ds.scenes["s1"].clips["c1"] # one clip within one scene
ds.scenes["s1"].clips["c1"].frames[0] # one frame (by _pos)Array of keys — selects specific rows (must be sorted):
ds.scenes[["s1", "s2", "s5"]]
# level: scenes, row domain: 3 scenesKey table — multi-level row selection. Each column corresponds to a level in the hierarchy, rows correspond element-wise:
import pyarrow as pa
ds.scenes.clips[pa.table({
"scenes": ["s1", "s1", "s2"],
"scenes.clips": ["c1", "c2", "c3"],
})]
# level: clips, row domain: 3 specific clipsKey tables are useful for sampling, precomputed batch indices, or feeding an externally-computed selection into a training loop.
Execution Model
Physical Storage
Each collection is a separate sorted table. Conceptually, child tables store parent keys for joining:
scenes — sorted by (_id)
┌─────┬──────────┐
│ _id │ location │
├─────┼──────────┤
│ s1 │ downtown │
│ s2 │ highway │
└─────┴──────────┘clips — sorted by (scenes._id, _id)
┌────────────┬─────┬──────────┐
│ scenes._id │ _id │ duration │
├────────────┼─────┼──────────┤
│ s1 │ c1 │ 30.0 │
│ s1 │ c2 │ 15.0 │
│ s2 │ c3 │ 20.0 │
└────────────┴─────┴──────────┘frames — sorted by (scenes._id, clips._id, _pos)
┌────────────┬───────────┬─────┬───────┐
│ scenes._id │ clips._id │ rgb │ depth │
├────────────┼───────────┼─────┼───────┤
│ s1 │ c1 │ f0 │ d0 │
│ s1 │ c1 │ f1 │ d1 │
│ s1 │ c2 │ f2 │ d2 │
│ s2 │ c3 │ f3 │ d3 │
└────────────┴───────────┴─────┴───────┘In practice, SpiralDB maintains efficient indexes to optimize cross-level joins, and the Vortex columnar format makes broadcasting a zero-copy operation. But the logical model above is equivalent.
Execution Plan
Consider the same resampling query from the previous section, but with a filter on detections:
ped_dets = detections.where(detections.label == "pedestrian")
sp.scan(
frames.rgb.resample(tl, on=frames._pos / 30, method="first"),
ped_dets.resample(tl, on=ped_dets.ts - clips.start_time, method="list"),
clips._id,
clips.duration,
).to_arrow()Each node below is a streaming operator; data flows top-to-bottom from table scans to the final output:
Scan(scenes) Scan(clips) Scan(frames) Scan(detections)
│ │ │ │
└─── Join ────┘ │ Filter(label=="pedestrian")
│ │ │
│ Resample(tl, Resample(tl,
│ method=first) method=list)
│ │ │
│ └──────── Zip ───────────┘
│ │
└─────────── Join ──────────────┘
│
OutputFour table scans feed into the plan. The filter on label is
pushed down before resampling. Each Resample is a streaming outer
merge — it walks the source and the timeline forward together,
binning source rows into timeline steps via the monotonic on
expression. Because both sides use the same tl object, the
output domains are identical and the combine is a zip
(lockstep walk, no searching). The ancestor fields (scenes, clips)
are joined via sort-merge on the shared key prefix.
Filter placement is an optimizer choice. The filter could instead be applied after the zip — this preserves the cheap lockstep walk but processes more rows through the resample. Pushing the filter before the zip reduces data volume but may break the row-domain identity, turning the zip into a join. The optimizer weighs filter selectivity against join cost.
Sideways information passing. When the filter is pushed to
the detections scan, it can also accelerate the frames scan. Both
streams are sorted by the same key prefix (scenes._id,
clips._id). As the filtered detections stream skips past clips
with no matching detections, it can pass updated key bounds to the
frames stream, allowing it to seek forward rather than scan
through frames that will be discarded after the zip. This
cross-operator communication — where one scan’s progress
informs another’s seek position — is known as sideways
information passing.
Rules Summary
| Expression | Behavior | Physical operation |
|---|---|---|
| Same-level field | Direct column read | Scan |
| Ancestor field | Implicit broadcast | Sort-merge join on prefix |
| Descendant field (no reduction) | Error | — |
Descendant field + .sum() etc. | Explicit reduction | Streaming GroupBy |
| Sibling field (no alignment) | Error | — |
Sibling field + .resample(tl) | Temporal alignment | Resample (outer) + Zip |
.where(pred) | Filter | Filter (optimizer places it) |
.rolling(before=, agg=) | Window function | Streaming window within parent groups |
.explode() | List → child rows | Explode |
Writing Data
All writes are upserts. Writing to an existing key replaces the record; writing to a new key creates it.
Prefer batch writes for best performance — columnar storage is most efficient when writing many records at once.
Single records — setitem with a dict:
ds.scenes["s3"] = {
"location": "park",
"metadata": {"weather": "sunny", "lighting": "natural"},
"clips": [
{
"_id": "c10",
"camera": "front",
"start_time": 0.0,
"duration": 60.0,
"frames": [{"rgb": b"f10", "depth": b"d10"}],
"detections": [{"label": "car", "confidence": 0.9, "ts": 0.5}],
"audio": [{"pcm": b"a10", "level_db": -20.0}],
},
],
}Nested records:
ds.scenes["s1"].clips["c1"].frames[0] = {
"rgb": b"f0_v2",
"depth": b"d0_v2",
}Scalars — when the path is fully indexed:
ds.scenes["s1"].location = "downtown_v2"
ds.scenes["s1"].clips["c1"].duration = 45.0Batch writes — setitem with an Arrow table or list of dicts:
import pyarrow as pa
ds.scenes["s1"].clips["c1"].frames = pa.table({
"rgb": [b"f20", b"f21", b"f22"],
"depth": [b"d20", b"d21", b"d22"],
})Note that positional collections (like frames) do not support upserting individual rows by index.
You can only replace the entire collection at once.
Scoped writes — the client batches them for you:
with ds.scenes.write() as w:
for scene in scene_dicts:
w[scene['_id']] = scene
# all writes flushed as a single batch on exitDeleting Data
del ds.scenes["s3"]
del ds.scenes["s1"].clips["c2"]
del ds.scenes["s1"].clips["c1"].frames[0]
# Bulk delete
del ds.scenes[["s1", "s2", "s5"]]