Analytics Zoo Orca data provides data-parallel pre-processing support for Python AI.
It supports data pre-processing from different data sources, like TensorFlow DataSet, PyTorch DataLoader, MXNet DataLoader, etc. and it supports various data formats, like Pandas DataFrame, Numpy, Images, Parquet, etc.
The distributed backend engine can be Spark or Ray. We now support Spark-based transformations to do the pre-processing, and provide functionality to seamlessly put data to Ray cluster for later training/inference on Ray.
XShards is a collection of data in Orca data API. We provide different backends(Spark and Ray) for XShards.
XShards General Operations
Pre-processing on XShards
You can do pre-processing with your customized function on XShards using below API:
funcis your pre-processing function. In this function, you can do the pre-processing with the data using common Python libraries such as Pandas, Numpy, PIL, TensorFlow Dataset, PyTorch DataLoader, etc., then return the processed object.
argsare the augurments for the pre-processing function.
This method would parallelly pre-process each element in the XShards with the customized function, and return a new XShards after transformation.
SharedValue can be used to give every node a copy of a large input dataset in an efficient manner. This is an example of using SharedValue:
def func(df, item_set) item_set = item_set.value .... item_set= ... item_set= orca.data.SharedValue(item_set) full_data.transform_shard(func, item_set)
Get all the elements in XShards
You can get all of elements in XShards with such API:
This method returns a list that contains all of the elements in this XShards.
You can repartition XShards to different number of partitions.
num_partitionsis the target number of partitions for the new XShards.
The method returns a new XShards that has exactly num_partitions partitions.
You can split one XShards into multiple XShards. Each element in the XShards needs be a list or tuple with same length.
This method returns splits of XShards. If each element in the input SparkDataShard is not a list or tuple, return list of input SparkDataShards.
You can save XShards on Spark as SequenceFiles of serialized objects. The serializer used is pyspark.serializers.PickleSerializer.
pathis target save path.
batchSizebatch size for each chunk in sequence file.
And you can load pickle file to XShards if you use save_pickle() to save data.
path: The pickle file path/directory.
minPartitions: The minimum partitions for the XShards.
This method return an XShards object from pickle files.
Move XShards on Spark to Ray backend
You can put data of the XShards on Spark to Ray cluster object store for later processing on Ray.
This method save data of XShards on Spark to Ray object store, and return a new RayXShards which contains plasma ObjectID, the plasma object_store_address and the node IP on each partition.
XShards with Pandas DataFrame
Read data into XShards
You can read csv/json files/directory into XShards with such APIs:
zoo.orca.data.pandas.read_csv(file_path, **kwargs) zoo.orca.data.pandas.read_json(file_path, **kwargs)
file_pathcould be a csv/json file, list of multiple csv/json file paths, a directory containing csv/json files. Supported file systems are local file system,
**kwargsis read_csv/read_json options supported by pandas.
- You can use
OrcaContext.pandas_read_backend = "pandas"to switch to pandas backend. Reference: Orca Context
After calling these APIs, you would get a XShards of Pandas DataFrame on Spark.
For Cloudera YARN client mode users:
If you use
pandas as pandas_read_backend, you should configure
ARROW_LIBHDFS_DIR before calling read_csv:
locate libhdfs.so to find libhdfs.so
export ARROW_LIBHDFS_DIR=/opt/cloudera/parcels/CDH-5.15.2-1.cdh5.15.2.p0.3/lib64 (replace with the result of
--conf "spark.executorEnv.ARROW_LIBHDFS_DIR=/opt/cloudera/parcels/CDH-5.15.2-1.cdh5.15.2.p0.3/lib64" to export the environment variable to all executors.
Partition by Pandas DataFrame columns
You can re-partition XShards of Pandas DataFrame with specified columns.
cols: DataFrame columns to partition by.
num_partitions: target number of partitions. If not specified, the new XShards would keep the current partition number.
This method return a new XShards partitioned using the specified columns.
Get unique element list of XShards of Pandas Series
You can get a unique list of elements of this XShards. This is useful when you want to count/get unique set of some column in the XShards of Pandas DataFrame.
This method return a unique list of elements of the XShards of Pandas Series.
XShards with Numpy
Load local numpy data to XShards
You can partition local in memory data and form an XShards on Spark.
data: The local data can be numpy.ndarray, a tuple, list, dict of numpy.ndarray, or a nested structure made of tuple, list, dict with ndarray as the leaf value.
This method returns a XShards which dispatch local data in parallel on Spark.