Task Graphs

Generic Functions

Any Python function can be wrapped in a

`Delayed`

object making the function executable as a future.Python

from tiledb.cloud.compute import Delayed

import numpy

# Wrap numpy median in a delayed object

x = Delayed(numpy.median)

# It can be called like a normal function to set the parameters

# Note at this point the function does not get executed since it

# is of "delayed" type

x([1,2,3,4,5])

# To force execution and get the result call `compute()`

print(x.compute())

SQL and Arrays

Besides arbitrary Python functions, serverless SQL queries and array UDFs can also be called with the delayed API.

Python

from tiledb.cloud.compute import DelayedSQL, DelayedArrayUDF

import numpy

3

4

# SQL

y = DelayedSQL("select AVG(`a`) FROM `tiledb://TileDB-Inc/quickstart_sparse`")

7

# Run query

print(y.compute())

10

# Array

11

z = DelayedArrayUDF("tiledb://TileDB-Inc/quickstart_sparse",

lambda x: numpy.average(x["a"]))([(1, 4), (1, 4)])

14

# Run the UDF on the array

z.compute()

Local Functions

It is also possible to include a generic Python function as delayed, but have it run locally instead of serverless on TileDB Cloud. This is useful for testing or for saving finalized results to your local machine, e.g., saving an image.

Python

from tiledb.cloud.compute import Delayed

import numpy

3

4

# Set the `local` argument to `True`

local = Delayed(numpy.median, local=True)([1,2,3])

7

# This will compute locally

local.compute()

Task Graphs

Delayed objects can be combined into a *task graph*, which is typically a directed acyclic graph (DAG). The output from one function or query can be passed into another, and dependencies are automatically determined.

Python

from tiledb.cloud.compute import DelayedArrayUDF, Delayed, DelayedSQL

import numpy

3

4

# Build several delayed objects to define a graph

# Note that package numpy is aliased as np in the UDFs

6

local = Delayed(lambda x: x * 2, local=True)(100)

7

array_apply = DelayedArrayUDF("tiledb://TileDB-Inc/quickstart_sparse",

lambda x: np.sum(x["a"]), name="array_apply")([(1, 4), (1, 4)])

9

sql = DelayedSQL("select SUM(`a`) as a from `tiledb://TileDB-Inc/quickstart_dense`"), name="sql")

11

# Custom function for averaging all the results we are passing in

def mean(local, array_apply, sql):

return np.mean([local, array_apply, sql.iloc(0)[0]])

14

# This is essentially a task graph that looks like

# mean

# / | \

# / | \

# local array_apply sql

#

# The `local`, `array_apply` and `sql` tasks will computed first,

# and once all three are finished, `mean` will computed on their results

23

res = Delayed(func_exec=mean, name="node_exec")(local, array_apply, sql)

print(res.compute())

Visualization

Any task graph created using the delayed API can be visualized with

`visualize()`

. The graph will be auto-updated by default as the computation progresses. If you wish to disable auto-updating, then simply set `auto_update=False`

as a parameter to `visualize()`

. If you are inside a Jupyter notebook, the graph will render as a widget. If you are not on the notebook, you can set `notebook=False`

as a parameter to render in a normal Python window.Python

res.visualize()

Manually Setting Task Dependencies

There are cases where you might have one function to depend on another without using its results directly. A common case is when one function manipulates data stored somewhere else (on S3 or a database). To facilitate this, we provide function

`depends_on`

.Python

# A few base functions:

import random

3

from tiledb.cloud.compute import Delayed

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

26

27

28

29

30

32

33

34

35

36

37

38

The above code, after the call to

`node_1.visualize()`

, produces a task graph similar to that shown below:Selecting Who to Charge

If you you are a member of an organization, then by default the organization is changed for your Delayed tasks. If you would like to charge the task to yourself, you just need to add one extra argument

`namespace`

. Python

import tiledb.cloud

3

tiledb.cloud.login(username="my_username", password="my_password")

# or tiledb.cloud.login(token="my_token")

6

res = DelayedSQL("select `rows`, AVG(a) as avg_a from `tiledb://TileDB-Inc/quickstart_dense` GROUP BY `rows`"

namespace ="my_username", # who to charge the query to

)

You can also set who to charge for the entire task graph instead of individual Delayed objects. This is often useful when building a large task graph to avoid having to set the extra parameter on every object. Taking the example above, you just pass

`namespace="my_username"`

to the `compute`

call.Python

import tiledb.cloud.compute

import numpy

4

# Build several delayed objects to define a graph

local = Delayed(lambda x: x * 2, local=True)(100)

array_apply = DelayedArrayUDF("tiledb://TileDB-Inc/quickstart_sparse",

lambda x: numpy.sum(x["a"]), name="array_apply")([(1, 4), (1, 4)])

sql = DelayedSQL("select SUM(`a`) as a from `tiledb://TileDB-Inc/quickstart_dense`"), name="sql")

9

# Custom function to use to average all the results we are passing in

def mean(local, array_apply, sql):

import numpy

return numpy.mean([local, array_apply, sql.iloc(0)[0]])

15

res = Delayed(func_exec=mean, name="node_exec")(local, array_apply, sql)

17

# Set all tasks to run under your username

print(res.compute(namespace="my_username"))

