TFPark Overview
Analytics-Zoo provides a set APIs for running TensorFlow model on Spark in a distributed fashion.
Remarks:
- You need to install tensorflow==1.15.0 on your driver node.
- Your operating system (OS) is required to be one of the following 64-bit systems: Ubuntu 16.04 or later and macOS 10.12.6 or later.
- To run on other systems, you need to manually compile the TensorFlow source code. Instructions can be found here.
TFPark API
TFPark is a set of high-level api modeling after tf.keras and tf.estimator to help user to train and evaluate TensorFlow
models on Spark and BigDL. Users can define their model using tf.keras
API or using model_fn
similar to tf.estimator
.
TFDataset
TFDatasets represents a distributed collection of elements (backed by a RDD) to be fed into a TensorFlow graph. TFDatasets can be created from numpy.ndarrays, an rdd of numpy.ndarrays as well as ImageSet, TextSet and FeatureSet. It acts as an interface connecting RDD data to TensorFlow models.
from zoo import init_nncontext
from zoo.tfpark import TFDataset
from tensorflow as tf
sc = init_nncontext()
# Each record in the train_rdd consists of a list of NumPy ndrrays
train_rdd = sc.parallelize(file_list)
.map(lambda x: read_image_and_label(x))
.map(lambda image_label: decode_to_ndarrays(image_label))
# TFDataset represents a distributed set of elements,
# in which each element contains one or more TensorFlow Tensor objects.
dataset = TFDataset.from_rdd(train_rdd,
features=(tf.float32, [28, 28, 1]),
labels=(tf.int32, []),
batch_size=BATCH_SIZE)
More on TFDataset API API Guide
KerasModel
KerasModel enables user to use tf.keras
API to define TensorFlow models and perform training or evaluation on top
of Spark and BigDL in a distributed fashion.
- Create a KerasModel
from zoo.tfpark import KerasModel, TFDataset
import tensorflow as tf
model = tf.keras.Sequential(
[tf.keras.layers.Flatten(input_shape=(28, 28, 1)),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dense(10, activation='softmax'),
]
)
model.compile(optimizer=tf.keras.optimizers.RMSprop(),
loss='sparse_categorical_crossentropy',
metrics=['accuracy'])
keras_model = KerasModel(model)
- Perform training on TFDataset and save model
keras_model.fit(training_dataset, epochs=max_epoch)
model.save_weights("/tmp/model.h5")
- Loading saved model and preform evaluation or inference
model.load_weights("/tmp/model.h5")
evaluation_results = model.evaluate(eval_dataset)
predictions = model.predict(pred_dataset)
More on KerasModel API API Guide
TFEstimator
TFEstimator wraps a model defined by model_fn
. The model_fn
is almost identical to TensorFlow's model_fn
except users are required to use ZooOptimizer, which takes a tf.train.Optimzer
as input, to derive a train_op.
- Define a
model_fn
and create a TFEstimator. Note thatZooOptimizer
must be used.
import tensorflow as tf
from zoo.tfpark import TFEstimator, ZooOptimizer
def model_fn(features, labels, mode):
hidden = tf.layers.dense(features, 32, activation=tf.nn.relu)
logits = tf.layers.dense(hidden, 10)
if mode == tf.estimator.ModeKeys.EVAL or mode == tf.estimator.ModeKeys.TRAIN:
loss = tf.reduce_mean(
tf.losses.sparse_softmax_cross_entropy(logits=logits, labels=labels))
train_op = ZooOptimizer(tf.train.AdamOptimizer()).minimize(loss)
return tf.estimator.EstimatorSpec(mode, train_op=train_op, predictions=logits, loss=loss)
else:
return tf.estimator.EstimatorSpec(mode, predictions=logits)
estimator = TFEstimator.from_model_fn(model_fn, model_dir="/tmp/estimator")
Or use a pre-made Estimator from TensorFlow and create a TFEstimator. Note that ZooOptimizer
must be used.
import tensorflow as tf
linear = tf.estimator.LinearClassifier(feature_columns=feature_columns,
optimizer=ZooOptimizer(tf.train.FtrlOptimizer(0.2)))
estimator = TFEstimator(linear)
- Define a input_fn.
import tensorflow as tf
sc = init_nncontext()
def input_fn(mode):
if mode == tf.estimator.ModeKeys.TRAIN:
training_rdd = get_data_rdd("train", sc)
dataset = TFDataset.from_rdd(training_rdd,
features=(tf.float32, [28, 28, 1]),
labels=(tf.int32, []),
batch_size=320)
elif mode == tf.estimator.ModeKeys.EVAL:
validation_rdd = get_data_rdd("validation", sc)
dataset = TFDataset.from_rdd(testing_rdd,
features=(tf.float32, [28, 28, 1]),
labels=(tf.int32, []),
batch_size=320)
else:
testing_rdd = get_data_rdd("test", sc)
dataset = TFDataset.from_rdd(testing_rdd,
features=(tf.float32, [28, 28, 1]),
batch_per_thread=80)
return dataset
- Perform training, evaluation or inference
estimator.train(input_fn, steps=10000)
evaluation_result = estimator.evaluate(input_fn, ["acc"])
predictions = estimator.predict(input_fn)
More on TFEstimator API API Guide
Low level API
Concepts
-
TFOptimizer is the class that does all the hard work in distributed training, such as model distribution and parameter synchronization. It takes the user specified loss (a TensorFlow scalar tensor) as an argument and runs stochastic gradient descent using the given optimMethod on all the Variables that contribute to this loss.
-
TFPredictor takes a list of user specified TensorFlow tensors as the model outputs, and feed all the elements in TFDatasets to produce those outputs; it returns a Spark RDD with each of its records representing the model prediction for the corresponding input elements.
Training
1.Data wrangling and analysis using PySpark
from zoo import init_nncontext
from zoo.tfpark import TFDataset
from tensorflow as tf
sc = init_nncontext()
# Each record in the train_rdd consists of a list of NumPy ndrrays
train_rdd = sc.parallelize(file_list)
.map(lambda x: read_image_and_label(x))
.map(lambda image_label: decode_to_ndarrays(image_label))
# TFDataset represents a distributed set of elements,
# in which each element contains one or more TensorFlow Tensor objects.
dataset = TFDataset.from_rdd(train_rdd,
features=(tf.float32, [28, 28, 1]),
labels=(tf.int32, []),
batch_size=BATCH_SIZE)
2.Deep learning model development using TensorFlow
import tensorflow as tf
slim = tf.contrib.slim
images, labels = dataset.tensors
squeezed_labels = tf.squeeze(labels)
with slim.arg_scope(lenet.lenet_arg_scope()):
logits, end_points = lenet.lenet(images, num_classes=10, is_training=True)
loss = tf.reduce_mean(tf.losses.sparse_softmax_cross_entropy(logits=logits, labels=squeezed_labels))
You can also construct your model using Keras provided by Tensorflow.
from tensorflow.keras.models import Model
from tensorflow.keras.layers import *
data = Input(shape=[28, 28, 1])
x = Flatten()(data)
x = Dense(64, activation='relu')(x)
x = Dense(64, activation='relu')(x)
predictions = Dense(10, activation='softmax')(x)
model = Model(inputs=data, outputs=predictions)
model.compile(optimizer='rmsprop',
loss='sparse_categorical_crossentropy',
metrics=['accuracy'])
3.Distributed training on Spark and BigDL
from zoo.tfpark import TFOptimizer
from bigdl.optim.optimizer import MaxIteration, Adam, MaxEpoch, TrainSummary
optimizer = TFOptimizer.from_loss(loss, Adam(1e-3))
optimizer.set_train_summary(TrainSummary("/tmp/az_lenet", "lenet"))
optimizer.optimize(end_trigger=MaxEpoch(5))
For Keras model:
from zoo.tfpark import TFOptimizer
from bigdl.optim.optimizer import MaxIteration, MaxEpoch, TrainSummary
optimizer = TFOptimizer.from_keras(keras_model=model, dataset=dataset)
optimizer.set_train_summary(TrainSummary("/tmp/az_lenet", "lenet"))
optimizer.optimize(end_trigger=MaxEpoch(5))
4.Save the variable to checkpoint
saver = tf.train.Saver()
saver.save(optimizer.sess, "/tmp/lenet/")
For Keras model, you can also Keras' save_weights
api.
model.save_weights("/tmp/keras.h5")
Inference
1.Data processing using PySpark
from zoo import init_nncontext
from zoo.tfpark import TFDataset
from tensorflow as tf
sc = init_nncontext()
# Each record in the train_rdd consists of a list of NumPy ndrrays
testing_rdd = sc.parallelize(file_list)
.map(lambda x: read_image_and_label(x))
.map(lambda image_label: decode_to_ndarrays(image_label))
# TFDataset represents a distributed set of elements,
# in which each element contains one or more TensorFlow Tensor objects.
dataset = TFDataset.from_rdd(testing_rdd,
features=(tf.float32, [28, 28, 1]),
batch_per_thread=4)
2.Reconstruct the model for inference and load the checkpoint
import tensorflow as tf
slim = tf.contrib.slim
images, labels = dataset.tensors
with slim.arg_scope(lenet.lenet_arg_scope()):
logits, end_points = lenet.lenet(images, num_classes=10, is_training=False)
sess = tf.Session()
saver = tf.train.Saver()
saver.restore(sess, "/tmp/lenet")
As before, you can also construct and restore your model using Keras provided by Tensorflow.
from tensorflow.keras.models import Model
from tensorflow.keras.layers import *
data = Input(shape=[28, 28, 1])
x = Flatten()(data)
x = Dense(64, activation='relu')(x)
x = Dense(64, activation='relu')(x)
predictions = Dense(10, activation='softmax')(x)
model = Model(inputs=data, outputs=predictions)
model.load_weights("/tmp/mnist_keras.h5")
3.Run predictions
predictor = TFPredictor.from_outputs(sess, [logits])
predictions_rdd = predictor.predict()
For keras model:
predictor = TFPredictor.from_keras(model, dataset)
predictions_rdd = predictor.predict()
Relationship to TFNet
TFNet is a layer representing a TensorFlow sub-graph (specified by the input and output TensorFlow tensors). It implements the standard BigDL layer API, and can be used with other Analytics-Zoo/BigDL layers to construct more complex models for training or inference using the standard Analytics-Zoo/BigDL API.
You can think of TFDatasets
, TFOptimizer
, TFPredictor
as a set API for training/testing TensorFlow models
on Spark/BigDL, while TFNet
as an Analytics-Zoo/BigDL layer initialized using TensorFlow graph.
For more information on TFNet, please refer to the API Guide