TileDB-VCF's Spark API offers a
DataSourceV2 data source to read from TileDB-VCF datasets into a Spark dataframe. To begin, launch a Spark shell with:
spark-shell --jars build/libs/TileDB-VCF-Spark-0.1.0-SNAPSHOT.jar
Depending on the size of the dataset and the size of the query results, you may need to increase the configured memory from the defaults, e.g.:
spark-shell --jars build/libs/TileDB-VCF-Spark-0.1.0-SNAPSHOT.jar --driver-memory 16g --executor-memory 16g
While the Spark API offers mostly the same functionality as the CLI, the main considerations when using the Spark API are the use of dataframe partitioning, and memory overheads.
There are two ways to partition a TileDB-VCF dataframe, which can be used separately or together:
Partitioning over samples (
Partitioning over genomic regions (
Conceptually, these correspond to partitioning over rows and columns, respectively, in the underlying TileDB array. For example, if you are reading a subset of 200 samples in a dataset, and specify 10 sample partitions, Spark will create 10 jobs, each of which will be responsible for handling the export from 20 of the 200 selected samples.
Similarly, if the BED file you specify for export contains 1,000,000 regions, and you specify 10 region partitions, Spark will create 10 jobs, each of which will be responsible for handling the export of 100,000 of the 1,000,000 regions (across all samples).
These two partitioning parameters can be composed to form rectangular regions of work to distribute across the available Spark executors.
Because TileDB-VCF is implemented as a native library, the Spark API makes use of both JVM on-heap and off-heap memory. This can make it challenging to correctly tune the memory consumption of each Spark job.
.option("memory", mb) option is used as a best-effort memory budget that any particular job is allowed to consume, across on-heap and off-heap allocations. Increasing the available memory with this option can result in large performance improvements, provided the executors are configured with sufficient memory to avoid OOM failures.
The details of how this parameter is used can be seen in the reading section on buffer allocation.
To export a few genomic regions from a dataset (which must be accessible by the Spark jobs; here we assume it is located on S3):
val df = spark.read.format("io.tiledb.vcf")\.option("uri", "s3://my-bucket/my-dataset")\.option("regions", "chr1:1000-2000,chr2:500-501")\.option("samples", "sampleA,sampleB,sampleD")\.option("range_partitions", 2).load()df.createOrReplaceTempView("vcf")spark.sql("select contig, posStart, alleles, genotype from vcf").show
Because there are only two regions specified, and two region partitions, each of the two Spark jobs started will read one of the given regions.
We can also place a list of sample names and a list of genomic regions to read in explicit text files, and use those during reading:
val df = spark.read.format("io.tiledb.vcf")\.option("uri", "s3://my-bucket/my-dataset")\.option("bedfile", "s3://my-bucket/query.bed")\.option("samplefile", "s3://my-bucket/sample-names.txt")\.option("memory", 8*1024)\.option("sample_partitions", 10)\.option("range_partitions", 2).load()df.createOrReplaceTempView("vcf")spark.sql("select contig, posStart, alleles, genotype from vcf").show
Here, we've also increased the memory budget to 8GB, and added partitioning over samples.