Big Data Open Source Security

In security there has never (IMHO) been enough open source solutions and Bruce Schneier has written about this several times in the past, and there’s no need to rewrite the arguments again.

Now with “NoSQL” and “Big Data” Open Source trends in the market place Security finally has an intersection… a union if I may where new solutions to solve problems that have plagued our society can finally begin to arrise (and have already in many cases). Fraud, Malware, Phishing, Spam, etc all can be tackled now with new Security solutions because of Big Data and Open Source.

At the front lines of this is Apache Accumulo which is a Big Data, Open Source and Secure NoSQL Database that runs on top of Apache Hadoop. It was originally developed by the United States National Security Agency and submitted to the Apache Foundation as Open Source in 2011 with 3 years of development and production operation already having occurred.

Accumulo extends the BigTable data model to implement a security mechanism known as cell-level security. Every key-value pair has its own security label, stored under the column visibility element of the key, which is used to determine whether a given user meets the security requirements to read the value. This enables data of various security levels to be stored within the same row, and users of varying degrees of access to query the same table, while preserving data confidentiality.

SECURITY LABEL EXPRESSIONS

When mutations are applied, users can specify a security label for each value. This is done as the Mutation is created by passing a ColumnVisibility object to the put() method:

Text rowID = new Text("row1");
Text colFam = new Text("myColFam");
Text colQual = new Text("myColQual");
ColumnVisibility colVis = new ColumnVisibility("public");
long timestamp = System.currentTimeMillis();

Value value = new Value("myValue");

Mutation mutation = new Mutation(rowID);
mutation.put(colFam, colQual, colVis, timestamp, value);

SECURITY LABEL EXPRESSION SYNTAX

Security labels consist of a set of user-defined tokens that are required to read the value the label is associated with. The set of tokens required can be specified using syntax that supports logical AND and OR combinations of tokens, as well as nesting groups of tokens together.

For example, suppose within our organization we want to label our data values with security labels defined in terms of user roles. We might have tokens such as:

admin
audit
system
These can be specified alone or combined using logical operators:

// Users must have admin privileges:
admin

// Users must have admin and audit privileges
admin&audit

// Users with either admin or audit privileges
admin|audit

// Users must have audit and one or both of admin or system
(admin|system)&audit

When both | and & operators are used, parentheses must be used to specify precedence of the operators.

AUTHORIZATION

When clients attempt to read data from Accumulo, any security labels present are examined against the set of authorizations passed by the client code when the Scanner or BatchScanner are created. If the authorizations are determined to be insufficient to satisfy the security label, the value is suppressed from the set of results sent back to the client.

Authorizations are specified as a comma-separated list of tokens the user possesses:

// user possess both admin and system level access
Authorization auths = new Authorization("admin","system");

Scanner s = connector.createScanner("table", auths);

USER AUTHORIZATIONS

Each accumulo user has a set of associated security labels. To manipulate these in the shell use the setuaths and getauths commands. These may also be modified using the java security operations API.

When a user creates a scanner a set of Authorizations is passed. If the authorizations passed to the scanner are not a subset of the users authorizations, then an exception will be thrown.

To prevent users from writing data they can not read, add the visibility constraint to a table. Use the -evc option in the createtable shell command to enable this constraint. For existing tables use the following shell command to enable the visibility constraint. Ensure the constraint number does not conflict with any existing constraints.

config -t table -s table.constraint.1=org.apache.accumulo.core.security.VisibilityConstraint

Any user with the alter table permission can add or remove this constraint. This constraint is not applied to bulk imported data, if this a concern then disable the bulk import permission.

SECURE AUTHORIZATIONS HANDLING

For applications serving many users, it is not expected that an accumulo user will be created for each application user. In this case an accumulo user with all authorizations needed by any of the applications users must be created. To service queries, the application should create a scanner with the application users authorizations. These authorizations could be obtained from a trusted 3rd party.

Often production systems will integrate with Public-Key Infrastructure (PKI) and designate client code within the query layer to negotiate with PKI servers in order to authenticate users and retrieve their authorization tokens (credentials). This requires users to specify only the information necessary to authenticate themselves to the system. Once user identity is established, their credentials can be accessed by the client code and passed to Accumulo outside of the reach of the user.

QUERY SERVICES LAYER

