Enrichment
The easiest way to build multimodal Spiral tables is to start with metadata with pointers, such as file paths or URLs, and use enrichment to ingest the actual data bytes, such as images, audio, video, etc. With column groups design supporting hundreds of thousands of columns, horizontally expanding tables is a powerful primitive.
We’ll explore this using the Open Images dataset , a large-scale collection of images with associated metadata.
Starting with Metadata
Let’s begin by creating a table and loading the image URLs from the Open Images dataset.
import spiral
from spiral.demo import images
import pandas as pd
import pyarrow as pa
sp = spiral.Spiral()
table = images(sp)At this point, our table contains only lightweight metadata:
table.schema().to_arrow()pa.schema({
"idx": pa.int64(),
"url": pa.string_view(),
"size": pa.int64(),
"etag": pa.string_view(),
})Ingesting Data
We can enrich the table by fetching the actual image data from ingested URLs.
from spiral import expressions as se
# Create an enrichment that fetches images from S3
enrichment = table.enrich(
se.pack({
"image": se.http.get(table["url"])
})
)
# Apply the enrichment in a streaming fashion
enrichment.apply()After enrichment, our table now contains the actual image data, as well as metadata requests:
table.schema().to_arrow()pa.schema({
"idx": pa.int64(),
"url": pa.string_view(),
"size": pa.int64(),
"etag": pa.string_view(),
"image": pa.struct({
"bytes": pa.binary_view(),
"meta": pa.struct({
"location": pa.string_view(),
"last_modified": pa.timestamp("ms"),
"size": pa.uint64(),
"e_tag": pa.string_view(),
"version": pa.string_view(),
"status_code": pa.uint16(),
}),
}),
})Incremental Progress
Rows can be selectively enriched using the where parameter.
The se.http.get() expression reads data from S3 URLs. Spiral handles parallel fetching, retries, and error handling
automatically, but some requests may still fail (e.g., 404 Not Found). The errors can be inspected in using the
status_code metadata field.
Let’s retry fetching images that previously failed with a 500 status code.
enrichment = table.enrich(
se.pack({
"image": se.http.get(table["url"])
}),
where=(table["image.meta.status_code"] == pa.scalar(500, pa.uint16()))
)
enrichment.apply()Other Data Sources
In addition to se.http.get(), Spiral supports other data sources such as se.s3.get() for object storage ingestion
and se.file.get() for local filesystem reads. Note that the client environment must have appropriate permissions
to access these resources.
Deriving Columns with UDFs
It’s also possible to enrich with User-Defined Functions (UDFs) for custom processing logic. Let’s say we had a column
path representing a path within a specific S3 bucket, and we wanted to enrich with the full image data. To use
se.s3.get() we need to have URLs in the form s3://bucket_name/path/to/object. Let’s enrich table with these.
First, we define a UDF to convert S3 paths to full S3 URLs.
import pyarrow as pa
from spiral.expressions.udf import UDF
class GetPath(UDF):
"""
turns urls like 'https://c2.staticflickr.com/6/5606/15611395595_f51465687d_o.jpg'
into paths like '/6/5606/15611395595_f51465687d_o.jpg'
"""
def __init__(self):
super().__init__("get_path")
def return_type(self, input_type: pa.DataType):
return pa.string()
def invoke(self, urls: pa.Array):
from urllib.parse import urlparse
paths = [
urlparse(url).path
for url in urls.to_pylist()
]
return pa.array(paths, type=pa.string())Now we can use this UDF in an enrichment to generate full S3 URLs and fetch the images.
get_path = GetPath()
enrichment = table.enrich(
se.pack({
"path": get_path(table["url"]),
})
)
enrichment.apply()Distributing Work
For large datasets, you can distribute the enrichment using Dask.
enrichment.apply_dask()Or if you have an existing Dask cluster, provide its address:
enrichment.apply_dask(address="your-cluster:8786")The apply_dask() method will:
- Connect to a Dask cluster (or create a local one if none is specified)
- Process each shard in parallel across Dask workers
- Write results back to the table atomically
UDFs are currently not supported with Dask enrichment. Reach out if you require this functionality.
Sharding
Distributed enrichment will default table shards, but that might not be sufficient parallelism. When metadata is small,
default sharding may contain way too many rows for a single distributed task. You can use a max_task_size parameter
to control the maximum number of rows per Dask task. Note that this will lead to more work on startup to create the
shards. Shards can only be explicitly provided through shards parameter.