Distributed Compute
Both Spiral Table scans and transactions can be row-wise partitioned for use in distributed query engines such as PySpark, Dask, and Ray. For the engines listed below we have first-class integrations. For any other engine, see Others for instructions on how to integrate a Spiral scan or transaction with an arbitrary distributed compute system with a Python API.
Ray
A Spiral Table can be read by a Ray Dataset:
import spiral
from spiral.demo import gharchive, demo_project
sp = spiral.Spiral()
table = gharchive(sp)
scan = sp.scan(table)
ds: ray.data.Dataset = scan.to_ray_dataset()A Ray Dataset can be written into a Spiral Table:
ds: ray.Dataset = sp.scan(table).to_ray_dataset()
gharchive_events_copy = demo_project(sp).create_table(
"gharchive.events_copy",
key_schema=table.key_schema
)
with gharchive_events_copy.txn() as txn:
ds.write_datasink(txn.to_ray_datasink())Others
Spiral’s distribution primitives should integrate with any Python-based distributed compute
platform. In this section, we use Python’s ThreadPoolExecutor to demonstrate this integration.
Reading
All scan functions (e.g. to_table(),
to_pandas(),
to_record_batches()) accept a shards argument which
are row-wise partitions of the table defined by non-overlapping key ranges. There are two ways to
generate a set of shards for a given scan:
shards()which is cheap to compute but shards based on the physical data layout.compute_shards()which scans the key columns to compute a set of shards with roughly equal numbers of keys (and thus rows) per shard.
When distributing work across processes or nodes, one must ensure a consistent table state. The
state of a table can be fixed (i.e. later writes are ignored) by constructing a
snapshot(). A Snapshot’s
asof, a psuedo-timestamp, is sufficient to uniquely identify the
state of a Spiral table. Any Scan which specifies the same asof will see the same state of the
table:
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
asof = table.snapshot().asof
table_id = table.table_id
predicate = table["created_at"] > datetime(2020, 3, 1)
driver_scan = sp.scan(table, where=predicate, asof=asof)
shards = driver_scan.shards()
def worker_fn(shard):
"""Code executed on another node."""
from spiral import Spiral
table = Spiral().table(table_id)
worker_scan = sp.scan(table, where=predicate, asof=asof, shard=shard)
return worker_scan.to_table()
with ThreadPoolExecutor() as executor:
results = executor.map(worker_fn, shards)Writing
Spiral Transactions are also horizontally scalable. The orchestration / driver node should start a
transaction, execute and collect the transaction operations of each node, compose them into one
transaction, and commit the entire transaction at once. Instead of serializing the transaction
object itself, we Transaction.take() the list of operations and
serialize those with Operation.to_json. Operation.from_json deserializes an operation and
Transaction.include() subsumes one or more operations into
another transaction.
from concurrent.futures import ThreadPoolExecutor
from spiral import TransactionOps
import pyarrow as pa
table = demo_project(sp).create_table("squares", key_schema={"key": pa.int64()})
table_id = table.table_id
def worker_fn(index: int) -> TransactionOps:
"""Code executed on another node."""
from spiral import Spiral
txn = Spiral().table(table_id).txn()
txn.write({
"key": [index],
"square": [index * index],
})
return txn.take()
with table.txn() as txn:
# The driver-side transaction must start before the workers!
with ThreadPoolExecutor() as executor:
worker_results = list(executor.map(worker_fn, [0, 1, 2, 3]))
for worker_tx in worker_results:
txn.include(worker_tx)See also: