Apache Mahout Interpreter for Apache Zeppelin

Installation

Apache Mahout is a collection of packages that enable machine learning and matrix algebra on underlying engines such as Apache Flink or Apache Spark. A convenience script for creating and configuring two Mahout enabled interpreters exists. The %sparkMahout and %flinkMahout interpreters do not exist by default but can be easily created using this script.

Easy Installation

To quickly and easily get up and running using Apache Mahout, run the following command from the top-level directory of the Zeppelin install:

python scripts/mahout/add_mahout.py

This will create the %sparkMahout and %flinkMahout interpreters, and restart Zeppelin.

Advanced Installation

The add_mahout.py script contains several command line arguments for advanced users.

Argument Description Example
--zeppelin_home This is the path to the Zeppelin installation. This flag is not needed if the script is run from the top-level installation directory or from the zeppelin/scripts/mahout directory. /path/to/zeppelin
--mahout_home If the user has already installed Mahout, this flag can set the path to MAHOUT_HOME. If this is set, downloading Mahout will be skipped. /path/to/mahout_home
--restart_later Restarting is necessary for updates to take effect. By default the script will restart Zeppelin for you. Restart will be skipped if this flag is set. NA
--force_download This flag will force the script to re-download the binary even if it already exists. This is useful for previously failed downloads. NA
--overwrite_existing This flag will force the script to overwrite existing %sparkMahout and %flinkMahout interpreters. Useful when you want to just start over. NA

NOTE 1: Apache Mahout at this time only supports Spark 1.5 and Spark 1.6 and Scala 2.10. If the user is using another version of Spark (e.g. 2.0), the %sparkMahout will likely not work. The %flinkMahout interpreter will still work and the user is encouraged to develop with that engine as the code can be ported via copy and paste, as is evidenced by the tutorial notebook.

NOTE 2: If using Apache Flink in cluster mode, the following libraries will also need to be coppied to ${FLINK_HOME}/lib - mahout-math-0.12.2.jar - mahout-math-scala2.10-0.12.2.jar - mahout-flink2.10-0.12.2.jar - mahout-hdfs-0.12.2.jar - com.google.guava:guava:14.0.1

Overview

The Apache Mahout™ project's goal is to build an environment for quickly creating scalable performant machine learning applications.

Apache Mahout software provides three major features:

  • A simple and extensible programming environment and framework for building scalable algorithms
  • A wide variety of premade algorithms for Scala + Apache Spark, H2O, Apache Flink
  • Samsara, a vector math experimentation environment with R-like syntax which works at scale

In other words:

Apache Mahout provides a unified API for quickly creating machine learning algorithms on a variety of engines.

How to use

When starting a session with Apache Mahout, depending on which engine you are using (Spark or Flink), a few imports must be made and a Distributed Context must be declared. Copy and paste the following code and run once to get started.

Flink

%flinkMahout

import org.apache.flink.api.scala._
import org.apache.mahout.math.drm._
import org.apache.mahout.math.drm.RLikeDrmOps._
import org.apache.mahout.flinkbindings._
import org.apache.mahout.math._
import scalabindings._
import RLikeOps._

implicit val ctx = new FlinkDistributedContext(benv)

Spark

%sparkMahout

import org.apache.mahout.math._
import org.apache.mahout.math.scalabindings._
import org.apache.mahout.math.drm._
import org.apache.mahout.math.scalabindings.RLikeOps._
import org.apache.mahout.math.drm.RLikeDrmOps._
import org.apache.mahout.sparkbindings._

implicit val sdc: org.apache.mahout.sparkbindings.SparkDistributedContext = sc2sdc(sc)

Same Code, Different Engines

After importing and setting up the distributed context, the Mahout R-Like DSL is consistent across engines. The following code will run in both %flinkMahout and %sparkMahout

val drmData = drmParallelize(dense(
  (2, 2, 10.5, 10, 29.509541),  // Apple Cinnamon Cheerios
  (1, 2, 12,   12, 18.042851),  // Cap'n'Crunch
  (1, 1, 12,   13, 22.736446),  // Cocoa Puffs
  (2, 1, 11,   13, 32.207582),  // Froot Loops
  (1, 2, 12,   11, 21.871292),  // Honey Graham Ohs
  (2, 1, 16,   8,  36.187559),  // Wheaties Honey Gold
  (6, 2, 17,   1,  50.764999),  // Cheerios
  (3, 2, 13,   7,  40.400208),  // Clusters
  (3, 3, 13,   4,  45.811716)), numPartitions = 2)

drmData.collect(::, 0 until 4)

val drmX = drmData(::, 0 until 4)
val y = drmData.collect(::, 4)
val drmXtX = drmX.t %*% drmX
val drmXty = drmX.t %*% y


val XtX = drmXtX.collect
val Xty = drmXty.collect(::, 0)
val beta = solve(XtX, Xty)

Leveraging Resource Pools and R for Visualization

Resource Pools are a powerful Zeppelin feature that lets us share information between interpreters. A fun trick is to take the output of our work in Mahout and analyze it in other languages.

Setting up a Resource Pool in Flink

In Spark based interpreters resource pools are accessed via the ZeppelinContext API. To put and get things from the resource pool one can be done simple

val myVal = 1
z.put("foo", myVal)
val myFetchedVal = z.get("foo")

To add this functionality to a Flink based interpreter we declare the follwoing

%flinkMahout

import org.apache.zeppelin.interpreter.InterpreterContext

val z = InterpreterContext.get().getResourcePool()

Now we can access the resource pool in a consistent manner from the %flinkMahout interpreter.

Passing a variable from Mahout to R and Plotting

In this simple example, we use Mahout (on Flink or Spark, the code is the same) to create a random matrix and then take the Sin of each element. We then randomly sample the matrix and create a tab separated string. Finally we pass that string to R where it is read as a .tsv file, and a DataFrame is created and plotted using native R plotting libraries.

val mxRnd = Matrices.symmetricUniformView(5000, 2, 1234)
val drmRand = drmParallelize(mxRnd)


val drmSin = drmRand.mapBlock() {case (keys, block) =>  
  val blockB = block.like()
  for (i <- 0 until block.nrow) {
    blockB(i, 0) = block(i, 0)
    blockB(i, 1) = Math.sin((block(i, 0) * 8))
  }
  keys -> blockB
}

z.put("sinDrm", org.apache.mahout.math.drm.drmSampleToTSV(drmSin, 0.85))

And then in an R paragraph...

%spark.r {"imageWidth": "400px"}

library("ggplot2")

sinStr = z.get("flinkSinDrm")

data <- read.table(text= sinStr, sep="\t", header=FALSE)

plot(data,  col="red")