TFPark Overview


Analytics-Zoo provides a set APIs for running TensorFlow model on Spark in a distributed fashion.

Remarks:

TFPark API

TFPark is a set of high-level api modeling after tf.keras and tf.estimator to help user to train and evaluate TensorFlow models on Spark and BigDL. Users can define their model using tf.keras API or using model_fn similar to tf.estimator.

TFDataset

TFDatasets represents a distributed collection of elements (backed by a RDD) to be fed into a TensorFlow graph. TFDatasets can be created from numpy.ndarrays, an rdd of numpy.ndarrays as well as ImageSet, TextSet and FeatureSet. It acts as an interface connecting RDD data to TensorFlow models.

from zoo import init_nncontext
from zoo.tfpark import TFDataset
from tensorflow as tf

sc = init_nncontext()

# Each record in the train_rdd consists of a list of NumPy ndrrays
train_rdd = sc.parallelize(file_list)
  .map(lambda x: read_image_and_label(x))
  .map(lambda image_label: decode_to_ndarrays(image_label))

# TFDataset represents a distributed set of elements,
# in which each element contains one or more TensorFlow Tensor objects. 
dataset = TFDataset.from_rdd(train_rdd,
                             features=(tf.float32, [28, 28, 1]),
                             labels=(tf.int32, []),
                             batch_size=BATCH_SIZE)

More on TFDataset API API Guide

KerasModel

KerasModel enables user to use tf.keras API to define TensorFlow models and perform training or evaluation on top of Spark and BigDL in a distributed fashion.

  1. Create a KerasModel
from zoo.tfpark import KerasModel, TFDataset
import tensorflow as tf

model = tf.keras.Sequential(
    [tf.keras.layers.Flatten(input_shape=(28, 28, 1)),
     tf.keras.layers.Dense(64, activation='relu'),
     tf.keras.layers.Dense(10, activation='softmax'),
     ]
)

model.compile(optimizer=tf.keras.optimizers.RMSprop(),
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])
keras_model = KerasModel(model)

  1. Perform training on TFDataset and save model
keras_model.fit(training_dataset, epochs=max_epoch)

model.save_weights("/tmp/model.h5")

  1. Loading saved model and preform evaluation or inference
model.load_weights("/tmp/model.h5")

evaluation_results = model.evaluate(eval_dataset)

predictions = model.predict(pred_dataset)

More on KerasModel API API Guide

TFEstimator

TFEstimator wraps a model defined by model_fn. The model_fn is almost identical to TensorFlow's model_fn except users are required to use ZooOptimizer, which takes a tf.train.Optimzer as input, to derive a train_op.

  1. Define a model_fn and create a TFEstimator. Note that ZooOptimizer must be used.
import tensorflow as tf
from zoo.tfpark import TFEstimator, ZooOptimizer
def model_fn(features, labels, mode):

    hidden = tf.layers.dense(features, 32, activation=tf.nn.relu)

    logits = tf.layers.dense(hidden, 10)

    if mode == tf.estimator.ModeKeys.EVAL or mode == tf.estimator.ModeKeys.TRAIN:
        loss = tf.reduce_mean(
            tf.losses.sparse_softmax_cross_entropy(logits=logits, labels=labels))
        train_op = ZooOptimizer(tf.train.AdamOptimizer()).minimize(loss)
        return tf.estimator.EstimatorSpec(mode, train_op=train_op, predictions=logits, loss=loss)
    else:
        return tf.estimator.EstimatorSpec(mode, predictions=logits)
estimator = TFEstimator.from_model_fn(model_fn, model_dir="/tmp/estimator")

Or use a pre-made Estimator from TensorFlow and create a TFEstimator. Note that ZooOptimizer must be used.

import tensorflow as tf
linear = tf.estimator.LinearClassifier(feature_columns=feature_columns,
                                           optimizer=ZooOptimizer(tf.train.FtrlOptimizer(0.2)))
estimator = TFEstimator(linear)
  1. Define a input_fn.
import tensorflow as tf
sc = init_nncontext()
def input_fn(mode):
    if mode == tf.estimator.ModeKeys.TRAIN:
        training_rdd = get_data_rdd("train", sc)
        dataset = TFDataset.from_rdd(training_rdd,
                                     features=(tf.float32, [28, 28, 1]),
                                     labels=(tf.int32, []),
                                     batch_size=320)
    elif mode == tf.estimator.ModeKeys.EVAL:
        validation_rdd = get_data_rdd("validation", sc)
        dataset = TFDataset.from_rdd(testing_rdd,
                                     features=(tf.float32, [28, 28, 1]),
                                     labels=(tf.int32, []),
                                     batch_size=320)
    else:
        testing_rdd = get_data_rdd("test", sc)
        dataset = TFDataset.from_rdd(testing_rdd,
                                     features=(tf.float32, [28, 28, 1]),
                                     batch_per_thread=80)
    return dataset
  1. Perform training, evaluation or inference
estimator.train(input_fn, steps=10000)
evaluation_result = estimator.evaluate(input_fn, ["acc"])
predictions = estimator.predict(input_fn)

More on TFEstimator API API Guide

