Task Graphs

Task Graphs

Delayed objects can be combined into a task graph, which is typically a directed acyclic graph (DAG). The output from one function or query can be passed into another, and dependencies are automatically determined.

from tiledb.cloud.compute import DelayedArrayUDF, Delayed, DelayedSQL
import numpy as np

# Build several delayed objects to define a graph
# Note that package numpy is aliased as np in the UDFs
local = Delayed(lambda x: x * 2, local=True)(100)
array_apply = DelayedArrayUDF("tiledb://TileDB-Inc/quickstart_sparse", 
        lambda x: np.sum(x["a"]), name="array_apply")([(1, 4), (1, 4)])
sql = DelayedSQL("select SUM(`a`) as a from `tiledb://TileDB-Inc/quickstart_dense`", name="sql")

# Custom function for averaging all the results we are passing in
def mean(local, array_apply, sql):
    return np.mean([local, array_apply, sql.iloc(0)[0]])

# This is essentially a task graph that looks like
#                 mean
#          /       |      \
#         /        |       \    
#      local  array_apply  sql 
#
# The `local`, `array_apply` and `sql` tasks will computed first,
# and once all three are finished, `mean` will computed on their results
res = Delayed(func_exec=mean, name="node_exec")(local, array_apply, sql)
print(res.compute())

Modes of Operation

Realtime

The default mode of operation, realtime, is designed to return results directly to the client emphasis low latency. Realtime task graphs are scheduled and executed immediately and are well suited for fast distributed workloads.

Batch

In contrast to realtime task graphs, batch task graphs are designed for large, resource intensive asynchronous workloads. Batch task graphs are defined, uploaded, and scheduled for execution and are well suited for ingestion-style workloads.

Setting the Mode

The mode can be set for any of the APIs by passing in a mode parameter. Accepted values are BATCH or REALTIME

Delayed API mode:

batch_function = Delayed(numpy.median, mode=tiledb.cloud.dag.Mode.BATCH)

realtime_function = Delayed(numpy.median, mode=tiledb.cloud.dag.Mode.REALTIME)

Task Graph API mode:

batch_dag = tiledb.cloud.dag.DAG(mode=tiledb.cloud.dag.Mode.BATCH)

realtime_dag = dag = tiledb.cloud.dag.DAG(mode=tiledb.cloud.dag.Mode.REALTIME)

Generic Functions

Any Python/R function can be wrapped in a Delayed object, making the function executable as a future.

from tiledb.cloud.compute import Delayed
import numpy

# Wrap numpy median in a delayed object
x = Delayed(numpy.median)

# It can be called like a normal function to set the parameters
# Note at this point the function does not get executed since it
# is of "delayed" type
x([1,2,3,4,5])

# To force execution and get the result call `compute()`
print(x.compute())

SQL and Arrays

Besides arbitrary Python/R functions, serverless SQL queries and array UDFs can also be called with the delayed API.

from tiledb.cloud.compute import DelayedSQL, DelayedArrayUDF
import numpy

# SQL
y = DelayedSQL("select AVG(`a`) FROM `tiledb://TileDB-Inc/quickstart_sparse`")

# Run query
print(y.compute())

# Array
z = DelayedArrayUDF("tiledb://TileDB-Inc/quickstart_sparse",
        lambda x: numpy.average(x["a"]))([(1, 4), (1, 4)])

# Run the UDF on the array
z.compute()

Local Functions

It is also possible to include a generic Python function as delayed, but have it run locally instead of serverless on TileDB Cloud. This is useful for testing or for saving finalized results to your local machine, e.g., saving an image.

from tiledb.cloud.compute import Delayed
import numpy

# Set the `local` argument to `True`
local = Delayed(numpy.median, local=True)([1,2,3])

# This will compute locally
local.compute()

Visualization

Any task graph created using the delayed API can be visualized with visualize(). The graph will be auto-updated by default as the computation progresses. If you wish to disable auto-updating, then simply set auto_update=False as a parameter to visualize(). If you are inside a Jupyter notebook, the graph will render as a widget. If you are not on the notebook, you can set notebook=False as a parameter to render in a normal Python window.

res.visualize()

Retrying Functions

If a function fails or you cancel it, you can manually retry the given node with the .retry method, or retry all failed nodes in a DAG with .retry_all(). Each retry call retries a node once.

from tiledb.cloud.compute import Delayed

# Retry one node:

flaky_node = Delayed(flaky_func)(my_data)
final_output = Delayed(process_output)(flaky_node)

data = final_output.compute()
# -> Raises an exception since flaky_node failed.

flaky_node.retry()
data = final_output.result()

# Retry many nodes:

flaky_inputs = [Delayed(flaky_func)(shard) for shard in input_shards]
combined = Delayed(merge_outputs)(flaky_inputs)

combined.compute()
# -> Raises an exception since some of the flaky inputs failed.

combined.dag.retry_all()
combined.dag.wait()

data = combined.result()

Canceling Task Graph

If you have a task graph that is running, you can cancel it with the the .cancel() function on the dag or delayed object.

import tiledb.cloud

def my_func():
  import time
  time.sleep(120)
  return

batch_dag = tiledb.cloud.dag.DAG(mode=tiledb.cloud.dag.Mode.BATCH)
result = batch_dag.submit(my_func)
# Start task graph
batch_dag.compute()


# Cancel Task Graph
batch_dag.cancel()

Advanced Usage

Manually Setting Delayed Task Dependencies

There are cases where you might have one function to depend on another without using its results directly. A common case is when one function manipulates data stored somewhere else (on S3 or a database). To facilitate this, we provide function depends_on.

# A few base functions:
import random
from tiledb.cloud.compute import Delayed

