PDAL

Installation

You can run the PDAL and TileDB code as follows:

docker run -it --rm -u 0 -v /local/path:/data -w /data tiledb/tiledb-geospatial /bin/bash

Ingesting LAS Files

First, create a TileDB config file tiledb.config where you can set any TileDB configuration parameter (e.g., AWS keys if you would like to write to a TileDB array on S3). Make sure you also add the following, as currently TileDB does not handle duplicate points (this will change in a future version).

sm.dedup_coords true

Then create a PDAL pipeline to translate some LAS data to a TileDB array, by storing the following in a file called pipeline.json:

[
    {
      "type":"readers.las",
      "filename":"lrf_ws_epsg6341_nad832011utmz12_navd88_epoch2010_264000_4909000.laz"
    },
    {
      "type":"writers.tiledb",
      "config_file":"tiledb.config",
      "compression":"zstd",
      "compression_level": 75,
      "chunk_size": 50000,
      "array_name":"sample_array"
    }
]

You can execute the pipeline with PDAL that will carry out the ingestion as follows:

pdal pipeline -i pipeline.json

We now have points and attributes stored in an array called sample_array. This write uses the streaming mode of PDAL.

You can view this sample_array directly from TileDB as follows (we demonstrate using TileDB's Python API, but any other API would work as well):

import numpy as np
import pptk
import tiledb

ctx = tiledb.Ctx()

# Open the array and read from it.
with tiledb.SparseArray('sample_array', ctx=ctx, mode='r') as arr:
    # Get non-empty domain
    arr.nonempty_domain()
    # note that the attributes have different types
    arr.dump()
    data = arr[:]

data
    
coords = np.array([np.asarray(list(t)) for t in data['coords']])
v = pptk.viewer(coords, coords[:, 2])

Parallel Writes

PDAL is single-threaded, but coupled with TileDB's parallel write support, can become a powerful tool for ingesting enormous amounts of point cloud data into TileDB. The PDAL driver supports appending to an existing dataset and we use this with Dask to create a parallel update.

We demonstrate parallel ingestion with the code below. Make sure to remove or move the sample_array created in the previous example.

import glob

from dask.distributed import Client
import pdal


def update(json):
  pipeline = pdal.Pipeline(json)
  pipeline.loglevel = 8 #really noisy
  return pipeline.execute()


json = """
[
    {
      "type":"readers.las",
      "filename":"%s"
    },
    {
      "type":"writers.tiledb",
      "config_file":"tiledb.config",
      "chunk_size": 50000,
      "array_name":"sample_array",
      "append": %s
    }
 ]
 """

client = Client(threads_per_worker=6, n_workers=1)
point_clouds = glob.glob('*.laz')
bAppend = False

for f in point_clouds:
    manifest = json % (f, str(bAppend).lower())
    f = client.submit(update, manifest)
    if not bAppend:
      # block for schema creation and initial load
      f.result()

    bAppend = True

Parallel Reads

Although the TileDB driver is parallel (i.e., it uses multiple threads for decompression and IO), PDAL is single-threaded and therefore some tasks may benefit from additional boosting. Take for instance the following PDAL command that counts the number of points in the dataset using the TileDB driver.

pdal info --driver readers.tiledb --readers.tiledb.array_name=sample_array -i sample_array

We can write a simple script in Python with Dask and direct access to TileDB to perform the same operation completely in parallel:

from dask.distributed import Client, progress
import math
import numpy as np
import pdal
import tiledb

def count(array_name, x1, x2, y1, y2, z1, z2):
  with tiledb.SparseArray(array_name, 'r') as arr:
    return arr.domain_index[x1:x2, y1:y2, z1:z2]['coords'].shape[0]

if __name__ == "__main__":
  client = Client(threads_per_worker=6, n_workers=1)
  array_name = 'sample_array'

  jobs = []
  tile_div = 6

  jobs = []

  with tiledb.SparseArray(array_name, 'r') as arr:
    xs, ys, zs = arr.nonempty_domain()
    tile_x = math.ceil((xs[1] - xs[0]) / tile_div)
    x1 = xs[0]

    for i in range(tile_div):
      x2 = min(xs[0] + ((i + 1) * tile_x), xs[1])

      if x1 > x2:
        continue

      f = client.submit(count, array_name, x1, x2, *ys, *zs)
      jobs.append(f)

      x1 = np.nextafter(x2, x2 + 1)

    results = client.gather(jobs)
    total = sum(results)
    print(f"Total points: {total}")

In both cases we get the answer of 31530863 (for a 750MB compressed array). With single-threaded PDAL, the output from the time command is the following on m5a.2xlargemachine on AWS:

real	1m15.267s
user	1m46.927s
sys	  0m2.410s

The above Python script using Dask is significantly faster:

real	0m9.902s
user	0m1.465s
sys	  0m0.216s

Last updated