Data


Introduction

Analytics Zoo Orca data provides data-parallel pre-processing support for Python AI.

It supports data pre-processing from different data sources, like TensorFlow DataSet, PyTorch DataLoader, MXNet DataLoader, etc. and it supports various data formats, like Pandas DataFrame, Numpy, Images, Parquet, etc.

The distributed backend engine can be Spark or Ray. We now support Spark-based transformations to do the pre-processing, and provide functionality to seamlessly put data to Ray cluster for later training/inference on Ray.


XShards

XShards is a collection of data in Orca data API. We provide different backends(Spark and Ray) for XShards.

XShards General Operations

Pre-processing on XShards

You can do pre-processing with your customized function on XShards using below API:

transform_shard(func, *args)

This method would parallelly pre-process each element in the XShards with the customized function, and return a new XShards after transformation.

SharedValue

SharedValue can be used to give every node a copy of a large input dataset in an efficient manner. This is an example of using SharedValue:

def func(df, item_set)
   item_set = item_set.value
   ....

item_set= ...
item_set= orca.data.SharedValue(item_set)
full_data.transform_shard(func, item_set)

Get all the elements in XShards

You can get all of elements in XShards with such API:

collect()

This method returns a list that contains all of the elements in this XShards.

Repartition XShards

You can repartition XShards to different number of partitions.

repartition(num_partitions)

The method returns a new XShards that has exactly num_partitions partitions.

Split XShards

You can split one XShards into multiple XShards. Each element in the XShards needs be a list or tuple with same length.

split()

This method returns splits of XShards. If each element in the input SparkDataShard is not a list or tuple, return list of input SparkDataShards.

Save/Load XShards

You can save XShards on Spark as SequenceFiles of serialized objects. The serializer used is pyspark.serializers.PickleSerializer.

save_pickle(path, batchSize=10)

And you can load pickle file to XShards if you use save_pickle() to save data.

zoo.orca.data.XShards.load_pickle(path, minPartitions=None)

This method return an XShards object from pickle files.

Move XShards on Spark to Ray backend

You can put data of the XShards on Spark to Ray cluster object store for later processing on Ray.

to_ray()

This method save data of XShards on Spark to Ray object store, and return a new RayXShards which contains plasma ObjectID, the plasma object_store_address and the node IP on each partition.

XShards with Pandas DataFrame

Read data into XShards

You can read csv/json files/directory into XShards with such APIs:

zoo.orca.data.pandas.read_csv(file_path, **kwargs)

zoo.orca.data.pandas.read_json(file_path, **kwargs)

After calling these APIs, you would get a XShards of Pandas DataFrame on Spark.

For Cloudera YARN client mode users: If you use pandas as pandas_read_backend, you should configure ARROW_LIBHDFS_DIR before calling read_csv: 1. use locate libhdfs.so to find libhdfs.so 2. export ARROW_LIBHDFS_DIR=/opt/cloudera/parcels/CDH-5.15.2-1.cdh5.15.2.p0.3/lib64 (replace with the result of locate libhdfs.so) 3. use --conf "spark.executorEnv.ARROW_LIBHDFS_DIR=/opt/cloudera/parcels/CDH-5.15.2-1.cdh5.15.2.p0.3/lib64" to export the environment variable to all executors.

Partition by Pandas DataFrame columns

You can re-partition XShards of Pandas DataFrame with specified columns.

partition_by(cols, num_partitions=None)

This method return a new XShards partitioned using the specified columns.

Get unique element list of XShards of Pandas Series

You can get a unique list of elements of this XShards. This is useful when you want to count/get unique set of some column in the XShards of Pandas DataFrame.

unique()

This method return a unique list of elements of the XShards of Pandas Series.

XShards with Numpy

Load local numpy data to XShards

You can partition local in memory data and form an XShards on Spark.

zoo.orca.data.XShards.partition(data)

This method returns a XShards which dispatch local data in parallel on Spark.