TileDB Cloud allows you to build arbitrary direct acyclic graphs (DAG) of tasks to combine any number of computations into one workflow. You can combine serverless UDFs, SQL and Array UDFs along with even local execution of functions.
TileDB Cloud currently supports serverless task graphs only through Python, but support for more languages will be added soon.
The graph is currently driven by the client. The client can be in a hosted notebook, your local laptop, or even a serverless UDF itself. The client manages the graph, and dispatches the execution of severless jobs or local functions. Currently there is no node-to-node communication in a task graph, and all results are serialized and returned to the client. If a subsequent task needs the results from a previous ones the results are returned to the client then dispatched as parameters to the following tasks. Over the next months we plan to eliminate this round trip and offer serverside handling of results between tasks.
The local driver uses the python ThreadPoolExecutor
by default to drive the tasks. The default number of workers is 4 * number of cores
on the client machine. Python allows multiple serverless tasks to run as they use asynchronous HTTP requests. Serverless tasks will scale elastically. As you request more tasks to be run, TileDB Cloud launches more resources to accommodate the tasks.
Local functions are subject to the Python GIL (interpreter lock) if the task graphs use the ThreadPoolExecutor (default).
This limits the concurrency of local functions, however serverless functionality is minimally effected. To avoid this see below how to use the ProcessPoolExecutor
.
Two APIs are offered for task graphs, a high level Delayed API and a lower level API for manipulating the DAG directly.
The Delayed API is a high-level API involving delayed objects to allow for asynchronous execution of functions or severless SQL. Dependencies are automatically computed and managed for you.
Any Python function can be wrapped in a Delayed object making the function executable as a future.
from tiledb.cloud.compute import Delayedimport numpy​# Wrap numpy median in a delayed objectx = Delayed(numpy.median)​# It can be called like a normal function to set the parameters# Note at this point the function does not get executed since it# is delayed typex([1,2,3,4,5])​# To force execution and get the result call `compute()`print(x.compute())
Besides arbitrary Python functions, serverless SQL queries and array-based UDFs can also be called with the delayed API.
from tiledb.cloud.compute import DelayedSQL, DelayedArrayUDFimport numpy​# SQLy = DelayedSQL("select AVG(`a`) FROM `tiledb://TileDB-Inc/quickstart_sparse`")​# Run queryprint(y.compute())​# Arrayz = DelayedArrayUDF("tiledb://TileDB-Inc/quickstart_sparse",lambda x: numpy.average(x["a"]))([(1, 4), (1, 4)])​# Run the udf on the arrayz.compute()
Lastly it is also possible to include a generic Python function as delayed, but have it run locally instead of serverlessly. This is useful for testing or for saving finalized results to your local machine, e.g., saving an image.
from tiledb.cloud.compute import Delayedimport numpy​local = Delayed(numpy.median, local=True)([1,2,3])local.compute()
Delayed objects can be combined into a task graph. The output from one function or query can be passed into another, and dependencies are automatically determined.
import tiledb.cloud.computeimport numpy​# Build several delayed objects to define a graphlocal = Delayed(lambda x: x * 2, local=True)(100)array_apply = DelayedArrayUDF("tiledb://TileDB-Inc/quickstart_sparse",lambda x: numpy.sum(x["a"]), name="array_apply")([(1, 4), (1, 4)])sql = DelayedSQL("select SUM(`a`) as a from `tiledb://TileDB-Inc/quickstart_dense`"), name="sql")​# Custom function to use to average all the results we are passing indef mean(local, array_apply, sql):import numpyreturn numpy.mean([local, array_apply, sql.iloc(0)[0]])​res = Delayed(func_exec=mean, name="node_exec")(local, array_apply, sql)print(res.compute())
The DAG created by delayed can be visualized with a call to visualize()
. The graph will be auto-updated by default. If you are inside a Jupyter notebook the graph will render as a widget. If you are not on the notebook, you can set notebook=False
as a parameter to render in a normal Python window.
res.visualize()
There are cases where you might have one function to depend on another without using its results directly. A common case is when one function manipulates data stored somewhere else (s3/database). To facilitate this an addition function is offered, depends_on
.
# A few base functions:import randomfrom tiledb.cloud.compute import Delayed​# Set three initial nodesnode_1 = Delayed(numpy.median, local=True, name="node_1")([1, 2, 3])node_2 = Delayed(lambda x: x * 2, local=True, name="node_2")(node_1)node_3 = Delayed(lambda x: x * 2, local=True, name="node_3")(node_2)​# Create a dictionary to hold the nodes so we can ranodmly pick dependenciesnodes_by_name= {'node_1': node_1, 'node_2': node_2, 'node_3': node_3}#Function which sleeps for some time so we can see the graph in different statesdef f():import timeimport randomtime.sleep(random.randrange(0, 30))return x​# Randomly add 96 other nodes to the graph. All of these will use the sleep functionfor i in range(4, 100):name = "node_{}".format(i)node = Delayed(f, local=True, name=name)()dep = random.randrange(1, i-1)# Randomly set dependency on one other nodenode_dep = nodes_by_name["node_{}".format(dep)]# Force the dependency to be setnode.depends_on(node_dep)nodes_by_name[name] = node​# You can call visualize on any member node and see the whole graphnode_1.visualize()​# Get the last function's resultsnode_99 = nodes_by_name["node_99"]node_99.compute()​
If you you are a member of an organization, then by default the organization is changed for your Delayed tasks. If you would like to charge the task to yourself, you just need to add one extra argument namespace
.
import tiledb.cloud​tiledb.cloud.login(username="my_username", password="my_password")# or tiledb.cloud.login(token="my_token")​res = DelayedSQL("select `rows`, AVG(a) as avg_a from `tiledb://TileDB-Inc/quickstart_dense` GROUP BY `rows`"namespace ="my_username", # who to charge the query to)
You can also set who to charge for the entire DAG instead of individual Delayed objects. This is often useful when building a large DAG to avoid having to set the extra parameter on every object. Taking the example above, you just pass namespace="my_username"
to the compute
call.
import tiledb.cloud.computeimport numpy​# Build several delayed objects to define a graphlocal = Delayed(lambda x: x * 2, local=True)(100)array_apply = DelayedArrayUDF("tiledb://TileDB-Inc/quickstart_sparse",lambda x: numpy.sum(x["a"]), name="array_apply")([(1, 4), (1, 4)])sql = DelayedSQL("select SUM(`a`) as a from `tiledb://TileDB-Inc/quickstart_dense`"), name="sql")​# Custom function to use to average all the results we are passing indef mean(local, array_apply, sql):import numpyreturn numpy.mean([local, array_apply, sql.iloc(0)[0]])​res = Delayed(func_exec=mean, name="node_exec")(local, array_apply, sql)​# Set all tasks to run under your usernameprint(res.compute(namespace="my_username"))
For advanced usage, the underlying DAG data structure is exposed. It is unlikely you will need to use this except for advanced usage beyond what the Delayed API offers. Below is an example using the DAG API to perform the same computations as in a the section Task Graphs With Delayed API.
import numpyimport tiledb.cloud.dag​uri_sparse = "tiledb://TileDB-Inc/quickstart_sparse"uri_dense =​d = dag.DAG()​# To run a local function, set the function as the delayed funclocal = d.add_node(lambda x: x * 2, 100, name="local",)​# Here we add a node that uses the tiledb.cloud.array.apply# as the delayed functionarray_apply = d.add_node(tiledb.cloud.array.apply,"tiledb://TileDB-Inc/quickstart_sparse",lambda x: numpy.sum(x["a"]),[(1, 4), (1, 4)],name="array_apply",)​# To add serverless sql we must set the function to tiledb.cloud.sql.execsql = d.add_node(tiledb.cloud.sql.exec,"select SUM(`a`) as a from `tiledb://TileDB-Inc/quickstart_dense`",name="sql",)​# Custom function to use to average all the results we are passing indef mean(local, array_apply, sql):import numpyreturn numpy.mean([local, array_apply, sql.iloc(0)[0]])​# For a UDF we must set tiledb.cloud.udf.execmean_results = d.add_node(tiledb.cloud.udf.exec, mean, local, array_apply, sql, name="mean_results",)​# Compared to the Delayed API, you call compute on the DAG itself.# This will not result results and is not blockingd.compute()​# Visualize can still be calledd.visualize()​# To get resutls you must call `result()` on one of the nodes.print(mean_results.result())
If you would like to increase the DAGs worker count, you can set the max_workers
parameter.
# Set the max number of workers for the thread or process pool to 32d = DAG(max_workers=32)
If you have a need to run many local functions in parallel, you might be interested in using the ProcessPoolExecutor
. This will fork each function into its own process, serializing the function and results. For most functions and data this will not cause any problems. There are some functions which are not easily serializable, this is why the ThreadPoolExecutor
is the default for the graph engine.
# Use ProcessPoolExecutord = DAG(use_processes=True)
Through serverless UDFs, TileDB-Cloud offers you a way to easily execute parallel and distributed computations on your arrays, without having to set up a distributed computing environment.
For example, suppose you wanted to compute an arithmetic mean of an attribute a
on a very large array. One option would be to perform the computation locally in a block-based or out-of-core fashion. With TileDB-Cloud, you can simply write a UDF to compute the mean of a particular chunk of the array, and submit all UDFs simultaneously to be computed.
The Python module below implements a parallel_mean()
function that will use TileDB-Cloud UDFs and Task Graphs to compute the arithmetic mean of a subarray. It assumes the array used is at the URI tiledb://TileDB-Inc/quickstart_dense
, and assumes the attribute is named a
.
import mathimport osimport tiledb.cloudfrom tiledb.cloud.compute import Delayed, DelayedArrayUDFimport timeimport numpy as np​def generate_partitions(min_row, min_col, max_row, max_col, partition_grid):"""Split the given domain into a grid of partitions.:param min_row: (inclusive) min row coordinate of domain:param min_col: (inclusive) min col coordinate of domain:param max_row: (inclusive) max row coordinate of domain:param max_col: (inclusive) max col coordinate of domain:param partition_grid: Tuple of (row_parts, col_parts) that defines thenumber of row and column partitions.​Examples:​generate_partitions(9, 9, (1, 1)) -> [((0, 10), (0, 10))]generate_partitions(9, 9, (1, 2)) ->[((0, 10), (0, 5)), ((0, 10), (5, 10))]generate_partitions(9, 9, (2, 2)) ->[((0, 5), (0, 5)), ((0, 5), (5, 10)), ((5, 10), (0, 5)),((5, 10), (5, 10))]generate_partitions(9, 9, (3, 2)) ->[((0, 4), (0, 5)),((0, 4), (5, 10)),((4, 8), (0, 5)),((4, 8), (5, 10)),((8, 10), (0, 5)),((8, 10), (5, 10))]"""nrows, ncols = max_row + 1 - min_row, max_col + 1 - min_colsr, sc = (int(math.ceil(float(nrows) / partition_grid[0])),int(math.ceil(float(ncols) / partition_grid[1])))partitions = []num_partitions = partition_grid[0] * partition_grid[1]for r in range(0, num_partitions):rmin = sr * (r // partition_grid[1]) + min_rowrmax = min(max_row, rmin + sr - 1)cmin = sc * (r % partition_grid[1]) + min_colcmax = min(max_col, cmin + sc - 1)if rmin > rmax or cmin > cmax:breakpartition = ((rmin, rmax), (cmin, cmax))partitions.append(partition)return partitions​​def parallel_mean(min_row, min_col, max_row, max_col, partition_grid):"""Computes the arithmetic mean, in parallel, over a subarray.​:param max_row: (0-based, inclusive) max row coordinate of subarray:param max_col: (0-based, inclusive) max col coordinate of subarray:param partition_grid: Tuple of (row_parts, col_parts) that defines thenumber of row and column partitions."""num_cells = (max_row - min_row + 1) * (max_col - min_col + 1)num_partitions = partition_grid[0] * partition_grid[1]print("num_cells={}".format(num_cells))​def compute_partition(data):"""Computes the arithmetic mean of a partition of an array."""return np.sum(data['a']) / num_cells​# Partition domain across configured "grid"partitions = generate_partitions(min_row, min_col, max_row, max_col, partition_grid)​# Arrayarray_uri = 'tiledb://TileDB-Inc/quickstart_dense'​# Submit each partition as a separate job.results = []for p in partitions:results.append(DelayedArrayUDF(array_uri, compute_partition, [p[0], p[1]]))​# Return the overall mean, which is the sum of the partial results.overall_mean = Delayed(np.sum, results)return overall_mean
Once you've loaded the above module into your Python session, you can use it quite simply to dispatch a parallel mean computation using configurable partitioning. For example,
rowmin, colmin, rowmax, colmax = 1, 1, 4, 4res = parallel_mean(rowmin, colmin, rowmax, colmax, (4, 2))res = res.compute()print('The mean of attribute "a" in subarray [{}:{}, {}:{}] is {:.3f}'.format(rowmin, rowmax, colmin, colmax, res))
This example computes the mean of attribute a
on the subarray [1:4, 1:4]
using a partitioning scheme of 4, 2. This means that there are 4 partitions formed over rows, and 2 over columns, for a total of 8 partitions. One UDF task will be submitted per partition.
Because floating point arithmetic is not associative, the arithmetic mean as implemented above may give slightly different results with different partitioning. This is often acceptable, but care should be taken for your particular use case.