How To Import Data
TFDatasets is the main entrance point in TFPark for importing and manipulating data. It represents a distributed collection of elements (backed by a RDD) to be fed into a TensorFlow graph for training, evaluation or inference. It provides a rich set of tools to import data from various data sources and work as a unified interface to interact with other components of TFPark.
This guide will walk you through some common cases of importing data and you can find detailed description of TFDataset's API in Analytics-Zoo API Guide.
TFDataset's job is to take in dataset, distribute the data across the Spark cluster and transform each data
record into the format that is compatible with TFPark.
Here are a few common features that every TFDataset share:
TFDatasetwill automatically stack consecutive records into batches. The
batch_sizeargument (for training) or
batch_per_threadargument (for inference or evaluation) should be set when creating TFDataset. The
batch_sizehere is used for training and it means the total batch size in distributed training. In other words, it equals to the total number of records processed in one iteration in the whole cluster.
batch_sizeshould be a multiple of the total number of cores that is allocated for this Spark application so that we can distributed the workload evenly across the cluster. You may need to adjust your other training hyper-parameters when
batch_per_threadis used for inference or evaluation and it means the number of records process in one iteration in one partition.
batch_per_threadis argument for tuning performance and it does not affect the correctness or accuracy of the prediction or evaluation. Too small
batch_per_threadmight slow down the prediction/evaluation.
TFDatasetcan optionally takes a validation data source for validation at the the end of each epoch. The validation data source should has the same structure of the main data source used for training.
import numpy as np from zoo.tfpark import TFDataset feature_data = np.random.randn(100, 28, 28, 1) label_data = np.random.randint(0, 10, size=(100,)) val_feature_data = np.random.randn(100, 28, 28, 1) val_label_data = np.random.randint(0, 10, size=(100,)) dataset = TFDataset.from_ndarrays((feature_data, label_data), batch_size=32, val_tensors=(val_feature_data, val_label_data))
Working with in-memory ndarray
If your input data is quite small, the simplest way to create
TFDataset to convert them to ndarrays and use
import numpy as np from zoo.tfpark import TFDataset feature_data = np.random.randn(100, 28, 28, 1) label_data = np.random.randint(0, 10, size=(100,)) dataset = TFDataset.from_ndarrays((feature_data, label_data), batch_size=32)
Working with data files including (csv files, text files and TFRecord files)
TFDataset support reading the records in tf.data.Dataset, so you can use tf.data.Dataset to read and process your data files and pass it to TFDataset. TFDataset will automatically ship the dataset to different Spark executors, shard the data and batch the records for further consumption.
If you data files is already in HDFS, you should configure you dataset with the path with the following pattern
"hdfs://namenode:port/path/to/file.txt" and TFDataset will directly access that file in HDFS in each executor.
HDFS_HDFS_HOME environment may needs to be set to the location where hadoop is installed for both Spark driver
and Spark executor. More information on the environment variable can be found here.
If you data files are in local file system, you can either upload it to a HDFS cluster and use the above approach or copy all the data files on each executor in exact the same location.
More information on
tf.data.Dataset can be found here.
ds = tf.data.TextLineDataset("hdfs://path/to/data.csv") ds = ds.map(lambda line: tf.parse_csv(line, COLUMNS)) ds = ds.map(lamdda data: extract_features_labels(data)) dataset = TFDataset.from_tf_data_dataset(dataset, batch_size=32)
Working with Analytics Zoo Feature Engineering tools
Analytics Zoo provides a rich set of tools to build complex data engineering pipelines on top Spark, including
FeatureSet. TFPark also support using those tools for manipulating data. Specifically,
you can use
TFDataset.from_feature_set for importing
data pipeline written in those tools. Details for these api can be found in Analytics-Zoo API Guide.
More information on Analytics Zoo's Feature Engineering tools can be found here.
Working with RDD or Spark DataFrame data
If the about approach does not match your use cases, you can always transform your data into RDD or DataFrame using Spark's data processing capability.
For rdd, we assume each record contains a tuple of numpy.ndarrays or a tuple of list/dict of numpy.ndarrays. The first element of the tuple, will be interpreted as feature and the second (optional) will be interpreted as label. Each record should has the same structure. Details for these api can be found in Analytics-Zoo API Guide.
image_rdd = sc.parallelize(np.random.randn(100, 28, 28, 1)) labels_rdd = sc.parallelize(np.random.randint(0, 10, size=(100,))) rdd = image_rdd.zip(labels_rdd) dataset = TFDataset.from_rdd(rdd, features=(tf.float32, [28, 28, 1]), labels=(tf.int32, ), batch_size=32)
For dataframe, you should which columns are features and which columns are labels (optional). And currently only numerical types and vectors are supported. Details for these api can be found in Analytics-Zoo API Guide.
rdd = self.sc.range(0, 1000) df = rdd.map(lambda x: (DenseVector(np.random.rand(20).astype(np.float)), x % 10)).toDF(["feature", "label"]) dataset = TFDataset.from_dataframe(train_df, feature_cols=["feature"], labels_cols=["label"], batch_size=32)