Archive

Archive for the ‘MapReduce’ Category

Hadoop Streaming Made Simple using Joins and Keys with Python

December 16, 2010 2 comments

There are a lot of different ways to write MapReduce jobs!!!

I find streaming scripts a good way to interrogate data sets (especially when I have not worked with them yet or are creating new ones) and enjoy the lifecycle when the initial elaboration of the data sets lead to the construction of the finalized scripts for an entire job (or series of jobs as is often the case).

When doing streaming with Hadoop you do have a few library options.  If you are a Ruby programmer then wukong is awesome! For Python programmers you can use dumbo and more recently released mrjob.

I like working under the hood myself and getting down and dirty with the data and here is how you can too.

Lets start first with defining two simple sample data sets.

Data set 1:  countries.dat

name|key

United States|US
Canada|CA
United Kingdom|UK
Italy|IT

Data set 2: customers.dat

name|type|country

Alice Bob|not bad|US
Sam Sneed|valued|CA
Jon Sneed|valued|CA
Arnold Wesise|not so good|UK
Henry Bob|not bad|US
Yo Yo Ma|not so good|CA
Jon York|valued|CA
Alex Ball|valued|UK
Jim Davis|not so bad|JA

The requirements: you need to find out grouped by type of customer how many of each type are in each country with the name of the country listed in the countries.dat in the final result (and not the 2 digit country name).

To-do this you need to:

1) Join the data sets
2) Key on country
3) Count type of customer per country
4) Output the results

So first lets code up a quick mapper called smplMapper.py (you can decide if smpl is short for simple or sample).

Now in coding the mapper and reducer in Python the basics are explained nicely here http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/ but I am going to dive a bit deeper to tackle our example with some more tactics.

#!/usr/bin/env python

import sys

# input comes from STDIN (standard input)
for line in sys.stdin:
	try: #sometimes bad data can cause errors use this how you like to deal with lint and bad data
        
		personName = "-1" #default sorted as first
		personType = "-1" #default sorted as first
		countryName = "-1" #default sorted as first
		country2digit = "-1" #default sorted as first
		
		# remove leading and trailing whitespace
		line = line.strip()
	 	
		splits = line.split("|")
		
		if len(splits) == 2: #country data
			countryName = splits[0]
			country2digit = splits[1]
		else: #people data
			personName = splits[0]
			personType = splits[1]
			country2digit = splits[2]			
		
		print '%s^%s^%s^%s' % (country2digit,personType,personName,countryName)
	except: #errors are going to make your job fail which you may or may not want
		pass

Don’t forget:

chmod a+x smplMapper.py

Great! We just took care of #1 but time to test and see what is going to the reducer.

From the command line run:

cat customers.dat countries.dat|./smplMapper.py|sort

Which will result in:

CA^-1^-1^Canada
CA^not so good^Yo Yo Ma^-1
CA^valued^Jon Sneed^-1
CA^valued^Jon York^-1
CA^valued^Sam Sneed^-1
IT^-1^-1^Italy
JA^not so bad^Jim Davis^-1
UK^-1^-1^United Kingdom
UK^not so good^Arnold Wesise^-1
UK^valued^Alex Ball^-1
US^-1^-1^United States
US^not bad^Alice Bob^-1
US^not bad^Henry Bob^-1

Notice how this is sorted so the country is first and the people in that country after it (so we can grab the correct country name as we loop) and with the type of customer also sorted (but within country) so we can properly count the types within the country. =8^)

Let us hold off on #2 for a moment (just hang in there it will all come together soon I promise) and get smplReducer.py working first.

#!/usr/bin/env python
 
import sys
 
# maps words to their counts
foundKey = ""
foundValue = ""
isFirst = 1
currentCount = 0
currentCountry2digit = "-1"
currentCountryName = "-1"
isCountryMappingLine = False

# input comes from STDIN
for line in sys.stdin:
	# remove leading and trailing whitespace
	line = line.strip()
	
	try:
		# parse the input we got from mapper.py
		country2digit,personType,personName,countryName = line.split('^')
		
		#the first line should be a mapping line, otherwise we need to set the currentCountryName to not known
		if personName == "-1": #this is a new country which may or may not have people in it
			currentCountryName = countryName
			currentCountry2digit = country2digit
			isCountryMappingLine = True
		else:
			isCountryMappingLine = False # this is a person we want to count
		
		if not isCountryMappingLine: #we only want to count people but use the country line to get the right name 

			#first check to see if the 2digit country info matches up, might be unkown country
			if currentCountry2digit != country2digit:
				currentCountry2digit = country2digit
				currentCountryName = '%s - Unkown Country' % currentCountry2digit
			
			currentKey = '%s\t%s' % (currentCountryName,personType) 
			
			if foundKey != currentKey: #new combo of keys to count
				if isFirst == 0:
					print '%s\t%s' % (foundKey,currentCount)
					currentCount = 0 #reset the count
				else:
					isFirst = 0
			
				foundKey = currentKey #make the found key what we see so when we loop again can see if we increment or print out
			
			currentCount += 1 # we increment anything not in the map list
	except:
		pass

try:
	print '%s\t%s' % (foundKey,currentCount)
except:
	pass

Don’t forget:

chmod a+x smplReducer.py

And then run:

cat customers.dat countries.dat|./smplMapper.py|sort|./smplReducer.py

And voila!

Canada	not so good	1
Canada	valued	3
JA - Unkown Country	not so bad	1
United Kingdom	not so good	1
United Kingdom	valued	1
United States	not bad	2

So now #3 and #4 are done but what about #2? 

First put the files into Hadoop:

hadoop fs -put ~/mayo/customers.dat .
hadoop fs -put ~/mayo/countries.dat .

And now run it like this (assuming you are running as hadoop in the bin directory):

hadoop jar ../contrib/streaming/hadoop-0.20.1+169.89-streaming.jar -D mapred.reduce.tasks=4 -file ~/mayo/smplMapper.py -mapper smplMapper.py -file ~/mayo/smplReducer.py -reducer smplReducer.py -input customers.dat -input countries.dat -output mayo -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner -jobconf stream.map.output.field.separator=^ -jobconf stream.num.map.output.key.fields=4 -jobconf map.output.key.field.separator=^ -jobconf num.key.fields.for.partition=1

Let us look at what we did:

hadoop fs -cat mayo/part*

Which results in:

Canada	not so good	1
Canada	valued	3
United Kingdom	not so good	1
United Kingdom	valued	1
United States	not bad	2
JA - Unkown Country	not so bad	1

So #2 is the partioner KeyFieldBasedPartitioner explained here further Hadoop Wiki On Streaming which allows the key to be whatever set of columns you output (in our case by country) configurable by the command line options and the rest of the values are sorted within that key and sent to the reducer together by key.

And there you go … Simple Python Scripting Implementing Streaming in Hadoop.  

Grab the tar here and give it a spin.

/*
Joe Stein
Twitter: @allthingshadoop
Connect: On Linked In
*/

Categories: Hadoop, MapReduce, Python

Making Hadoop and MapReduce easier with Karmasphere

For those folks either just getting started or even already in the the daily trenches of M/R development every day Karmasphere has come about to help developers and technical professionals make Hadoop MapReduce easier http://www.karmasphere.com/. Karmasphere Studio is a desktop IDE for graphically prototyping MapReduce jobs and deploying, monitoring and debugging them on Hadoop clusters in private and public clouds.

* Runs on Linux, Apple Mac OS and Windows.
* Works with all major distributions and versions of Hadoop including Apache, Yahoo! and Cloudera.
* Works with Amazon Elastic MapReduce.
* Supports local, networked, HDFS and Amazon S3 file systems.
* Support for Cascading
* Enables job submission from all major platforms including Windows.
* Operates with clusters and file systems behind firewalls.