Since the primary method of interaction with Accumulo is through the Java API, production environments often call for the implementation of a Query layer. This can be done using web services in containers such as Apache Tomcat, but is not a requirement. The Query Services Layer provides a mechanism for providing a platform on which user facing applications can be built. This allows the application designers to isolate potentially complex query logic, and enables a convenient point at which to perform essential security functions.

Several production environments choose to implement authentication at this layer, where users identifiers are used to retrieve their access credentials which are then cached within the query layer and presented to Accumulo through the Authorizations mechanism.

Typically, the query services layer sits between Accumulo and user workstations.

Apache Accumulo version 1.5 just came out for download with docs

New software as a service solutions will start to spring up into the market as will new out of the box open source solutions. Whether we are trying to prevent health care fraud, protect individuals from identify theft or corporations from intrusion all without comprimsing the (C)onfidentiality, (I)ntegrity and the (A)vailability of the data and distributes systems.

/*
Joe Stein
http://www.linkedin.com/in/charmalloc
*/

Using Scala To Work With Hadoop

Cloudera has a great toolkit to work with Hadoop.  Specifically it is focused on building distributed systems and services on top of the Hadoop Ecosystem.

http://cloudera.github.io/cdk/docs/0.2.0/cdk-data/guide.html

And the examples are in Scala!!!!

Here is how you you work with generic stuff on the file system including Avro files reading and writing.

https://github.com/cloudera/cdk/blob/master/cdk-examples/src/main/scala/creategeneric.scala

/**
* Copyright 2013 Cloudera Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import com.cloudera.data.{DatasetDescriptor, DatasetWriter}
import com.cloudera.data.filesystem.FileSystemDatasetRepository
import java.io.FileInputStream
import org.apache.avro.Schema
import org.apache.avro.Schema.Parser
import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import scala.compat.Platform
import scala.util.Random

// Construct a local filesystem dataset repository rooted at /tmp/data
val repo = new FileSystemDatasetRepository(
FileSystem.get(new Configuration()),
new Path("/tmp/data")
)

// Read an Avro schema from the user.avsc file on the classpath
val schema = new Parser().parse(new FileInputStream("src/main/resources/user.avsc"))

// Create a dataset of users with the Avro schema in the repository
val descriptor = new DatasetDescriptor.Builder().schema(schema).get()
val users = repo.create("users", descriptor)

// Get a writer for the dataset and write some users to it
val writer = users.getWriter().asInstanceOf[DatasetWriter[GenericRecord]]
writer.open()
val colors = Array("green", "blue", "pink", "brown", "yellow")
val rand = new Random()
for (i val builder = new GenericRecordBuilder(schema)
val record = builder.set("username", "user-" + i)
.set("creationDate", Platform.currentTime)
.set("favoriteColor", colors(rand.nextInt(colors.length))).build()
writer.write(record)
}
writer.close()

Big ups to the Cloudera team!

/*
Joe Stein
https://twitter.com/allthingshadoop
*/

Categories: Uncategorized

Hortonworks HDP1, Apache Hadoop 2.0, NextGen MapReduce (YARN), HDFS Federation and the future of Hadoop with Arun C. Murthy

July 23, 2012 2 comments

Episode #8 of the Podcast is a talk with Arun C. Murthy.

We talked about Hortonworks HDP1, the first release from Hortonworks, Apache Hadoop 2.0, NextGen MapReduce (YARN) and HDFS Federations

subscribe to the podcast and listen to all of what Arun had to share.

Some background to what we discussed:

Hortonworks Data Platform (HDP)

from their website: http://hortonworks.com/products/hortonworksdataplatform/

Hortonworks Data Platform (HDP) is a 100% open source data management platform based on Apache Hadoop. It allows you to load, store, process and manage data in virtually any format and at any scale. As the foundation for the next generation enterprise data architecture, HDP includes all of the necessary components to begin uncovering business insights from the quickly growing streams of data flowing into and throughout your business.

Hortonworks Data Platform is ideal for organizations that want to combine the power and cost-effectiveness of Apache Hadoop with the advanced services required for enterprise deployments. It is also ideal for solution providers that wish to integrate or extend their solutions with an open and extensible Apache Hadoop-based platform.

