dask.delayed is a powerful feature of Dask that allows you to create arbitrary task graphs and submit them to Dask's scheduler for execution. You can be truly creative with that functionality and implement sophisticated out-of-core computations (i.e., on larger than RAM datasets) and handle highly distributed workloads.
There is no special integration needed with TileDB, as
dask.delayed is quite generic and can work with any user-defined task. We just point out here that you can use TileDB array slicing in a delayed task, which allows you to process truly large TileDB arrays on your laptop or on a large cluster.
We include a very simple example below, stressing though that one can implement much more complex algorithms on arbitrarily large TileDB arrays.
import tiledbimport numpy as npimport dask, dask.arrayuri = "<array-uri>"ctx = tiledb.Ctx()# Create a simple 1D array with 1000 elementsdef write_array():dom = tiledb.Domain(tiledb.Dim(name="x",domain=(0, 999),tile=10,dtype=np.uint64),ctx=ctx)attrs = [tiledb.Attr(name="attr", dtype=np.float64, ctx=ctx),]schema = tiledb.ArraySchema(domain=dom, sparse=False,attrs=attrs,ctx=ctx)tiledb.DenseArray.create(uri, schema)with tiledb.DenseArray(uri, 'w') as A:A[:] = np.arange(1000,dtype=np.float64)# Create the array only if it does not already existif not tiledb.VFS().is_dir(uri):write_array()# This produces an array slicedef slice_tiledb(path, slc):with tiledb.DenseArray(path) as A:return A[slc]['attr']# Partition the array into 50 delayed slicespartition = 50delayed_slices = list(dask.delayed(slice_tiledb)(uri, slice(start, start+partition)) forstart innp.arange(0,1001-partition,step=partition))# This creates a Dask array from the delayed slicesdarray = dask.array.concatenate(dask.array.from_delayed(x,shape=(partition,), dtype=np.float64)for x in delayed_slices)#Everything up until here is lazy - nothing is really computed# This triggers the entire computationmean = darray.mean().compute()print(mean)