Skip to Content
TablesDistributed Compute

Distributed Compute

Both Spiral Table scans and transactions can be row-wise partitioned for use in distributed query engines such as PySpark, Dask, and Ray. For the engines listed below we have first-class integrations. For any other engine, see Others for instructions on how to integrate a Spiral scan or transaction with an arbitrary distributed compute system with a Python API.

Ray

A Spiral Table can be read by a Ray Dataset:

import spiral from spiral.demo import gharchive, demo_project sp = spiral.Spiral() table = gharchive(sp) scan = sp.scan(table) ds: ray.data.Dataset = scan.to_ray_dataset()

A Ray Dataset can be written into a Spiral Table:

ds: ray.Dataset = sp.scan(table).to_ray_dataset() gharchive_events_copy = demo_project(sp).create_table( "gharchive.events_copy", key_schema=table.key_schema ) with gharchive_events_copy.txn() as txn: ds.write_datasink(txn.to_ray_datasink())

Others

Spiral’s distribution primitives should integrate with any Python-based distributed compute platform. In this section, we use Python’s ThreadPoolExecutor to demonstrate this integration.

Reading

All scan functions (e.g. to_table(), to_pandas(), to_record_batches()) accept a shards argument which are row-wise partitions of the table defined by non-overlapping key ranges. There are two ways to generate a set of shards for a given scan:

  1. shards() which is cheap to compute but shards based on the physical data layout.
  2. compute_shards() which scans the key columns to compute a set of shards with roughly equal numbers of keys (and thus rows) per shard.

When distributing work across processes or nodes, one must ensure a consistent table state. The state of a table can be fixed (i.e. later writes are ignored) by constructing a snapshot(). A Snapshot’s asof, a psuedo-timestamp, is sufficient to uniquely identify the state of a Spiral table. Any Scan which specifies the same asof will see the same state of the table:

from concurrent.futures import ThreadPoolExecutor from datetime import datetime asof = table.snapshot().asof table_id = table.table_id predicate = table["created_at"] > datetime(2020, 3, 1) driver_scan = sp.scan(table, where=predicate, asof=asof) shards = driver_scan.shards() def worker_fn(shard): """Code executed on another node.""" from spiral import Spiral table = Spiral().table(table_id) worker_scan = sp.scan(table, where=predicate, asof=asof, shard=shard) return worker_scan.to_table() with ThreadPoolExecutor() as executor: results = executor.map(worker_fn, shards)

Writing

Spiral Transactions are also horizontally scalable. The orchestration / driver node should start a transaction, execute and collect the transaction operations of each node, compose them into one transaction, and commit the entire transaction at once. Instead of serializing the transaction object itself, we Transaction.take() the list of operations and serialize those with Operation.to_json. Operation.from_json deserializes an operation and Transaction.include() subsumes one or more operations into another transaction.

from concurrent.futures import ThreadPoolExecutor from spiral import TransactionOps import pyarrow as pa table = demo_project(sp).create_table("squares", key_schema={"key": pa.int64()}) table_id = table.table_id def worker_fn(index: int) -> TransactionOps: """Code executed on another node.""" from spiral import Spiral txn = Spiral().table(table_id).txn() txn.write({ "key": [index], "square": [index * index], }) return txn.take() with table.txn() as txn: # The driver-side transaction must start before the workers! with ThreadPoolExecutor() as executor: worker_results = list(executor.map(worker_fn, [0, 1, 2, 3])) for worker_tx in worker_results: txn.include(worker_tx)

See also:

Last updated on