Key Features
  • Integrated and Tested Package – HDP includes stable versions of all the critical Apache Hadoop components in an integrated and tested package.
  • Easy Installation – HDP includes an installation and provisioning tool with a modern, intuitive user interface.
  • Management and Monitoring Services – HDP includes intuitive dashboards for monitoring your clusters and creating alerts.
  • Data Integration Services – HDP includes Talend Open Studio for Big Data, the leading open source integration tool for easily connecting Hadoop to hundreds of data systems without having to write code.
  • Metadata Services – HDP includes Apache HCatalog, which simplifies data sharing between Hadoop applications and between Hadoop and other data systems.
  • High Availability – HDP has been extended to seamlessly integrate with proven high availability solutions.

Apache Hadoop 2.0

from their website: http://hadoop.apache.org/common/docs/current/

Apache Hadoop 2.x consists of significant improvements over the previous stable release (hadoop-1.x).

Here is a short overview of the improvments to both HDFS and MapReduce.

  • HDFS FederationIn order to scale the name service horizontally, federation uses multiple independent Namenodes/Namespaces. The Namenodes are federated, that is, the Namenodes are independent and don’t require coordination with each other. The datanodes are used as common storage for blocks by all the Namenodes. Each datanode registers with all the Namenodes in the cluster. Datanodes send periodic heartbeats and block reports and handles commands from the Namenodes.More details are available in the HDFS Federation document.
  • MapReduce NextGen aka YARN aka MRv2The new architecture introduced in hadoop-0.23, divides the two major functions of the JobTracker: resource management and job life-cycle management into separate components.The new ResourceManager manages the global assignment of compute resources to applications and the per-application ApplicationMaster manages the application‚Äôs scheduling and coordination.An application is either a single job in the sense of classic MapReduce jobs or a DAG of such jobs.The ResourceManager and per-machine NodeManager daemon, which manages the user processes on that machine, form the computation fabric.The per-application ApplicationMaster is, in effect, a framework specific library and is tasked with negotiating resources from the ResourceManager and working with the NodeManager(s) to execute and monitor the tasks.

    More details are available in the YARN document.

Getting Started

The Hadoop documentation includes the information you need to get started using Hadoop. Begin with the Single Node Setup which shows you how to set up a single-node Hadoop installation. Then move on to the Cluster Setup to learn how to set up a multi-node Hadoop installation.

Apache Hadoop NextGen MapReduce (YARN)

from their website: http://hadoop.apache.org/common/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html

MapReduce has undergone a complete overhaul in hadoop-0.23 and we now have, what we call, MapReduce 2.0 (MRv2) or YARN.

The fundamental idea of MRv2 is to split up the two major functionalities of the JobTracker, resource management and job scheduling/monitoring, into separate daemons. The idea is to have a global ResourceManager (RM) and per-application ApplicationMaster (AM). An application is either a single job in the classical sense of Map-Reduce jobs or a DAG of jobs.

The ResourceManager and per-node slave, the NodeManager (NM), form the data-computation framework. The ResourceManager is the ultimate authority that arbitrates resources among all the applications in the system.

The per-application ApplicationMaster is, in effect, a framework specific library and is tasked with negotiating resources from the ResourceManager and working with the NodeManager(s) to execute and monitor the tasks.

MapReduce NextGen Architecture

The ResourceManager has two main components: Scheduler and ApplicationsManager.

The Scheduler is responsible for allocating resources to the various running applications subject to familiar constraints of capacities, queues etc. The Scheduler is pure scheduler in the sense that it performs no monitoring or tracking of status for the application. Also, it offers no guarantees about restarting failed tasks either due to application failure or hardware failures. The Scheduler performs its scheduling function based the resource requirements of the applications; it does so based on the abstract notion of a resource Container which incorporates elements such as memory, cpu, disk, network etc. In the first version, only memory is supported.

The Scheduler has a pluggable policy plug-in, which is responsible for partitioning the cluster resources among the various queues, applications etc. The current Map-Reduce schedulers such as the CapacityScheduler and the FairScheduler would be some examples of the plug-in.

The CapacityScheduler supports hierarchical queues to allow for more predictable sharing of cluster resources

The ApplicationsManager is responsible for accepting job-submissions, negotiating the first container for executing the application specific ApplicationMaster and provides the service for restarting the ApplicationMaster container on failure.

The NodeManager is the per-machine framework agent who is responsible for containers, monitoring their resource usage (cpu, memory, disk, network) and reporting the same to the ResourceManager/Scheduler.

The per-application ApplicationMaster has the responsibility of negotiating appropriate resource containers from the Scheduler, tracking their status and monitoring for progress.

