Parallel Computing

Through serverless UDFs, TileDB-Cloud offers you a way to easily execute parallel and distributed computations on your arrays, without having to set up a distributed computing environment.

For example, suppose you wanted to compute an arithmetic mean of an attribute a on a very large array. One option would be to perform the computation locally in a block-based or out-of-core fashion. With TileDB-Cloud, you can simply write a UDF to compute the mean of a particular chunk of the array, and submit all UDFs simultaneously to be computed.

The Python module below implements a parallel_mean() function that will use TileDB-Cloud UDFs to compute the arithmetic mean of a subarray. It assumes the array used is at the URI tiledb://<namespace>/my_dense_array, and assumes the attribute is named a.

import math
import os
import tiledb.cloud
import time
import concurrent.futures
from concurrent.futures import ProcessPoolExecutor
def generate_partitions(max_row, max_col, partition_grid):
"""Split the given domain into a grid of partitions.
:param max_row: (0-based, inclusive) max row coordinate of domain
:param max_col: (0-based, inclusive) max col coordinate of domain
:param partition_grid: Tuple of (row_parts, col_parts) that defines the
number of row and column partitions.
Examples:
generate_partitions(9, 9, (1, 1)) -> [((0, 10), (0, 10))]
generate_partitions(9, 9, (1, 2)) ->
[((0, 10), (0, 5)), ((0, 10), (5, 10))]
generate_partitions(9, 9, (2, 2)) ->
[((0, 5), (0, 5)), ((0, 5), (5, 10)), ((5, 10), (0, 5)),
((5, 10), (5, 10))]
generate_partitions(9, 9, (3, 2)) ->
[((0, 4), (0, 5)),
((0, 4), (5, 10)),
((4, 8), (0, 5)),
((4, 8), (5, 10)),
((8, 10), (0, 5)),
((8, 10), (5, 10))]
"""
nrows, ncols = max_row + 1, max_col + 1
sr, sc = (int(math.ceil(float(nrows) / partition_grid[0])),
int(math.ceil(float(ncols) / partition_grid[1])))
partitions = []
num_partitions = partition_grid[0] * partition_grid[1]
for r in range(0, num_partitions):
rmin = sr * (r // partition_grid[1])
rmax = min(max_row, rmin + sr - 1) + 1
cmin = sc * (r % partition_grid[1])
cmax = min(max_col, cmin + sc - 1) + 1
partition = ((rmin, rmax), (cmin, cmax))
partitions.append(partition)
return partitions
def gather(futures):
"""Waits for all futures to complete, and returns their results."""
results = []
for f in concurrent.futures.as_completed(futures):
results.append(f.result())
return results
def compute_partition(num_cells, row_bounds, col_bounds):
"""Computes the arithmetic mean of a partition of an array."""
array_uri = 'tiledb://<namespace>/my_dense_array'
with tiledb.DenseArray(array_uri, mode='r', ctx=tiledb.cloud.Ctx()) as A:
return A.apply(lambda d: np.sum(d['a']) / num_cells,
[row_bounds, col_bounds])
def parallel_mean(max_row, max_col, partition_grid):
"""Computes the arithmetic mean, in parallel, over a subarray.
:param max_row: (0-based, inclusive) max row coordinate of subarray
:param max_col: (0-based, inclusive) max col coordinate of subarray
:param partition_grid: Tuple of (row_parts, col_parts) that defines the
number of row and column partitions.
"""
num_cells = (max_row + 1) * (max_col + 1)
num_partitions = partition_grid[0] * partition_grid[1]
# Start pool of local executors
executor = ProcessPoolExecutor(max_workers=num_partitions)
# Partition domain across configured "grid"
partitions = generate_partitions(max_row, max_col, partition_grid)
# Submit each partition as a separate job.
results = gather(
[executor.submit(compute_partition, num_cells, p[0], p[1]) for p in
partitions])
# Return the overall mean, which is the sum of the partial results.
overall_mean = sum(results)
return overall_mean

Once you've loaded the above module into your Python session, you can use it quite simply to dispatch a parallel mean computation using configurable partitioning. For example,

rowmax, colmax = 9999, 9999
print('The mean of attribute "a" in subarray [0:{}, 0{}] is {:.3f}'.format(
rowmax + 1, colmax + 1, parallel_mean(rowmax, colmax, (4, 2))))

This example computes the mean of attribute a on the subarray [0:10000, 0:10000] using a partitioning scheme of 4, 2. This means that there are 4 partitions formed over rows, and 2 over columns, for a total of 8 partitions. One UDF task will be submitted per partition.

Because floating point arithmetic is not associative, the arithmetic mean as implemented above may give slightly different results with different partitioning. This is often acceptable, but care should be taken for your particular use case.