# Task Graphs

Delayed objects can be combined into a

*task graph*, which is typically a directed acyclic graph (DAG). The output from one function or query can be passed into another, and dependencies are automatically determined.Python

R

from tiledb.cloud.compute import DelayedArrayUDF, Delayed, DelayedSQL

import numpy as np

# Build several delayed objects to define a graph

# Note that package numpy is aliased as np in the UDFs

local = Delayed(lambda x: x * 2, local=True)(100)

array_apply = DelayedArrayUDF("tiledb://TileDB-Inc/quickstart_sparse",

lambda x: np.sum(x["a"]), name="array_apply")([(1, 4), (1, 4)])

sql = DelayedSQL("select SUM(`a`) as a from `tiledb://TileDB-Inc/quickstart_dense`", name="sql")

# Custom function for averaging all the results we are passing in

def mean(local, array_apply, sql):

return np.mean([local, array_apply, sql.iloc(0)[0]])

# This is essentially a task graph that looks like

# mean

# / | \

# / | \

# local array_apply sql

#

# The `local`, `array_apply` and `sql` tasks will computed first,

# and once all three are finished, `mean` will computed on their results

res = Delayed(func_exec=mean, name="node_exec")(local, array_apply, sql)

print(res.compute())

library(tiledbcloud)

# Build several delayed objects to define a graph

# Locally executed; simple enough

local <- delayed(function(x) { x*2 }, local=TRUE)

delayed_args(local) <- list(100)

# Array UDF -- we specify selected ranges and attributes, then do some R on the

# dataframe which the UDF receives

array_apply <- delayed_array_udf(

array="TileDB-Inc/quickstart_sparse",

udf=function(df) { sum(as.vector(df[["a"]])) },

selectedRanges=list(cbind(1,4), cbind(1,4)),

attrs=c("a")

)

# SQL -- note the output is a dataframe, and values are all strings (MariaDB

# "decimal values") so we'll cast them to numeric later

sql <- delayed_sql(

"select SUM(`a`) as a from `tiledb://TileDB-Inc/quickstart_dense`",

name="sql"

)

# Custom function for averaging all the results we are passing in

ourmean <- function(local, array_apply, sql) {

mean(c(local, array_apply, as.numeric(sql[["a"]])))

}

# This is essentially a task graph that looks like

# ourmean

# / | \

# / | \

# local array_apply sql

#

# The `local`, `array_apply` and `sql` tasks will computed first,

# and once all three are finished, `ourmean` will computed on their results.

# Note here we slot out the answer from the SQL dataframe using `[[...]]`,

# and also cast to numeric.

res <- delayed(ourmean, args=list(local, array_apply, sql))

print(compute(res, verbose=TRUE))

The

**default**mode of operation, realtime, is designed to return results directly to the client emphasis low latency. Realtime task graphs are scheduled and executed immediately and are well suited for fast distributed workloads.In contrast to realtime task graphs, batch task graphs are designed for large, resource intensive asynchronous workloads. Batch task graphs are defined, uploaded, and scheduled for execution and are well suited for ingestion-style workloads.

The mode can be set for any of the APIs by passing in a

`mode`

parameter. Accepted values are `BATCH`

or `REALTIME`

Python

R

batch_function = Delayed(numpy.median, mode=tiledb.cloud.dag.Mode.BATCH)

realtime_function = Delayed(numpy.median, mode=tiledb.cloud.dag.Mode.REALTIME)

batch_function <- delayed(median, mode=tiledbcloud::BATCH)

realtime_function <- delayed(median, mode=tiledbcloud::REALTIME)

Python

R

batch_dag = tiledb.cloud.dag.DAG(mode=tiledb.cloud.dag.Mode.BATCH)

realtime_dag = dag = tiledb.cloud.dag.DAG(mode=tiledb.cloud.dag.Mode.REALTIME)

batch_dag = tiledbcloud::DAG(mode=tiledbcloud::BATCH)

realtime_dag = tiledbcloud::DAG(mode=tiledbcloud::REALTIME)

Any Python/R function can be wrapped in a

`Delayed`

object, making the function executable as a future.Python

R

from tiledb.cloud.compute import Delayed

import numpy

# Wrap numpy median in a delayed object

x = Delayed(numpy.median)

# It can be called like a normal function to set the parameters

# Note at this point the function does not get executed since it

# is of "delayed" type

x([1,2,3,4,5])

# To force execution and get the result call `compute()`

print(x.compute())

library(tiledbcloud)

# Wrap median in a delayed object

x = delayed(median)

# You can set the parameters. Note at this point the function does not

# get executed since it # is of "delayed" type.

delayed_args(x) <- list(c(1,2,3,4,5))

# To force execution and get the result call `compute()`

print(compute(x))

Besides arbitrary Python/R functions, serverless SQL queries and array UDFs can also be called with the delayed API.

Python

R

from tiledb.cloud.compute import DelayedSQL, DelayedArrayUDF

import numpy

# SQL

y = DelayedSQL("select AVG(`a`) FROM `tiledb://TileDB-Inc/quickstart_sparse`")

# Run query

print(y.compute())

# Array

z = DelayedArrayUDF("tiledb://TileDB-Inc/quickstart_sparse",

lambda x: numpy.average(x["a"]))([(1, 4), (1, 4)])

# Run the UDF on the array

z.compute()

library(tiledbcloud)

y = delayed_sql("select AVG(`a`) FROM `tiledb://TileDB-Inc/quickstart_sparse`")

# Run query

print(compute(y))

# Array

z = delayed_array_udf(

"tiledb://TileDB-Inc/quickstart_sparse",

function(x) mean(x[["a"]]),

selectedRanges=list(cbind(1,4), cbind(1,4)),

attrs=c("a")

)

# Run the UDF on the array

print(compute(z))

It is also possible to include a generic Python function as delayed, but have it run locally instead of serverless on TileDB Cloud. This is useful for testing or for saving finalized results to your local machine, e.g., saving an image.

Python

R

from tiledb.cloud.compute import Delayed

import numpy

# Set the `local` argument to `True`

local = Delayed(numpy.median, local=True)([1,2,3])

# This will compute locally

local.compute()

library(tiledbcloud)

# Wrap median in a delayed object

x = delayed(median)

# You can set the parameters. Note at this point the function does not

# get executed since it # is of "delayed" type.

delayed_args(x) <- list(c(1,2,3))

# To force execution and get the result call `compute()`

print(compute(x, force_all_local=TRUE))

Any task graph created using the delayed API can be visualized with

`visualize()`

. The graph will be auto-updated by default as the computation progresses. If you wish to disable auto-updating, then simply set `auto_update=False`

as a parameter to `visualize()`

. If you are inside a Jupyter notebook, the graph will render as a widget. If you are not on the notebook, you can set `notebook=False`

as a parameter to render in a normal Python window.Python

res.visualize()

If a function fails or you cancel it, you can manually retry the given node with the

`.retry`

method, or retry all failed nodes in a DAG with `.retry_all()`

. Each retry call retries a node once. Python

from tiledb.cloud.compute import Delayed

# Retry one node:

flaky_node = Delayed(flaky_func)(my_data)

final_output = Delayed(process_output)(flaky_node)

data = final_output.compute()

# -> Raises an exception since flaky_node failed.

flaky_node.retry()

data = final_output.result()

# Retry many nodes:

flaky_inputs = [Delayed(flaky_func)(shard) for shard in input_shards]

combined = Delayed(merge_outputs)(flaky_inputs)

combined.compute()

# -> Raises an exception since some of the flaky inputs failed.

combined.dag.retry_all()

combined.dag.wait()

data = combined.result()

There are cases where you might have one function to depend on another without using its results directly. A common case is when one function manipulates data stored somewhere else (on S3 or a database). To facilitate this, we provide function

`depends_on`

.Python

# A few base functions:

import random

from tiledb.cloud.compute import Delayed

# Set three initial nodes

node_1 = Delayed(numpy.median, local=True, name="node_1")([1, 2, 3])

node_2 = Delayed(lambda x: x * 2, local=True, name="node_2")(node_1)

node_3 = Delayed(lambda x: x * 2, local=True, name="node_3")(node_2)

# Create a dictionary to hold the nodes so we can ranodmly pick dependencies

nodes_by_name= {'node_1': node_1, 'node_2': node_2, 'node_3': node_3}

#Function which sleeps for some time so we can see the graph in different states

def f():

import time

import random

time.sleep(random.randrange(0, 30))

return x

# Randomly add 96 other nodes to the graph. All of these will use the sleep function

for i in range(4, 100):

name = "node_{}".format(i)

node = Delayed(f, local=True, name=name)()

dep = random.randrange(1, i-1)

# Randomly set dependency on one other node

node_dep = nodes_by_name["node_{}".format(dep)]

# Force the dependency to be set

node.depends_on(node_dep)

nodes_by_name[name] = node

# You can call visualize on any member node and see the whole graph

node_1.visualize()

# Get the last function's results

node_99 = nodes_by_name["node_99"]

node_99.compute()

The above code, after the call to

`node_1.visualize()`

, produces a task graph similar to that shown below:A lower level Task Graph API is provided which gives full control of building out arbitrary task graphs.

Python

from tiledb.cloud.dag import dag

import numpy as np

# This is the same implementation which backs `Delayed`, but this interface

# is better suited more advanced use cases where full control is desired.

graph = dag.DAG()

# Define a graph

# Note that package numpy is aliased as np in the UDFs

local = graph.submit(lambda x: x * 2, local=True, 100)

array_apply = graph.submit_array_udf(lambda x: np.sum(x["a"]),

"tiledb://TileDB-Inc/quickstart_sparse",

name="array_apply", ranges=[(1, 4), (1, 4)])

sql = graph.submit_sql("select SUM(`a`) as a from `tiledb://TileDB-Inc/quickstart_dense`"), name="sql")

# Custom function for averaging all the results we are passing in

def mean(local, array_apply, sql):

return np.mean([local, array_apply, sql.iloc(0)[0]])

# This is essentially a task graph that looks like

# mean

# / | \

# / | \

# local array_apply sql

#

# The `local`, `array_apply` and `sql` tasks will computed first,

# and once all three are finished, `mean` will computed on their results

res = graph.submit(func_exec=mean, name="node_exec", local=local, array_apply=array_apply, sql=sql)

graph.compute()

graph.wait()

print(res.result())

If you are a member of an organization, then by default the organization is changed for your Delayed tasks. If you would like to charge the task to yourself, you just need to add one extra argument

`namespace`

. Python

R

import tiledb.cloud

tiledb.cloud.login(username="my_username", password="my_password")

# or tiledb.cloud.login(token="my_token")

res = DelayedSQL("select `rows`, AVG(a) as avg_a from `tiledb://TileDB-Inc/quickstart_dense` GROUP BY `rows`"

namespace ="my_username", # who to charge the query to

)

# When using the Task Graph API set the namespace on the DAG object

dag = tiledb.cloud.dag.DAG(namespace="my_username")

dag.submit_sql("select `rows`, AVG(a) as avg_a from `tiledb://TileDB-Inc/quickstart_dense` GROUP BY `rows`")

library (tiledbcloud)

tiledbcloud::login(username="my_username", password="my_password")

# or tiledbcloud::login(token="my_token")

res <- delayed_sql(

query="select `rows`, AVG(a) as avg_a from `tiledb://TileDB-Inc/quickstart_dense` GROUP BY `rows`",

namespace=namespace)

out <- compute(res, namespace) # You can also put the namespace here

str(out)

You can also set who to charge for the entire task graph instead of individual Delayed objects. This is often useful when building a large task graph, to avoid having to set the extra parameter on every object. Taking the example above, you just pass

`namespace="my_username"`

to the `compute`

call.Python

R

import tiledb.cloud.compute

import numpy

# Build several delayed objects to define a graph

local = Delayed(lambda x: x * 2, local=True)(100)

array_apply = DelayedArrayUDF("tiledb://TileDB-Inc/quickstart_sparse",

lambda x: numpy.sum(x["a"]), name="array_apply")([(1, 4), (1, 4)])

sql = DelayedSQL("select SUM(`a`) as a from `tiledb://TileDB-Inc/quickstart_dense`"), name="sql")

# Custom function to use to average all the results we are passing in

def mean(local, array_apply, sql):

import numpy

return numpy.mean([local, array_apply, sql.iloc(0)[0]])

res = Delayed(func_exec=mean, name="node_exec")(local, array_apply, sql)

# Set all tasks to run under your username

print(res.compute(namespace="my_username"))

library(tiledbcloud)

# Build several delayed objects to define a graph

# Locally executed; simple enough

local = delayed(function(x) { x*2 }, local=TRUE)

delayed_args(local) <- list(100)

# Array UDF -- we specify selected ranges and attributes, then do some R on the

# dataframe which the UDF receives

array_apply <- delayed_array_udf(

array="TileDB-Inc/quickstart_dense",

udf=function(df) { sum(as.vector(df[["a"]])) },

selectedRanges=list(cbind(1,4), cbind(1,4)),

attrs=c("a")

)

# SQL -- note the output is a dataframe, and values are all strings (MariaDB

# "decimal values") so we'll cast them to numeric later

sql = delayed_sql(

"select SUM(`a`) as a from `tiledb://TileDB-Inc/quickstart_dense`",

name="sql"

)

# Custom function for averaging all the results we are passing in

ourmean <- function(local, array_apply, sql) {

mean(c(local, array_apply, sql))

}

# This is essentially a task graph that looks like

# ourmean

# / | \

# / | \

# local array_apply sql

#

# The `local`, `array_apply` and `sql` tasks will computed first,

# and once all three are finished, `ourmean` will computed on their results.

# Note here we slot out the ansswer from the SQL dataframe using `[[...]]`,

# and also cast to numeric.

res <- delayed(ourmean, args=list(local, array_apply, as.numeric(sql[["a"]])))

print(compute(res, namespace=namespace, verbose=TRUE))

Batch task graphs allow you to specify resource requirements for CPU, Memory and GPUs for every individual task. In TileDB Cloud SaaS, GPUs leverage Nvidia V100 GPUs.

Resources can be passed directly to any of the Delayed or Task Graph submission APIs.

Python

R

Delayed(numpy.median, mode=tiledb.cloud.dag.Mode.BATCH, resources={"cpu": "8", "memory": "12Gi", "gpus": "0"}_

delayed(median, mode=tiledbcloud::BATCH, resources={"cpu": "8", "memory": "12Gi", "gpus": "0"})

Python

R

dag.submit(numpy.median, mode=tiledb.cloud.dag.Mode.BATCH, resources={"cpu": "8", "memory": "12Gi", "gpus": "0"}_

delayed(median, mode=tiledbcloud::BATCH, resources={"cpu": "8", "memory": "12Gi", "gpus": "0"})

Last modified 5d ago