Scalding Interpreter for Apache Zeppelin

Scalding is an open source Scala library for writing MapReduce jobs.

Building the Scalding Interpreter

You have to first build the Scalding interpreter by enable the scalding profile as follows:

mvn clean package -Pscalding -DskipTests

Enabling the Scalding Interpreter

In a notebook, to enable the Scalding interpreter, click on the Gear icon,select Scalding, and hit Save.

Interpreter Binding

Interpreter Selection

Configuring the Interpreter

Scalding interpreter runs in two modes:

  • local
  • hdfs

In the local mode, you can access files on the local server and scalding transformation are done locally.

In hdfs mode you can access files in HDFS and scalding transformation are run as hadoop map-reduce jobs.

Zeppelin comes with a pre-configured Scalding interpreter in local mode.

To run the scalding interpreter in the hdfs mode you have to do the following:

Set the classpath with ZEPPELIN_CLASSPATH_OVERRIDES

In conf/zeppelinenv.sh, you have to set ZEPPELINCLASSPATH_OVERRIDES to the contents of 'hadoop classpath' and directories with custom jar files you need for your scalding commands.

Set arguments to the scalding repl

The default arguments are: --local --repl

For hdfs mode you need to add: --hdfs --repl

If you want to add custom jars, you need to add: -libjars directory/*:directory/*

For reducer estimation, you need to add something like: -Dscalding.reducer.estimator.classes=com.twitter.scalding.reducer_estimation.InputSizeReducerEstimator

Set max.open.instances

If you want to control the maximum number of open interpreters, you have to select "scoped" interpreter for note option and set max.open.instances argument.

Testing the Interpreter

Local mode

In example, by using the Alice in Wonderland tutorial, we will count words (of course!), and plot a graph of the top 10 words in the book.

%scalding

import scala.io.Source

// Get the Alice in Wonderland book from gutenberg.org:
val alice = Source.fromURL("http://www.gutenberg.org/files/11/11.txt").getLines
val aliceLineNum = alice.zipWithIndex.toList
val alicePipe = TypedPipe.from(aliceLineNum)

// Now get a list of words for the book:
val aliceWords = alicePipe.flatMap { case (text, _) => text.split("\\s+").toList }

// Now lets add a count for each word:
val aliceWithCount = aliceWords.filterNot(_.equals("")).map { word => (word, 1L) }

// let's sum them for each word:
val wordCount = aliceWithCount.group.sum

print ("Here are the top 10 words\n")
val top10 = wordCount
  .groupAll
  .sortBy { case (word, count) => -count }
  .take(10)
top10.dump
%scalding

val table = "words\t count\n" + top10.toIterator.map{case (k, (word, count)) => s"$word\t$count"}.mkString("\n")
print("%table " + table)

If you click on the icon for the pie chart, you should be able to see a chart like this: Scalding - Pie - Chart

HDFS mode

Test mode

%scalding
mode

This command should print:

res4: com.twitter.scalding.Mode = Hdfs(true,Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml)

Test HDFS read

val testfile = TypedPipe.from(TextLine("/user/x/testfile"))
testfile.dump

This command should print the contents of the hdfs file /user/x/testfile.

Test map-reduce job

val testfile = TypedPipe.from(TextLine("/user/x/testfile"))
val a = testfile.groupAll.size.values
a.toList

This command should create a map reduce job.

Future Work

  • Better user feedback (hadoop url, progress updates)
  • Ability to cancel jobs
  • Ability to dynamically load jars without restarting the interpreter
  • Multiuser scalability (run scalding interpreters on different servers)