MRV2 maintains API compatibility with previous stable release (hadoop-0.20.205). This means that all Map-Reduce jobs should still run unchanged on top of MRv2 with just a recompile.

HDFS Federation

from their website: http://hadoop.apache.org/common/docs/current/hadoop-yarn/hadoop-yarn-site/Federation.html

Background

HDFS LayersHDFS has two main layers:

  • Namespace
    • Consists of directories, files and blocks
    • It supports all the namespace related file system operations such as create, delete, modify and list files and directories.
  • Block Storage Service has two parts
    • Block Management (which is done in Namenode)
      • Provides datanode cluster membership by handling registrations, and periodic heart beats.
      • Processes block reports and maintains location of blocks.
      • Supports block related operations such as create, delete, modify and get block location.
      • Manages replica placement and replication of a block for under replicated blocks and deletes blocks that are over replicated.
    • Storage – is provided by datanodes by storing blocks on the local file system and allows read/write access.

    The prior HDFS architecture allows only a single namespace for the entire cluster. A single Namenode manages this namespace. HDFS Federation addresses limitation of the prior architecture by adding support multiple Namenodes/namespaces to HDFS file system.

Multiple Namenodes/Namespaces

In order to scale the name service horizontally, federation uses multiple independent Namenodes/namespaces. The Namenodes are federated, that is, the Namenodes are independent and don’t require coordination with each other. The datanodes are used as common storage for blocks by all the Namenodes. Each datanode registers with all the Namenodes in the cluster. Datanodes send periodic heartbeats and block reports and handles commands from the Namenodes.

HDFS Federation ArchitectureBlock Pool

A Block Pool is a set of blocks that belong to a single namespace. Datanodes store blocks for all the block pools in the cluster. It is managed independently of other block pools. This allows a namespace to generate Block IDs for new blocks without the need for coordination with the other namespaces. The failure of a Namenode does not prevent the datanode from serving other Namenodes in the cluster.

A Namespace and its block pool together are called Namespace Volume. It is a self-contained unit of management. When a Namenode/namespace is deleted, the corresponding block pool at the datanodes is deleted. Each namespace volume is upgraded as a unit, during cluster upgrade.

ClusterID

A new identifier ClusterID is added to identify all the nodes in the cluster. When a Namenode is formatted, this identifier is provided or auto generated. This ID should be used for formatting the other Namenodes into the cluster.

Key Benefits

  • Namespace Scalability – HDFS cluster storage scales horizontally but the namespace does not. Large deployments or deployments using lot of small files benefit from scaling the namespace by adding more Namenodes to the cluster
  • Performance – File system operation throughput is limited by a single Namenode in the prior architecture. Adding more Namenodes to the cluster scales the file system read/write operations throughput.
  • Isolation – A single Namenode offers no isolation in multi user environment. An experimental application can overload the Namenode and slow down production critical applications. With multiple Namenodes, different categories of applications and users can be isolated to different namespaces.

subscribe to the podcast and listen to all of what Arun had to share.

/*
Joe Stein
http://www.linkedin.com/in/charmalloc
*/

Hadoop distribution bake-off and my experience with Cloudera and MapR

July 10, 2012 Leave a comment

A few months back we started to endeavor on a new Hadoop cluster @ medialets

We have been live with Hadoop in production since April 2010 and we are still running CDH2. Our current hosting provider does not have a very ideal implementation for us where our 36 nodes are spread out across an entire data center and 5 networks each with 1 GB link. While there are issues with this type of setup we have been able to organically grow our cluster (started at 4 nodes) which powers 100% of our batch analytics for what is now hundreds of millions of mobile devices.

One of our mapreduce jobs processes 30+ billion objects (about 3 TB of uncompressed data) and takes about 90 minutes to run. This jobs runs all day long contiguously. Each run ingests the data that was received while the previous job was running. One of the primary goals of our new cluster was to reduce the time these type of jobs take without having to make any code changes or increase our investment in hardware. We figured besides the infrastructure changes we needed/wanted to make that running an old version of Hadoop meant that we were not taking advantage of all the awesome work that folks have been putting in over the last 2 years to do things like increasing performance.

So we endeavored to what seems to have been coined as “The Hadoop Distribution Bake-off”. We wanted to not only see how new versions of the Cloudera distribution would be running our jobs but also evaluate other distributions that have emerged since we first started with Hadoop. When we did this Hortonwork’s distribution was not released yet otherwise we would have added them and their distro to the possible choices.

First we found a new vendor to setup a test cluster for us http://www.logicworks.com. It was a four node cluster each with 2GB (1G dual bonded) NIC, 12GB of RAM, 4 x 1TB drives (using 3 of the drives for data and one for the OS) and 2x Westmere 5645 2.4GHz Hex-Core CPU. While this was not going to be the exact configuration we were going to end up with it was what they had in inventory and for the purposes of this test it was all about keeping the same hardware running with the same job with the same data and only changing the distro and configurations. As part of our due diligence, performance was not the only point we were interested in but was the primary goal of the bake-off and testing. We also reviewed other aspects of the distributions and companies which ultimately led to our final decision to go with CDH4 for our new cluster.

First, we wanted to create a baseline to see how our data and job did with the existing distribution (CDH2) we run in production with our existing production configuration. Next we wanted to give MapR a shot. We engaged with their team and they spent their time and assistance to help configure and optimize for the job’s test run. Once that was done we wanted to give CDH3 and CDH4 (which was still beta at the time) and the Cloudera folks also lent their time and helped configure and optimize the cluster.

CDH2 = 12 hours 12 min (our production configuration)
MapR = 4 hours 31 min (configuration done by MapR team)
CDH3 = 6 hours 8 min (our production configuration)
CDH4 = 4 hours 20 min (configuration done by Cloudera team)

This told us that the decision between running CDH4 or MapR was not going to be made based on performance of the distribution with our data and mapreduce jobs.

So, we had to look at the other things that were important to us.

MapR has a couple of a really nice features that are unique to their platform. Their file system features with NFS and Snapshots, both are cool so lets go through them quickly. MapR’s underlying proprietary file system allows for these unique features in the Hadoop ecosystem. The NFS feature basically allows you to copy to an NFS share that is distributed across the entire cluster (with a VIP so highly available). This means that you can use the cluster for saving data from your applications and then without any additional copies map-reduce over it. Data is compressible under the hood though this did not mean much to us since we compress all of our data in sequence files using compress by block size on the sequence file. Snapshots (and mirroring to other clusters of those snapshots) is nifty. Being able to take a point in time instance cut of things makes the cluster feel and operate like our SAN. While snapshots are nifty the same end result is capable with a distcp which sure takes longer but is still technically feasible not a lot of other benefits for us or our business, nifty none the less. The main issue we had with all of this was that all of the features that were attractive required us to license their product. Their product also is not open source so we would not be able to build the code, make changes or anything else always having to rely on them for support and maintenance. We met a lot of great folks from MapR but only 2 of them were Apache committers (they may have more on staff, I only met two though) and this is important to us from a support & maintenance perspective… for them it probably is not a huge deal since their platform is not open source and proprietary ( I think I just repeated myself here but did so on purpose ).

Cloudera… tried, true and trusted (I have been running CDH2 for 2 years in production without ever having to upgrade) and know lots of folks that can say the same thing. Everything is Open Source with a very healthy and active community. A handful of times this has been very helpful in development cycles for me to see what the container I was running in was doing to help me resolve the problems I was finding in my own code… or even to simply shoot a question over the mailing list to get a response to a question. As far as the distribution goes, it costs nothing to get it running and have it run in production with all of the features we wanted. If we ever decided to pay for support there are a boat load (a large boat) of Apache Committers not just to the Hadoop project but to lots of projects within the Hadoop eco system all of which are available and part and parcel to help answer questions and make code changes, etc. The philosophy of their distribution (besides just being open source) is to cherry pick changes from Apache Hadoop as soon as they can (or should or want) to be introduced to making their distribution best.

I can think of a lot of industries and companies were MapR would be a good choice over Cloudera.

We decided what was best for us was to go with CDH4 for our new cluster. And, if we ever decide to purchase support we would get it from Cloudera.

/*
Joe Stein
http://www.linkedin.com/in/charmalloc
*/

Categories: Hadoop

Unified analytics and large scale machine learning with Milind Bhandarkar

Episode #7 of the Podcast is a talk with Milind Bhandarkar.

We talked about unified analytics, machine learning, data science, some great history of Hadoop, the future of Hadoop and a lot more!

subscribe to the podcast and listen to all of what Milind had to share.

/*
Joe Stein
http://www.medialets.com
*/

