Batch Inference and Inference Jobs API

In this notebook we walk you through the batch inference api for a model endpoint. Towards the end we also create an inference job.

Below we build a simple MNIST model.

In [ ]:
from sbrain.learning.experiment import *
from sbrain.dataset.dataset import *
import time
import os


def input_function(mode, batch_size, params):
    from tensorflow.examples.tutorials.mnist import input_data
    import tensorflow as tf

    local_dir = "/workspace/shared-dir/sample-notebooks/demo-data/learning/mnist/"

    if mode == "train":
        mnist = input_data.read_data_sets(local_dir, one_hot=True)

        dataset = tf.data.Dataset.from_tensor_slices(({"data" : mnist.train.images}, mnist.train.labels))
        dataset = dataset.shuffle(1000).batch(batch_size).repeat()
        return dataset
    else:
        mnist = input_data.read_data_sets(local_dir, one_hot=True)

        dataset = tf.data.Dataset.from_tensor_slices(({"data" : mnist.test.images}, mnist.test.labels))
        dataset = dataset.batch(batch_size)
        return dataset

Below is the model function.

In [ ]:
def my_model_function_with_mnist(features, labels, mode, params):
    import tensorflow as tf

    net = tf.feature_column.input_layer(features, [tf.feature_column.numeric_column("data", shape=(784))])
    # labels = tf.one_hot(labels, 2) ## either or
    for units in [20, 20]:
        net = tf.layers.dense(net, units=units, activation=tf.nn.relu)

    # Compute logits (1 per class).
    # logits = tf.layers.dense(net, 2, activation=None) ## either or
    logits = tf.layers.dense(net, 10, activation=None)

    # Compute predictions.
    predicted_classes = tf.argmax(logits, 1)
    if mode == tf.estimator.ModeKeys.PREDICT:
        predictions = {
            'class_ids': predicted_classes[:, tf.newaxis],
            'probabilities': tf.nn.softmax(logits),
            'logits': logits,
        }
        return tf.estimator.EstimatorSpec(mode, predictions=predictions)

    # Compute loss.
    loss = tf.losses.softmax_cross_entropy(onehot_labels=labels, logits=logits)

    # Compute evaluation metrics.
    labels_to_compare = tf.argmax(labels, 1)
    accuracy = tf.metrics.accuracy(labels=labels_to_compare,
                                   predictions=predicted_classes,
                                   name='acc_op')
    metrics = {'accuracy': accuracy}
    tf.summary.scalar('accuracy', accuracy[1])

    if mode == tf.estimator.ModeKeys.EVAL:
        return tf.estimator.EstimatorSpec(
            mode, loss=loss, eval_metric_ops=metrics)

    # Create training op.
    assert mode == tf.estimator.ModeKeys.TRAIN

    global_step = tf.train.get_global_step()
    optimizer = tf.train.AdagradOptimizer(learning_rate=0.001)
    train_op = optimizer.minimize(loss, global_step=global_step)

    return tf.estimator.EstimatorSpec(mode, loss=loss, train_op=train_op,
                                      training_chief_hooks=None,
                                      training_hooks=[StopIfHigherHook(metric_name="accuracy",
                                                                       threshold=0.50, min_steps=20, run_every_secs=15)]
                                      )

We submit the job and wait for it to finish.

In [ ]:
estimator = Estimator.NewClassificationEstimator(model_fn=my_model_function_with_mnist)
name = "BestModelEstimator" + str(time.time()).replace(".", "")
estimator = Estimator.create(name, "Hello", estimator)

hyper_parameters = HParams(iterations=5000, batch_size=10)
rc = RunConfig(no_of_ps=1, no_of_workers=1, summary_save_frequency=5000, run_eval=True, use_gpu=False, checkpoint_frequency_in_steps=500)

exper = Experiment.run(experiment_name="BestModelEstimator" + str(time.time()).replace(".", ""),
                       description="Really first model",
                       estimator=estimator,
                       hyper_parameters=hyper_parameters,
                       run_config=rc,
                       dataset_version_split=None,
                       input_function=input_function)
job = exper.get_single_job()
print(job.__dict__)
print("")
print("tensorboard url")
print(job.get_tensorboard_url())

job.has_finished()

job.wait_until_finish()

print("Is the job success?? : {}".format(job.is_success()))

print()
print("Model metrics..")
print(job.get_model().model_metrics)

We get the model.

In [ ]:
model = job.get_model()

We deploy a model endpoint.

In [ ]:
me = model.deploy("MyEndpoint" + str(time.time()).replace(".", ""),
                  "myDesc", 1)

