Cascading 1.1.0 Released
Cascading is an open source abstraction for MapReduce that allows applications to work with Hadoop through a straightforward Java API. Cascading provides finer-grained ways to define applications and glue them together, as well as abstractions to work with external systems.
This release features many performance and usability enhancements while remaining backwards compatible with 1.0.
- Performance optimizations with all join types
- Numerous job planner optimizations
- Dynamic optimizations when running in Amazon Elastic MapReduce and S3
- API usability improvements around large number of field names
- Support for TSV, CSV, and custom delimited text files
- Support for manipulating and serializing non-Comparable custom Java types
- Debug levels supported by the job planner
For a detailed list of changes see: CHANGES.txt
Along with this release are a number of extensions created by the Cascading user community.
Among these extension are:
- Bixo – a data mining toolkit
- DBMigrate – a tool for migrating data to/from RDBMSs into Hadoop
- Apache HBase, Amazon SimpleDB, and JDBC integration
- JRuby and Clojure based scripting languages for Cascading
- Cascalog – a robust interactive extensible query language
Here is an excerpt from their User Guide for getting started http://www.cascading.org/1.1/userguide/html/ch02.html
Counting words in a document is the most common example presented to
new Hadoop (and MapReduce) developers, it is the Hadoop equivalent to the
“Hello World” application.
Word counting is where a document is parsed into individual words,
and the frequency of those words are counted.
For example, if we counted the last paragraph “is” would be counted
twice, and “document” counted once.
In the code example below, we will use Cascading to read each line
of text from a file (our document), parse it into words, then count the
number of time the word is encountered.
Example 2.1. Word Counting
// define source and sink Taps. Scheme sourceScheme = new TextLine( new Fields( "line" ) ); Tap source = new Hfs( sourceScheme, inputPath ); Scheme sinkScheme = new TextLine( new Fields( "word", "count" ) ); Tap sink = new Hfs( sinkScheme, outputPath, SinkMode.REPLACE ); // the 'head' of the pipe assembly Pipe assembly = new Pipe( "wordcount" ); // For each input Tuple // parse out each word into a new Tuple with the field name "word" // regular expressions are optional in Cascading String regex = "(?<!\\pL)(?=\\pL)[^ ]*(?<=\\pL)(?!\\pL)"; Function function = new RegexGenerator( new Fields( "word" ), regex ); assembly = new Each( assembly, new Fields( "line" ), function ); // group the Tuple stream by the "word" value assembly = new GroupBy( assembly, new Fields( "word" ) ); // For every Tuple group // count the number of occurrences of "word" and store result in // a field named "count" Aggregator count = new Count( new Fields( "count" ) ); assembly = new Every( assembly, count ); // initialize app properties, tell Hadoop which jar file to use Properties properties = new Properties(); FlowConnector.setApplicationJarClass( properties, Main.class ); // plan a new Flow from the assembly using the source and sink Taps // with the above properties FlowConnector flowConnector = new FlowConnector( properties ); Flow flow = flowConnector.connect( "word-count", source, sink, assembly ); // execute the flow, block until complete flow.complete();
There are a couple things to take away from this example.
First, the pipe assembly is not coupled to the data (the Tap instances) until the last moment before execution. That is, file paths or references are not embedded in the pipe assembly. The pipe assembly remains independent of which data it processes until execution. The only dependency is what the data looks
like, its “scheme”, or the field names that make it up.
That brings up fields. Every input and output file has field names associated with it, and every processing element of the pipe assembly either expects certain fields, or creates new fields. This allows the developer to self document their code, and allows the Cascading planner to “fail fast” during planning if a dependency between elements isn’t satisfied (used a missing or wrong field name).
It is also important to point out that pipe assemblies are assembled through constructor chaining. This may seem odd but is done for two reasons. It keeps the code more concise. And it prevents developers from creating “cycles” in the resulting pipe assembly. Pipe assemblies are Directed Acyclic Graphs (or DAGs). The Cascading planner cannot handle processes that feed themselves, that have cycles (not to say there are ways around this that are much safer).
Notice the very first
Pipe instance has a name. That instance is the “head” of this particular pipe assembly. Pipe assemblies can have any number of heads, and any number of tails. This example does not name the tail assembly, but for complex assemblies, tails must be named for reasons described below.
Heads and tails of pipe assemblies generally need names, this is how sources and sinks are “bound” to them during planning. In our example above, there is only one head and one tail, and subsequently only one source and one sink, respectively. So naming in this case is optional, it’s obvious what goes where. Naming is also useful for self documenting pipe assemblies, especially where there are splits, joins, and merges in the assembly.
To paraphrase, our example will:
- read each line of text from a file and give it the field name “line”,
- parse each “line” into words by the
RegexGeneratorobject which in turn returns each word in the field named “word”,
- groups on the field named “word” using the
- then counts the number of elements in each grouping using the
Count()object and stores this value in the “count” field,
- finally the “word” and “count” fields are written out.