# Set three initial nodes
node_1 = Delayed(numpy.median, local=True, name="node_1")([1, 2, 3])
node_2 = Delayed(lambda x: x * 2, local=True, name="node_2")(node_1)
node_3 = Delayed(lambda x: x * 2, local=True, name="node_3")(node_2)

# Create a dictionary to hold the nodes so we can ranodmly pick dependencies
nodes_by_name= {'node_1': node_1, 'node_2': node_2, 'node_3': node_3}

#Function which sleeps for some time so we can see the graph in different states
def f():
    import time
    import random
    time.sleep(random.randrange(0, 30))
    return x

# Randomly add 96 other nodes to the graph. All of these will use the sleep function
for i in range(4, 100):
    name = "node_{}".format(i)
    node = Delayed(f, local=True, name=name)()
    
    dep = random.randrange(1, i-1)
    # Randomly set dependency on one other node
    node_dep = nodes_by_name["node_{}".format(dep)]
    # Force the dependency to be set
    node.depends_on(node_dep)
    
    nodes_by_name[name] = node

# You can call visualize on any member node and see the whole graph
node_1.visualize()

# Get the last function's results
node_99 = nodes_by_name["node_99"]
node_99.compute()

The above code, after the call to node_1.visualize(), produces a task graph similar to that shown below:

Low Level Task Graph API

A lower level Task Graph API is provided which gives full control of building out arbitrary task graphs.

from tiledb.cloud.dag import dag
import numpy as np

# This is the same implementation which backs `Delayed`, but this interface
# is better suited more advanced use cases where full control is desired.
graph = dag.DAG()

# Define a graph 
# Note that package numpy is aliased as np in the UDFs
local = graph.submit(lambda x: x * 2, local=True, 100)
array_apply = graph.submit_array_udf(lambda x: np.sum(x["a"]),
       "tiledb://TileDB-Inc/quickstart_sparse", 
       name="array_apply", ranges=[(1, 4), (1, 4)])
sql = graph.submit_sql("select SUM(`a`) as a from `tiledb://TileDB-Inc/quickstart_dense`"), name="sql")

# Custom function for averaging all the results we are passing in
def mean(local, array_apply, sql):
    return np.mean([local, array_apply, sql.iloc(0)[0]])

# This is essentially a task graph that looks like
#                 mean
#          /       |      \
#         /        |       \    
#      local  array_apply  sql 
#
# The `local`, `array_apply` and `sql` tasks will computed first,
# and once all three are finished, `mean` will computed on their results
res = graph.submit(func_exec=mean, name="node_exec", local=local, array_apply=array_apply, sql=sql)
graph.compute()
graph.wait()

print(res.result())

Selecting Who To Charge

If you are a member of an organization, then by default the organization is changed for your Delayed tasks. If you would like to charge the task to yourself, you just need to add one extra argument namespace.

import tiledb.cloud

tiledb.cloud.login(username="my_username", password="my_password")
# or tiledb.cloud.login(token="my_token")

res = DelayedSQL("select `rows`, AVG(a) as avg_a from `tiledb://TileDB-Inc/quickstart_dense` GROUP BY `rows`"
      namespace ="my_username", # who to charge the query to
      )

# When using the Task Graph API set the namespace on the DAG object
dag = tiledb.cloud.dag.DAG(namespace="my_username")
dag.submit_sql("select `rows`, AVG(a) as avg_a from `tiledb://TileDB-Inc/quickstart_dense` GROUP BY `rows`")

You can also set who to charge for the entire task graph instead of individual Delayed objects. This is often useful when building a large task graph, to avoid having to set the extra parameter on every object. Taking the example above, you just pass namespace="my_username" to the compute call.

import tiledb.cloud.compute
import numpy

# Build several delayed objects to define a graph
local = Delayed(lambda x: x * 2, local=True)(100)
array_apply = DelayedArrayUDF("tiledb://TileDB-Inc/quickstart_sparse", 
        lambda x: numpy.sum(x["a"]), name="array_apply")([(1, 4), (1, 4)])
sql = DelayedSQL("select SUM(`a`) as a from `tiledb://TileDB-Inc/quickstart_dense`"), name="sql")

# Custom function to use to average all the results we are passing in
def mean(local, array_apply, sql):
    import numpy
    return numpy.mean([local, array_apply, sql.iloc(0)[0]])

res = Delayed(func_exec=mean, name="node_exec")(local, array_apply, sql)

# Set all tasks to run under your username
print(res.compute(namespace="my_username"))

Controlling the Number of Realtime Workers

Realtime Task Graphs are driven by the client. The client dispatches each task as a separate request and potentially will fetch and return results. These requests are all in parallel and the maximum number of requests is controlled by defining how many threads are allowed to execute. This defaults to min(32, os.cpu_count() + 4) in python. A function is provided to global configure this and allow a larger number of parallel requests and downloading of results to the client.

tiledb.cloud.client.client.set_threads(100)

Resource Specification

Batch task graphs allow you to specify resource requirements for CPU, Memory and GPUs for every individual task. In TileDB Cloud SaaS, GPUs leverage Nvidia V100 GPUs.

Resources can be passed directly to any of the Delayed or Task Graph submission APIs.

Delayed API

Delayed(numpy.median, mode=tiledb.cloud.dag.Mode.BATCH, resources={"cpu": "6", "memory": "12Gi", "gpus": "0"}_

Task Graph API

# Create batch dag
dag = tiledb.cloud.dag.DAG(mode=tiledb.cloud.dag.Mode.BATCH)

# Submit function specifying resoures
dag.submit(numpy.median, resources={"cpu": "6", "memory": "12Gi", "gpus": "0"}_

Last updated