Archive

Archive for the ‘Hadoop’ Category

Ruby Streaming for Hadoop with Wukong a talk with Flip Kromer from Infochimps

Another great discussion on our PodcastClick here to listen.  For this episode our guest was Flip Kromer from Infochimps http://www.infochimps.org.  Infochimps.org’s mission is to increase the world’s access to structured data.  They have been working since the start of 2008 to build the world’s most interesting data commons, and since the start of 2009 to build the world’s first data marketplace. Our founding team consists of two physicists (Flip Kromer and Dhruv Bansal) and one entrepreneur (Joseph Kelly).

We talked about Ruby streaming with Hadoop and why to use the open source project Wukong to simplify implementation of Hadoop using Ruby.  There are some great examples http://github.com/infochimps/wukong/tree/master/examples that are just awesome like the web log analysis that creates the paths (chain of pages) that users go through during their visited session.

It was interesting to learn some of the new implementations and projects that he has going on like using Cassandra to help with storing unique values for social network analysis.  This new project is called Cluster Chef http://github.com/infochimps/cluster_chef.  ClusterChef will help you create a scalable, efficient compute cluster in the cloud. It has recipes for Hadoop, Cassandra, NFS and more — use as many or as few as you like.

  • A small 1-5 node cluster for development or just to play around with Hadoop or Cassandra
  • A spot-priced, ebs-backed cluster for unattended computing at rock-bottom prices
  • A large 30+ machine cluster with multiple EBS volumes per node running Hadoop and Cassandra, with optional NFS for
  • With Chef, you declare a final state for each node, not a procedure to follow. Adminstration is more efficient, robust and maintainable.
  • You get a nice central dashboard to manage clients
  • You can easily roll out configuration changes across all your machines
  • Chef is actively developed and has well-written recipes for webservers, databases, development tools, and a ton of different software packages.
  • Poolparty makes creating amazon cloud machines concise and easy: you can specify spot instances, ebs-backed volumes, disable-api-termination, and more.
  • Hadoop
  • NFS
  • Persistent HDFS on EBS volumes
  • Zookeeper (in progress)
  • Cassandra (in progress)

Another couple of good links we got from Flip were Peter Norvig’s “Unreasonable Effectiveness of Data” thing I mentioned: http://bit.ly/effectofdatabit.ly/norvigtalk

