Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Spark is a very popular analytics engine for large-scale data processing. It allows users to create distributed arrays and dataframes, use machine learning libraries, perform SQL queries, etc. TileDB-Spark is TileDB's datasource driver for Spark, which allows the user to create distributed Spark dataframes from TileDB arrays and, thus, process TileDB data with familiar tooling at great scale with minimum code changes.
TileDB offers a prebuilt uber jar that contains all dependencies. This can be used on most Spark clusters to enable the TileDB-Spark datasource driver.
The latest jars can be downloaded from Github.
Compiling TileDB-Spark from source is simple:
This will create a jar file /path/to/TileDB-Spark/build/libs/tiledb-spark-<version>.jar
.
To launch a spark shell with TileDB-Spark enabled simply point Spark to the jar you have obtained:
We have created two handy scripts for setting up TileDB-Spark and on an EMR cluster. Arrow is optional but will increase performance if you use or .
EMR requires that the bootstrap scripts be copied to an S3 bucket. You can sync the scripts from our repo to S3 as follows:
Create the EMR cluster as follows:
From the AWS EMR console, click on "Create Cluster".
Click on link "Go to advanced options".
In Step 1, make sure Spark is selected.
In Step 3, click on "Bootstrap Actions", then select a custom action, and click on "Configure and add". For the "Script location", you will need to point to where you have uploaded the bootstrap scripts, eg. s3://my_bucket/path/emr_bootstrap/install-tiledb-spark.sh
.
Continue to create the cluster. It typically takes 10-20 minutes for the cluster to be ready.
Follow the same procedure as above, but in Step 3 add one more bootstrap action, providing the location of our CRAN packages script, e.g., s3://my_bucket/path/emr_bootstrap/install-cran-packages.sh
. Moreover, under, "Optional arguments" you must add --packages arrow
(potentially adding any other CRAN package of your choice).
TileDB-Spark provides a metric source to collect timing and input metric details. This can be helpful in tracking performance of TileDB and the TileDB-Spark driver.
In Step 1 of the EMR launch cluster console, there is a section "Edit software settings". Paste the following json config, which will enable the spark metrics source from TileDB:
Spark and TileDB have slight variations in their supported datatypes. This table below shows a mapping between the (core) TileDB and Spark datatypes for easy reference.
You can create a new TileDB array from an existing Spark dataframe as follows. See for a summary on the options you can use.
You can write a Spark dataframe to an existing TileDB array by simply adding an "append" mode.
You can run SQL queries with Spark on TileDB arrays as follows:
The TileDB-Spark data source allows you to specify a partition count when reading a TileDB Array into a distributed Spark dataframe, via the partition_count
option. An example is shown below. This creates evenly sized partitions across all array dimensions, based on the volume of the subarray, in order to balance the computational load across the workers.
You can read a TileDB array into a Spark dataframe as follows. See for a summary on the options you can use.
TileDB DataType | Spark SQL Datatype |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Option | Required | Description |
| No | Result layout order. It can be |
| No | Set the TileDB read buffer size in bytes per attribute/coordinates. Defaults to 10MB |
| No | If the read buffer size is too small allow reallocation. Default: True |
| No | Number of partitions. |
Reporting metrics is supported via dropwizard
and the default Spark metrics setup. The metrics can be enabled by adding the following lines to your/etc/spark/conf/metric.properties
file:
When loading an application jar (i.e. via the --jar
CLI flag when launching a Spark shell) the metrics are available to the master node and the driver
metrics will report. However, the executors will error about a class not found. This is because on each worker node a jar containing the org.apache.spark.metrics.TileDBMetricsSource
must be provided in the class path. To address this, you must copy our dedicated path/to/TileDB-Spark/build/libs/tiledb-spark-metrics-<version>.jar
to $SPARK_HOME/jars/
.
Option | Required | Description |
| Yes | URI of a TileDB sparse or dense array (required) |
| No |
Option | Required | Description |
| No | Set the TileDB read buffer size in bytes per attribute/coordinates. Defaults to 10MB |
| Yes | Specify which of the Spark dataframe columns will be dimension |
| No | Specify the lower bound for the TileDB domain on dimension |
| No | Specify the upper bound for the TileDB domain on dimension |
| No | Specify the tile extent on dimension |
| No | Specify a filter list for attribute |
| No | Specify the tile capacity for sparse fragments. |
| No | Specify the tile order. |
| No | Specify the cell order. |
| No | Specify the coordinates filter list. The filter list is a list of tuples of the form |
| No | Specify the offsets filter list. The filter list is a list of tuples of the form |
Spark has a large number of configuration parameters that can affect the performance of both the TileDB driver and the user application. In this page we provide some performance tuning tips.
TileDB-Spark uses TileDB-Java and the underlying C++ libtiledb
for parallel IO, compression and encryption. As such, for optimized read performance you should limit the Spark executors to one per machine and give each single executor all the resources of that machine.
Set the following Spark configuration parameters:
It is important to set the Spark task CPUs to be the same as the number of executor cores. This prevents Spark from putting more than one read partition on each machine. The executor memory is set to 80% of available memory to allow for overhead on the host itself.
If using Yarn, the above configuration parameters are likely not enough. You will need to also configure Yarn similarly.
There are two main TileDB to tweak for optimizing reads, partition_count
and read_buffer_size
. The partition_count
should be set to the number of executors, which is the number of machines in the cluster.
read_buffer_size
should be set to at least 104857600 (100MB). Larger read buffers are critical to reduce the number of . The maximum size of the read buffers is limited based on the available RAM. If you use Yarn, the maximum buffer is also constrained by spark.yarn.executor.memoryOverhead
. TileDB read/write buffers are stored off-heap.
There are applications that rely on using multiple Spark tasks for parallelism, constraining each task to run on a single thread. This is common for most PySpark and SparkR applications. Below we describe how to configure Spark and the TileDB data source for optimized performance in this case.
Set the following Spark configuration parameters:
If you use Yarn, the above configuration parameters are likely not enough. You will need to also configure Yarn similarly.
The read_buffer_size
should be set to the largest value possible given the executor's available memory. TileDB typically has a memory overhead of 3x, and therefore 3 * read_buffer_size
should be less than the Spark's off-heap maximum memory. If you use Yarn, this value is defined in spark.yarn.executor.memoryOverhead
. A default value of 10MB
is usually sufficient.
Thepartition_count
should be set to a value of data size being read
/ read_buffer_size
. If the data size is not known, then set the partition count to the number of executors. This might lead to over partitioning, as such you might want to try different sizes until you find an optimal size for your dataset.
Finally, it is important to set several of the TileDB parallelism configuration parameters in the Spark option()
dataframe commands upon reading:
Set a , e.g., option("tiledb.vfs.num_threads", 4)
.