Skip to Content
TablesEnrichment

Enrichment

The easiest way to build multimodal Spiral tables is to start with metadata with pointers, such as file paths or URLs, and use enrichment to ingest the actual data bytes, such as images, audio, video, etc. With column groups design supporting hundreds of thousands of columns, horizontally expanding tables is a powerful primitive.

We’ll explore this using the Open Images dataset , a large-scale collection of images with associated metadata.

Starting with Metadata

Let’s begin by creating a table and loading the image URLs from the Open Images dataset.

import spiral from spiral.demo import images import pandas as pd import pyarrow as pa sp = spiral.Spiral() table = images(sp)

At this point, our table contains only lightweight metadata:

table.schema().to_arrow()
pa.schema({ "idx": pa.int64(), "url": pa.string_view(), "size": pa.int64(), "etag": pa.string_view(), })

Ingesting Data

We can enrich the table by fetching the actual image data from ingested URLs.

from spiral import expressions as se # Create an enrichment that fetches images from S3 enrichment = table.enrich( se.pack({ "image": se.http.get(table["url"]) }) ) # Apply the enrichment in a streaming fashion enrichment.apply()

After enrichment, our table now contains the actual image data, as well as metadata requests:

table.schema().to_arrow()
pa.schema({ "idx": pa.int64(), "url": pa.string_view(), "size": pa.int64(), "etag": pa.string_view(), "image": pa.struct({ "bytes": pa.binary_view(), "meta": pa.struct({ "location": pa.string_view(), "last_modified": pa.timestamp("ms"), "size": pa.uint64(), "e_tag": pa.string_view(), "version": pa.string_view(), "status_code": pa.uint16(), }), }), })

Incremental Progress

Rows can be selectively enriched using the where parameter.

The se.http.get() expression reads data from S3 URLs. Spiral handles parallel fetching, retries, and error handling automatically, but some requests may still fail (e.g., 404 Not Found). The errors can be inspected in using the status_code metadata field.

Let’s retry fetching images that previously failed with a 500 status code.

enrichment = table.enrich( se.pack({ "image": se.http.get(table["url"]) }), where=(table["image.meta.status_code"] == pa.scalar(500, pa.uint16())) ) enrichment.apply()

Other Data Sources

In addition to se.http.get(), Spiral supports other data sources such as se.s3.get() for object storage ingestion and se.file.get() for local filesystem reads. Note that the client environment must have appropriate permissions to access these resources.

Deriving Columns with UDFs

It’s also possible to enrich with User-Defined Functions (UDFs) for custom processing logic. Let’s say we had a column path representing a path within a specific S3 bucket, and we wanted to enrich with the full image data. To use se.s3.get() we need to have URLs in the form s3://bucket_name/path/to/object. Let’s enrich table with these.

First, we define a UDF to convert S3 paths to full S3 URLs.

import pyarrow as pa from spiral.expressions.udf import UDF class GetPath(UDF): """ turns urls like 'https://c2.staticflickr.com/6/5606/15611395595_f51465687d_o.jpg' into paths like '/6/5606/15611395595_f51465687d_o.jpg' """ def __init__(self): super().__init__("get_path") def return_type(self, input_type: pa.DataType): return pa.string() def invoke(self, urls: pa.Array): from urllib.parse import urlparse paths = [ urlparse(url).path for url in urls.to_pylist() ] return pa.array(paths, type=pa.string())

Now we can use this UDF in an enrichment to generate full S3 URLs and fetch the images.

get_path = GetPath() enrichment = table.enrich( se.pack({ "path": get_path(table["url"]), }) ) enrichment.apply()

Distributing Work

For large datasets, you can distribute the enrichment using Dask.

enrichment.apply_dask()

Or if you have an existing Dask cluster, provide its address:

enrichment.apply_dask(address="your-cluster:8786")

The apply_dask() method will:

  1. Connect to a Dask cluster (or create a local one if none is specified)
  2. Process each shard in parallel across Dask workers
  3. Write results back to the table atomically

UDFs are currently not supported with Dask enrichment. Reach out if you require this functionality.

Sharding

Distributed enrichment will default table shards, but that might not be sufficient parallelism. When metadata is small, default sharding may contain way too many rows for a single distributed task. You can use a max_task_size parameter to control the maximum number of rows per Dask task. Note that this will lead to more work on startup to create the shards. Shards can only be explicitly provided through shards parameter.

Last updated on