Archive for the ‘Cascading’ Category

Hadoop, Mesos, Cascading, Scalding, Cascalog and Data Science with Paco Nathan

July 30, 2013 Leave a comment

Episode #9 of the podcast is a talk with Paco Nathon.  Available also on iTunes

We talked about how he got started with Hadoop with Natural Language Processing back in 2007 with text analytics.

And then starting talking about Mesos

Apache Mesos is a cluster manager that provides efficient resource isolation and sharing across distributed applications, or frameworks. It can run Hadoop, MPI, Hypertable, Spark, and other applications on a dynamically shared pool of nodes.

We talked a little about the difference between YARN and Mesos.  Paco talked about how Mesos is lower in the stack and part of the operating system where YARN is higher up in the stack and built to support the Hadoop ecosystem in the JVM.  He talked about the future of Mesos and touched on its contrast to Google Borg … for some more information on Google Borg and Mesos here is a great article

Then we got into Cascading which was started by Chris Wensel – and talked about the enterprise use cases for Cascading.  He talked about how Cascading has always been geared to satisfy enterprise use cases and not slice and dice but build an application on top of it and be able to debug it to see where it is running because it is deterministic. He talked about how this contrasts to Hive and Pig. He brought up Steve Yegeg’s post “Notes from the Mystery Machine Bus” and talked a bit how Cascading applied.

We got into design patterns for the enterprise with big batch workflow breaking it up into five parts:

1) Different data sources (structured and unstructured data)
2) ETL
3) Custom data preparation and business logic to clean up the data
4) Analytics or predictive modeling to enrich the data
5) integration with end use cases that consume the data products

Cascading addresses all of these points and Paco talked in more detail about them.

We finished up the podcast with him talking about the future of these technologies and also data science.

Subscribe to the podcast and listen to what Paco had to say.  Available also on iTunes

Joe Stein
Big Data Open Source Security LLC


Cascading 1.1.0 Released

April 23, 2010 2 comments

Cascading is an open source abstraction for MapReduce that allows applications to work with Hadoop through a straightforward Java API. Cascading provides finer-grained ways to define applications and glue them together, as well as abstractions to work with external systems.

This release features many performance and usability enhancements while remaining backwards compatible with 1.0.


  • Performance optimizations with all join types
  • Numerous job planner optimizations
  • Dynamic optimizations when running in Amazon Elastic MapReduce and S3
  • API usability improvements around large number of field names
  • Support for TSV, CSV, and custom delimited text files
  • Support for manipulating and serializing non-Comparable custom Java types
  • Debug levels supported by the job planner

For a detailed list of changes see: CHANGES.txt

Along with this release are a number of extensions created by the Cascading user community.

Among these extension are:

  • Bixo – a data mining toolkit
  • DBMigrate – a tool for migrating data to/from RDBMSs into Hadoop
  • Apache HBase, Amazon SimpleDB, and JDBC integration
  • JRuby and Clojure based scripting languages for Cascading
  • Cascalog – a robust interactive extensible query language

Here is an excerpt from their User Guide for getting started

Counting words in a document is the most common example presented to
new Hadoop (and MapReduce) developers, it is the Hadoop equivalent to the
“Hello World” application.

Word counting is where a document is parsed into individual words,
and the frequency of those words are counted.

For example, if we counted the last paragraph “is” would be counted
twice, and “document” counted once.

In the code example below, we will use Cascading to read each line
of text from a file (our document), parse it into words, then count the
number of time the word is encountered.

Example 2.1. Word Counting

// define source and sink Taps.
Scheme sourceScheme = new TextLine( new Fields( "line" ) );
Tap source = new Hfs( sourceScheme, inputPath );

Scheme sinkScheme = new TextLine( new Fields( "word", "count" ) );
Tap sink = new Hfs( sinkScheme, outputPath, SinkMode.REPLACE );

// the 'head' of the pipe assembly
Pipe assembly = new Pipe( "wordcount" );

// For each input Tuple
// parse out each word into a new Tuple with the field name "word"
// regular expressions are optional in Cascading
String regex = "(?<!\\pL)(?=\\pL)[^ ]*(?<=\\pL)(?!\\pL)";
Function function = new RegexGenerator( new Fields( "word" ), regex );
assembly = new Each( assembly, new Fields( "line" ), function );

// group the Tuple stream by the "word" value
assembly = new GroupBy( assembly, new Fields( "word" ) );

// For every Tuple group
// count the number of occurrences of "word" and store result in
// a field named "count"
Aggregator count = new Count( new Fields( "count" ) );
assembly = new Every( assembly, count );

// initialize app properties, tell Hadoop which jar file to use
Properties properties = new Properties();
FlowConnector.setApplicationJarClass( properties, Main.class );

// plan a new Flow from the assembly using the source and sink Taps
// with the above properties
FlowConnector flowConnector = new FlowConnector( properties );
Flow flow = flowConnector.connect( "word-count", source, sink, assembly );

// execute the flow, block until complete

There are a couple things to take away from this example.

First, the pipe assembly is not coupled to the data (the Tap instances) until the last moment before execution. That is, file paths or references are not embedded in the pipe assembly. The pipe assembly remains independent of which data it processes until execution. The only dependency is what the data looks
like, its “scheme”, or the field names that make it up.

That brings up fields. Every input and output file has field names associated with it, and every processing element of the pipe assembly either expects certain fields, or creates new fields. This allows the developer to self document their code, and allows the Cascading planner to “fail fast” during planning if a dependency between elements isn’t satisfied (used a missing or wrong field name).

It is also important to point out that pipe assemblies are assembled through constructor chaining. This may seem odd but is done for two reasons. It keeps the code more concise. And it prevents developers from creating “cycles” in the resulting pipe assembly. Pipe assemblies are Directed Acyclic Graphs (or DAGs). The Cascading planner cannot handle processes that feed themselves, that have cycles (not to say there are ways around this that are much safer).

Notice the very first Pipe instance has a name. That instance is the “head” of this particular pipe assembly. Pipe assemblies can have any number of heads, and any number of tails. This example does not name the tail assembly, but for complex assemblies, tails must be named for reasons described below.

Heads and tails of pipe assemblies generally need names, this is how sources and sinks are “bound” to them during planning. In our example above, there is only one head and one tail, and subsequently only one source and one sink, respectively. So naming in this case is optional, it’s obvious what goes where. Naming is also useful for self documenting pipe assemblies, especially where there are splits, joins, and merges in the assembly.

To paraphrase, our example will:

  • read each line of text from a file and give it the field name “line”,
  • parse each “line” into words by the RegexGenerator object which in turn returns each word in the field named “word”,
  • groups on the field named “word” using the GroupBy object,
  • then counts the number of elements in each grouping using the Count() object and stores this value in the “count” field,
  • finally the “word” and “count” fields are written out.


Categories: Cascading