Skip to content

tomasatdatabricks/spark-deep-learning

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

45 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Deep Learning Pipelines for Apache Spark

Build Status Coverage

Deep Learning Pipelines provides high-level APIs for scalable deep learning in Python with Apache Spark.

Overview

Deep Learning Pipelines provides high-level APIs for scalable deep learning in Python with Apache Spark.

The library comes from Databricks and leverages Spark for its two strongest facets:

  1. In the spirit of Spark and Spark MLlib, it provides easy-to-use APIs that enable deep learning in very few lines of code.
  2. It uses Spark's powerful distributed engine to scale out deep learning on massive datasets.

Currently, TensorFlow and TensorFlow-backed Keras workflows are supported, with a focus on model inference/scoring and transfer learning on image data at scale, with hyper-parameter tuning in the works.

Furthermore, it provides tools for data scientists and machine learning experts to turn deep learning models into SQL functions that can be used by a much wider group of users. It does not perform single-model distributed training - this is an area of active research, and here we aim to provide the most practical solutions for the majority of deep learning use cases.

For an overview of the library, see the Databricks blog post introducing Deep Learning Pipelines. For the various use cases the package serves, see the Quick user guide section below.

The library is in its early days, and we welcome everyone's feedback and contribution.

Maintainers: Bago Amirbekian, Joseph Bradley, Sue Ann Hong, Tim Hunter, Philip Yang

Building and running unit tests

To compile this project, run build/sbt assembly from the project home directory. This will also run the Scala unit tests.

To run the Python unit tests, run the run-tests.sh script from the python/ directory. You will need to set a few environment variables, e.g.

# Be sure to run build/sbt assembly before running the Python tests
sparkdl$ SPARK_HOME=/usr/local/lib/spark-2.1.1-bin-hadoop2.7 PYSPARK_PYTHON=python2 SCALA_VERSION=2.11.8 SPARK_VERSION=2.1.1 ./python/run-tests.sh

Spark version compatibility

Spark 2.1.1 and Python 2.7 are recommended.

Support

You can ask questions and join the development discussion on the DL Pipelines Google group.

You can also post bug reports and feature requests in Github issues.

Quick user guide

The current version of Deep Learning Pipelines provides a suite of tools around working with and processing images using deep learning. The tools can be categorized as

To try running the examples below, check out the Databricks notebook DeepLearning Pipelines on Databricks.

Working with images in Spark

The first step to applying deep learning on images is the ability to load the images. Spark and Deep Learning Pipelines include utility functions that can load millions of images into a Spark DataFrame and decode them automatically in a distributed fashion, allowing manipulation at scale.

Using Spark's ImageSchema

from sparkdl.image.image import ImageSchema
image_df = ImageSchema.readImages("/data/myimages")

or if custom image library is needed:

from sparkdl.image import imageIO as imageIO
image_df = imageIO.readImagesWithCustomFn("/data/myimages",decode_f=<your image library, see imageIO.PIL_decode>)

The resulting DataFrame contains a string column named "image" containing an image struct with schema == ImageSchema.

image_df.show()

The goal is to add support for more data types, such as text and time series, as there is interest.

Transfer learning

Deep Learning Pipelines provides utilities to perform transfer learning on images, which is one of the fastest (code and run-time-wise) ways to start using deep learning. Using Deep Learning Pipelines, it can be done in just several lines of code.

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from sparkdl import DeepImageFeaturizer

featurizer = DeepImageFeaturizer(inputCol="image", outputCol="features", modelName="InceptionV3")
lr = LogisticRegression(maxIter=20, regParam=0.05, elasticNetParam=0.3, labelCol="label")
p = Pipeline(stages=[featurizer, lr])

model = p.fit(train_images_df)    # train_images_df is a dataset of images and labels

# Inspect training error
df = model.transform(train_images_df.limit(10)).select("image", "probability",  "uri", "label")
predictionAndLabels = df.select("prediction", "label")
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print("Training set accuracy = " + str(evaluator.evaluate(predictionAndLabels)))

Applying deep learning models at scale

Spark DataFrames are a natural construct for applying deep learning models to a large-scale dataset. Deep Learning Pipelines provides a set of (Spark MLlib) Transformers for applying TensorFlow Graphs and TensorFlow-backed Keras Models at scale. In addition, popular images models can be applied out of the box, without requiring any TensorFlow or Keras code. The Transformers, backed by the Tensorframes library, efficiently handle the distribution of models and data to Spark workers.

  1. Applying popular image models

    There are many well-known deep learning models for images. If the task at hand is very similar to what the models provide (e.g. object recognition with ImageNet classes), or for pure exploration, one can use the Transformer DeepImagePredictor by simply specifying the model name.

    from sparkdl.image.image import ImageSchema
    
    from sparkdl import DeepImagePredictor
    
    predictor = DeepImagePredictor(inputCol="image", outputCol="predicted_labels",
                                   modelName="InceptionV3", decodePredictions=True, topK=10)
    image_df = ImageSchema.readImages("/data/myimages")
    predictions_df = predictor.transform(image_df)
  2. For TensorFlow users

    Deep Learning Pipelines provides a Transformer that will apply the given TensorFlow Graph to a DataFrame containing a column of images (e.g. loaded using the utilities described in the previous section). Here is a very simple example of how a TensorFlow Graph can be used with the Transformer. In practice, the TensorFlow Graph will likely be restored from files before calling TFImageTransformer.

    from sparkdl.image.image import ImageSchema
    from sparkdl import TFImageTransformer
    import sparkdl.graph.utils as tfx
    from sparkdl.transformers import utils
    import tensorflow as tf
    
    graph = tf.Graph()
    with tf.Session(graph=graph) as sess:
        image_arr = utils.imageInputPlaceholder()
        resized_images = tf.image.resize_images(image_arr, (299, 299))
        frozen_graph = tfx.strip_and_freeze_until([resized_images], graph, sess,
                                                  return_graph=True)
    
    transformer = TFImageTransformer(inputCol="image", outputCol="predictions", graph=frozen_graph,
                                     inputTensor=image_arr, outputTensor=resized_images,
                                     outputMode="image")
    image_df = ImageSchema.readImages("/data/myimages")
    processed_image_df = transformer.transform(image_df)
  3. For Keras users

    Images

    For applying Keras models to images in a distributed manner using Spark, KerasImageFileTransformer works on TensorFlow-backed Keras models. It

    1. Internally creates a DataFrame containing a column of images by applying the user-specified image loading and processing function to the input DataFrame containing a column of image URIs
    2. Loads a Keras model from the given model file path
    3. Applies the model to the image DataFrame

    The difference in the API from TFImageTransformer above stems from the fact that usual Keras workflows have very specific ways to load and resize images that are not part of the TensorFlow Graph.

    To use the transformer, we first need to have a Keras model stored as a file. For this example we'll just save the Keras built-in InceptionV3 model instead of training one.

    from keras.applications import InceptionV3
    
    model = InceptionV3(weights="imagenet")
    model.save('/tmp/model-full.h5')

    Now on the prediction side, we can do:

    from keras.applications.inception_v3 import preprocess_input
    from keras.preprocessing.image import img_to_array, load_img
    import numpy as np
    import os
    from sparkdl import KerasImageFileTransformer
    
    def loadAndPreprocessKerasInceptionV3(uri):
        # this is a typical way to load and prep images in keras
        image = img_to_array(load_img(uri, target_size=(299, 299)))
        image = np.expand_dims(image, axis=0)
        return preprocess_input(image)
    
    transformer = KerasImageFileTransformer(inputCol="uri", outputCol="predictions",
                                            modelFile="/tmp/model-full.h5",
                                            imageLoader=loadAndPreprocessKerasInceptionV3,
                                            outputMode="vector")
    
    files = [os.path.abspath(os.path.join(dirpath, f)) for f in os.listdir("/data/myimages") if f.endswith('.jpg')]
    uri_df = sqlContext.createDataFrame(files, StringType()).toDF("uri")
    
    final_df = transformer.transform(uri_df)

    Tensor Inputs

    KerasTransformer applies a TensorFlow-backed Keras model to inputs of up to 2 dimensions. It loads a Keras model from a given model file path and applies the model to a column of arrays (where an array corresponds to a Tensor), outputting a column of arrays.

    from sparkdl import KerasTransformer
    from keras.models import Sequential
    from keras.layers import Dense
    import numpy as np
    
    # Generate random input data
    num_features = 10
    num_examples = 100
    input_data = [{"features" : np.random.randn(num_features).tolist()} for i in range(num_examples)]
    input_df = sqlContext.createDataFrame(input_data)
    
    # Create and save a single-hidden-layer Keras model for binary classification
    # NOTE: In a typical workflow, we'd train the model before exporting it to disk,
    # but we skip that step here for brevity
    model = Sequential()
    model.add(Dense(units=20, input_shape=[num_features], activation='relu'))
    model.add(Dense(units=1, activation='sigmoid'))
    model_path = "/tmp/simple-binary-classification"
    model.save(model_path)
    
    # Create transformer and apply it to our input data
    transformer = KerasTransformer(inputCol="features", outputCol="predictions",
                                   modelFile=model_path)
    final_df = transformer.transform(input_df)

Deploying models as SQL functions

One way to productionize a model is to deploy it as a Spark SQL User Defined Function, which allows anyone who knows SQL to use it. Deep Learning Pipelines provides mechanisms to take a deep learning model and register a Spark SQL User Defined Function (UDF).

The resulting UDF takes a column (formatted as a image struct "SpImage") and produces the output of the given Keras model (e.g. for Inception V3, it produces a real valued score vector over the ImageNet object categories). For other models, the output could have different meanings. Please consult the actual models specification.

We can register any Keras models that work on images as follows.

from keras.applications import InceptionV3
from sparkdl.udf.keras_image_model import registerKerasImageUDF

from keras.applications import InceptionV3
registerKerasImageUDF("my_keras_inception_udf", InceptionV3(weights="imagenet"))

To use a customized Keras model, we can save it and pass the file path as parameter.

# Assume we have a compiled and trained Keras model
model.save('path/to/my/model.h5')
registerKerasImageUDF("my_custom_keras_model_udf", "path/to/my/model.h5")

Once the UDF is registered as described above, it can be used in a SQL query.

SELECT my_custom_keras_model_udf(image) as predictions from my_spark_image_table

If there are further preprocessing steps required to prepare the images, the user has the option to provide a preprocessing function preprocessor. The preprocessor converts a file path into a image array. This function is usually introduced in Keras workflow, as in the following example.

from keras.applications import InceptionV3
from sparkdl.udf.keras_image_model import registerKerasImageUDF

def keras_load_img(fpath):
    from keras.preprocessing.image import load_img, img_to_array
    import numpy as np
    img = load_img(fpath, target_size=(299, 299))
    return img_to_array(img).astype(np.uint8)

registerKerasImageUDF("my_keras_inception_udf", InceptionV3(weights="imagenet"), keras_load_img)

Releases

  • 0.1.0 Alpha release
  • 0.2.0 release:
    1. KerasImageFileEstimator API (train a Keras model on image files)
    2. SQL UDF support for Keras models
    3. Added Xception, Resnet50 models to DeepImageFeaturizer/DeepImagePredictor.

License

  • The Deep Learning Pipelines source code is released under the Apache License 2.0 (see the LICENSE file).
  • Models marked as provided by Keras (used by DeepImageFeaturizer and DeepImagePredictor) are provided subject to the MIT license located at https://github.com/fchollet/keras/blob/master/LICENSE and subject to any additional copyrights and licenses specified in the code or documentation. Also see the Keras applications page for more on the individual model licensing information.

About

Deep Learning Pipelines for Apache Spark

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages

  • Python 71.7%
  • Scala 23.2%
  • Shell 3.4%
  • Makefile 1.7%