Task Graphs

Generic Functions

Any Python/R function can be wrapped in a Delayed object, making the function executable as a future.
Python
R
1
from tiledb.cloud.compute import Delayed
2
import numpy
3
4
# Wrap numpy median in a delayed object
5
x = Delayed(numpy.median)
6
7
# It can be called like a normal function to set the parameters
8
# Note at this point the function does not get executed since it
9
# is of "delayed" type
10
x([1,2,3,4,5])
11
12
# To force execution and get the result call `compute()`
13
print(x.compute())
Copied!
1
library(tiledbcloud)
2
3
# Wrap median in a delayed object
4
x = delayed(median)
5
6
# You can set the parameters. Note at this point the function does not
7
# get executed since it # is of "delayed" type.
8
delayed_args(x) <- list(c(1,2,3,4,5))
9
10
# To force execution and get the result call `compute()`
11
print(compute(x))
Copied!

SQL and Arrays

Besides arbitrary Python/R functions, serverless SQL queries and array UDFs can also be called with the delayed API.
Python
R
1
from tiledb.cloud.compute import DelayedSQL, DelayedArrayUDF
2
import numpy
3
4
# SQL
5
y = DelayedSQL("select AVG(`a`) FROM `tiledb://TileDB-Inc/quickstart_sparse`")
6
7
# Run query
8
print(y.compute())
9
10
# Array
11
z = DelayedArrayUDF("tiledb://TileDB-Inc/quickstart_sparse",
12
lambda x: numpy.average(x["a"]))([(1, 4), (1, 4)])
13
14
# Run the UDF on the array
15
z.compute()
Copied!
1
library(tiledbcloud)
2
3
y = delayed_sql("select AVG(`a`) FROM `tiledb://TileDB-Inc/quickstart_sparse`")
4
5
# Run query
6
print(compute(y))
7
8
# Array
9
z = delayed_array_udf(
10
"tiledb://TileDB-Inc/quickstart_sparse",
11
function(x) mean(x[["a"]]),
12
selectedRanges=list(cbind(1,4), cbind(1,4)),
13
attrs=c("a")
14
)
15
16
# Run the UDF on the array
17
print(compute(z))
Copied!

Local Functions

It is also possible to include a generic Python function as delayed, but have it run locally instead of serverless on TileDB Cloud. This is useful for testing or for saving finalized results to your local machine, e.g., saving an image.
Python
R
1
from tiledb.cloud.compute import Delayed
2
import numpy
3
4
# Set the `local` argument to `True`
5
local = Delayed(numpy.median, local=True)([1,2,3])
6
7
# This will compute locally
8
local.compute()
Copied!
1
library(tiledbcloud)
2
3
# Wrap median in a delayed object
4
x = delayed(median)
5
6
# You can set the parameters. Note at this point the function does not
7
# get executed since it # is of "delayed" type.
8
delayed_args(x) <- list(c(1,2,3))
9
10
# To force execution and get the result call `compute()`
11
print(compute(x, force_all_local=TRUE))
Copied!

Task Graphs

Delayed objects can be combined into a task graph, which is typically a directed acyclic graph (DAG). The output from one function or query can be passed into another, and dependencies are automatically determined.
Python
R
1
from tiledb.cloud.compute import DelayedArrayUDF, Delayed, DelayedSQL
2
import numpy
3
4
# Build several delayed objects to define a graph
5
# Note that package numpy is aliased as np in the UDFs
6
local = Delayed(lambda x: x * 2, local=True)(100)
7
array_apply = DelayedArrayUDF("tiledb://TileDB-Inc/quickstart_sparse",
8
lambda x: np.sum(x["a"]), name="array_apply")([(1, 4), (1, 4)])
9
sql = DelayedSQL("select SUM(`a`) as a from `tiledb://TileDB-Inc/quickstart_dense`"), name="sql")
10
11
# Custom function for averaging all the results we are passing in
12
def mean(local, array_apply, sql):
13
return np.mean([local, array_apply, sql.iloc(0)[0]])
14
15
# This is essentially a task graph that looks like
16
# mean
17
# / | \
18
# / | \
19
# local array_apply sql
20
#
21
# The `local`, `array_apply` and `sql` tasks will computed first,
22
# and once all three are finished, `mean` will computed on their results
23
res = Delayed(func_exec=mean, name="node_exec")(local, array_apply, sql)
24
print(res.compute())
Copied!
1
library(tiledbcloud)
2
3
# Build several delayed objects to define a graph
4
5
# Locally executed; simple enough
6
local = delayed(function(x) { x*2 }, local=TRUE)
7
delayed_args(local) <- list(100)
8
9
# Array UDF -- we specify selected ranges and attributes, then do some R on the
10
# dataframe which the UDF receives
11
array_apply <- delayed_array_udf(
12
array="TileDB-Inc/quickstart_dense",
13
udf=function(df) { sum(as.vector(df[["a"]])) },
14
selectedRanges=list(cbind(1,4), cbind(1,4)),
15
attrs=c("a")
16
)
17
18
# SQL -- note the output is a dataframe, and values are all strings (MariaDB
19
# "decimal values") so we'll cast them to numeric later
20
sql = delayed_sql(
21
"select SUM(`a`) as a from `tiledb://TileDB-Inc/quickstart_dense`",
22
name="sql"
23
)
24
25
# Custom function for averaging all the results we are passing in
26
ourmean <- function(local, array_apply, sql) {
27
mean(c(local, array_apply, sql))
28
}
29
30
# This is essentially a task graph that looks like
31
# ourmean
32
# / | \
33
# / | \
34
# local array_apply sql
35
#
36
# The `local`, `array_apply` and `sql` tasks will computed first,
37
# and once all three are finished, `ourmean` will computed on their results.
38
# Note here we slot out the ansswer from the SQL dataframe using `[[...]]`,
39
# and also cast to numeric.
40
res <- delayed(ourmean, args=list(local, array_apply, as.numeric(sql[["a"]])))
41
print(compute(res, verbose=TRUE))
Copied!

Visualization

Any task graph created using the delayed API can be visualized with visualize(). The graph will be auto-updated by default as the computation progresses. If you wish to disable auto-updating, then simply set auto_update=False as a parameter to visualize(). If you are inside a Jupyter notebook, the graph will render as a widget. If you are not on the notebook, you can set notebook=False as a parameter to render in a normal Python window.
Python
1
res.visualize()
Copied!

Manually Setting Task Dependencies

There are cases where you might have one function to depend on another without using its results directly. A common case is when one function manipulates data stored somewhere else (on S3 or a database). To facilitate this, we provide function depends_on.
Python
1
# A few base functions:
2
import random
3
from tiledb.cloud.compute import Delayed
4
5
# Set three initial nodes
6
node_1 = Delayed(numpy.median, local=True, name="node_1")([1, 2, 3])
7
node_2 = Delayed(lambda x: x * 2, local=True, name="node_2")(node_1)
8
node_3 = Delayed(lambda x: x * 2, local=True, name="node_3")(node_2)
9
10
# Create a dictionary to hold the nodes so we can ranodmly pick dependencies
11
nodes_by_name= {'node_1': node_1, 'node_2': node_2, 'node_3': node_3}
12
13
#Function which sleeps for some time so we can see the graph in different states
14
def f():
15
import time
16
import random
17
time.sleep(random.randrange(0, 30))
18
return x
19
20
# Randomly add 96 other nodes to the graph. All of these will use the sleep function
21
for i in range(4, 100):
22
name = "node_{}".format(i)
23
node = Delayed(f, local=True, name=name)()
24
25
dep = random.randrange(1, i-1)
26
# Randomly set dependency on one other node
27
node_dep = nodes_by_name["node_{}".format(dep)]
28
# Force the dependency to be set
29
node.depends_on(node_dep)
30
31
nodes_by_name[name] = node
32
33
# You can call visualize on any member node and see the whole graph
34
node_1.visualize()
35
36
# Get the last function's results
37
node_99 = nodes_by_name["node_99"]
38
node_99.compute()
Copied!
The above code, after the call to node_1.visualize(), produces a task graph similar to that shown below:

Selecting Who to Charge

If you are a member of an organization, then by default the organization is changed for your Delayed tasks. If you would like to charge the task to yourself, you just need to add one extra argument namespace.
Python
R
1
import tiledb.cloud
2
3
tiledb.cloud.login(username="my_username", password="my_password")
4
# or tiledb.cloud.login(token="my_token")
5
6
res = DelayedSQL("select `rows`, AVG(a) as avg_a from `tiledb://TileDB-Inc/quickstart_dense` GROUP BY `rows`"
7
namespace ="my_username", # who to charge the query to
8
)
Copied!
1
library (tiledbcloud)
2
3
tiledbcloud::login(username="my_username", password="my_password")
4
# or tiledbcloud::login(token="my_token")
5
6
res <- delayed_sql(
7
query="select `rows`, AVG(a) as avg_a from `tiledb://TileDB-Inc/quickstart_dense` GROUP BY `rows`",
8
namespace=namespace)
9
out <- compute(res, namespace) # You can also put the namespace here
10
str(out)
Copied!
You can also set who to charge for the entire task graph instead of individual Delayed objects. This is often useful when building a large task graph, to avoid having to set the extra parameter on every object. Taking the example above, you just pass namespace="my_username" to the compute call.
Python
R
1
import tiledb.cloud.compute
2
import numpy
3
4
# Build several delayed objects to define a graph
5
local = Delayed(lambda x: x * 2, local=True)(100)
6
array_apply = DelayedArrayUDF("tiledb://TileDB-Inc/quickstart_sparse",
7
lambda x: numpy.sum(x["a"]), name="array_apply")([(1, 4), (1, 4)])
8
sql = DelayedSQL("select SUM(`a`) as a from `tiledb://TileDB-Inc/quickstart_dense`"), name="sql")
9
10
# Custom function to use to average all the results we are passing in
11
def mean(local, array_apply, sql):
12
import numpy
13
return numpy.mean([local, array_apply, sql.iloc(0)[0]])
14
15
res = Delayed(func_exec=mean, name="node_exec")(local, array_apply, sql)
16
17
# Set all tasks to run under your username
18
print(res.compute(namespace="my_username"))
Copied!
1
library(tiledbcloud)
2
3
# Build several delayed objects to define a graph
4
5
# Locally executed; simple enough
6
local = delayed(function(x) { x*2 }, local=TRUE)
7
delayed_args(local) <- list(100)
8
9
# Array UDF -- we specify selected ranges and attributes, then do some R on the
10
# dataframe which the UDF receives
11
array_apply <- delayed_array_udf(
12
array="TileDB-Inc/quickstart_dense",
13
udf=function(df) { sum(as.vector(df[["a"]])) },
14
selectedRanges=list(cbind(1,4), cbind(1,4)),
15
attrs=c("a")
16
)
17
18
# SQL -- note the output is a dataframe, and values are all strings (MariaDB
19
# "decimal values") so we'll cast them to numeric later
20
sql = delayed_sql(
21
"select SUM(`a`) as a from `tiledb://TileDB-Inc/quickstart_dense`",
22
name="sql"
23
)
24
25
# Custom function for averaging all the results we are passing in
26
ourmean <- function(local, array_apply, sql) {
27
mean(c(local, array_apply, sql))
28
}
29
30
# This is essentially a task graph that looks like
31
# ourmean
32
# / | \
33
# / | \
34
# local array_apply sql
35
#
36
# The `local`, `array_apply` and `sql` tasks will computed first,
37
# and once all three are finished, `ourmean` will computed on their results.
38
# Note here we slot out the ansswer from the SQL dataframe using `[[...]]`,
39
# and also cast to numeric.
40
res <- delayed(ourmean, args=list(local, array_apply, as.numeric(sql[["a"]])))
41
print(compute(res, namespace=namespace, verbose=TRUE))
Copied!