Hortonworks HDP1, Apache Hadoop 2.0, NextGen MapReduce (YARN), HDFS Federation and the future of Hadoop with Arun C. Murthy
subscribe to the podcast and listen to all of what Arun had to share.
Some background to what we discussed:
Hortonworks Data Platform (HDP)
from their website:
Hortonworks Data Platform (HDP) is a 100% open source data management platform based on Apache Hadoop. It allows you to load, store, process and manage data in virtually any format and at any scale. As the foundation for the next generation enterprise data architecture, HDP includes all of the necessary components to begin uncovering business insights from the quickly growing streams of data flowing into and throughout your business.
Hortonworks Data Platform is ideal for organizations that want to combine the power and cost-effectiveness of Apache Hadoop with the advanced services required for enterprise deployments. It is also ideal for solution providers that wish to integrate or extend their solutions with an open and extensible Apache Hadoop-based platform.
- Integrated and Tested Package – HDP includes stable versions of all the critical Apache Hadoop components in an integrated and tested package.
- Easy Installation – HDP includes an installation and provisioning tool with a modern, intuitive user interface.
- Management and Monitoring Services – HDP includes intuitive dashboards for monitoring your clusters and creating alerts.
- Data Integration Services – HDP includes Talend Open Studio for Big Data, the leading open source integration tool for easily connecting Hadoop to hundreds of data systems without having to write code.
- Metadata Services – HDP includes Apache HCatalog, which simplifies data sharing between Hadoop applications and between Hadoop and other data systems.
- High Availability – HDP has been extended to seamlessly integrate with proven high availability solutions.
Apache Hadoop 2.0
from their website:
Apache Hadoop 2.x consists of significant improvements over the previous stable release (hadoop-1.x).
Here is a short overview of the improvments to both HDFS and MapReduce.
- HDFS FederationIn order to scale the name service horizontally, federation uses multiple independent Namenodes/Namespaces. The Namenodes are federated, that is, the Namenodes are independent and don’t require coordination with each other. The datanodes are used as common storage for blocks by all the Namenodes. Each datanode registers with all the Namenodes in the cluster. Datanodes send periodic heartbeats and block reports and handles commands from the Namenodes.More details are available in the HDFS Federation document.
- MapReduce NextGen aka YARN aka MRv2The new architecture introduced in hadoop-0.23, divides the two major functions of the JobTracker: resource management and job life-cycle management into separate components.The new ResourceManager manages the global assignment of compute resources to applications and the per-application ApplicationMaster manages the application‚Äôs scheduling and coordination.An application is either a single job in the sense of classic MapReduce jobs or a DAG of such jobs.The ResourceManager and per-machine NodeManager daemon, which manages the user processes on that machine, form the computation fabric.The per-application ApplicationMaster is, in effect, a framework specific library and is tasked with negotiating resources from the ResourceManager and working with the NodeManager(s) to execute and monitor the tasks.
More details are available in the YARN document.
The Hadoop documentation includes the information you need to get started using Hadoop. Begin with the Single Node Setup which shows you how to set up a single-node Hadoop installation. Then move on to the Cluster Setup to learn how to set up a multi-node Hadoop installation.
MapReduce has undergone a complete overhaul in hadoop-0.23 and we now have, what we call, MapReduce 2.0 (MRv2) or YARN.
The fundamental idea of MRv2 is to split up the two major functionalities of the JobTracker, resource management and job scheduling/monitoring, into separate daemons. The idea is to have a global ResourceManager (RM) and per-application ApplicationMaster (AM). An application is either a single job in the classical sense of Map-Reduce jobs or a DAG of jobs.
The ResourceManager and per-node slave, the NodeManager (NM), form the data-computation framework. The ResourceManager is the ultimate authority that arbitrates resources among all the applications in the system.
The per-application ApplicationMaster is, in effect, a framework specific library and is tasked with negotiating resources from the ResourceManager and working with the NodeManager(s) to execute and monitor the tasks.
The ResourceManager has two main components: Scheduler and ApplicationsManager.
The Scheduler is responsible for allocating resources to the various running applications subject to familiar constraints of capacities, queues etc. The Scheduler is pure scheduler in the sense that it performs no monitoring or tracking of status for the application. Also, it offers no guarantees about restarting failed tasks either due to application failure or hardware failures. The Scheduler performs its scheduling function based the resource requirements of the applications; it does so based on the abstract notion of a resource Container which incorporates elements such as memory, cpu, disk, network etc. In the first version, only memory is supported.
The Scheduler has a pluggable policy plug-in, which is responsible for partitioning the cluster resources among the various queues, applications etc. The current Map-Reduce schedulers such as the CapacityScheduler and the FairScheduler would be some examples of the plug-in.
The CapacityScheduler supports hierarchical queues to allow for more predictable sharing of cluster resources
The ApplicationsManager is responsible for accepting job-submissions, negotiating the first container for executing the application specific ApplicationMaster and provides the service for restarting the ApplicationMaster container on failure.
The NodeManager is the per-machine framework agent who is responsible for containers, monitoring their resource usage (cpu, memory, disk, network) and reporting the same to the ResourceManager/Scheduler.
The per-application ApplicationMaster has the responsibility of negotiating appropriate resource containers from the Scheduler, tracking their status and monitoring for progress.
MRV2 maintains API compatibility with previous stable release (hadoop-0.20.205). This means that all Map-Reduce jobs should still run unchanged on top of MRv2 with just a recompile.
HDFS has two main layers:
- Consists of directories, files and blocks
- It supports all the namespace related file system operations such as create, delete, modify and list files and directories.
- Block Storage Service has two parts
- Block Management (which is done in Namenode)
- Provides datanode cluster membership by handling registrations, and periodic heart beats.
- Processes block reports and maintains location of blocks.
- Supports block related operations such as create, delete, modify and get block location.
- Manages replica placement and replication of a block for under replicated blocks and deletes blocks that are over replicated.
- Storage – is provided by datanodes by storing blocks on the local file system and allows read/write access.
The prior HDFS architecture allows only a single namespace for the entire cluster. A single Namenode manages this namespace. HDFS Federation addresses limitation of the prior architecture by adding support multiple Namenodes/namespaces to HDFS file system.
- Block Management (which is done in Namenode)
In order to scale the name service horizontally, federation uses multiple independent Namenodes/namespaces. The Namenodes are federated, that is, the Namenodes are independent and don’t require coordination with each other. The datanodes are used as common storage for blocks by all the Namenodes. Each datanode registers with all the Namenodes in the cluster. Datanodes send periodic heartbeats and block reports and handles commands from the Namenodes.
A Block Pool is a set of blocks that belong to a single namespace. Datanodes store blocks for all the block pools in the cluster. It is managed independently of other block pools. This allows a namespace to generate Block IDs for new blocks without the need for coordination with the other namespaces. The failure of a Namenode does not prevent the datanode from serving other Namenodes in the cluster.
A Namespace and its block pool together are called Namespace Volume. It is a self-contained unit of management. When a Namenode/namespace is deleted, the corresponding block pool at the datanodes is deleted. Each namespace volume is upgraded as a unit, during cluster upgrade.
A new identifier ClusterID is added to identify all the nodes in the cluster. When a Namenode is formatted, this identifier is provided or auto generated. This ID should be used for formatting the other Namenodes into the cluster.
- Namespace Scalability – HDFS cluster storage scales horizontally but the namespace does not. Large deployments or deployments using lot of small files benefit from scaling the namespace by adding more Namenodes to the cluster
- Performance – File system operation throughput is limited by a single Namenode in the prior architecture. Adding more Namenodes to the cluster scales the file system read/write operations throughput.
- Isolation – A single Namenode offers no isolation in multi user environment. An experimental application can overload the Namenode and slow down production critical applications. With multiple Namenodes, different categories of applications and users can be isolated to different namespaces.
subscribe to the podcast and listen to all of what Arun had to share.
We talked about unified analytics, machine learning, data science, some great history of Hadoop, the future of Hadoop and a lot more!
subscribe to the podcast and listen to all of what Milind had to share.
There are a lot of different ways to write MapReduce jobs!!!
Sample code for this post
I find streaming scripts a good way to interrogate data sets (especially when I have not worked with them yet or are creating new ones) and enjoy the lifecycle when the initial elaboration of the data sets lead to the construction of the finalized scripts for an entire job (or series of jobs as is often the case).
I like working under the hood myself and getting down and dirty with the data and here is how you can too.
Lets start first with defining two simple sample data sets.
Data set 1: countries.dat
United States|US Canada|CA United Kingdom|UK Italy|IT
Data set 2: customers.dat
Alice Bob|not bad|US Sam Sneed|valued|CA Jon Sneed|valued|CA Arnold Wesise|not so good|UK Henry Bob|not bad|US Yo Yo Ma|not so good|CA Jon York|valued|CA Alex Ball|valued|UK Jim Davis|not so bad|JA
The requirements: you need to find out grouped by type of customer how many of each type are in each country with the name of the country listed in the countries.dat in the final result (and not the 2 digit country name).
To-do this you need to:
1) Join the data sets 2) Key on country 3) Count type of customer per country 4) Output the results
So first lets code up a quick mapper called smplMapper.py (you can decide if smpl is short for simple or sample).
Now in coding the mapper and reducer in Python the basics are explained nicely here
but I am going to dive a bit deeper to tackle our example with some more tactics.
#!/usr/bin/env python import sys # input comes from STDIN (standard input) for line in sys.stdin: try: #sometimes bad data can cause errors use this how you like to deal with lint and bad data personName = "-1" #default sorted as first personType = "-1" #default sorted as first countryName = "-1" #default sorted as first country2digit = "-1" #default sorted as first # remove leading and trailing whitespace line = line.strip() splits = line.split("|") if len(splits) == 2: #country data countryName = splits country2digit = splits else: #people data personName = splits personType = splits country2digit = splits print '%s^%s^%s^%s' % (country2digit,personType,personName,countryName) except: #errors are going to make your job fail which you may or may not want pass
chmod a+x smplMapper.py
Great! We just took care of #1 but time to test and see what is going to the reducer.
From the command line run:
cat customers.dat countries.dat|./smplMapper.py|sort
Which will result in:
CA^-1^-1^Canada CA^not so good^Yo Yo Ma^-1 CA^valued^Jon Sneed^-1 CA^valued^Jon York^-1 CA^valued^Sam Sneed^-1 IT^-1^-1^Italy JA^not so bad^Jim Davis^-1 UK^-1^-1^United Kingdom UK^not so good^Arnold Wesise^-1 UK^valued^Alex Ball^-1 US^-1^-1^United States US^not bad^Alice Bob^-1 US^not bad^Henry Bob^-1
Notice how this is sorted so the country is first and the people in that country after it (so we can grab the correct country name as we loop) and with the type of customer also sorted (but within country) so we can properly count the types within the country. =8^)
Let us hold off on #2 for a moment (just hang in there it will all come together soon I promise) and get smplReducer.py working first.
#!/usr/bin/env python import sys # maps words to their counts foundKey = "" foundValue = "" isFirst = 1 currentCount = 0 currentCountry2digit = "-1" currentCountryName = "-1" isCountryMappingLine = False # input comes from STDIN for line in sys.stdin: # remove leading and trailing whitespace line = line.strip() try: # parse the input we got from mapper.py country2digit,personType,personName,countryName = line.split('^') #the first line should be a mapping line, otherwise we need to set the currentCountryName to not known if personName == "-1": #this is a new country which may or may not have people in it currentCountryName = countryName currentCountry2digit = country2digit isCountryMappingLine = True else: isCountryMappingLine = False # this is a person we want to count if not isCountryMappingLine: #we only want to count people but use the country line to get the right name #first check to see if the 2digit country info matches up, might be unkown country if currentCountry2digit != country2digit: currentCountry2digit = country2digit currentCountryName = '%s - Unkown Country' % currentCountry2digit currentKey = '%s\t%s' % (currentCountryName,personType) if foundKey != currentKey: #new combo of keys to count if isFirst == 0: print '%s\t%s' % (foundKey,currentCount) currentCount = 0 #reset the count else: isFirst = 0 foundKey = currentKey #make the found key what we see so when we loop again can see if we increment or print out currentCount += 1 # we increment anything not in the map list except: pass try: print '%s\t%s' % (foundKey,currentCount) except: pass
chmod a+x smplReducer.py
And then run:
cat customers.dat countries.dat|./smplMapper.py|sort|./smplReducer.py
Canada not so good 1 Canada valued 3 JA - Unkown Country not so bad 1 United Kingdom not so good 1 United Kingdom valued 1 United States not bad 2
So now #3 and #4 are done but what about #2?
First put the files into Hadoop:
hadoop fs -put ~/mayo/customers.dat . hadoop fs -put ~/mayo/countries.dat .
And now run it like this (assuming you are running as hadoop in the bin directory):
hadoop jar ../contrib/streaming/hadoop-0.20.1+169.89-streaming.jar -D mapred.reduce.tasks=4 -file ~/mayo/smplMapper.py -mapper smplMapper.py -file ~/mayo/smplReducer.py -reducer smplReducer.py -input customers.dat -input countries.dat -output mayo -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner -jobconf stream.map.output.field.separator=^ -jobconf stream.num.map.output.key.fields=4 -jobconf map.output.key.field.separator=^ -jobconf num.key.fields.for.partition=1
Let us look at what we did:
hadoop fs -cat mayo/part*
Which results in:
Canada not so good 1 Canada valued 3 United Kingdom not so good 1 United Kingdom valued 1 United States not bad 2 JA - Unkown Country not so bad 1
So #2 is the partioner KeyFieldBasedPartitioner explained here further Hadoop Wiki On Streaming which allows the key to be whatever set of columns you output (in our case by country) configurable by the command line options and the rest of the values are sorted within that key and sent to the reducer together by key.
And there you go … Simple Python Scripting Implementing Streaming in Hadoop.
Grab the tar here and give it a spin.
For those folks either just getting started or even already in the the daily trenches of M/R development every day Karmasphere has come about to help developers and technical professionals make Hadoop MapReduce easier
. Karmasphere Studio is a desktop IDE for graphically prototyping MapReduce jobs and deploying, monitoring and debugging them on Hadoop clusters in private and public clouds.
* Runs on Linux, Apple Mac OS and Windows.
* Works with all major distributions and versions of Hadoop including Apache, Yahoo! and Cloudera.
* Works with Amazon Elastic MapReduce.
* Supports local, networked, HDFS and Amazon S3 file systems.
* Support for Cascading
* Enables job submission from all major platforms including Windows.
* Operates with clusters and file systems behind firewalls.
So, what can you do with it?
- Prototype on the desktop: Get going with MapReduce job development quickly. No need for a cluster since Hadoop emulation is included.
- Deploy to a private or cloud-based cluster: Whether you’re using a cluster in your own network or a cloud, deploy your job/s easily.
- Debug on the cluster: One of the most challenging areas in MapReduce programming is debugging your job on the cluster. Visual tools deliver real-time insight into your job, including support for viewing and charting Hadoop job and task counters.
- Graphically visualize and manipulate: Whether it’s clusters, file systems, job configuration, counters, log files or other debugging information, save time and get better insight by accessing it all in one place.
- Monitor and analyze your jobs in real-time: Get realtime, workflow view of inputs, outputs and intermediate results including map, partition, sort and reduce phases.
Whether you’re new to Hadoop and want to easily explore MapReduce programming or you like the sound of something that helps you prototype, deploy and manage in an integrated environment or you’re already using Hadoop but could use a lot more insight into your jobs running on a cluster, there’s something here for you.
All you need is NetBeans (version 6.7 or 6.8) and Java 1.6 and you’ll be ready to give Karmasphere Studio a whirl.
You do NOT need any kind of Hadoop cluster set up to begin prototyping. But when you are ready to deploy your job on a large data set, you’ll need a virtual or real cluster in your data center or a public cloud such as Amazon Web Services.
An Eclipse version is in progress.
So if you are looking for a good NoSQL read of HBase vs. Cassandra you can check out
. In short HBase is good for reads and Cassandra for writes. Cassandra does a great job on reads too so please do not think I am shooting either down in any way. I am just saying that both HBase and Cassandra have great value and useful purpose in their own right and even use cases exists to run both. HBase recently got called up as a top level apache project coming up and out of Hadoop.
Having worked with Cassandra a bit I often see/hear of folks asking about running Map/Reduce jobs against the data stored in Cassandra instances. Well Hadoopers & Hadooperettes the Cassandra folks in the 0.60 release provide a way to-do very nicely. It is VERY straight forward and well thought through. If you want to see the evolution check out the JIRA issue
So how do you it? Very simple, Cassandra provides an implementation of InputFormat. Incase you are new to Hadoop the InputFormat is what the mapper is going to use to load your data into it (basically). Their subclass connects your mapper to pull the data in from Cassandra. What is also great here is that the Cassandra folks have also spent the time implementing the integration in the classic “Word Count” example.
for this example. Cassandra rows or row fragments (that is, pairs of key + SortedMap of columns) are input to Map tasks for processing by your job, as specified by a SlicePredicate that describes which columns to fetch from each row. Here’s how this looks in the word_count example, which selects just one configurable columnName from each row:
ConfigHelper.setColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY);
SlicePredicate predicate = new SlicePredicate().setColumn_names(Arrays.asList(columnName.getBytes()));
Cassandra also provides a Pig LoadFunc for running jobs in the Pig DSL instead of writing Java code by hand. This is in