So you are ready now for your first real cluster? Have you gone through all the stand alone and pseudo setup of Hadoop? If not you could/should get familiar http://hadoop.apache.org/common/docs/current/quickstart.html. The Cloudera distribution has a nice walk through http://archive.cloudera.com/docs/cdh.html. Cloudera also has great training videos online http://www.cloudera.com/resources/?type=Training. Also, if you have not already read the Hadoop bible you should (that would be Tom White’s Definitive Guide to Hadoop).
So now that you are setting up your cluster and looking to run some MR (Map Reduce) jobs on it there are a few points of interest to note that deviate from default settings and nuances. Granted you would eventually end up learning these over a little bit of pain, trial & error, etc but I figure to share the knowledge for what is pretty much run of the mill stuff.
The first episode of our Podcast is also on this topic.
In a cluster, the namenode needs to communicate with all the other nodes. I have already blogged on this previously http://wp.me/pTu1i-29 but it is important to getting you going so I wanted to mention it here again.
Namenode Configuration Changes
The image from your name node is the meta data for knowing where all of the blocks for all of your files in HDFS across your entire cluster are stored. If you lose the image file your data is NOT recoverable (as the nodes will have no idea what block is for what file). No fear you can just save a copy to an NFS share. After you have mounted it then update your hdfs-site.xml configuration. The property dfs.name.dir takes a comma separated list of directories to save your image to, add it.
Max tasks can be setup on the Namenode or overridden (and set final) on data nodes that may have different hardware configurations. The max tasks are setup for both mappers and reducers. To calculate this it is based on CPU (cores) and the amount of RAM you have and also the JVM max you setup in mapred.child.java.opts (the default is 200). The Datanode and Tasktracker each are set to 1GB so for a 8GB machine the mapred.tasktracker.map.tasks.maximum could be set to 7 and the mapred.tasktracker.reduce.tasks.maximum set to 7 with the mapred.child.java.opts set to -400Xmx (assuming 8 cores). Please note these task maxes are as much done by your CPU if you only have 1 CPU with 1 core then it is time to get new hardware for your data node or set the mask tasks to 1. If you have 1 CPU with 4 cores then setting map to 3 and reduce to 3 would be good (saving 1 core for the daemon).
By default there is only one reducer and you need to configure mapred.reduce.tasks to be more than one. This value should be somewhere between .95 and 1.75 times the number of maximum tasks per node times the number of data nodes. So if you have 3 data nodes and it is setup max tasks of 7 then configure this between 25 and 36.
As of version 0.19 Hadoop is able to reuse the JVM for multiple tasks. This is kind of important to not waste time starting up and tearing down the JVM itself and is configurable here mapred.job.reuse.jvm.num.tasks.
Another good configuration addition is mapred.compress.map.output. Setting this value to true should (balance between the time to compress vs transfer) speed up the reducers copy greatly especially when working with large data sets.
Here are some more configuration for real world cluster setup, the docs are good too http://hadoop.apache.org/common/docs/current/cluster_setup.html
- Some non-default configuration values used to run sort900, that is 9TB of data sorted on a cluster with 900 nodes:
Configuration File Parameter Value Notes conf/hdfs-site.xml dfs.block.size 134217728 HDFS blocksize of 128MB for large file-systems. conf/hdfs-site.xml dfs.namenode.handler.count 40 More NameNode server threads to handle RPCs from large number of DataNodes. conf/mapred-site.xml mapred.reduce.parallel.copies 20 Higher number of parallel copies run by reduces to fetch outputs from very large number of maps. conf/mapred-site.xml mapred.child.java.opts -Xmx512M Larger heap-size for child jvms of maps/reduces. conf/core-site.xml fs.inmemory.size.mb 200 Larger amount of memory allocated for the in-memory file-system used to merge map-outputs at the reduces. conf/core-site.xml io.sort.factor 100 More streams merged at once while sorting files. conf/core-site.xml io.sort.mb 200 Higher memory-limit while sorting data. conf/core-site.xml io.file.buffer.size 131072 Size of read/write buffer used in SequenceFiles
- Updates to some configuration values to run sort1400 and sort2000, that is 14TB of data sorted on 1400 nodes and 20TB of data sorted on 2000 nodes:
Configuration File Parameter Value Notes conf/mapred-site.xml mapred.job.tracker.handler.count 60 More JobTracker server threads to handle RPCs from large number of TaskTrackers. conf/mapred-site.xml mapred.reduce.parallel.copies 50 conf/mapred-site.xml tasktracker.http.threads 50 More worker threads for the TaskTracker’s http server. The http server is used by reduces to fetch intermediate map-outputs. conf/mapred-site.xml mapred.child.java.opts -Xmx1024M Larger heap-size for child jvms of maps/reduces.
You may have read or heard already a dozen times that the Secondary Namenode is NOT a backup of the Namenode. Well, let me say it again for you. The Secondary Namenode is not the backup of the Namenode. The Secondary Namenode should also not run on the same machine as the Namenode. The Secondary Namenode helps to keep the edit log of the of the Namenode updated in the image file. All of the changes that happen in HDFS are managed in memory on the Namenode. It writes the changes to a log file that the secondary name node will compact into a new image file. This is helpful because on restart the Namenode would have to-do this and also helpful because the logs would just grow and could be normalized.
Now, the way that the Secondary Namenode does this is with GET & POST (yes over http) to the Namenode. It will GET the logs and POST the image. In order for the Secondary Namenode to-do this you have to configure dfs.http.address in the hdfs-site.xml on the server that you plan to run the Secondary Namenode on.
The address and the base port where the dfs namenode web ui will listen on.
If the port is 0 then the server will start on a free port.
Also, you need to put the Secondary Namenode into the masters file (which is defaulted to localhost).
Datanode Configuration Changes & Hardware
So here I only have to say that you should use JBOD (Just a Bunch Of Disks). There have been tests showing 10%-30% degradation when HDFS blocks run on RAID. So, when setting up your data nodes if you have a bunch of disks you can configure each one for that node to write to dfs.data.dir.
If you want you can also set a value that will always leave some space on the disk dfs.datanode.du.reserved.
Map Reduce Code Changes
In your Java code there is a little trick to help the job be “aware” within the cluster of tasks that are not dead but just working hard. During execution of a task there is no built in reporting that the job is running as expected if it is not writing out. So this means that if your tasks are taking up a lot of time doing work it is possible the cluster will see that task as failed (based on the mapred.task.tracker.expiry.interval setting).
Have no fear there is a way to tell cluster that your task is doing just fine. You have 2 ways todo this you can either report the status or increment a counter. Both of these will cause the task tracker to properly know the task is ok and this will get seen by the jobtracker in turn. Both of these options are explained in the JavaDoc http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/mapred/Reporter.html
So if you are looking for a good NoSQL read of HBase vs. Cassandra you can check out http://ria101.wordpress.com/2010/02/24/hbase-vs-cassandra-why-we-moved/. In short HBase is good for reads and Cassandra for writes. Cassandra does a great job on reads too so please do not think I am shooting either down in any way. I am just saying that both HBase and Cassandra have great value and useful purpose in their own right and even use cases exists to run both. HBase recently got called up as a top level apache project coming up and out of Hadoop.
Having worked with Cassandra a bit I often see/hear of folks asking about running Map/Reduce jobs against the data stored in Cassandra instances. Well Hadoopers & Hadooperettes the Cassandra folks in the 0.60 release provide a way to-do very nicely. It is VERY straight forward and well thought through. If you want to see the evolution check out the JIRA issue https://issues.apache.org/jira/browse/CASSANDRA-342
So how do you it? Very simple, Cassandra provides an implementation of InputFormat. Incase you are new to Hadoop the InputFormat is what the mapper is going to use to load your data into it (basically). Their subclass connects your mapper to pull the data in from Cassandra. What is also great here is that the Cassandra folks have also spent the time implementing the integration in the classic “Word Count” example.
See https://svn.apache.org/repos/asf/cassandra/trunk/contrib/word_count/ for this example. Cassandra rows or row fragments (that is, pairs of key + SortedMap of columns) are input to Map tasks for processing by your job, as specified by a SlicePredicate that describes which columns to fetch from each row. Here’s how this looks in the word_count example, which selects just one configurable columnName from each row:
ConfigHelper.setColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY);
SlicePredicate predicate = new SlicePredicate().setColumn_names(Arrays.asList(columnName.getBytes()));
Cassandra also provides a Pig LoadFunc for running jobs in the Pig DSL instead of writing Java code by hand. This is in https://svn.apache.org/repos/asf/cassandra/trunk/contrib/pig/.
Cascading is an open source abstraction for MapReduce that allows applications to work with Hadoop through a straightforward Java API. Cascading provides finer-grained ways to define applications and glue them together, as well as abstractions to work with external systems.
This release features many performance and usability enhancements while remaining backwards compatible with 1.0.
- Performance optimizations with all join types
- Numerous job planner optimizations
- Dynamic optimizations when running in Amazon Elastic MapReduce and S3
- API usability improvements around large number of field names
- Support for TSV, CSV, and custom delimited text files
- Support for manipulating and serializing non-Comparable custom Java types
- Debug levels supported by the job planner
For a detailed list of changes see: CHANGES.txt
Along with this release are a number of extensions created by the Cascading user community.
Among these extension are:
- Bixo – a data mining toolkit
- DBMigrate – a tool for migrating data to/from RDBMSs into Hadoop
- Apache HBase, Amazon SimpleDB, and JDBC integration
- JRuby and Clojure based scripting languages for Cascading
- Cascalog – a robust interactive extensible query language
Here is an excerpt from their User Guide for getting started http://www.cascading.org/1.1/userguide/html/ch02.html
Counting words in a document is the most common example presented to
new Hadoop (and MapReduce) developers, it is the Hadoop equivalent to the
“Hello World” application.
Word counting is where a document is parsed into individual words,
and the frequency of those words are counted.
For example, if we counted the last paragraph “is” would be counted
twice, and “document” counted once.
In the code example below, we will use Cascading to read each line
of text from a file (our document), parse it into words, then count the
number of time the word is encountered.
Example 2.1. Word Counting
// define source and sink Taps. Scheme sourceScheme = new TextLine( new Fields( "line" ) ); Tap source = new Hfs( sourceScheme, inputPath ); Scheme sinkScheme = new TextLine( new Fields( "word", "count" ) ); Tap sink = new Hfs( sinkScheme, outputPath, SinkMode.REPLACE ); // the 'head' of the pipe assembly Pipe assembly = new Pipe( "wordcount" ); // For each input Tuple // parse out each word into a new Tuple with the field name "word" // regular expressions are optional in Cascading String regex = "(?<!\\pL)(?=\\pL)[^ ]*(?<=\\pL)(?!\\pL)"; Function function = new RegexGenerator( new Fields( "word" ), regex ); assembly = new Each( assembly, new Fields( "line" ), function ); // group the Tuple stream by the "word" value assembly = new GroupBy( assembly, new Fields( "word" ) ); // For every Tuple group // count the number of occurrences of "word" and store result in // a field named "count" Aggregator count = new Count( new Fields( "count" ) ); assembly = new Every( assembly, count ); // initialize app properties, tell Hadoop which jar file to use Properties properties = new Properties(); FlowConnector.setApplicationJarClass( properties, Main.class ); // plan a new Flow from the assembly using the source and sink Taps // with the above properties FlowConnector flowConnector = new FlowConnector( properties ); Flow flow = flowConnector.connect( "word-count", source, sink, assembly ); // execute the flow, block until complete flow.complete();
There are a couple things to take away from this example.
First, the pipe assembly is not coupled to the data (the Tap instances) until the last moment before execution. That is, file paths or references are not embedded in the pipe assembly. The pipe assembly remains independent of which data it processes until execution. The only dependency is what the data looks
like, its “scheme”, or the field names that make it up.
That brings up fields. Every input and output file has field names associated with it, and every processing element of the pipe assembly either expects certain fields, or creates new fields. This allows the developer to self document their code, and allows the Cascading planner to “fail fast” during planning if a dependency between elements isn’t satisfied (used a missing or wrong field name).
It is also important to point out that pipe assemblies are assembled through constructor chaining. This may seem odd but is done for two reasons. It keeps the code more concise. And it prevents developers from creating “cycles” in the resulting pipe assembly. Pipe assemblies are Directed Acyclic Graphs (or DAGs). The Cascading planner cannot handle processes that feed themselves, that have cycles (not to say there are ways around this that are much safer).
Notice the very first
Pipe instance has a name. That instance is the “head” of this particular pipe assembly. Pipe assemblies can have any number of heads, and any number of tails. This example does not name the tail assembly, but for complex assemblies, tails must be named for reasons described below.
Heads and tails of pipe assemblies generally need names, this is how sources and sinks are “bound” to them during planning. In our example above, there is only one head and one tail, and subsequently only one source and one sink, respectively. So naming in this case is optional, it’s obvious what goes where. Naming is also useful for self documenting pipe assemblies, especially where there are splits, joins, and merges in the assembly.
To paraphrase, our example will:
- read each line of text from a file and give it the field name “line”,
- parse each “line” into words by the
RegexGeneratorobject which in turn returns each word in the field named “word”,
- groups on the field named “word” using the
- then counts the number of elements in each grouping using the
Count()object and stores this value in the “count” field,
- finally the “word” and “count” fields are written out.
The NYC Hadoop meetup on April 21st was great. Many thanks as always to the Cloudera folks and The Winter Wyman Companies for the pizza. Also thanks to Hiveat55 for use of their office and the Datameer folks for a good time afterwards.
The first part of the meetup was presentation by Azza Abouzeid and Kamil Bajda-Pawlikowski (Yale University) on Hadoop DB (http://db.cs.yale.edu/hadoopdb/hadoopdb.html). Their vision is to take the best of both worlds from the Map Reduce bliss for lots of data that we get from Hadoop as well as the DBMS complex data analysis capabilities.
The basic idea behind HadoopDB is to give Hadoop access to multiple single-node DBMS servers (eg. PostgreSQL or MySQL) deployed across the cluster. HadoopDB pushes as much as possible data processing into the database engine by issuing SQL queries (usually most of the Map/Combine phase logic is expressible in SQL). This in turn results in creating a system that resembles a shared-nothing parallel database. Applying techniques taken from the database world leads to a performance boost, especially in more complex data analysis. At the same time, the fact that HadoopDB relies on MapReduce framework ensures scores on scalability and fault/heterogeneity tolerance similar to Hadoop.
They have a spent a lot of time thinking through, finding and resolving the tradeoffs that occur and continue to make progress on this end. They have had 2,200 downloads as of this posting and are actively looking for developers to contribute to their project. I think it is great to see a University involved at this level for Open Source in general and more specifically doing work related to Hadoop. The audience was very engaging and it made for a very lively discussion. Their paper tells all the gory details http://db.cs.yale.edu/hadoopdb/hadoopdb.pdf.
The rest of the meetup was off the hook. Stefan Groschupf got off to a quick start throwing down some pretty serious street cred as a long-standing commit-er for Nutch, Hadoop, Katta, Bixo and more. He was very engaging with a good sort of anecdotes for the question that drives the Hadoop community “What do you want to-do with your data?”. It is always processing it or querying it and there is not one golden bullet solution. We were then demoed Datameer’s product (which is one of the best user interface concept solutions I have seen).
In short the Datameer Analytic Solution (DAS) is a spreadsheet user interface allowing users to take a sample of data and (with 15 existing data connections and over 120 functions) like any good spreadsheet pull the data into an aggregated format. Their product then turns that format pushing it down into Hadoop (like through Hive) which then goes into a map/reduce job in Hadoop.
So end to end you can have worthy analytic folks (spreadsheet types) do their job against limitless data. wicked.
From their website http://datameer.com
With DAS, business users no longer have to rely on intuition or a “best guess” based on what’s happened in the past to make business decisions. DAS makes data assets available as they are needed regardless of format or location so that users have the facts they need to reach the best possible conclusions.
DAS includes a familiar interactive spreadsheet that is easy to use, but also powerful so that business users don’t need to turn to developers for analytics. The spreadsheet is specifically designed for visualization of big data and includes more than 120 built-in functions for exploring and discovering complex relationships. In addition, because DAS is extensible, business analysts can use functions from third-party tools or they can write their own commands.
Drag & drop reporting allow users to quickly create their own personalized dashboard Users simply select the information they want to view and how to display it on the dashboard – tables, charts, or graphs.
The portfolio of analytical and reporting tools in organizations can be broad. Business users can easily share data in DAS with these tools to either extend their analysis or to give other users access.
After the quick demo Stefan walked us through a solution for using Hadoop to pull the “signal from the noise” in social data and used twitter as an example. He used a really interesting graph exploration tool (going to give it a try myself) http://gephi.org/. Gephi is an interactive visualization and exploration platform for all kinds of networks and complex systems, dynamic and hierarchical graphs. He then talked a bit about X-RIME http://xrime.sourceforge.net/ which is Hadoop based large-scale social network analysis (Open Source).
So you have spent your time in pseudo mode and you have finally started moving to your own cluster? Perhaps you just jumped right into the cluster setup? In any case, a distributed Hadoop cluster setup requires your “master” node [name node & job tracker] to be able to SSH (without requiring a password, so key based authentication) to all other “slave” nodes (e.g. data nodes).
The need for SSH Key based authentication is required so that the master node can then login to slave nodes (and the secondary node) to start/stop them, etc. This is also required to be setup on the secondary name node (which is listed in your masters file) so that [presuming it is running on another machine which is a VERY good idea for a production cluster] will be started from your name node with ./start-dfs.sh and job tracker node with ./start-mapred.sh
Make sure you are the hadoop user for all of these commands. If you have not yet installed Hadoop and/or created the hadoop user you should do that first. Depending on your distribution (please follow it’s directions for setup) this will be slightly different (e.g. Cloudera creates the hadoop user for your when going through the rpm install).
First from your “master” node check that you can ssh to the localhost without a passphrase:
$ ssh localhost
If you cannot ssh to localhost without a passphrase, execute the following commands:
$ ssh-keygen -t dsa -P “” -f ~/.ssh/id_dsa
$ cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys
On your master node try to ssh again (as the hadoop user) to your localhost and if you are still getting a password prompt then.
$ chmod go-w $HOME $HOME/.ssh
$ chmod 600 $HOME/.ssh/authorized_keys
$ chown `whoami` $HOME/.ssh/authorized_keys
Now you need to copy (however you want to-do this please go ahead) your public key to all of your “slave” machine (don’t forget your secondary name node). It is possible (depending on if these are new machines) that the slave’s hadoop user does not have a .ssh directory and if not you should create it ($ mkdir ~/.ssh)
$ scp ~/.ssh/id_dsa.pub slave1:~/.ssh/master.pub
Now login (as the hadoop user) to your slave machine. While on your slave machine add your master machine’s hadoop user’s public key to the slave machine’s hadoop authorized key store.
$ cat ~/.ssh/master.pub >> ~/.ssh/authorized_keys
Now, from the master node try to ssh to slave.
If you are still prompted for a password (which is most likely) then it is very often just a simple permission issue. Go back to your slave node again and as the hadoop user run this
$ chmod go-w $HOME $HOME/.ssh
$ chmod 600 $HOME/.ssh/authorized_keys
$ chown `whoami` $HOME/.ssh/authorized_keys
Try again from your master node.
And you should be good to go. Repeat for all Hadoop Cluster Nodes.
This is a very good article on HBase http://jimbojw.com/wiki/index.php?title=Understanding_Hbase_and_BigTable. As Jim says in his article, after reading it you should be better able to make an educated decision regarding when you might want to use HBase vs when you’d be better off with a “traditional” database.
This is also a nice primer on NoSQL in general (at least for system like Cassandra, etc).
What I really like best about this article is how Jim breaks out the terminology and working it to explain what HBase does properly.
This is an interesting open source project I have recently heard about http://code.google.com/p/hiho/.
What is very interesting to me about this project is the export utility which takes data from HDFS and loads it into MySQL.
It also has a nice way for querying and importing data from a JDBC database directly into HDFS. It looks much more robust than the out of the box DBInputFormat that Hadoop provides. You can import the data as delimited records, with choice of delimiter. You can also import the data and save them as Avro records. It supports queries – you can say join two tables. It splits on user specified column ranges, instead of using LIMIT and OFFSET. It does no code generation or ORM mapping.
There are other ETL tools out there (e.g. Sqoop http://www.cloudera.com/developers/downloads/sqoop/). In Cloudera’s Distrobution for Hadoop Version 3 (CDH3) Sqoop supports HDFS back into MySQL also now.
I am definitely going to have to give this utility a try. I here from their (HIHO) project folks that the next to-do is support for more databases for export.