Hadoop Streaming Made Simple using Joins and Keys with Python

December 16, 2011 2 comments

There are a lot of different ways to write MapReduce jobs!!!

Sample code for this post https://github.com/joestein/amaunet

I find streaming scripts a good way to interrogate data sets (especially when I have not worked with them yet or are creating new ones) and enjoy the lifecycle when the initial elaboration of the data sets lead to the construction of the finalized scripts for an entire job (or series of jobs as is often the case).

When doing streaming with Hadoop you do have a few library options.  If you are a Ruby programmer then wukong is awesome! For Python programmers you can use dumbo and more recently released mrjob.

I like working under the hood myself and getting down and dirty with the data and here is how you can too.

Lets start first with defining two simple sample data sets.

Data set 1:  countries.dat

name|key

United States|US
Canada|CA
United Kingdom|UK
Italy|IT

Data set 2: customers.dat

name|type|country

Alice Bob|not bad|US
Sam Sneed|valued|CA
Jon Sneed|valued|CA
Arnold Wesise|not so good|UK
Henry Bob|not bad|US
Yo Yo Ma|not so good|CA
Jon York|valued|CA
Alex Ball|valued|UK
Jim Davis|not so bad|JA

The requirements: you need to find out grouped by type of customer how many of each type are in each country with the name of the country listed in the countries.dat in the final result (and not the 2 digit country name).

To-do this you need to:

1) Join the data sets
2) Key on country
3) Count type of customer per country
4) Output the results

So first lets code up a quick mapper called smplMapper.py (you can decide if smpl is short for simple or sample).

Now in coding the mapper and reducer in Python the basics are explained nicely here http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/ but I am going to dive a bit deeper to tackle our example with some more tactics.

#!/usr/bin/env python

import sys

# input comes from STDIN (standard input)
for line in sys.stdin:
	try: #sometimes bad data can cause errors use this how you like to deal with lint and bad data
        
		personName = "-1" #default sorted as first
		personType = "-1" #default sorted as first
		countryName = "-1" #default sorted as first
		country2digit = "-1" #default sorted as first
		
		# remove leading and trailing whitespace
		line = line.strip()
	 	
		splits = line.split("|")
		
		if len(splits) == 2: #country data
			countryName = splits[0]
			country2digit = splits[1]
		else: #people data
			personName = splits[0]
			personType = splits[1]
			country2digit = splits[2]			
		
		print '%s^%s^%s^%s' % (country2digit,personType,personName,countryName)
	except: #errors are going to make your job fail which you may or may not want
		pass

Don’t forget:

chmod a+x smplMapper.py

Great! We just took care of #1 but time to test and see what is going to the reducer.

From the command line run:

cat customers.dat countries.dat|./smplMapper.py|sort

Which will result in:

CA^-1^-1^Canada
CA^not so good^Yo Yo Ma^-1
CA^valued^Jon Sneed^-1
CA^valued^Jon York^-1
CA^valued^Sam Sneed^-1
IT^-1^-1^Italy
JA^not so bad^Jim Davis^-1
UK^-1^-1^United Kingdom
UK^not so good^Arnold Wesise^-1
UK^valued^Alex Ball^-1
US^-1^-1^United States
US^not bad^Alice Bob^-1
US^not bad^Henry Bob^-1

Notice how this is sorted so the country is first and the people in that country after it (so we can grab the correct country name as we loop) and with the type of customer also sorted (but within country) so we can properly count the types within the country. =8^)

Let us hold off on #2 for a moment (just hang in there it will all come together soon I promise) and get smplReducer.py working first.

#!/usr/bin/env python
 
import sys
 
# maps words to their counts
foundKey = ""
foundValue = ""
isFirst = 1
currentCount = 0
currentCountry2digit = "-1"
currentCountryName = "-1"
isCountryMappingLine = False