In order to run batch inference, we define two functions.

  1. Predict input function : This function has the same semantics as the input function to tensorflow estimator. This is not an SBrain input function. This function will be handed off to the tensorflow estimator directly, without any modification from the SBrain side. You have full control on how the data is fed into the model.
  2. Predict output function : The result of the above prediction will be passed into the output function for further processing. Further processing could mean, a: Writing them to shared directory for the application to process. b: Serialializing them in some format, so that it gets output to the response stream of the rest call. This function is expected to return a string, which will get passed in to the response of the rest call.

NOTE: Let us know, if you need any additional libraries to be present to do the serialization that you might need.

In [ ]:
def predict_input_function():
    from tensorflow.examples.tutorials.mnist import input_data
    import tensorflow as tf
    local_dir = "/workspace/shared-dir/sample-notebooks/demo-data/learning/mnist/"
    print("In my predict input function")
    mnist = input_data.read_data_sets(local_dir, one_hot=True)

    dataset = tf.data.Dataset.from_tensor_slices(({"data" : mnist.train.images[0:10, :]}))
    # dataset = tf.data.Dataset.from_tensor_slices(({"data" : mnist.train.images}, mnist.train.labels))
    return dataset.batch(1000)



def predict_output_function(results):
    import pickle
    import base64
    print("BEFORE")
    ret = [] ## collecting all results of the model inference in an array
    bc = 1
    for each in results:
        ret.append(each)
        bc += 1
    print("AFTER")
    serialized = pickle.dumps(ret, protocol=0) # protocol 0 is printable ASCII
    s = base64.b64encode(serialized)
    s = s.decode("UTF-8")
    ## WRITE RESULT TO /workspace/shared-dir/<some sub folder> to be used from outside.
    
    return s ## returning the serialized string to be returned to the application. You can also pass empty string response,
    ## if you write directly to file system instead of the REST response.

The new api, which takes the new functions, which will be directly applied to the model without any processing from SBrain.

The response string that the predict_output_function returns will be returned from the raw_predict() also. You can choose any kind of serialization in the output function.

In [ ]:
res = me.raw_predict(predict_input_function, predict_output_function)

Checking the serialized string, in this case, we use base64.

In [ ]:
print(res)

Deserializing the base64 encoded string and getting back the original numpy arrays.

In [ ]:
import pickle
import base64
result_numpy_array = pickle.loads(base64.decodestring(bytearray(res, encoding="UTF-8")))
print(result_numpy_array)

Batch inference jobs

Below is the api to submit an inference job. An inference job also has the same semantics of raw_predict(), which means the input function is directly handed off to the estimator without any modification from SBrain. In the case of the job, the inference instance comes up as a separate pod, that invokes the prediction and goes down. It comes up on a gpu node if gpu_required flag is set.

You can write the output of prediction to any shared-dir folder to be used later, from the predict_output_function. A rest interface will not work, since this is a job.

In [ ]:
def predict_input_function_job(params):
    from tensorflow.examples.tutorials.mnist import input_data
    import tensorflow as tf
    print(params)
    local_dir = "/workspace/shared-dir/sample-notebooks/demo-data/learning/mnist/"
    print("In my predict input function")
    mnist = input_data.read_data_sets(local_dir, one_hot=True)

    dataset = tf.data.Dataset.from_tensor_slices(({"data" : mnist.train.images[0:10, :]}))
    # dataset = tf.data.Dataset.from_tensor_slices(({"data" : mnist.train.images}, mnist.train.labels))
    return dataset.batch(1000)



def predict_output_function_job(results, params):
    import pickle
    import base64
    print("BEFORE")
    print(params)
    ret = [] ## collecting all results of the model inference in an array
    bc = 1
    for each in results:
        log.info("Processed batch {}".format(bc))
        ret.append(each)
        bc += 1
    print("AFTER")
    serialized = pickle.dumps(ret, protocol=0) # protocol 0 is printable ASCII
    s = base64.b64encode(serialized)
    s = s.decode("UTF-8")
    ## WRITE RESULT TO /workspace/shared-dir/<some sub folder> to be used from outside.
    
    return s ## returning the serialized string to be returned to the application. You can also pass empty string response,
    ## if you write directly to file system instead of the REST response.
In [ ]:
inf_job = model.submit_inference_job(job_name="TestInfJob" + str(time.time()).replace(".", ""),
                           description="Test Job",
                           input_function=predict_input_function_job,
                           output_function=predict_output_function_job,
                           model_function=my_model_function_with_mnist, ## Try with and without the model function
                           best_model=False
                           ,gpu_required=True,
                           params={"testparam":"test_val_1"}
                               )


inf_job.wait_until_finish()
In [ ]:
print(ModelInferenceJob.retrieve(inf_job.model_inference_job_id).is_success())
In [ ]:
## Create one more job using the above cells.

inf_job.request_cancellation() ## Immediately returns, later the job will be cancelled.


inf_job.cancel() ## requests cancel and waits for cancellation to finish

Cheers!!

In [ ]:
 
In [ ]: