Faster Datanodes with less wait io using df instead of du
I have noticed often that the check Hadoop uses to calculate usage for the data nodes causes a fair amount of wait io on them driving up load.
Every cycle we can get from every spindle we want!
So I came up with a nice little hack to use df instead of du.
Here is basically what I did so you can do it too.
mv /usr/bin/du /usr/bin/bak_du
vi /usr/bin/du
and save this inside of it
#!/bin/sh
mydf=$(df $2 | grep -vE '^Filesystem|tmpfs|cdrom' | awk '{ print $3 }')
echo -e "$mydf\t$2"
then give it execute permission
chmod a+x /usr/bin/du
restart you data node check the log for no errors and make sure it starts back up
viola
Now when Hadoop calls “du -sk /yourhdfslocation” it will be expedient with its results
whats wrong with this?
1) I assume you have nothing else on your disks that you are storing so df is really close to du since almost all of your data is in HDFS
2) If you have more than 1 volume holding your hdfs blocks this is not exactly accurate so you are skewing the size of each vol by only calculating one of them and using that result for the others…. this is simple to fix just parse your df result differently and use the path passed into the second paramater to know which vol to grep in your df result… your first volume is going to be larger anyways most likely and you should be monitoring disk space another way so it is not going to be very harmefull if you just check and report the first volume’s size
3) you might not have your HDFS blocks on your first volume …. see #2 you can just grep the volume you want to report
/*
Joe Stein
http://www.linkedin.com/in/charmalloc
*/
Cloudera, Yahoo and the Apache Hadoop Community Security Branch Release Update
In the wake of Yahoo! having announced that they would discontinue their Hadoop distribution and focus their efforts into Apache Hadoop http://yhoo.it/i9Ww8W the landscape has become tumultuous.
Yahoo! engineers have spent their time and effort contributing back to the Apache Hadoop security branch (branch of 0.20) and have proposed release candidates.
Currently being voted and discussed is “Release candidate 0.20.203.0-rc1″. If you are following the VOTE and the DISCUSSION then maybe you are like me it just cannot be done without a bowl of popcorn before opening the emails. It is getting heated in a good and constructive kind of way. http://mail-archives.apache.org/mod_mbox/hadoop-general/201105.mbox/thread there are already more emails in 5 days of May than there were in all of April. woot!
My take? Has it become Cloudera vs Yahoo! and Apache Hadoop releases will become fragmented because of it? Well, it is kind of like that already. 0.21 is the latest and can anyone that is not a committer quickly know or find out the difference between that and the other release branches? It is esoteric
0.22 is right around the corner too which is a release from trunk.
Lets take HBase as an example (a Hadoop project). Do you know what version of HDFS releases can support HBase in production without losing data? If you do then maybe you don’t realize that many people still don’t even know about the branch. And, now that CDH3 is out you can use that (thanks Cloudera!) otherwise it is highly recommended to not be in production with HBase unless you use the append branch http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/ of 0.20 which makes you miss out on other changes in trunk releases…
__ eyes crossing inwards and sideways with what branch does what and when the trunk release has everything __
Hadoop is becoming an a la cart which features and fixes can I live without for all of what I really need to deploy … or requiring companies to hire a committer … or a bunch of folks that do nothing but Hadoop day in and day out (sounds like Oracle, ahhhhhh)… or going with the Cloudera Distribution (which is what I do and don’t look back). The barrier to entry feels like it has increased over the last year. However, stepping back from that the system overall has had a lot of improvements! A lot of great work by a lot of dedicated folks putting in their time and effort towards making Hadoop (in whatever form the elephant stampedes through its data) a reality.
Big shops that have teams of “Hadoop Engineers” (Yahoo, Facebook, eBay, LinkedIn, etc) with contributors and/or committers on that team should not have lots of impact because ultimately they are able to role their own releases for whatever they need/want themselves in production and just support it. Not all are so endowed.
Now, all of that having been said I write this because the discussion is REALLY good and has a lot of folks (including those from Yahoo! and Cloudera) bringing up pain points and proposing some great solutions that hopefully will contribute to the continued growth and success of the Apache Hadoop Community http://hadoop.apache.org/…. still if you want to run it in your company (and don’t have a committer on staff) then go download CDH3 http://www.cloudera.com it will get you going with the latest and greatest of all the releases, branches, etc, etc, etc. Great documentation too!
/*
Joe Stein
http://www.linkedin.com/in/charmalloc
*/
Hadoop Streaming Made Simple using Joins and Keys with Python
There are a lot of different ways to write MapReduce jobs!!!
I find streaming scripts a good way to interrogate data sets (especially when I have not worked with them yet or are creating new ones) and enjoy the lifecycle when the initial elaboration of the data sets lead to the construction of the finalized scripts for an entire job (or series of jobs as is often the case).
When doing streaming with Hadoop you do have a few library options. If you are a Ruby programmer then wukong is awesome! For Python programmers you can use dumbo and more recently released mrjob.
I like working under the hood myself and getting down and dirty with the data and here is how you can too.
Lets start first with defining two simple sample data sets.
Data set 1: countries.dat
name|key
United States|US Canada|CA United Kingdom|UK Italy|IT
Data set 2: customers.dat
name|type|country
Alice Bob|not bad|US Sam Sneed|valued|CA Jon Sneed|valued|CA Arnold Wesise|not so good|UK Henry Bob|not bad|US Yo Yo Ma|not so good|CA Jon York|valued|CA Alex Ball|valued|UK Jim Davis|not so bad|JA
The requirements: you need to find out grouped by type of customer how many of each type are in each country with the name of the country listed in the countries.dat in the final result (and not the 2 digit country name).
To-do this you need to:
1) Join the data sets 2) Key on country 3) Count type of customer per country 4) Output the results
So first lets code up a quick mapper called smplMapper.py (you can decide if smpl is short for simple or sample).
Now in coding the mapper and reducer in Python the basics are explained nicely here http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/ but I am going to dive a bit deeper to tackle our example with some more tactics.
#!/usr/bin/env python
import sys
# input comes from STDIN (standard input)
for line in sys.stdin:
try: #sometimes bad data can cause errors use this how you like to deal with lint and bad data
personName = "-1" #default sorted as first
personType = "-1" #default sorted as first
countryName = "-1" #default sorted as first
country2digit = "-1" #default sorted as first
# remove leading and trailing whitespace
line = line.strip()
splits = line.split("|")
if len(splits) == 2: #country data
countryName = splits[0]
country2digit = splits[1]
else: #people data
personName = splits[0]
personType = splits[1]
country2digit = splits[2]
print '%s^%s^%s^%s' % (country2digit,personType,personName,countryName)
except: #errors are going to make your job fail which you may or may not want
pass
Don’t forget:
chmod a+x smplMapper.py
Great! We just took care of #1 but time to test and see what is going to the reducer.
From the command line run:
cat customers.dat countries.dat|./smplMapper.py|sort
Which will result in:
CA^-1^-1^Canada CA^not so good^Yo Yo Ma^-1 CA^valued^Jon Sneed^-1 CA^valued^Jon York^-1 CA^valued^Sam Sneed^-1 IT^-1^-1^Italy JA^not so bad^Jim Davis^-1 UK^-1^-1^United Kingdom UK^not so good^Arnold Wesise^-1 UK^valued^Alex Ball^-1 US^-1^-1^United States US^not bad^Alice Bob^-1 US^not bad^Henry Bob^-1
Notice how this is sorted so the country is first and the people in that country after it (so we can grab the correct country name as we loop) and with the type of customer also sorted (but within country) so we can properly count the types within the country. =8^)
Let us hold off on #2 for a moment (just hang in there it will all come together soon I promise) and get smplReducer.py working first.
#!/usr/bin/env python
import sys
# maps words to their counts
foundKey = ""
foundValue = ""
isFirst = 1
currentCount = 0
currentCountry2digit = "-1"
currentCountryName = "-1"
isCountryMappingLine = False
# input comes from STDIN
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
try:
# parse the input we got from mapper.py
country2digit,personType,personName,countryName = line.split('^')
#the first line should be a mapping line, otherwise we need to set the currentCountryName to not known
if personName == "-1": #this is a new country which may or may not have people in it
currentCountryName = countryName
currentCountry2digit = country2digit
isCountryMappingLine = True
else:
isCountryMappingLine = False # this is a person we want to count
if not isCountryMappingLine: #we only want to count people but use the country line to get the right name
#first check to see if the 2digit country info matches up, might be unkown country
if currentCountry2digit != country2digit:
currentCountry2digit = country2digit
currentCountryName = '%s - Unkown Country' % currentCountry2digit
currentKey = '%s\t%s' % (currentCountryName,personType)
if foundKey != currentKey: #new combo of keys to count
if isFirst == 0:
print '%s\t%s' % (foundKey,currentCount)
currentCount = 0 #reset the count
else:
isFirst = 0
foundKey = currentKey #make the found key what we see so when we loop again can see if we increment or print out
currentCount += 1 # we increment anything not in the map list
except:
pass
try:
print '%s\t%s' % (foundKey,currentCount)
except:
pass
Don’t forget:
chmod a+x smplReducer.py
And then run:
cat customers.dat countries.dat|./smplMapper.py|sort|./smplReducer.py
And voila!
Canada not so good 1 Canada valued 3 JA - Unkown Country not so bad 1 United Kingdom not so good 1 United Kingdom valued 1 United States not bad 2
So now #3 and #4 are done but what about #2?
First put the files into Hadoop:
hadoop fs -put ~/mayo/customers.dat . hadoop fs -put ~/mayo/countries.dat .
And now run it like this (assuming you are running as hadoop in the bin directory):
hadoop jar ../contrib/streaming/hadoop-0.20.1+169.89-streaming.jar -D mapred.reduce.tasks=4 -file ~/mayo/smplMapper.py -mapper smplMapper.py -file ~/mayo/smplReducer.py -reducer smplReducer.py -input customers.dat -input countries.dat -output mayo -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner -jobconf stream.map.output.field.separator=^ -jobconf stream.num.map.output.key.fields=4 -jobconf map.output.key.field.separator=^ -jobconf num.key.fields.for.partition=1
Let us look at what we did:
hadoop fs -cat mayo/part*
Which results in:
Canada not so good 1 Canada valued 3 United Kingdom not so good 1 United Kingdom valued 1 United States not bad 2 JA - Unkown Country not so bad 1
So #2 is the partioner KeyFieldBasedPartitioner explained here further Hadoop Wiki On Streaming which allows the key to be whatever set of columns you output (in our case by country) configurable by the command line options and the rest of the values are sorted within that key and sent to the reducer together by key.
And there you go … Simple Python Scripting Implementing Streaming in Hadoop.
Grab the tar here and give it a spin.
/*
Joe Stein
Twitter: @allthingshadoop
Connect: On Linked In
*/
NoSQL HBase and Hadoop with Todd Lipcon from Cloudera
Episode #6 of the Podcast is a talk with Todd Lipcon from Cloudera discussing HBase.
We talked about NoSQL and how it should stand for “Not Only SQL” and the tight integration between Hadoop and HBase and how systems like Cassandra (which is eventually consistent and not strongly consistent like HBase) is complementary as these systems have applicability within big data eco system depending on your use cases.
With the strong consistency of HBase you get features like incrementing counters and the tight integration with Hadoop means faster loads with HDFS thanks to a new feature in the 0.89 development preview release in the doc folders called “bulk loads”.
We covered a lot more unique features, talked about more of what is coming in upcoming releases as well as some tips with HBase so subscribe to the podcast and listen to all of what Todd had to say.
/*
Joe Stein
http://www.medialets.com
*/
Pre-Release from Pentaho – HIVE JDBC Adapter
Pentaho’s Jordan Ganoff, Software Engineer, has open sourced some HIVE JDBC Adapters in what they are doing for their BI server
http://forums.pentaho.com/showthread.php?77826-Hive-amp-Hadoop
Not sure what state they are in but will try to check it on this week.
To use from maven:
<dependency>
<groupId>org.apache.hadoop.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>0.5.0-pentaho-SNAPSHOT</version>
</dependency>
You must also add the repository information to either the pom.xml or
your local settings:
<repository>
<id>pentaho</id>
<name>Pentaho External Repository</name>
<url>http://repo.pentaho.org/artifactory/repo</url>
</repository>
/*
Joe Stein
http://medialets.com
*/
Hadoop Development Tools By Karmasphere
In Episode #5 of the Hadoop Podcast http://allthingshadoop.com/podcast/ I speak with Shevek, the CTO of Karmasphere http://karmasphere.com/. To subscribe to the Podcast click here.
We talk a bit about their existing Community Edition (support Netbeans & Eclipse)
- For developing, debugging and deploying Hadoop Jobs
- Desktop MapReduce Prototyping
- GUI to manipulate clusters, file systems and jobs
- Easy deployment to any Hadoop version, any distribution in any cloud
- Works through firewalls
As well as the new products they have launched:
Karmasphere Client:
The Karmasphere Client is a cross platform library for ensuring MapReduce jobs can work from any desktop environment to any Hadoop cluster in any enterprise data network. By isolating the Big Data professional and version of Hadoop, Karmasphere Client simplifies the process of switching between data centers and the cloud and enables Hadoop jobs to be independent of the version of the underlying cluster.
Unlike the standard Hadoop client , Karmasphere Client works from Microsoft Windows as well as Linux and MacOS, and works through SSH-based firewalls. Karmasphere Client provides a cloud-independent environment that makes it easy and predictable to maintain a business operation reliant on Hadoop.
- Ensures Hadoop distribution and version independence
- Works from Windows (unlike Hadoop Client)
- Supports any cloud environment: public, private or public cloud service.
- Provides:
- Job portability
- Operating system portability
- Firewall hopping
- Fault tolerant API
- Synchronous and Asynchronous API
- Clean Object Oriented Design
- Making it easy and predictable to maintain a business operation reliant on Hadoop
Karmasphere Studio Professional Edition
Karmasphere Studio Professional Edition includes all the functionality of the Community Edition, plus a range of deeper functionality required to simplify the developer’s task of making a MapReduce job robust, efficient and production-ready.
For a MapReduce job to be robust, its functioning on the cluster has to be well understood in terms of time, processing, and storage requirements, as well as in terms of its behavior when implemented within well-defined “bounds.” Karmasphere Studio Professional Edition incorporates the tools and a predefined set of rules that make it easy for the developer to understand how his or her job is performing on the cluster and where there is room for improvement.
- Enhanced cluster visualization and debugging
- Execution diagnostics
- Job performance timelines
- Job charting
- Job profiling
- Job Export
- For easy production deployment
- Support
Karmasphere Studio Analyst Edition
- SQL interface for ad hoc analysis
- Karmasphere Application Framework + Hive + GUI =
- No cluster changes
- Works over proxies and firewalls
- Integrated Hadoop monitoring Interactive syntax checking
- Detailed diagnostics
- Enhanced schema browser
- Full JDBC4 compliance
- Multi-threaded & concurrent
/*
Joe Stein
http://www.linkedin.com/in/charmalloc
*/
Hadoop and Pig with Alan Gates from Yahoo
Episode 4 of our Podcast is with Alan Gates, Senior Software Engineer @ Yahoo! and Pig committer. Click here to listen.
Hadoop is a really important part of Yahoo’s infrastructure because processing and analyzing big data is increasingly important for their business. Hadoop allows Yahoo to connect their consumer products with their advertisers and users for a better user experience. They have been involved with Hadoop for many years now and have their own distribution. Yahoo also sponsors/hosts a user group meeting which has grown to hundreds of attendees every month.
We talked about what Pig is now, the future of Pig and other projects like Oozie http://github.com/tucu00/oozie1 which Yahoo uses (and is open source) for workflow of MapReduce & Pig script automation. We also talked about Zebra http://wiki.apache.org/pig/zebra, Owl http://wiki.apache.org/pig/owl, and Elephant Bird http://github.com/kevinweil/elephant-bird
/*
Joe Stein
http://www.linkedin.com/in/charmalloc
*/
Ruby Streaming for Hadoop with Wukong a talk with Flip Kromer from Infochimps
Another great discussion on our Podcast. Click here to listen. For this episode our guest was Flip Kromer from Infochimps http://www.infochimps.org. Infochimps.org’s mission is to increase the world’s access to structured data. They have been working since the start of 2008 to build the world’s most interesting data commons, and since the start of 2009 to build the world’s first data marketplace. Our founding team consists of two physicists (Flip Kromer and Dhruv Bansal) and one entrepreneur (Joseph Kelly).
We talked about Ruby streaming with Hadoop and why to use the open source project Wukong to simplify implementation of Hadoop using Ruby. There are some great examples http://github.com/infochimps/wukong/tree/master/examples that are just awesome like the web log analysis that creates the paths (chain of pages) that users go through during their visited session.
It was interesting to learn some of the new implementations and projects that he has going on like using Cassandra to help with storing unique values for social network analysis. This new project is called Cluster Chef http://github.com/infochimps/cluster_chef. ClusterChef will help you create a scalable, efficient compute cluster in the cloud. It has recipes for Hadoop, Cassandra, NFS and more — use as many or as few as you like.
- A small 1-5 node cluster for development or just to play around with Hadoop or Cassandra
- A spot-priced, ebs-backed cluster for unattended computing at rock-bottom prices
- A large 30+ machine cluster with multiple EBS volumes per node running Hadoop and Cassandra, with optional NFS for
- With Chef, you declare a final state for each node, not a procedure to follow. Adminstration is more efficient, robust and maintainable.
- You get a nice central dashboard to manage clients
- You can easily roll out configuration changes across all your machines
- Chef is actively developed and has well-written recipes for webservers, databases, development tools, and a ton of different software packages.
- Poolparty makes creating amazon cloud machines concise and easy: you can specify spot instances, ebs-backed volumes, disable-api-termination, and more.
- Hadoop
- NFS
- Persistent HDFS on EBS volumes
- Zookeeper (in progress)
- Cassandra (in progress)
Another couple of good links we got from Flip were Peter Norvig’s “Unreasonable Effectiveness of Data” thing I mentioned: http://bit.ly/effectofdata / bit.ly/norvigtalk
/*
Joe Stein
http://www.linkedin.com/in/charmalloc
*/
Hadoop, BigData and Cassandra with Jonathan Ellis
Today I spoke with Jonathan Ellis who is the Project Chair of the Apache Cassandra project http://cassandra.apache.org/ and co-founder of Riptano, the source for professional Cassandra support http://riptano.com. It was a great discussion about Hadoop, BigData, Cassandra and Open Source.
We talked about the recent Cassandra 0.6 NoSQL integration and support for Hadoop Map/Reduce against the data stored in Cassandra and some of what is coming up in the 0.7 release.
We touched on how Pig is currently supported and why the motivation for Hive integration may not have any support with Cassandra in the future.
We also got a bit into a discussion of HBase vs Cassandra and some of the benefits & drawbacks as they live in your ecosystem (e.g. HBase is to OLAP as Cassandra is to OLTP).
This was the second Podcast and you can click here to listen.
/*
Joe Stein
http://www.linkedin.com/in/charmalloc/
*/
Making Hadoop and MapReduce easier with Karmasphere
For those folks either just getting started or even already in the the daily trenches of M/R development every day Karmasphere has come about to help developers and technical professionals make Hadoop MapReduce easier http://www.karmasphere.com/. Karmasphere Studio is a desktop IDE for graphically prototyping MapReduce jobs and deploying, monitoring and debugging them on Hadoop clusters in private and public clouds.
* Runs on Linux, Apple Mac OS and Windows.
* Works with all major distributions and versions of Hadoop including Apache, Yahoo! and Cloudera.
* Works with Amazon Elastic MapReduce.
* Supports local, networked, HDFS and Amazon S3 file systems.
* Support for Cascading
* Enables job submission from all major platforms including Windows.
* Operates with clusters and file systems behind firewalls.
So, what can you do with it?
- Prototype on the desktop: Get going with MapReduce job development quickly. No need for a cluster since Hadoop emulation is included.
- Deploy to a private or cloud-based cluster: Whether you’re using a cluster in your own network or a cloud, deploy your job/s easily.
- Debug on the cluster: One of the most challenging areas in MapReduce programming is debugging your job on the cluster. Visual tools deliver real-time insight into your job, including support for viewing and charting Hadoop job and task counters.
- Graphically visualize and manipulate: Whether it’s clusters, file systems, job configuration, counters, log files or other debugging information, save time and get better insight by accessing it all in one place.
- Monitor and analyze your jobs in real-time: Get realtime, workflow view of inputs, outputs and intermediate results including map, partition, sort and reduce phases.
Whether you’re new to Hadoop and want to easily explore MapReduce programming or you like the sound of something that helps you prototype, deploy and manage in an integrated environment or you’re already using Hadoop but could use a lot more insight into your jobs running on a cluster, there’s something here for you.
All you need is NetBeans (version 6.7 or 6.8) and Java 1.6 and you’ll be ready to give Karmasphere Studio a whirl.
You do NOT need any kind of Hadoop cluster set up to begin prototyping. But when you are ready to deploy your job on a large data set, you’ll need a virtual or real cluster in your data center or a public cloud such as Amazon Web Services.
An Eclipse version is in progress.
/*
Joe Stein
http://www.linkedin.com/in/charmalloc
*/
Tips, Tricks And Pointers When Setting Up Your First Hadoop Cluster To Run Map Reduce Jobs
So you are ready now for your first real cluster? Have you gone through all the stand alone and pseudo setup of Hadoop? If not you could/should get familiar http://hadoop.apache.org/common/docs/current/quickstart.html. The Cloudera distribution has a nice walk through http://archive.cloudera.com/docs/cdh.html. Cloudera also has great training videos online http://www.cloudera.com/resources/?type=Training. Also, if you have not already read the Hadoop bible you should (that would be Tom White’s Definitive Guide to Hadoop).
So now that you are setting up your cluster and looking to run some MR (Map Reduce) jobs on it there are a few points of interest to note that deviate from default settings and nuances. Granted you would eventually end up learning these over a little bit of pain, trial & error, etc but I figure to share the knowledge for what is pretty much run of the mill stuff.
The first episode of our Podcast is also on this topic.
SSH Authentication
In a cluster, the namenode needs to communicate with all the other nodes. I have already blogged on this previously http://wp.me/pTu1i-29 but it is important to getting you going so I wanted to mention it here again.
Namenode Configuration Changes
The image from your name node is the meta data for knowing where all of the blocks for all of your files in HDFS across your entire cluster are stored. If you lose the image file your data is NOT recoverable (as the nodes will have no idea what block is for what file). No fear you can just save a copy to an NFS share. After you have mounted it then update your hdfs-site.xml configuration. The property dfs.name.dir takes a comma separated list of directories to save your image to, add it.
Max tasks can be setup on the Namenode or overridden (and set final) on data nodes that may have different hardware configurations. The max tasks are setup for both mappers and reducers. To calculate this it is based on CPU (cores) and the amount of RAM you have and also the JVM max you setup in mapred.child.java.opts (the default is 200). The Datanode and Tasktracker each are set to 1GB so for a 8GB machine the mapred.tasktracker.map.tasks.maximum could be set to 7 and the mapred.tasktracker.reduce.tasks.maximum set to 7 with the mapred.child.java.opts set to -400Xmx (assuming 8 cores). Please note these task maxes are as much done by your CPU if you only have 1 CPU with 1 core then it is time to get new hardware for your data node or set the mask tasks to 1. If you have 1 CPU with 4 cores then setting map to 3 and reduce to 3 would be good (saving 1 core for the daemon).
By default there is only one reducer and you need to configure mapred.reduce.tasks to be more than one. This value should be somewhere between .95 and 1.75 times the number of maximum tasks per node times the number of data nodes. So if you have 3 data nodes and it is setup max tasks of 7 then configure this between 25 and 36.
As of version 0.19 Hadoop is able to reuse the JVM for multiple tasks. This is kind of important to not waste time starting up and tearing down the JVM itself and is configurable here mapred.job.reuse.jvm.num.tasks.
Another good configuration addition is mapred.compress.map.output. Setting this value to true should (balance between the time to compress vs transfer) speed up the reducers copy greatly especially when working with large data sets.
Here are some more configuration for real world cluster setup, the docs are good too http://hadoop.apache.org/common/docs/current/cluster_setup.html
- Some non-default configuration values used to run sort900, that is 9TB of data sorted on a cluster with 900 nodes:
Configuration File Parameter Value Notes conf/hdfs-site.xml dfs.block.size 134217728 HDFS blocksize of 128MB for large file-systems. conf/hdfs-site.xml dfs.namenode.handler.count 40 More NameNode server threads to handle RPCs from large number of DataNodes. conf/mapred-site.xml mapred.reduce.parallel.copies 20 Higher number of parallel copies run by reduces to fetch outputs from very large number of maps. conf/mapred-site.xml mapred.child.java.opts -Xmx512M Larger heap-size for child jvms of maps/reduces. conf/core-site.xml fs.inmemory.size.mb 200 Larger amount of memory allocated for the in-memory file-system used to merge map-outputs at the reduces. conf/core-site.xml io.sort.factor 100 More streams merged at once while sorting files. conf/core-site.xml io.sort.mb 200 Higher memory-limit while sorting data. conf/core-site.xml io.file.buffer.size 131072 Size of read/write buffer used in SequenceFiles - Updates to some configuration values to run sort1400 and sort2000, that is 14TB of data sorted on 1400 nodes and 20TB of data sorted on 2000 nodes:
Configuration File Parameter Value Notes conf/mapred-site.xml mapred.job.tracker.handler.count 60 More JobTracker server threads to handle RPCs from large number of TaskTrackers. conf/mapred-site.xml mapred.reduce.parallel.copies 50 conf/mapred-site.xml tasktracker.http.threads 50 More worker threads for the TaskTracker’s http server. The http server is used by reduces to fetch intermediate map-outputs. conf/mapred-site.xml mapred.child.java.opts -Xmx1024M Larger heap-size for child jvms of maps/reduces.
Secondary Namenode
You may have read or heard already a dozen times that the Secondary Namenode is NOT a backup of the Namenode. Well, let me say it again for you. The Secondary Namenode is not the backup of the Namenode. The Secondary Namenode should also not run on the same machine as the Namenode. The Secondary Namenode helps to keep the edit log of the of the Namenode updated in the image file. All of the changes that happen in HDFS are managed in memory on the Namenode. It writes the changes to a log file that the secondary name node will compact into a new image file. This is helpful because on restart the Namenode would have to-do this and also helpful because the logs would just grow and could be normalized.
Now, the way that the Secondary Namenode does this is with GET & POST (yes over http) to the Namenode. It will GET the logs and POST the image. In order for the Secondary Namenode to-do this you have to configure dfs.http.address in the hdfs-site.xml on the server that you plan to run the Secondary Namenode on.
<property>
<name>dfs.http.address</name>
<value>namenode:50070</value>
<description>
The address and the base port where the dfs namenode web ui will listen on.
If the port is 0 then the server will start on a free port.
</description>
</property>
Also, you need to put the Secondary Namenode into the masters file (which is defaulted to localhost).
Datanode Configuration Changes & Hardware
So here I only have to say that you should use JBOD (Just a Bunch Of Disks). There have been tests showing 10%-30% degradation when HDFS blocks run on RAID. So, when setting up your data nodes if you have a bunch of disks you can configure each one for that node to write to dfs.data.dir.
If you want you can also set a value that will always leave some space on the disk dfs.datanode.du.reserved.
Map Reduce Code Changes
In your Java code there is a little trick to help the job be “aware” within the cluster of tasks that are not dead but just working hard. During execution of a task there is no built in reporting that the job is running as expected if it is not writing out. So this means that if your tasks are taking up a lot of time doing work it is possible the cluster will see that task as failed (based on the mapred.task.tracker.expiry.interval setting).
Have no fear there is a way to tell cluster that your task is doing just fine. You have 2 ways todo this you can either report the status or increment a counter. Both of these will cause the task tracker to properly know the task is ok and this will get seen by the jobtracker in turn. Both of these options are explained in the JavaDoc http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/mapred/Reporter.html
/*
Joe Stein
http://www.linkedin.com/in/charmalloc
*/
Running Hadoop MapReduce With Cassandra NoSQL
So if you are looking for a good NoSQL read of HBase vs. Cassandra you can check out http://ria101.wordpress.com/2010/02/24/hbase-vs-cassandra-why-we-moved/. In short HBase is good for reads and Cassandra for writes. Cassandra does a great job on reads too so please do not think I am shooting either down in any way. I am just saying that both HBase and Cassandra have great value and useful purpose in their own right and even use cases exists to run both. HBase recently got called up as a top level apache project coming up and out of Hadoop.
Having worked with Cassandra a bit I often see/hear of folks asking about running Map/Reduce jobs against the data stored in Cassandra instances. Well Hadoopers & Hadooperettes the Cassandra folks in the 0.60 release provide a way to-do very nicely. It is VERY straight forward and well thought through. If you want to see the evolution check out the JIRA issue https://issues.apache.org/jira/browse/CASSANDRA-342
So how do you it? Very simple, Cassandra provides an implementation of InputFormat. Incase you are new to Hadoop the InputFormat is what the mapper is going to use to load your data into it (basically). Their subclass connects your mapper to pull the data in from Cassandra. What is also great here is that the Cassandra folks have also spent the time implementing the integration in the classic “Word Count” example.
See https://svn.apache.org/repos/asf/cassandra/trunk/contrib/word_count/ for this example. Cassandra rows or row fragments (that is, pairs of key + SortedMap of columns) are input to Map tasks for processing by your job, as specified by a SlicePredicate that describes which columns to fetch from each row. Here’s how this looks in the word_count example, which selects just one configurable columnName from each row:
ConfigHelper.setColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY);
SlicePredicate predicate = new SlicePredicate().setColumn_names(Arrays.asList(columnName.getBytes()));
ConfigHelper.setSlicePredicate(job.getConfiguration(), predicate);
Cassandra also provides a Pig LoadFunc for running jobs in the Pig DSL instead of writing Java code by hand. This is in https://svn.apache.org/repos/asf/cassandra/trunk/contrib/pig/.
/*
Joe Stein
http://www.linkedin.com/in/charmalloc
*/
Cascading 1.1.0 Released
Cascading is an open source abstraction for MapReduce that allows applications to work with Hadoop through a straightforward Java API. Cascading provides finer-grained ways to define applications and glue them together, as well as abstractions to work with external systems.
This release features many performance and usability enhancements while remaining backwards compatible with 1.0.
http://www.concurrentinc.com/news-events/entry/cascading_1.1.0_now_available
Specifically:
- Performance optimizations with all join types
- Numerous job planner optimizations
- Dynamic optimizations when running in Amazon Elastic MapReduce and S3
- API usability improvements around large number of field names
- Support for TSV, CSV, and custom delimited text files
- Support for manipulating and serializing non-Comparable custom Java types
- Debug levels supported by the job planner
For a detailed list of changes see: CHANGES.txt
Along with this release are a number of extensions created by the Cascading user community.
Among these extension are:
- Bixo – a data mining toolkit
- DBMigrate – a tool for migrating data to/from RDBMSs into Hadoop
- Apache HBase, Amazon SimpleDB, and JDBC integration
- JRuby and Clojure based scripting languages for Cascading
- Cascalog – a robust interactive extensible query language
Here is an excerpt from their User Guide for getting started http://www.cascading.org/1.1/userguide/html/ch02.html
Counting words in a document is the most common example presented to
new Hadoop (and MapReduce) developers, it is the Hadoop equivalent to the
“Hello World” application.
Word counting is where a document is parsed into individual words,
and the frequency of those words are counted.
For example, if we counted the last paragraph “is” would be counted
twice, and “document” counted once.
In the code example below, we will use Cascading to read each line
of text from a file (our document), parse it into words, then count the
number of time the word is encountered.
Example 2.1. Word Counting
// define source and sink Taps. Scheme sourceScheme = new TextLine( new Fields( "line" ) ); Tap source = new Hfs( sourceScheme, inputPath ); Scheme sinkScheme = new TextLine( new Fields( "word", "count" ) ); Tap sink = new Hfs( sinkScheme, outputPath, SinkMode.REPLACE ); // the 'head' of the pipe assembly Pipe assembly = new Pipe( "wordcount" ); // For each input Tuple // parse out each word into a new Tuple with the field name "word" // regular expressions are optional in Cascading String regex = "(?<!\\pL)(?=\\pL)[^ ]*(?<=\\pL)(?!\\pL)"; Function function = new RegexGenerator( new Fields( "word" ), regex ); assembly = new Each( assembly, new Fields( "line" ), function ); // group the Tuple stream by the "word" value assembly = new GroupBy( assembly, new Fields( "word" ) ); // For every Tuple group // count the number of occurrences of "word" and store result in // a field named "count" Aggregator count = new Count( new Fields( "count" ) ); assembly = new Every( assembly, count ); // initialize app properties, tell Hadoop which jar file to use Properties properties = new Properties(); FlowConnector.setApplicationJarClass( properties, Main.class ); // plan a new Flow from the assembly using the source and sink Taps // with the above properties FlowConnector flowConnector = new FlowConnector( properties ); Flow flow = flowConnector.connect( "word-count", source, sink, assembly ); // execute the flow, block until complete flow.complete();
There are a couple things to take away from this example.
First, the pipe assembly is not coupled to the data (the Tap instances) until the last moment before execution. That is, file paths or references are not embedded in the pipe assembly. The pipe assembly remains independent of which data it processes until execution. The only dependency is what the data looks
like, its “scheme”, or the field names that make it up.
That brings up fields. Every input and output file has field names associated with it, and every processing element of the pipe assembly either expects certain fields, or creates new fields. This allows the developer to self document their code, and allows the Cascading planner to “fail fast” during planning if a dependency between elements isn’t satisfied (used a missing or wrong field name).
It is also important to point out that pipe assemblies are assembled through constructor chaining. This may seem odd but is done for two reasons. It keeps the code more concise. And it prevents developers from creating “cycles” in the resulting pipe assembly. Pipe assemblies are Directed Acyclic Graphs (or DAGs). The Cascading planner cannot handle processes that feed themselves, that have cycles (not to say there are ways around this that are much safer).
Notice the very first Pipe instance has a name. That instance is the “head” of this particular pipe assembly. Pipe assemblies can have any number of heads, and any number of tails. This example does not name the tail assembly, but for complex assemblies, tails must be named for reasons described below.
Heads and tails of pipe assemblies generally need names, this is how sources and sinks are “bound” to them during planning. In our example above, there is only one head and one tail, and subsequently only one source and one sink, respectively. So naming in this case is optional, it’s obvious what goes where. Naming is also useful for self documenting pipe assemblies, especially where there are splits, joins, and merges in the assembly.
To paraphrase, our example will:
- read each line of text from a file and give it the field name “line”,
- parse each “line” into words by the
RegexGeneratorobject which in turn returns each word in the field named “word”, - groups on the field named “word” using the
GroupByobject, - then counts the number of elements in each grouping using the
Count()object and stores this value in the “count” field, - finally the “word” and “count” fields are written out.
Hadoop NYC Meetup With Yale University and Datameer
The NYC Hadoop meetup on April 21st was great. Many thanks as always to the Cloudera folks and The Winter Wyman Companies for the pizza. Also thanks to Hiveat55 for use of their office and the Datameer folks for a good time afterwards.
The first part of the meetup was presentation by Azza Abouzeid and Kamil Bajda-Pawlikowski (Yale University) on Hadoop DB (http://db.cs.yale.edu/hadoopdb/hadoopdb.html). Their vision is to take the best of both worlds from the Map Reduce bliss for lots of data that we get from Hadoop as well as the DBMS complex data analysis capabilities.
The basic idea behind HadoopDB is to give Hadoop access to multiple single-node DBMS servers (eg. PostgreSQL or MySQL) deployed across the cluster. HadoopDB pushes as much as possible data processing into the database engine by issuing SQL queries (usually most of the Map/Combine phase logic is expressible in SQL). This in turn results in creating a system that resembles a shared-nothing parallel database. Applying techniques taken from the database world leads to a performance boost, especially in more complex data analysis. At the same time, the fact that HadoopDB relies on MapReduce framework ensures scores on scalability and fault/heterogeneity tolerance similar to Hadoop.
They have a spent a lot of time thinking through, finding and resolving the tradeoffs that occur and continue to make progress on this end. They have had 2,200 downloads as of this posting and are actively looking for developers to contribute to their project. I think it is great to see a University involved at this level for Open Source in general and more specifically doing work related to Hadoop. The audience was very engaging and it made for a very lively discussion. Their paper tells all the gory details http://db.cs.yale.edu/hadoopdb/hadoopdb.pdf.
The rest of the meetup was off the hook. Stefan Groschupf got off to a quick start throwing down some pretty serious street cred as a long-standing commit-er for Nutch, Hadoop, Katta, Bixo and more. He was very engaging with a good sort of anecdotes for the question that drives the Hadoop community “What do you want to-do with your data?”. It is always processing it or querying it and there is not one golden bullet solution. We were then demoed Datameer’s product (which is one of the best user interface concept solutions I have seen).
In short the Datameer Analytic Solution (DAS) is a spreadsheet user interface allowing users to take a sample of data and (with 15 existing data connections and over 120 functions) like any good spreadsheet pull the data into an aggregated format. Their product then turns that format pushing it down into Hadoop (like through Hive) which then goes into a map/reduce job in Hadoop.
So end to end you can have worthy analytic folks (spreadsheet types) do their job against limitless data. wicked.
From their website http://datameer.com
With DAS, business users no longer have to rely on intuition or a “best guess” based on what’s happened in the past to make business decisions. DAS makes data assets available as they are needed regardless of format or location so that users have the facts they need to reach the best possible conclusions.
DAS includes a familiar interactive spreadsheet that is easy to use, but also powerful so that business users don’t need to turn to developers for analytics. The spreadsheet is specifically designed for visualization of big data and includes more than 120 built-in functions for exploring and discovering complex relationships. In addition, because DAS is extensible, business analysts can use functions from third-party tools or they can write their own commands.
Drag & drop reporting allow users to quickly create their own personalized dashboard Users simply select the information they want to view and how to display it on the dashboard – tables, charts, or graphs.
The portfolio of analytical and reporting tools in organizations can be broad. Business users can easily share data in DAS with these tools to either extend their analysis or to give other users access.
After the quick demo Stefan walked us through a solution for using Hadoop to pull the “signal from the noise” in social data and used twitter as an example. He used a really interesting graph exploration tool (going to give it a try myself) http://gephi.org/. Gephi is an interactive visualization and exploration platform for all kinds of networks and complex systems, dynamic and hierarchical graphs. He then talked a bit about X-RIME http://xrime.sourceforge.net/ which is Hadoop based large-scale social network analysis (Open Source).
/*
Joe Stein
http://www.linkedin.com/in/charmalloc
*/
Hadoop Cluster Setup, SSH Key Authentication
So you have spent your time in pseudo mode and you have finally started moving to your own cluster? Perhaps you just jumped right into the cluster setup? In any case, a distributed Hadoop cluster setup requires your “master” node [name node & job tracker] to be able to SSH (without requiring a password, so key based authentication) to all other “slave” nodes (e.g. data nodes).
The need for SSH Key based authentication is required so that the master node can then login to slave nodes (and the secondary node) to start/stop them, etc. This is also required to be setup on the secondary name node (which is listed in your masters file) so that [presuming it is running on another machine which is a VERY good idea for a production cluster] will be started from your name node with ./start-dfs.sh and job tracker node with ./start-mapred.sh
Make sure you are the hadoop user for all of these commands. If you have not yet installed Hadoop and/or created the hadoop user you should do that first. Depending on your distribution (please follow it’s directions for setup) this will be slightly different (e.g. Cloudera creates the hadoop user for your when going through the rpm install).
First from your “master” node check that you can ssh to the localhost without a passphrase:
$ ssh localhost
If you cannot ssh to localhost without a passphrase, execute the following commands:
$ ssh-keygen -t dsa -P “” -f ~/.ssh/id_dsa
$ cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys
On your master node try to ssh again (as the hadoop user) to your localhost and if you are still getting a password prompt then.
$ chmod go-w $HOME $HOME/.ssh
$ chmod 600 $HOME/.ssh/authorized_keys
$ chown `whoami` $HOME/.ssh/authorized_keys
Now you need to copy (however you want to-do this please go ahead) your public key to all of your “slave” machine (don’t forget your secondary name node). It is possible (depending on if these are new machines) that the slave’s hadoop user does not have a .ssh directory and if not you should create it ($ mkdir ~/.ssh)
$ scp ~/.ssh/id_dsa.pub slave1:~/.ssh/master.pub
Now login (as the hadoop user) to your slave machine. While on your slave machine add your master machine’s hadoop user’s public key to the slave machine’s hadoop authorized key store.
$ cat ~/.ssh/master.pub >> ~/.ssh/authorized_keys
Now, from the master node try to ssh to slave.
$ssh slave1
If you are still prompted for a password (which is most likely) then it is very often just a simple permission issue. Go back to your slave node again and as the hadoop user run this
$ chmod go-w $HOME $HOME/.ssh
$ chmod 600 $HOME/.ssh/authorized_keys
$ chown `whoami` $HOME/.ssh/authorized_keys
Try again from your master node.
$ssh slave1
And you should be good to go. Repeat for all Hadoop Cluster Nodes.
/*
Joe Stein
http://www.linkedin.com/in/charmalloc
*/
Understanding HBase and BigTable
This is a very good article on HBase http://jimbojw.com/wiki/index.php?title=Understanding_Hbase_and_BigTable. As Jim says in his article, after reading it you should be better able to make an educated decision regarding when you might want to use HBase vs when you’d be better off with a “traditional” database.
This is also a nice primer on NoSQL in general (at least for system like Cassandra, etc).
What I really like best about this article is how Jim breaks out the terminology and working it to explain what HBase does properly.
/*
Joe Stein
http://www.linkedin.com/in/charmalloc
*/
Nifty Tool To Export Files From HDFS Into MySQL
This is an interesting open source project I have recently heard about http://code.google.com/p/hiho/.
What is very interesting to me about this project is the export utility which takes data from HDFS and loads it into MySQL.
It also has a nice way for querying and importing data from a JDBC database directly into HDFS. It looks much more robust than the out of the box DBInputFormat that Hadoop provides. You can import the data as delimited records, with choice of delimiter. You can also import the data and save them as Avro records. It supports queries – you can say join two tables. It splits on user specified column ranges, instead of using LIMIT and OFFSET. It does no code generation or ORM mapping.
There are other ETL tools out there (e.g. Sqoop http://www.cloudera.com/developers/downloads/sqoop/). In Cloudera’s Distrobution for Hadoop Version 3 (CDH3) Sqoop supports HDFS back into MySQL also now.
I am definitely going to have to give this utility a try. I here from their (HIHO) project folks that the next to-do is support for more databases for export.
/*
Joe Stein
http://www.linkedin.com/in/charmalloc/
*/

