TileDB with Dask Delayed

dask.delayed is a powerful feature of Dask that allows you to create arbitrary task graphs and submit them to Dask's scheduler for execution. You can be truly creative with that functionality and implement sophisticated out-of-core computations (i.e., on larger than RAM datasets) and handle highly distributed workloads.

There is no special integration needed with TileDB, as dask.delayed is quite generic and can work with any user-defined task. We just point out here that you can use TileDB array slicing in a delayed task, which allows you to process truly large TileDB arrays on your laptop or on a large cluster.

We include a very simple example below, stressing though that one can implement much more complex algorithms on arbitrarily large TileDB arrays.

import tiledb
import numpy as np
import dask, dask.array

uri = "<array-uri>"
ctx = tiledb.Ctx()

# Create a simple 1D array with 1000 elements
def write_array():
    dom = tiledb.Domain(tiledb.Dim(name="x",
                                   domain=(0, 999),
                                   tile=10,
                                   dtype=np.uint64),
         	                       ctx=ctx)

    attrs = [tiledb.Attr(name="attr", dtype=np.float64, ctx=ctx),]

    schema = tiledb.ArraySchema(domain=dom, sparse=False,
                                attrs=attrs,
                                ctx=ctx)
    tiledb.DenseArray.create(uri, schema)

    with tiledb.DenseArray(uri, 'w') as A:
        A[:] = np.arange(1000,dtype=np.float64)

# Create the array only if it does not already exist
if not tiledb.VFS().is_dir(uri):
    write_array()

# This produces an array slice
def slice_tiledb(path, slc):
    with tiledb.DenseArray(path) as A:
        return A[slc]['attr']

# Partition the array into 50 delayed slices
partition = 50
delayed_slices = list(
    dask.delayed(slice_tiledb)(uri, slice(start, start+partition)) for 
                               start in 
                               np.arange(0,1001-partition,step=partition))

# This creates a Dask array from the delayed slices
darray = dask.array.concatenate(
    dask.array.from_delayed(x,
                            shape=(partition,), dtype=np.float64)
                            for x in delayed_slices)

#Everything up until here is lazy - nothing is really computed

# This triggers the entire computation
mean = darray.mean().compute()
print(mean)

Last updated