So, what can you do with it?

  • Prototype on the desktop: Get going with MapReduce job development quickly. No need for a cluster since Hadoop emulation is included.
  • Deploy to a private or cloud-based cluster: Whether you’re using a cluster in your own network or a cloud, deploy your job/s easily.
  • Debug on the cluster: One of the most challenging areas in MapReduce programming is debugging your job on the cluster. Visual tools deliver real-time insight into your job, including support for viewing and charting Hadoop job and task counters.
  • Graphically visualize and manipulate: Whether it’s clusters, file systems, job configuration, counters, log files or other debugging information, save time and get better insight by accessing it all in one place.
  • Monitor and analyze your jobs in real-time: Get realtime, workflow view of inputs, outputs and intermediate results including map, partition, sort and reduce phases.

Whether you’re new to Hadoop and want to easily explore MapReduce programming or you like the sound of something that helps you prototype, deploy and manage in an integrated environment or you’re already using Hadoop but could use a lot more insight into your jobs running on a cluster, there’s something here for you.

All you need is NetBeans (version 6.7 or 6.8) and Java 1.6 and you’ll be ready to give Karmasphere Studio a whirl.

You do NOT need any kind of Hadoop cluster set up to begin prototyping. But when you are ready to deploy your job on a large data set, you’ll need a virtual or real cluster in your data center or a public cloud such as Amazon Web Services.

An Eclipse version is in progress.

/*
Joe Stein
http://www.linkedin.com/in/charmalloc
*/

Categories: Hadoop, MapReduce

Running Hadoop MapReduce With Cassandra NoSQL

April 24, 2010 5 comments

So if you are looking for a good NoSQL read of HBase vs. Cassandra you can check out http://ria101.wordpress.com/2010/02/24/hbase-vs-cassandra-why-we-moved/.  In short HBase is good for reads and Cassandra for writes.  Cassandra does a great job on reads too so please do not think I am shooting either down in any way.  I am just saying that both HBase and Cassandra have great value and useful purpose in their own right and even use cases exists to run both.  HBase recently got called up as a top level apache project coming up and out of Hadoop.

Having worked with Cassandra a bit I often see/hear of folks asking about running Map/Reduce jobs against the data stored in Cassandra instances.  Well Hadoopers & Hadooperettes the Cassandra folks in the 0.60 release provide a way to-do very nicely.   It is VERY straight forward and well thought through.  If you want to see the evolution check out the JIRA issue https://issues.apache.org/jira/browse/CASSANDRA-342

So how do you it?  Very simple, Cassandra provides an implementation of InputFormat.  Incase you are new to Hadoop the InputFormat is what the mapper is going to use to load your data into it (basically).  Their subclass connects your mapper to pull the data in from Cassandra.  What is also great here is that the Cassandra folks have also spent the time implementing the integration in the classic “Word Count” example.

See https://svn.apache.org/repos/asf/cassandra/trunk/contrib/word_count/ for this example.  Cassandra rows or row fragments (that is, pairs of key + SortedMap of columns) are input to Map tasks for processing by your job, as specified by a SlicePredicate that describes which columns to fetch from each row. Here’s how this looks in the word_count example, which selects just one configurable columnName from each row:

ConfigHelper.setColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY);
SlicePredicate predicate = new SlicePredicate().setColumn_names(Arrays.asList(columnName.getBytes()));
ConfigHelper.setSlicePredicate(job.getConfiguration(), predicate);

Cassandra also provides a Pig LoadFunc for running jobs in the Pig DSL instead of writing Java code by hand. This is in https://svn.apache.org/repos/asf/cassandra/trunk/contrib/pig/.

/*
Joe Stein
http://www.linkedin.com/in/charmalloc
*/

Follow

Get every new post delivered to your Inbox.