Apache Mahout Interpreter for Apache Zeppelin
Installation
Apache Mahout is a collection of packages that enable machine learning and matrix algebra on underlying engines such as Apache Flink or Apache Spark. A convenience script for creating and configuring two Mahout enabled interpreters exists. The %sparkMahout
and %flinkMahout
interpreters do not exist by default but can be easily created using this script.
Easy Installation
To quickly and easily get up and running using Apache Mahout, run the following command from the top-level directory of the Zeppelin install:
python scripts/mahout/add_mahout.py
This will create the %sparkMahout
and %flinkMahout
interpreters, and restart Zeppelin.
Advanced Installation
The add_mahout.py
script contains several command line arguments for advanced users.
Argument | Description | Example |
---|---|---|
--zeppelin_home |
This is the path to the Zeppelin installation. This flag is not needed if the script is run from the top-level installation directory or from the zeppelin/scripts/mahout directory. |
/path/to/zeppelin |
--mahout_home |
If the user has already installed Mahout, this flag can set the path to MAHOUT_HOME . If this is set, downloading Mahout will be skipped. |
/path/to/mahout_home |
--restart_later |
Restarting is necessary for updates to take effect. By default the script will restart Zeppelin for you. Restart will be skipped if this flag is set. | NA |
--force_download |
This flag will force the script to re-download the binary even if it already exists. This is useful for previously failed downloads. | NA |
--overwrite_existing |
This flag will force the script to overwrite existing %sparkMahout and %flinkMahout interpreters. Useful when you want to just start over. |
NA |
NOTE 1: Apache Mahout at this time only supports Spark 1.5 and Spark 1.6 and Scala 2.10. If the user is using another version of Spark (e.g. 2.0), the %sparkMahout
will likely not work. The %flinkMahout
interpreter will still work and the user is encouraged to develop with that engine as the code can be ported via copy and paste, as is evidenced by the tutorial notebook.
NOTE 2: If using Apache Flink in cluster mode, the following libraries will also need to be coppied to ${FLINK_HOME}/lib
- mahout-math-0.12.2.jar
- mahout-math-scala2.10-0.12.2.jar
- mahout-flink2.10-0.12.2.jar
- mahout-hdfs-0.12.2.jar
- com.google.guava:guava:14.0.1
Overview
The Apache Mahout™ project's goal is to build an environment for quickly creating scalable performant machine learning applications.
Apache Mahout software provides three major features:
- A simple and extensible programming environment and framework for building scalable algorithms
- A wide variety of premade algorithms for Scala + Apache Spark, H2O, Apache Flink
- Samsara, a vector math experimentation environment with R-like syntax which works at scale
In other words:
Apache Mahout provides a unified API for quickly creating machine learning algorithms on a variety of engines.
How to use
When starting a session with Apache Mahout, depending on which engine you are using (Spark or Flink), a few imports must be made and a Distributed Context must be declared. Copy and paste the following code and run once to get started.
Flink
%flinkMahout
import org.apache.flink.api.scala._
import org.apache.mahout.math.drm._
import org.apache.mahout.math.drm.RLikeDrmOps._
import org.apache.mahout.flinkbindings._
import org.apache.mahout.math._
import scalabindings._
import RLikeOps._
implicit val ctx = new FlinkDistributedContext(benv)
Spark
%sparkMahout
import org.apache.mahout.math._
import org.apache.mahout.math.scalabindings._
import org.apache.mahout.math.drm._
import org.apache.mahout.math.scalabindings.RLikeOps._
import org.apache.mahout.math.drm.RLikeDrmOps._
import org.apache.mahout.sparkbindings._
implicit val sdc: org.apache.mahout.sparkbindings.SparkDistributedContext = sc2sdc(sc)
Same Code, Different Engines
After importing and setting up the distributed context, the Mahout R-Like DSL is consistent across engines. The following code will run in both %flinkMahout
and %sparkMahout
val drmData = drmParallelize(dense(
(2, 2, 10.5, 10, 29.509541), // Apple Cinnamon Cheerios
(1, 2, 12, 12, 18.042851), // Cap'n'Crunch
(1, 1, 12, 13, 22.736446), // Cocoa Puffs
(2, 1, 11, 13, 32.207582), // Froot Loops
(1, 2, 12, 11, 21.871292), // Honey Graham Ohs
(2, 1, 16, 8, 36.187559), // Wheaties Honey Gold
(6, 2, 17, 1, 50.764999), // Cheerios
(3, 2, 13, 7, 40.400208), // Clusters
(3, 3, 13, 4, 45.811716)), numPartitions = 2)
drmData.collect(::, 0 until 4)
val drmX = drmData(::, 0 until 4)
val y = drmData.collect(::, 4)
val drmXtX = drmX.t %*% drmX
val drmXty = drmX.t %*% y
val XtX = drmXtX.collect
val Xty = drmXty.collect(::, 0)
val beta = solve(XtX, Xty)
Leveraging Resource Pools and R for Visualization
Resource Pools are a powerful Zeppelin feature that lets us share information between interpreters. A fun trick is to take the output of our work in Mahout and analyze it in other languages.
Setting up a Resource Pool in Flink
In Spark based interpreters resource pools are accessed via the ZeppelinContext API. To put and get things from the resource pool one can be done simple
val myVal = 1
z.put("foo", myVal)
val myFetchedVal = z.get("foo")
To add this functionality to a Flink based interpreter we declare the follwoing
%flinkMahout
import org.apache.zeppelin.interpreter.InterpreterContext
val z = InterpreterContext.get().getResourcePool()
Now we can access the resource pool in a consistent manner from the %flinkMahout
interpreter.
Passing a variable from Mahout to R and Plotting
In this simple example, we use Mahout (on Flink or Spark, the code is the same) to create a random matrix and then take the Sin of each element. We then randomly sample the matrix and create a tab separated string. Finally we pass that string to R where it is read as a .tsv file, and a DataFrame is created and plotted using native R plotting libraries.
val mxRnd = Matrices.symmetricUniformView(5000, 2, 1234)
val drmRand = drmParallelize(mxRnd)
val drmSin = drmRand.mapBlock() {case (keys, block) =>
val blockB = block.like()
for (i <- 0 until block.nrow) {
blockB(i, 0) = block(i, 0)
blockB(i, 1) = Math.sin((block(i, 0) * 8))
}
keys -> blockB
}
z.put("sinDrm", org.apache.mahout.math.drm.drmSampleToTSV(drmSin, 0.85))
And then in an R paragraph...
%spark.r {"imageWidth": "400px"}
library("ggplot2")
sinStr = z.get("flinkSinDrm")
data <- read.table(text= sinStr, sep="\t", header=FALSE)
plot(data, col="red")