Scio Interpreter for Apache Zeppelin

Overview

Scio is a Scala DSL for Google Cloud Dataflow and Apache Beam inspired by Spark and Scalding. See the current wiki and API documentation for more information.

Configuration

Name Default Value Description
zeppelin.scio.argz --runner=InProcessPipelineRunner Scio interpreter wide arguments. Documentation: https://github.com/spotify/scio/wiki#options and https://cloud.google.com/dataflow/pipelines/specifying-exec-params
zeppelin.scio.maxResult 1000 Max number of SCollection results to display

Enabling the Scio Interpreter

In a notebook, to enable the Scio interpreter, click the Gear icon and select beam (beam.scio).

Using the Scio Interpreter

In a paragraph, use %beam.scio to select the Scio interpreter. You can use it much the same way as vanilla Scala REPL and Scio REPL. State (like variables, imports, execution etc) is shared among all Scio paragraphs. There is a special variable argz which holds arguments from Scio interpreter settings. The easiest way to proceed is to create a Scio context via standard ContextAndArgs.

%beam.scio
val (sc, args) = ContextAndArgs(argz)

Use sc context the way you would in a regular pipeline/REPL.

Example:

%beam.scio
val (sc, args) = ContextAndArgs(argz)
sc.parallelize(Seq("foo", "foo", "bar")).countByValue.closeAndDisplay()

If you close Scio context, go ahead an create a new one using ContextAndArgs. Please refer to Scio wiki for more complex examples. You can close Scio context much the same way as in Scio REPL, and use Zeppelin display helpers to synchronously close and display results - read more below.

Progress

There can be only one paragraph running at once. There is no notion of overall progress, thus progress bar will show 0.

SCollection display helpers

Scio interpreter comes with display helpers to ease working with Zeppelin notebooks. Simply use closeAndDisplay() on SCollection to close context and display the results. The number of results is limited by zeppelin.scio.maxResult (by default 1000).

Supported SCollection types:

  • Scio's typed BigQuery
  • Scala's Products (case classes, tuples)
  • Google BigQuery's TableRow
  • Apache Avro
  • All Scala's AnyVal

Helper methods

There are different helper methods for different objects. You can easily display results from SCollection, Future[Tap] and Tap.

SCollection helper

SCollection has closeAndDisplay Zeppelin helper method for types listed above. Use it to synchronously close Scio context, and once available pull and display results.

Future[Tap] helper

Future[Tap] has waitAndDisplay Zeppelin helper method for types listed above. Use it to synchronously wait for results, and once available pull and display results.

Tap helper

Tap has display Zeppelin helper method for types listed above. Use it to pull and display results.

Examples

BigQuery example:

%beam.scio
@BigQueryType.fromQuery("""|SELECT departure_airport,count(case when departure_delay>0 then 1 else 0 end) as no_of_delays
                           |FROM [bigquery-samples:airline_ontime_data.flights]
                           |group by departure_airport
                           |order by 2 desc
                           |limit 10""".stripMargin) class Flights

val (sc, args) = ContextAndArgs(argz)
sc.bigQuerySelect(Flights.query).closeAndDisplay(Flights.schema)

BigQuery typed example:

%beam.scio
@BigQueryType.fromQuery("""|SELECT departure_airport,count(case when departure_delay>0 then 1 else 0 end) as no_of_delays
                           |FROM [bigquery-samples:airline_ontime_data.flights]
                           |group by departure_airport
                           |order by 2 desc
                           |limit 10""".stripMargin) class Flights

val (sc, args) = ContextAndArgs(argz)
sc.typedBigQuery[Flights]().flatMap(_.no_of_delays).mean.closeAndDisplay()

Avro example:

%beam.scio
import com.spotify.data.ExampleAvro

val (sc, args) = ContextAndArgs(argz)
sc.avroFile[ExampleAvro]("gs://<bucket>/tmp/my.avro").take(10).closeAndDisplay()

Avro example with a view schema:

%beam.scio
import com.spotify.data.ExampleAvro
import org.apache.avro.Schema

val (sc, args) = ContextAndArgs(argz)
val view = Schema.parse("""{"type":"record","name":"ExampleAvro","namespace":"com.spotify.data","fields":[{"name":"track","type":"string"}, {"name":"artist", "type":"string"}]}""")

sc.avroFile[EndSongCleaned]("gs://<bucket>/tmp/my.avro").take(10).closeAndDisplay(view)

Google credentials

Scio Interpreter will try to infer your Google Cloud credentials from its environment, it will take into the account:

  • argz interpreter settings (doc)
  • environment variable (GOOGLE_APPLICATION_CREDENTIALS)
  • gcloud configuration

BigQuery macro credentials

Currently BigQuery project for macro expansion is inferred using Google Dataflow's DefaultProjectFactory().create()