[tweetmeme http://wp.me/pTu1i-4i%5D

/*
Joe Stein
http://www.linkedin.com/in/charmalloc
*/

Advertisements

Hadoop, BigData and Cassandra with Jonathan Ellis

Today I spoke with Jonathan Ellis who is the Project Chair of the Apache Cassandra project http://cassandra.apache.org/ and co-founder of Riptano, the source for professional Cassandra support http://riptano.com.  It was a great discussion about Hadoop, BigData, Cassandra and Open Source.

We talked about the recent Cassandra 0.6 NoSQL integration and support for Hadoop Map/Reduce against the data stored in Cassandra and some of what is coming up in the 0.7 release.

We touched on how Pig is currently supported and why the motivation for Hive integration may not have any support with Cassandra in the future.

We also got a bit into a discussion of HBase vs Cassandra and some of the benefits & drawbacks as they live in your ecosystem (e.g. HBase is to OLAP as Cassandra is to OLTP).

This was the second Podcast and you can click here to listen.

[tweetmeme http://wp.me/pTu1i-40%5D

/*
Joe Stein
http://www.linkedin.com/in/charmalloc/
*/

Making Hadoop and MapReduce easier with Karmasphere

For those folks either just getting started or even already in the the daily trenches of M/R development every day Karmasphere has come about to help developers and technical professionals make Hadoop MapReduce easier http://www.karmasphere.com/. Karmasphere Studio is a desktop IDE for graphically prototyping MapReduce jobs and deploying, monitoring and debugging them on Hadoop clusters in private and public clouds.

* Runs on Linux, Apple Mac OS and Windows.
* Works with all major distributions and versions of Hadoop including Apache, Yahoo! and Cloudera.
* Works with Amazon Elastic MapReduce.
* Supports local, networked, HDFS and Amazon S3 file systems.
* Support for Cascading
* Enables job submission from all major platforms including Windows.
* Operates with clusters and file systems behind firewalls.

So, what can you do with it?

  • Prototype on the desktop: Get going with MapReduce job development quickly. No need for a cluster since Hadoop emulation is included.
  • Deploy to a private or cloud-based cluster: Whether you’re using a cluster in your own network or a cloud, deploy your job/s easily.
  • Debug on the cluster: One of the most challenging areas in MapReduce programming is debugging your job on the cluster. Visual tools deliver real-time insight into your job, including support for viewing and charting Hadoop job and task counters.
  • Graphically visualize and manipulate: Whether it’s clusters, file systems, job configuration, counters, log files or other debugging information, save time and get better insight by accessing it all in one place.
  • Monitor and analyze your jobs in real-time: Get realtime, workflow view of inputs, outputs and intermediate results including map, partition, sort and reduce phases.

Whether you’re new to Hadoop and want to easily explore MapReduce programming or you like the sound of something that helps you prototype, deploy and manage in an integrated environment or you’re already using Hadoop but could use a lot more insight into your jobs running on a cluster, there’s something here for you.

All you need is NetBeans (version 6.7 or 6.8) and Java 1.6 and you’ll be ready to give Karmasphere Studio a whirl.

You do NOT need any kind of Hadoop cluster set up to begin prototyping. But when you are ready to deploy your job on a large data set, you’ll need a virtual or real cluster in your data center or a public cloud such as Amazon Web Services.

An Eclipse version is in progress.

[tweetmeme http://wp.me/pTu1i-3O%5D

/*
Joe Stein
http://www.linkedin.com/in/charmalloc
*/

Categories: Hadoop, MapReduce

Tips, Tricks And Pointers When Setting Up Your First Hadoop Cluster To Run Map Reduce Jobs

April 28, 2010 7 comments

So you are ready now for your first real cluster?  Have you gone through all the stand alone and pseudo setup of Hadoop?  If not you could/should get familiar http://hadoop.apache.org/common/docs/current/quickstart.html.  The Cloudera distribution has a nice walk through http://archive.cloudera.com/docs/cdh.html.  Cloudera also has great training videos online http://www.cloudera.com/resources/?type=Training.  Also, if you have not already read the Hadoop bible you should (that would be Tom White’s Definitive Guide to Hadoop).

So now that you are setting up your cluster and looking to run some MR (Map Reduce) jobs on it there are a few points of interest to note that deviate from default settings and nuances.  Granted you would eventually end up learning these over a little bit of pain, trial & error, etc but I figure to share the knowledge for what is pretty much run of the mill stuff.

The first episode of our Podcast is also on this topic.

SSH Authentication

In a cluster, the namenode needs to communicate with all the other nodes.  I have already blogged on this previously http://wp.me/pTu1i-29 but it is important to getting you going so I wanted to mention it here again.

Namenode Configuration Changes

The image from your name node is the meta data for knowing where all of the blocks for all of your files in HDFS across your entire cluster are stored.  If you lose the image file your data is NOT recoverable (as the nodes will have no idea what block is for what file).  No fear you can just save a copy to an NFS share.  After you have mounted it then update your hdfs-site.xml configuration.  The property dfs.name.dir takes a comma separated list of directories to save your image to, add it.

Max tasks can be setup on the Namenode or overridden (and set final) on data nodes that may have different hardware configurations.  The max tasks are setup for both mappers and reducers.  To calculate this it is based on CPU (cores) and the amount of RAM you have and also the JVM max you setup in mapred.child.java.opts (the default is 200).  The Datanode and Tasktracker each are set to 1GB so for a 8GB machine the mapred.tasktracker.map.tasks.maximum could be set to 7  and the mapred.tasktracker.reduce.tasks.maximum set to 7 with the mapred.child.java.opts set to -400Xmx (assuming 8 cores).  Please note these task maxes are as much done by your CPU if you only have 1 CPU with 1 core then it is time to get new hardware for your data node or set the mask tasks to 1.  If you have 1 CPU with 4 cores then setting map to 3 and reduce to 3 would be good (saving 1 core for the daemon).

By default there is only one reducer and you need to configure mapred.reduce.tasks to be more than one.  This value should be somewhere between .95 and 1.75 times the number of maximum tasks per node times the number of data nodes.  So if you have 3 data nodes and it is setup max tasks of 7 then configure this between 25 and 36.

As of version 0.19 Hadoop is able to reuse the JVM for multiple tasks.  This is kind of important to not waste time starting up and tearing down the JVM itself and is configurable here mapred.job.reuse.jvm.num.tasks.

Another good configuration addition is mapred.compress.map.output.  Setting this value to true should (balance between the time to compress vs transfer) speed up the reducers copy greatly especially when working with large data sets.

Here are some more configuration for real world cluster setup,  the docs are good too http://hadoop.apache.org/common/docs/current/cluster_setup.html

  • Some non-default configuration values used to run sort900, that is 9TB of data sorted on a cluster with 900 nodes:
    Configuration File Parameter Value Notes
    conf/hdfs-site.xml dfs.block.size 134217728 HDFS blocksize of 128MB for large file-systems.
    conf/hdfs-site.xml dfs.namenode.handler.count 40 More NameNode server threads to handle RPCs from large number of DataNodes.
    conf/mapred-site.xml mapred.reduce.parallel.copies 20 Higher number of parallel copies run by reduces to fetch outputs from very large number of maps.
    conf/mapred-site.xml mapred.child.java.opts -Xmx512M Larger heap-size for child jvms of maps/reduces.
    conf/core-site.xml fs.inmemory.size.mb 200 Larger amount of memory allocated for the in-memory file-system used to merge map-outputs at the reduces.
    conf/core-site.xml io.sort.factor 100 More streams merged at once while sorting files.
    conf/core-site.xml io.sort.mb 200 Higher memory-limit while sorting data.
    conf/core-site.xml io.file.buffer.size 131072 Size of read/write buffer used in SequenceFiles
  • Updates to some configuration values to run sort1400 and sort2000, that is 14TB of data sorted on 1400 nodes and 20TB of data sorted on 2000 nodes:
    Configuration File Parameter Value Notes
    conf/mapred-site.xml mapred.job.tracker.handler.count 60 More JobTracker server threads to handle RPCs from large number of TaskTrackers.
    conf/mapred-site.xml mapred.reduce.parallel.copies 50
    conf/mapred-site.xml tasktracker.http.threads 50 More worker threads for the TaskTracker’s http server. The http server is used by reduces to fetch intermediate map-outputs.
    conf/mapred-site.xml mapred.child.java.opts -Xmx1024M Larger heap-size for child jvms of maps/reduces.

Secondary Namenode

You may have read or heard already a dozen times that the Secondary Namenode is NOT a backup of the Namenode.  Well, let me say it again for you.  The Secondary Namenode is not the backup of the Namenode.  The Secondary Namenode should also not run on the same machine as the Namenode.  The Secondary Namenode helps to keep the edit log of the of the Namenode updated in the image file.  All of the changes that happen in HDFS are managed in memory on the Namenode.  It writes the changes to a log file that the secondary name node will compact into a new image file.  This is helpful because on restart the Namenode would have to-do this and also helpful because the logs would just grow and could be normalized.

Now, the way that the Secondary Namenode does this is with GET & POST (yes over http) to the Namenode.  It will GET the logs and POST the image.  In order for the Secondary Namenode to-do this you have to configure dfs.http.address in the hdfs-site.xml on the server that you plan to run the Secondary Namenode on.

<property>
<name>dfs.http.address</name>
<value>namenode:50070</value>
<description>
The address and the base port where the dfs namenode web ui will listen on.
If the port is 0 then the server will start on a free port.
</description>
</property>

Also, you need to put the Secondary Namenode into the masters file (which is defaulted to localhost).

Datanode Configuration Changes & Hardware

So here I only have to say that you should use JBOD (Just a Bunch Of Disks).  There have been tests showing 10%-30% degradation when HDFS blocks run on RAID.  So, when setting up your data nodes if you have a bunch of disks you can configure each one for that node to write to dfs.data.dir.

If you want you can also set a value that will always leave some space on the disk dfs.datanode.du.reserved.

Map Reduce Code Changes

In your Java code there is a little trick to help the job be “aware” within the cluster of tasks that are not dead but just working hard.  During execution of a task there is no built in reporting that the job is running as expected if it is not writing out.  So this means that if your tasks are taking up a lot of time doing work it is possible the cluster will see that task as failed (based on the mapred.task.tracker.expiry.interval setting).

Have no fear there is a way to tell cluster that your task is doing just fine.  You have 2 ways todo this you can either report the status or increment a counter.  Both of these will cause the task tracker to properly know the task is ok and this will get seen by the jobtracker in turn.  Both of these options are explained in the JavaDoc http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/mapred/Reporter.html

[tweetmeme http://wp.me/pTu1i-o%5D

/*
Joe Stein
http://www.linkedin.com/in/charmalloc
*/

Categories: Cluster, Hadoop Tags:

Running Hadoop MapReduce With Cassandra NoSQL

April 24, 2010 7 comments

So if you are looking for a good NoSQL read of HBase vs. Cassandra you can check out http://ria101.wordpress.com/2010/02/24/hbase-vs-cassandra-why-we-moved/.  In short HBase is good for reads and Cassandra for writes.  Cassandra does a great job on reads too so please do not think I am shooting either down in any way.  I am just saying that both HBase and Cassandra have great value and useful purpose in their own right and even use cases exists to run both.  HBase recently got called up as a top level apache project coming up and out of Hadoop.

Having worked with Cassandra a bit I often see/hear of folks asking about running Map/Reduce jobs against the data stored in Cassandra instances.  Well Hadoopers & Hadooperettes the Cassandra folks in the 0.60 release provide a way to-do very nicely.   It is VERY straight forward and well thought through.  If you want to see the evolution check out the JIRA issue https://issues.apache.org/jira/browse/CASSANDRA-342

So how do you it?  Very simple, Cassandra provides an implementation of InputFormat.  Incase you are new to Hadoop the InputFormat is what the mapper is going to use to load your data into it (basically).  Their subclass connects your mapper to pull the data in from Cassandra.  What is also great here is that the Cassandra folks have also spent the time implementing the integration in the classic “Word Count” example.

See https://svn.apache.org/repos/asf/cassandra/trunk/contrib/word_count/ for this example.  Cassandra rows or row fragments (that is, pairs of key + SortedMap of columns) are input to Map tasks for processing by your job, as specified by a SlicePredicate that describes which columns to fetch from each row. Here’s how this looks in the word_count example, which selects just one configurable columnName from each row:

ConfigHelper.setColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY);
SlicePredicate predicate = new SlicePredicate().setColumn_names(Arrays.asList(columnName.getBytes()));
ConfigHelper.setSlicePredicate(job.getConfiguration(), predicate);

Cassandra also provides a Pig LoadFunc for running jobs in the Pig DSL instead of writing Java code by hand. This is in https://svn.apache.org/repos/asf/cassandra/trunk/contrib/pig/.

[tweetmeme http://wp.me/pTu1i-2Z%5D

/*
Joe Stein
http://www.linkedin.com/in/charmalloc
*/