TileDB With Dask Delayed
dask.delayed is a powerful feature of Dask that allows you to create arbitrary task graphs and submit them to Dask's scheduler for execution. You can be truly creative with that functionality and implement sophisticated out-of-core computations (i.e., on larger than RAM datasets) and handle highly distributed workloads.
There is no special integration needed with TileDB, as dask.delayed is quite generic and can work with any user-defined task. We just point out here that you can use TileDB array slicing in a delayed task, which allows you to process truly large TileDB arrays on your laptop or on a large cluster.
We include a very simple example below, stressing though that one can implement much more complex algorithms on arbitrarily large TileDB arrays.
Python
1
import tiledb
2
import numpy as np
3
import dask, dask.array
4
5
uri = "<array-uri>"
6
ctx = tiledb.Ctx()
7
8
# Create a simple 1D array with 1000 elements
9
def write_array():
10
dom = tiledb.Domain(tiledb.Dim(name="x",
11
domain=(0, 999),
12
tile=10,
13
dtype=np.uint64),
14
ctx=ctx)
15
16
attrs = [tiledb.Attr(name="attr", dtype=np.float64, ctx=ctx),]
17
18
schema = tiledb.ArraySchema(domain=dom, sparse=False,
19
attrs=attrs,
20
ctx=ctx)
21
tiledb.DenseArray.create(uri, schema)
22
23
with tiledb.DenseArray(uri, 'w') as A:
24
A[:] = np.arange(1000,dtype=np.float64)
25
26
# Create the array only if it does not already exist
27
if not tiledb.VFS().is_dir(uri):
28
write_array()
29
30
# This produces an array slice
31
def slice_tiledb(path, slc):
32
with tiledb.DenseArray(path) as A:
33
return A[slc]['attr']
34
35
# Partition the array into 50 delayed slices
36
partition = 50
37
delayed_slices = list(
38
dask.delayed(slice_tiledb)(uri, slice(start, start+partition)) for
39
start in
40
np.arange(0,1001-partition,step=partition))
41
42
# This creates a Dask array from the delayed slices
43
darray = dask.array.concatenate(
44
dask.array.from_delayed(x,
45
shape=(partition,), dtype=np.float64)
46
for x in delayed_slices)
47
48
#Everything up until here is lazy - nothing is really computed
49
50
# This triggers the entire computation
51
mean = darray.mean().compute()
52
print(mean)
Copied!
Last modified 13d ago
Copy link