Low level API

Concepts

Training

1.Data wrangling and analysis using PySpark

from zoo import init_nncontext
from zoo.tfpark import TFDataset
from tensorflow as tf

sc = init_nncontext()

# Each record in the train_rdd consists of a list of NumPy ndrrays
train_rdd = sc.parallelize(file_list)
  .map(lambda x: read_image_and_label(x))
  .map(lambda image_label: decode_to_ndarrays(image_label))

# TFDataset represents a distributed set of elements,
# in which each element contains one or more TensorFlow Tensor objects. 
dataset = TFDataset.from_rdd(train_rdd,
                             features=(tf.float32, [28, 28, 1]),
                             labels=(tf.int32, []),
                             batch_size=BATCH_SIZE)

2.Deep learning model development using TensorFlow

import tensorflow as tf

slim = tf.contrib.slim

images, labels = dataset.tensors
squeezed_labels = tf.squeeze(labels)
with slim.arg_scope(lenet.lenet_arg_scope()):
     logits, end_points = lenet.lenet(images, num_classes=10, is_training=True)

loss = tf.reduce_mean(tf.losses.sparse_softmax_cross_entropy(logits=logits, labels=squeezed_labels))

You can also construct your model using Keras provided by Tensorflow.

from tensorflow.keras.models import Model
from tensorflow.keras.layers import *

data = Input(shape=[28, 28, 1])

x = Flatten()(data)
x = Dense(64, activation='relu')(x)
x = Dense(64, activation='relu')(x)
predictions = Dense(10, activation='softmax')(x)

model = Model(inputs=data, outputs=predictions)

model.compile(optimizer='rmsprop',
            loss='sparse_categorical_crossentropy',
            metrics=['accuracy'])

3.Distributed training on Spark and BigDL

from zoo.tfpark import TFOptimizer
from bigdl.optim.optimizer import MaxIteration, Adam, MaxEpoch, TrainSummary

optimizer = TFOptimizer.from_loss(loss, Adam(1e-3))
optimizer.set_train_summary(TrainSummary("/tmp/az_lenet", "lenet"))
optimizer.optimize(end_trigger=MaxEpoch(5))

For Keras model:

from zoo.tfpark import TFOptimizer
from bigdl.optim.optimizer import MaxIteration, MaxEpoch, TrainSummary

optimizer = TFOptimizer.from_keras(keras_model=model, dataset=dataset)
optimizer.set_train_summary(TrainSummary("/tmp/az_lenet", "lenet"))
optimizer.optimize(end_trigger=MaxEpoch(5))

4.Save the variable to checkpoint

saver = tf.train.Saver()
saver.save(optimizer.sess, "/tmp/lenet/")

For Keras model, you can also Keras' save_weights api.

model.save_weights("/tmp/keras.h5")

Inference

1.Data processing using PySpark

from zoo import init_nncontext
from zoo.tfpark import TFDataset
from tensorflow as tf

sc = init_nncontext()

# Each record in the train_rdd consists of a list of NumPy ndrrays
testing_rdd = sc.parallelize(file_list)
  .map(lambda x: read_image_and_label(x))
  .map(lambda image_label: decode_to_ndarrays(image_label))

# TFDataset represents a distributed set of elements,
# in which each element contains one or more TensorFlow Tensor objects. 
dataset = TFDataset.from_rdd(testing_rdd,
                             features=(tf.float32, [28, 28, 1]),
                             batch_per_thread=4)

2.Reconstruct the model for inference and load the checkpoint

import tensorflow as tf

slim = tf.contrib.slim

images, labels = dataset.tensors
with slim.arg_scope(lenet.lenet_arg_scope()):
     logits, end_points = lenet.lenet(images, num_classes=10, is_training=False)

sess = tf.Session()
saver = tf.train.Saver()
saver.restore(sess, "/tmp/lenet")

As before, you can also construct and restore your model using Keras provided by Tensorflow.

from tensorflow.keras.models import Model
from tensorflow.keras.layers import *

data = Input(shape=[28, 28, 1])

x = Flatten()(data)
x = Dense(64, activation='relu')(x)
x = Dense(64, activation='relu')(x)
predictions = Dense(10, activation='softmax')(x)

model = Model(inputs=data, outputs=predictions)

model.load_weights("/tmp/mnist_keras.h5")

3.Run predictions

predictor = TFPredictor.from_outputs(sess, [logits])
predictions_rdd = predictor.predict()

For keras model:

predictor = TFPredictor.from_keras(model, dataset)
predictions_rdd = predictor.predict()

Relationship to TFNet

TFNet is a layer representing a TensorFlow sub-graph (specified by the input and output TensorFlow tensors). It implements the standard BigDL layer API, and can be used with other Analytics-Zoo/BigDL layers to construct more complex models for training or inference using the standard Analytics-Zoo/BigDL API.

You can think of TFDatasets, TFOptimizer, TFPredictor as a set API for training/testing TensorFlow models on Spark/BigDL, while TFNet as an Analytics-Zoo/BigDL layer initialized using TensorFlow graph.

For more information on TFNet, please refer to the API Guide