# input comes from STDIN
for line in sys.stdin:
	# remove leading and trailing whitespace
	line = line.strip()
	
	try:
		# parse the input we got from mapper.py
		country2digit,personType,personName,countryName = line.split('^')
		
		#the first line should be a mapping line, otherwise we need to set the currentCountryName to not known
		if personName == "-1": #this is a new country which may or may not have people in it
			currentCountryName = countryName
			currentCountry2digit = country2digit
			isCountryMappingLine = True
		else:
			isCountryMappingLine = False # this is a person we want to count
		
		if not isCountryMappingLine: #we only want to count people but use the country line to get the right name 

			#first check to see if the 2digit country info matches up, might be unkown country
			if currentCountry2digit != country2digit:
				currentCountry2digit = country2digit
				currentCountryName = '%s - Unkown Country' % currentCountry2digit
			
			currentKey = '%s\t%s' % (currentCountryName,personType) 
			
			if foundKey != currentKey: #new combo of keys to count
				if isFirst == 0:
					print '%s\t%s' % (foundKey,currentCount)
					currentCount = 0 #reset the count
				else:
					isFirst = 0
			
				foundKey = currentKey #make the found key what we see so when we loop again can see if we increment or print out
			
			currentCount += 1 # we increment anything not in the map list
	except:
		pass

try:
	print '%s\t%s' % (foundKey,currentCount)
except:
	pass

Don’t forget:

chmod a+x smplReducer.py

And then run:

cat customers.dat countries.dat|./smplMapper.py|sort|./smplReducer.py

And voila!

Canada	not so good	1
Canada	valued	3
JA - Unkown Country	not so bad	1
United Kingdom	not so good	1
United Kingdom	valued	1
United States	not bad	2

So now #3 and #4 are done but what about #2? 

First put the files into Hadoop:

hadoop fs -put ~/mayo/customers.dat .
hadoop fs -put ~/mayo/countries.dat .

And now run it like this (assuming you are running as hadoop in the bin directory):

hadoop jar ../contrib/streaming/hadoop-0.20.1+169.89-streaming.jar -D mapred.reduce.tasks=4 -file ~/mayo/smplMapper.py -mapper smplMapper.py -file ~/mayo/smplReducer.py -reducer smplReducer.py -input customers.dat -input countries.dat -output mayo -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner -jobconf stream.map.output.field.separator=^ -jobconf stream.num.map.output.key.fields=4 -jobconf map.output.key.field.separator=^ -jobconf num.key.fields.for.partition=1

Let us look at what we did:

hadoop fs -cat mayo/part*

Which results in:

Canada	not so good	1
Canada	valued	3
United Kingdom	not so good	1
United Kingdom	valued	1
United States	not bad	2
JA - Unkown Country	not so bad	1

So #2 is the partioner KeyFieldBasedPartitioner explained here further Hadoop Wiki On Streaming which allows the key to be whatever set of columns you output (in our case by country) configurable by the command line options and the rest of the values are sorted within that key and sent to the reducer together by key.

And there you go … Simple Python Scripting Implementing Streaming in Hadoop.  

Grab the tar here and give it a spin.

/*
Joe Stein
Twitter: @allthingshadoop
Connect: On Linked In
*/

Categories: Hadoop, MapReduce, Python

Faster Datanodes with less wait io using df instead of du

May 20, 2011 1 comment

I have noticed often that the check Hadoop uses to calculate usage for the data nodes causes a fair amount of wait io on them driving up load.

Every cycle we can get from every spindle we want!

So I came up with a nice little hack to use df instead of du.

Here is basically what I did so you can do it too.


mv /usr/bin/du /usr/bin/bak_du
vi /usr/bin/du

and save this inside of it

#!/bin/sh

mydf=$(df $2 | grep -vE '^Filesystem|tmpfs|cdrom' | awk '{ print $3 }')
echo -e "$mydf\t$2"

then give it execute permission

chmod a+x /usr/bin/du

restart you data node check the log for no errors and make sure it starts back up

viola

Now when Hadoop calls “du -sk /yourhdfslocation” it will be expedient with its results

whats wrong with this?

1) I assume you have nothing else on your disks that you are storing so df is really close to du since almost all of your data is in HDFS

2) If you have more than 1 volume holding your hdfs blocks this is not exactly accurate so you are skewing the size of each vol by only calculating one of them and using that result for the others…. this is simple to fix just parse your df result differently and use the path passed into the second paramater to know which vol to grep in your df result… your first volume is going to be larger anyways most likely and you should be monitoring disk space another way so it is not going to be very harmefull if you just check and report the first volume’s size

3) you might not have your HDFS blocks on your first volume …. see #2 you can just grep the volume you want to report

/*
Joe Stein
http://www.linkedin.com/in/charmalloc
*/

Categories: Hadoop
Follow

Get every new post delivered to your Inbox.

Join 25 other followers