Archive for the ‘Hadoop’ Category

Apache Hadoop HDFS Data Node Apache Mesos Framework


This project allows running HDFS on Mesos.

You should be familiar with HDFS and Mesos basics:

Project requires:

  • Mesos 0.23.0+
  • JDK 1.7.x
  • Hadoop 1.2.x or 2.7.x

Mesos in Vagrant

Project includes vagrant environment, that allows to run Mesos cluster locally.

If you are going to use external Mesos cluster, you can skip this section.

1. Start vagrant nodes:

# cd hdfs-mesos/vagrant
# vagrant up

It creates mesos master and slave nodes.

2. Add vagrant node names to /etc/hosts

Now Mesos in vagrant should be running. You can proceed with starting scheduler.

For more details about vagrant environment please read vagrant/

Running Scheduler

1. Download hdfs-mesos\*.jar OR clone & build the project:

Download jar:

# mkdir hdfs-mesos
# cd hdfs-mesos
# wget

OR clone & build:

# git clone
# cd hdfs-mesos
# ./gradlew jar

2. Download hadoop tarball:

# wget

3. Start scheduler:

# ./ scheduler --api=http://$scheduler:7000 --master=zk://$master:2181/mesos --user=vagrant
2016-03-18 15:04:48,785 [main] INFO hdfs.Scheduler - Starting Scheduler:
api: http://$scheduler:7000
files: jar:./hdfs-mesos-, hadoop:./hadoop-1.2.1.tar.gz
mesos: master:master:5050, user:vagrant, principal:<none>, secret:<none>
framework: name:hdfs, role:*, timeout:30d
2016-03-18 15:04:48,916 [main] INFO hdfs.HttpServer - started on port 7000
I0318 15:04:49.008314 19123 sched.cpp:164] Version: 0.25.0
I0318 15:04:49.017160 19155 sched.cpp:262] New master detected at master@
I0318 15:04:49.019287 19155 sched.cpp:272] No credentials provided. Attempting to register without authentication
I0318 15:04:49.029218 19155 sched.cpp:641] Framework registered with 20160310-141004-84125888-5050-10895-0006
2016-03-18 15:04:49,044 [Thread-17] INFO hdfs.Scheduler - [registered] framework:#-0006 master:#326bb pid:master@ hostname:master
2016-03-18 15:04:49,078 [Thread-18] INFO hdfs.Scheduler - [resourceOffers]
slave0#-O761 cpus:1.00; mem:2500.00; disk:35164.00; ports:[5000..32000]
master#-O762 cpus:1.00; mem:2500.00; disk:35164.00; ports:[5000..32000]
2016-03-18 15:04:49,078 [Thread-18] INFO hdfs.Scheduler - [resourceOffers]


  • $scheduler is scheduler address accessible from slave nodes;
  • $master master address accessible from scheduler node;

Scheduler should register itself and start receiving resource offers. If scheduler is not receiving offers it could be required to specify LIBPROCESS_IP:

# export LIBPROCESS_IP=$scheduler_ip

Now scheduler should be running and you can proceed with starting HDFS nodes.

Running HDFS Cluster

Project provides CLI & REST API for managing HDFS nodes. We will focus first on CLI.

1. Add namenode & datanode:

# ./ node add nn --type=namenode
node added:
  id: nn
  type: namenode
  state: idle
  resources: cpus:0.5, mem:512

# ./ node add dn0 --type=datanode
node added:
  id: dn0
  type: datanode
  state: idle
  resources: cpus:0.5, mem:512

2. Start nodes:

# ./ node start \*
nodes started:
  id: nn
  type: namenode
  state: running
  resources: cpus:0.5, mem:512
  reservation: cpus:0.5, mem:512, ports:http=5000,ipc=5001
    task: 383aaab9-982b-400e-aa35-463e66cdcb3b
    executor: 19065e07-a006-49a4-8f2b-636d8b1f2ad6
    slave: 241be3a2-39bc-417c-a967-82b4018a0762-S0 (master)

  id: dn0
  type: datanode
  state: running
  resources: cpus:0.5, mem:512
  reservation: cpus:0.5, mem:512, ports:http=5002,ipc=5003,data=5004
    task: 37f3bcbb-10a5-4323-96d2-aef8846aa281
    executor: 088463c9-5f2e-4d1d-8195-56427168b86f
    slave: 241be3a2-39bc-417c-a967-82b4018a0762-S0 (master)

Nodes are up & running now.

Note: starting may take some time. You can view the progress via Mesos UI.

3. Do some FS operations:

# hadoop fs -mkdir hdfs://master:5001/dir
# hadoop fs -ls hdfs://master:5001/
Found 1 items
drwxr-xr-x   - vagrant supergroup          0 2016-03-17 12:46 /dir

Note: namenode host and ipc port is used in fs url.

Using CLI

Project provides CLI with following structure:

# ./ help
Usage: <cmd> ...

  help [cmd [cmd]] - print general or command-specific help
  scheduler        - start scheduler
  node             - node management

Help is provided for each command and sub-command:

# ./ help node
Node management commands
Usage: node <cmd>

  list       - list nodes
  add        - add node
  update     - update node
  start      - start node
  stop       - stop node
  remove     - remove node

Run `help node <cmd>` to see details of specific command

# ./ help node add
Add node
Usage: node add <ids> [options]

Option (* = required)  Description
---------------------  -----------
--core-site-opts       Hadoop core-site.xml options.
--cpus <Double>        CPU amount (0.5, 1, 2).
--executor-jvm-opts    Executor JVM options.
--hadoop-jvm-opts      Hadoop JVM options.
--hdfs-site-opts       Hadoop hdfs-site.xml options.
--mem <Long>           Mem amount in Mb.
* --type               node type (name_node, data_node).

Generic Options
Option  Description
------  -----------
--api   REST api url (same as --api option for

All node-related commands support bulk operations using node-id-expressions. Examples:

# ./ node add dn0..1 --type=datanode
nodes added:
  id: dn0
  type: datanode

  id: dn1
  type: datanode

# ./ node update dn* --cpus=1
nodes updated:
  id: dn0
  resources: cpus:1.0, mem:512

  id: dn1
  resources: cpus:1.0, mem:512

# ./ node start dn0,dn1
nodes started:
  id: dn0

  id: dn0

Id expression examples:

  • nn – matches node with id nn
  • * – matches any node (should be slash-escaped in shell)
  • dn* – matches node with id starting with dn
  • dn0..2 – matches nodes dn0, dn1, dn2

Using REST

Scheduler uses embedded HTTP server. Server serves two functions:

  • distributing binaries of Hadoop, JRE and executor;
  • serving REST API, invoked by CLI;

Most CLI commands map to REST API call. Examples:

CLI command REST call
node add nn --type=namenode --cpus=2 /api/node/add?node=nn&type=namenode&cpus=2
node start dn* --timeout=3m- /api/node/start?node=dn*&timeout=3m
node remove dn5 /api/node/remove?node=dn5

REST calls accepts plain HTTP params and return JSON responses. Examples:

# curl http://$scheduler:7000/api/node/list
        "id": "nn",
        "type": "namenode",
        "id": "dn0",
        "type": "datanode",

# curl http://$scheduler:7000/api/node/start?node=nn,dn0
    "status": "started",
    "nodes": [
            "id": "nn",
            "state": "running",
            "id": "dn0",
            "state": "running",

CLI params maps one-to-one to REST params. CLI params use dashed style while REST params use camel-case. Example of mappings:

CLI param REST param
<id> (node add|update|…) node
timeout (node start|stop) timeout
core-site-opts (node add|update) coreSiteOpts
executor-jvm-opts (node add|update) executorJvmOpts

REST API call could return error in some cases. Errors are marked with status code other than 200. Error response is returned in JSON format.


# curl -v
HTTP/1.1 400 node not found
{"error":"node not found","code":400}

For more detail on REST API please refer to sources.

Categories: Hadoop

Hadoop isn’t dead but you might be doing it wrong!

March 18, 2016 Leave a comment

I haven’t blogged (or podcasted for that matter) in a while. There are lots of different reasons for that and I am always happy to chat and grab tea if folks are interested but after attending this year’s HIMSS conference I just couldn’t hold it in anymore.

I went to HIMSS so excited it was supposed to be the year of Big Data! Everything was about transformation and interoperability and OMGZ the excitement.

The first keynote Monday evening was OFF THE HOOK The rest of the time myself and two of my colleagues where at the expo. It is basically CES for Healthcare (if you don’t know what CES is then think DEFCON for Healthcare… or something). Its big.

But where was the Big Data?

Not really anywhere … There were 3 recognizable “big data companies” and one of them was in the booth as a partner for cloud services. It was weird. What happened?

One of the engineers from Cerner has a lightening talk at the Kafka Summit, go Cerner!!

Didn’t everyone get the memo? We need to help reduce costs of patient care!

Here are two ways to help reduce costs of patient care!

  1. (Paraphrasing Michael Dell from his keynote) Innovation funding for Healthcare IT will come from optimizing your data center resources.
  2. (This one is from me but inspired by Bruce Schneier) Through Open Source we can enable better systems by sharing in the R&D costs and also make them more secure.

Totally agree with #1, have seen it first hand people saving 82% of their data center bill. Not even using spot (or as they say “preemptive“) instances yet. Amazing!

As for #2, you have to realize that different people are good at different things. One person can write anything but sometimes 2 or 3 or 45 of them can write it better…. at least make sure the tests always keep passing and evolving properly, etc, etc, etc, stewardship, etc.

Besides all of that, the conference was great. There were a lot of companies and people I recognized and bumped into and it was great to catch up.

I was also really REALLY excited to see how far physician signatures and form signing has (finally) come in healthcare removing all that paper. Fax is almost dead but there are still a couple of companies kicking.

One last thing, the cyber security part of the expo was also disappointing. I know it was during the RSA Conference but Healthcare needs good solutions too. For that there were a good set of solutions not bad in some cases legit and known (thanks for showing up!) but the “pavilion” was downstairs in the back left corner. Maybe if HIMSS coincided with Strata it would have been different, hard to say.

There was one tweet about it (at least) not sure if there were more.

So, Big Data, Healthcare, Security, OH MY! I am in!

I will be talking more about problems and solutions with using the Open Source Interoperable XML based FHIR standard in Healthcare removing the need to integrate and make interoperable HL7 systems in New York City on 03/29/2016 and getting into realtime stream processing on Mesos.

I will also be conducting a training on SMACK Stack 1.0 (Streaming Mesos Analytics Cassandra Kafka) using telephone systems and API to start stream events and interactions with different systems because of them. Yes, I bought phones and yes you get to keep yours.

What has attracted me (for almost 2 years now) to running on Mesos Hadoop systems and eco-system components is the ease it brings for the developers, systems engineers, data scientists, analysts and the users of the software systems that run (as a service often) those components. There are lots of things to research and read in those cases I would

1) scour my blog

2) read this

3) and this

4) your own thing

Hadoop! Mesos!


~ Joestein

p.s. if you have something good to say about Hadoop and want to talk about it and it is gripping and good and gets back to the history and continued efforts. Let me know. Thanks!



Open Source Cloud Formation with Minotaur for Mesos, Kafka and Hadoop

December 17, 2014 Leave a comment

Today I am happy to announce “Minotaur” which is our Open Source AWS based infrastructure for managing big data open source projects including (but not limited too): Apache Kafka, Apache Mesos and Cloudera’s Distribution of Hadoop. Minotaur is based on AWS Cloud Formation.

The following labs are currently supported:


Supervisor is a Docker-based image that contains all the necessary software to manage nodes/resources in AWS.

Supervisor set-up

  • clone this repo to repo_dir
  • cd to repo_dir/supervisor folder

Before trying to build docker image, you must put some configuration files under config directory:

a) aws_config

This file is just a regular aws-cli config, you must paste your secret and access keys, provided by Amazon in it:

output = json
region = us-east-1
aws_access_key_id = SECRET_KEY
aws_secret_access_key = ACCESS_KEY

Do not add or remove any extra whitespaces (especially before and after “=” sign in keys)

b) private.key

This is your private SSH key, public part of which is registered on Bastion host

c) environment.key

This is a shared key for all nodes in environment Supervisor is supposed to manage.

d) ssh_config

This is a regular SSH config file, you have to change your_username only (this is the one registered on Bastion).

BASTION_IP is handled dynamically when container is built.

# BDOSS environment
Host 10.0.2.*
    IdentityFile ~/.ssh/environment.key 
    User ubuntu
    ProxyCommand  ssh -i ~/.ssh/private.key your_username@BASTION_IP nc %h %p
Host 10.0.*.*
    IdentityFile ~/.ssh/environment.key 
    User ubuntu
    ProxyCommand  ssh -i ~/.ssh/private.key your_username@BASTION_IP nc %h %p
  • exec

If this is the first time you’re launching supervisor – it will take some time to build.

Subsequent up’s will take seconds.

Using supervisor

Now you can cd to /deploy/labs/ and deploy whatever you want


minotaur lab deploy mesosmaster -e bdoss-dev -d test -r us-east-1 -z us-east-1a
Creating new stack 'mesos-master-test-bdoss-dev-us-east-1-us-east-1a'...
Stack deployed.

this will spin up a mesos master node in “testing” deployment.


Supervisor has a built-in “awsinfo” command, which relies on AWS API and provides brief info about running machines. It is also capable of searching through that info.

Usage example

awsinfo – will display brief info about all nodes running in AWS

root@supervisor:/deploy# awsinfo
Cloud:  bdoss/us-east-1
Name                                Instance ID  Instance Type  Instance State  Private IP      Public IP      
----                                -----------  -------------  --------------  ----------      ---------      
nat.bdoss-dev                       i-c46a0b2a   m1.small       running  
bastion.bdoss-dev                   i-3faa69de   m1.small       running      None           
mesos-master.test.bdoss-dev         i-e80ddc09   m1.small       terminated      None            None           
mesos-slave.test.bdoss-dev          i-e00ddc01   m1.small       terminated      None            None           

awsinfo mesos-master – will display info about all mesos-master nodes running in AWS.

root@supervisor:/deploy/labs# awsinfo mesos-master
Cloud:  bdoss/us-east-1
Name                                Instance ID  Instance Type  Instance State  Private IP      Public IP      
----                                -----------  -------------  --------------  ----------      ---------      
mesos-master.test.bdoss-dev         i-e80ddc09   m1.small       terminated      None            None           

awsinfo 10.0.2 – match a private/public subnet

root@supervisor:/deploy/labs# awsinfo 10.0.2
Cloud:  bdoss/us-east-1
Name                                Instance ID  Instance Type  Instance State  Private IP      Public IP      
----                                -----------  -------------  --------------  ----------      ---------      
nat.bdoss-dev                       i-c46a0b2a   m1.small       running  
mesos-master.test.bdoss-dev         i-e96ebd08   m1.small       running 


If you can’t use Docker directly for some reason, there’s a Vagrant wrapper VM for it.

Before doing anything with Vagrant, complete the above steps for Docker, but don’t execute script

Just cd into vagrant directory and exec vagrant up, then vagrant ssh (nothing special here yet).

When you will exec vagrant ssh, docker container build process will spawn up immediately, so wait a bit and let it complete.

Now you’re inside a Docker container nested in Vagrant VM and can proceed with deployment in the same manner as it’s described for docker.

All the following vagrant ssh‘s will spawn Docker container almost immediately.

Once you are inside of the supervisor image, the script may be used to provision an environment and labs. The rest of this readme assumes that the script is executed from within the supervisor container.

Minotaur Commands

List Infrastructure Components

root@supervisor:/deploy# ./ infrastructure list
Available deployments are: ['bastion', 'iampolicies', 'iamusertogroupadditions', 'nat', 'sns', 'subnet', 'vpc']

Print Infrastructure Component Usage

root@supervisor:/deploy# ./ infrastructure deploy bastion -h
usage: infrastructure deploy bastion [-h] -e ENVIRONMENT -r REGION
                                                 -z AVAILABILITY_ZONE
                                                 [-i INSTANCE_TYPE]

optional arguments:
  -h, --help            show this help message and exit
                        CloudFormation environment to deploy to
  -r REGION, --region REGION
                        Geographic area to deploy to
                        Isolated location to deploy to
  -i INSTANCE_TYPE, --instance-type INSTANCE_TYPE
                        AWS EC2 instance type to deploy

Deploy Infrastructure Component

In this example, the bdoss-dev bastion already existed, so the CloudFormation stack was updated with the current template.

root@supervisor:/deploy# ./ infrastructure deploy bastion -e bdoss-dev -r us-east-1 -z -us-east-1a
Template successfully validated.
Updating existing 'bastion-bdoss-dev-us-east-1-us-east-1a' stack...
Stack updated.

List Labs

List all supported labs.

root@supervisor:/deploy# ./ lab list
Available deployments are: ['clouderahadoop', 'gokafkaconsumer', 'gokafkaproducer', 'kafka', 'mesosmaster', 'mesosslave', 'zookeeper']

Print Lab Usage

Print the kafka lab usage.

root@supervisor:/deploy# ./ lab deploy kafka -h
usage: lab deploy kafka [-h] -e ENVIRONMENT -d DEPLOYMENT -r
                                    REGION -z AVAILABILITY_ZONE [-n NUM_NODES]
                                    [-i INSTANCE_TYPE] [-v ZK_VERSION]
                                    [-k KAFKA_URL]

optional arguments:
  -h, --help            show this help message and exit
                        CloudFormation environment to deploy to
  -d DEPLOYMENT, --deployment DEPLOYMENT
                        Unique name for the deployment
  -r REGION, --region REGION
                        Geographic area to deploy to
                        Isolated location to deploy to
  -n NUM_NODES, --num-nodes NUM_NODES
                        Number of instances to deploy
  -i INSTANCE_TYPE, --instance-type INSTANCE_TYPE
                        AWS EC2 instance type to deploy
  -v ZK_VERSION, --zk-version ZK_VERSION
                        The Zookeeper version to deploy
  -k KAFKA_URL, --kafka-url KAFKA_URL
                        The Kafka URL

Deploy Lab

Deploy a 3-broker Kafka cluster.

root@supervisor:/deploy# ./ lab deploy kafka -e bdoss-dev -d kafka-example -r us-east-1 -z us-east-1a -n 3 -i m1.small    
Template successfully validated.
Creating new 'kafka-bdoss-dev-kafka-example-us-east-1-us-east-1a' stack...
Stack deployed.

 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 Twitter: @allthingshadoop


Ideas and goals behind the Go Kafka Client

December 10, 2014 Leave a comment

I think a bunch of folks have heard already that B.D.O.S.S. was working on a new Apache Kafka Client For Go. Go Kafka Client was open sourced last Friday. Today we are starting the release of Minotaur which is our lab environment for Apache Zookeeper, Apache Mesos, Apache CassandraApache Kafka, Apache Hadoop and our new Go Kafka Client.

To get started using the consumer client check out our example code and its property file.

Ideas and goals behind the Go Kafka Client:


1) Partition Ownership

We decided on implementing multiple strategies for this including static assignment. The concept of re-balancing is preserved but now there are a few different strategies to re-balancing and they can run at different times depending on what is going on (like a blue/green deploy is happening). For more on blue/green deployments check out this video with Jim Plush and Sean Berry from 2014 AWS re:Invent.

2) Fetch Management

This is what “fills up the reservoir” as I like to call it so the processing (either sequential or in batch) will always have data if there is data for it to have without making a network hop. The fetcher has to stay ahead here keeping the processing tap full (or if empty that is controlled) pulling the data for the Kafka partition(s) it is owning.

3) Work Management

For the Go consumer we currently only support “fan out” using go routines and channels. If you have ever used go this will be familiar to you if not you should drop everything and learn Go.

4) Offset Management

Our offset management is based on a per batch basis with each highest offset from the batch committed on a per partition basis.
func main() {
	config, consumerIdPattern, topic, numConsumers, graphiteConnect, graphiteFlushInterval := resolveConfig()
	startMetrics(graphiteConnect, graphiteFlushInterval)

	ctrlc := make(chan os.Signal, 1)
	signal.Notify(ctrlc, os.Interrupt)

	consumers := make([]*kafkaClient.Consumer, numConsumers)
	for i := 0; i < numConsumers; i++ {
		consumers[i] = startNewConsumer(*config, topic, consumerIdPattern, i)
		time.Sleep(10 * time.Second)

	fmt.Println("Shutdown triggered, closing all alive consumers")
	for _, consumer := range consumers {
	fmt.Println("Successfully shut down all consumers")

func startMetrics(graphiteConnect string, graphiteFlushInterval time.Duration) {
	addr, err := net.ResolveTCPAddr("tcp", graphiteConnect)
	if err != nil {
	go metrics.GraphiteWithConfig(metrics.GraphiteConfig{
		Addr:          addr,
		Registry:      metrics.DefaultRegistry,
		FlushInterval: graphiteFlushInterval,
		DurationUnit:  time.Second,
		Prefix:        "metrics",
		Percentiles:   []float64{0.5, 0.75, 0.95, 0.99, 0.999},

func startNewConsumer(config kafkaClient.ConsumerConfig, topic string, consumerIdPattern string, consumerIndex int) *kafkaClient.Consumer {
	config.Consumerid = fmt.Sprintf(consumerIdPattern, consumerIndex)
	config.Strategy = GetStrategy(config.Consumerid)
	config.WorkerFailureCallback = FailedCallback
	config.WorkerFailedAttemptCallback = FailedAttemptCallback
	consumer := kafkaClient.NewConsumer(&config)
	topics := map[string]int {topic : config.NumConsumerFetchers}
	go func() {
	return consumer

func GetStrategy(consumerId string) func(*kafkaClient.Worker, *kafkaClient.Message, kafkaClient.TaskId) kafkaClient.WorkerResult {
	consumeRate := metrics.NewRegisteredMeter(fmt.Sprintf("%s-ConsumeRate", consumerId), metrics.DefaultRegistry)
	return func(_ *kafkaClient.Worker, msg *kafkaClient.Message, id kafkaClient.TaskId) kafkaClient.WorkerResult {
		kafkaClient.Tracef("main", "Got a message: %s", string(msg.Value))

		return kafkaClient.NewSuccessfulResult(id)

func FailedCallback(wm *kafkaClient.WorkerManager) kafkaClient.FailedDecision {
	kafkaClient.Info("main", "Failed callback")

	return kafkaClient.DoNotCommitOffsetAndStop

func FailedAttemptCallback(task *kafkaClient.Task, result kafkaClient.WorkerResult) kafkaClient.FailedDecision {
	kafkaClient.Info("main", "Failed attempt")

	return kafkaClient.CommitOffsetAndContinue
We set our mailing list to be this is the central place for client library discussions for the Apache Kafka community.

Plans moving forward with the Go Kafka Client:

1) Build up suite of integration tests.
2) Stabilize the API nomenclature with the JVM Kafka Client for consistency.
3) Run integration tests and create baseline regression points.
4) Re-work changes from feedback of issues found and from our own customer support.
5) Push to production and continue on going support.

Ideas and goals behind Minotaur:

Provide a controlled and isolated environment through scripts for regression of versions and benchmarks against instance sizing. This comes in 3 parts: 1) Supervisor 2) Infrastructure 3) Labs. The Labs portion is broker down with both chef cook books and python wrappers for cloud formation templates for each of the following systems:

Plans moving forward with Minotaur:

1) Upstream our Cloud Formation Scripts (this is going to take some more work detangling Dexter (the name for our private repo of Minotaur).
2) Move the work we do with Apache Cassandra into the project.
3) Start support of Puppet in addition to the work we did with Chef.
4) Get the Docker supervisor console pushed upstream through Bastion hosts.
5) Cut our labs over to using Minotaur.
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 Twitter: @allthingshadoop

Apache Solr real-time live index updates at scale with Apache Hadoop

Episode # 22 of the podcast was a talk with Patrick Hunt

We talked about the new work that has gone into Apache Solr (upstream) that allows it to work on Apache Hadoop. Solr has support for writing and reading its index and transaction log files to the HDFS distributed filesystem. This does not use Hadoop Map-Reduce to process Solr data, rather it only uses the HDFS filesystem for index and transaction log file storage.

We also talked about Solr Cloud and how the sharding features allow Solr to scale with a Hadoop cluster

Apache Solr includes the ability to set up a cluster of Solr servers that combines fault tolerance and high availability. Called SolrCloud, these capabilities provide distributed indexing and search capabilities, supporting the following features:

  • Central configuration for the entire cluster
  • Automatic load balancing and fail-over for queries
  • ZooKeeper integration for cluster coordination and configuration.

SolrCloud is flexible distributed search and indexing, without a master node to allocate nodes, shards and replicas. Instead, Solr uses ZooKeeper to manage these locations, depending on configuration files and schemas. Documents can be sent to any server and ZooKeeper will figure it out.

Patrick introduced me to Morphlines (part of the Cloudera Development Kit for Hadoop)

Cloudera Morphlines is an open source framework that reduces the time and skills necessary to build and change Hadoop ETL stream processing applications that extract, transform and load data into Apache Solr, HBase, HDFS, Enterprise Data Warehouses, or Analytic Online Dashboards. Want to build or facilitate ETL jobs without programming and without substantial MapReduce skills? Get the job done with a minimum amount of fuss and support costs? Here is how to get started.

A morphline is a rich configuration file that makes it easy to define a transformation chain that consumes any kind of data from any kind of data source, processes the data and loads the results into a Hadoop component. It replaces Java programming with simple configuration steps, and correspondingly reduces the cost and integration effort associated with developing and maintaining custom ETL projects.

Morphlines is a library, embeddable in any Java codebase. A morphline is an in-memory container of transformation commands. Commands are plugins to a morphline that perform tasks such as loading, parsing, transforming, or otherwise processing a single record. A record is an in-memory data structure of name-value pairs with optional blob attachments or POJO attachments. The framework is extensible and integrates existing functionality and third party systems in a straightforward manner.

The morphline commands were developed as part of Cloudera Search. Morphlines power ETL data flows from Flume and MapReduce and HBase into Apache Solr. Flume covers the real time case, whereas MapReduce covers the batch processing case. Since the launch of Cloudera Search morphline development graduated into the Cloudera Development Kit(CDK) in order to make the technology accessible to a wider range of users and products, beyond Search. The CDK is a set of libraries, tools, examples, and documentation focused on making it easier to build systems on top of the Hadoop ecosystem. The CDK is hosted on GitHub and encourages involvement by the community. For example, morphlines could be embedded into Crunch, HBase, Impala, Pig, Hive, or Sqoop. Let us know where you want to take it!

Morphlines can be seen as an evolution of Unix pipelines where the data model is generalized to work with streams of generic records, including arbitrary binary payloads. A morphline is an efficient way to consume records (e.g. Flume events, HDFS files, RDBMS tables or Avro objects), turn them into a stream of records, and pipe the stream of records through a set of easily configurable transformations on the way to a target application such as Solr, for example as outlined in the following figure:

In this figure, a Flume Source receives syslog events and sends them to a Flume Morphline Sink, which converts each Flume event to a record and pipes it into a readLine command. The readLine command extracts the log line and pipes it into a grok command. The grok command uses regular expression pattern matching to extract some substrings of the line. It pipes the resulting structured record into the loadSolr command. Finally, the loadSolr command loads the record into Solr, typically a SolrCloud. In the process, raw data or semi-structured data is transformed into structured data according to application modelling requirements.

The Morphline framework ships with a set of frequently used high level transformation and I/O commands that can be combined in application specific ways. The plugin system allows the adding of new transformations and I/O commands and integrates existing functionality and third party systems in a straightforward manner.

This integration enables rapid Hadoop ETL application prototyping, complex stream and event processing in real time, flexible log file analysis, integration of multiple heterogeneous input schemas and file formats, as well as reuse of ETL logic building blocks across Hadoop ETL applications.

The CDK ships an efficient runtime that compiles a morphline on the fly. The runtime executes all commands of a given morphline in the same thread. Piping a record from one command to another implies just a cheap Java method call. In particular, there are no queues, no handoffs among threads, no context switches and no serialization between commands, which minimizes performance overheads.

Morphlines manipulate continuous or arbitrarily large streams of records. A command transforms a record into zero or more records. The data model can be described as follows: A record is a set of named fields where each field has an ordered list of one or more values. A value can be any Java Object. That is, a record is essentially a hash table where each hash table entry contains a String key and a list of Java Objects as values. Note that a field can have multiple values and any two records need not use common field names. This flexible data model corresponds exactly to the characteristics of the Solr/Lucene data model.

Not only structured data, but also binary data can be passed into and processed by a morphline. By convention, a record can contain an optional field named _attachment_body, which can be a Java or Java byte[]. Optionally, such binary input data can be characterized in more detail by setting the fields named _attachment_mimetype (such as “application/pdf”) and _attachment_charset (such as “UTF-8”) and _attachment_name (such as “cars.pdf”), which assists in detecting and parsing the data type. This is similar to the way email works.

This generic data model is useful to support a wide range of applications. For example, the Apache Flume Morphline Solr Sink embeds the morphline library and executes a morphline to convert flume events into morphline records and load them into Solr. This sink fills the body of the Flume event into the _attachment_body field of the morphline record, as well as copies the headers of the Flume event into record fields of the same name. As another example, the Mappers of the MapReduceIndexerTool fill the Java referring to the currently processed HDFS file into the _attachment_body field of the morphline record. The Mappers of the MapReduceIndexerTool also fill metadata about the HDFS file into record fields, such as the file’s name, path, size, last modified time, etc. This way a morphline can act on all data received from Flume and HDFS. As yet another example, the Morphline Lily HBase Indexer fills a HBase Result Java POJO into the _attachment_body field of the morphline record. This way morphline commands such as extractHBaseCells can extract data from HBase updates and correspondingly update a Solr index.

We also talked a good deal about Apache Zookeeper and some of the history back from when Zookeeper was originally at Yahoo! and Patrick’s experience since then. To hear everything that Patrick had to say please subscribe to the podcast.


 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 Twitter: @allthingshadoop


Beyond MapReduce and Apache Hadoop 2.X with Bikas Saha and Arun Murthy

April 2, 2014 Leave a comment

Episode 20 of the podcast was with Bikas Saha and Arun Murthy.

When I spoke with Arun a year or so a go YARN was NextGen Hadoop and there have been a lot of updates, work done and production experience since.

Besides Yahoo! other multi thousand node clusters have been and are running in production with YARN. These clusters have shown 2x capacity throughput which resulted in reduced cost for hardware (and in some cases being able to shut down co-los) while still gaining performance improvements overall to previous clusters of Hadoop 1.X.

I got to hear about some of what is in 2.4 and coming in 2.5 of Hadoop:

  • Application timeline server repository and api for application specific metrics (Tez, Spark, Whatever).
    • web service API to put and get with some aggregation.
    • plugable nosql store (hbase, accumulo) to scale it.
  • Preemption capacity scheduler.
  • Multiple resource support (CPU, RAM and Disk).
  • Labels tag nodes with labels can be labeled however so some windows and some linux and ask for resources with only those labels with ACLS.
  • Hypervisor support as a key part of the topology.
  • Hoya generalize for YARN (game changer) and now proposed as Slider to the Apache incubator.

We talked about Tez which provides complex DAGs of queries to translate what you want to-do on Hadoop without the work arounds for making it have to run in MapReduce.  MapReduce was not designed to be re-workable out side of the parts of the Job it gave you for Map, Split, Shuffle, Combine, Reduce, Etc and Tez is more expressible exposing a DAG API.


Now becomes with Tez:



There were also some updates on Hive v13 coming out with sub queries, low latency queries (through Tez), high precision decimal points and more!

Subscribe to the podcast and listen to all of what Bikas and Arun had to say.

 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 Twitter: @allthingshadoop



Impala and SQL on Hadoop

February 22, 2014 Leave a comment

The origins of Impala can be found in F1 – The Fault-Tolerant Distributed RDBMS Supporting Google’s Ad Business.

One of many differences between MapReduce and Impala is in Impala the intermediate data moves from process to process directly instead of storing it on HDFS for processes to get at the data needed for processing.  This provides a HUGE performance advantage and doing so while consuming few cluster resources.   Less hardware to-do more!


There are many advantages to this approach over alternative approaches for querying Hadoop data, including::

  • Thanks to local processing on data nodes, network bottlenecks are avoided.
  • A single, open, and unified metadata store can be utilized.
  • Costly data format conversion is unnecessary and thus no overhead is incurred.
  • All data is immediately query-able, with no delays for ETL.
  • All hardware is utilized for Impala queries as well as for MapReduce.
  • Only a single machine pool is needed to scale.

We encourage you to read the documentation for further exploration!

There are still transformation steps required to optimize the queries but Impala can help to-do this for you with Parquet file format.  Better compression and optimized runtime performance is realized using the ParquetFormat though many other file types are supported.

This and a whole lot more was discussed with Marcel Kornacker the Cloudera Architect behind Impala on Episode 18 of the All Things Hadoop Podcast.

 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 Twitter: @allthingshadoop

Using Apache Drill for Large Scale, Interactive, Real-Time Analytic Queries

October 29, 2013 Leave a comment

Episode #17 of the podcast is a talk with Jacques Nadeau  available also on iTunes

Apache Drill, a modern interactive query engine that runs on top of Hadoop.

Jacques talked about how Apache Drill is a modern query engine that is meant to be a query layer on top of all big data open source systems. Apache Drill is being designed to make the storage engine as plug-able so it could be the interface for any big data storage engine. The first release came out recently to allow developers to understand the data pipeline.

Leveraging an efficient columnar storage format, an optimistic execution engine and a cache-conscious memory layout, Apache Drill is blazing fast. Coordination, query planning, optimization, scheduling, and execution are all distributed throughout nodes in a system to maximize parallelization.


Perform interactive analysis on all of your data, including nested and schema-less. Drill supports querying against many different schema-less data sources including HBase, Cassandra and MongoDB. Naturally flat records are included as a special case of nested data.


Strongly defined tiers and APIs for straightforward integration with a wide array of technologies.


Subscribe to the podcast and listen to what Jacques had to say.  Available also on iTunes

Joe Stein
Founder, Principal Consultant
Big Data Open Source Security LLC
Twitter: @allthingshadoop

Big Data, Open Source and Analytics

August 26, 2013 Leave a comment

Episode #15 of the podcast is a talk with Stefan Groschupf  available also on iTunes

Stefan is the CEO of Datameer and talked about how the company started and where it is now. Founded in 2009 by some of the original contributors to Apache Hadoop, Datameer has grown to a global team, advancing big data analytics. After several implementations of Hadoop analytics solutions at Global 500 companies, the founders were determined to build the next generation analytics application to solve the new use cases created by the explosion of structured and unstructured data. Datameer is the single application for big data analytics by combining data integration, data transformation and data visualization. Customers love us and we work to make Datameer even better each day.

Datameer provides the most complete solution to analyze structured and unstructured data. Not limited by a pre-built schema, the point and click functions means your analytics are only limited by your imagination. Even the most complex nested joins of a large number of datasets can be performed using an interactive dialog. Mix and match analytics and data transformations in unlimited number of data processing pipelines. Leave the raw data untouched.

Datameer turbocharges time series analytics by correlating multiple sets of complex, disparate data. Resulting analytics are endless including correlation of credit card transactions with card holder authorizations, network traffic data, marketing interaction data and many more. The end game is a clear window into the operations of your business, giving you the actionable insights you need to make business decisions.

some alt text

Data is the raw materials of insight and the more data you have, the deeper and broader the possible insights. Not just traditional, transaction data but all types of data so that you can get a complete view of your customers, better understand business processes and improve business performance.

Datameer ignores the limitations of ETL and static schemas to empower business users to integrate data from any source into Hadoop. Pre-built data connector wizards for all common structured and unstructured data sources means that data integration is an easy, three step process of where, what and when.

App Market Infographics

Now you never have to waste precious time by starting from scratch. Anyone can simply browse the Analytics App Market, download an app, connect to data, and get instant results. But why stop there? Every application is completely open so you can customize it, extend it, or even mash it up with other applications to get the insights you need.

Built by data scientists, analysts, or subject matter experts, analytic apps range from horizontal use cases like email and social sentiment analysis to vertical or even product-specific applications like advanced sales-cycle analysis.

Check out the Datameer app market.

Subscribe to the podcast and listen to what Stefan had to say.  Available also on iTunes

Joe Stein
Founder, Principal Consultant
Big Data Open Source Security LLC
Twitter: @allthingshadoop

Categories: Hadoop, Podcast

SQL Compatibility in Hadoop with Hive

August 15, 2013 Leave a comment

Episode #14 of the podcast is a talk with Alan Gates available also on iTunes

The Stinger initiative is a collection of development threads in the Hive community that will deliver 100X performance improvements as well as SQL compatibility.

Fast Interactive Query
An immediate aim of 100x performance increase for Hive is more ambitious than any other effort.
SQL Compatibility
Based on industry standard SQL, the Stinger Initiative improves HiveQL to deliver SQL compatibility.

Apache Hive is the de facto standard for SQL-in-Hadoop today with more enterprises relying on this open source project than any alternative. As Hadoop gains in popularity, enterprise requirements for Hive to become more real time or interactive have evolved… and the Hive community has responded.

He spoke in detail about the Stinger initiative, who is contributing to it, why they decided to improve upon Hive and not create a new system and more.

He talked about how Microsoft is contributing in the open source community to improve upon Hive.

Hadoop is so much more than just SQL, one of the wonderful things about Big Data is the power it brings for users to bring different processing models such as realtime streaming with Storm, Graph processing with Giraph and ETL with Pig and all different things to-do beyond just this SQL compatibility.

Alan also talked about YARN and Tez and the benefits of the Stinger initiative to other Hadoop ecosystem tools too.

Subscribe to the podcast and listen to what Alan had to say.  Available also on iTunes

Joe Stein
Founder, Principal Consultant
Big Data Open Source Security LLC
Twitter: @allthingshadoop

Categories: Hadoop, Hive, Podcast

Apache BigTop and how packaging infrastructure binds the Hadoop ecosystem together

August 12, 2013 Leave a comment

Episode #12 of the podcast is a talk with Mark Grover and Roman Shaposhnik  Available also on iTunes

Apache Bigtop is a project for the development of packaging and tests of the Apache Hadoop ecosystem.

The primary goal of Bigtop is to build a community around the packaging and interoperability testing of Hadoop-related projects. This includes testing at various levels (packaging, platform, runtime, upgrade, etc…) developed by a community with a focus on the system as a whole, rather than individual projects.

BigTop makes it easier to deploy Hadoop Ecosystem projects including:

  • Apache Zookeeper

  • Apache Flume

  • Apache HBase

  • Apache Pig

  • Apache Hive

  • Apache Sqoop

  • Apache Oozie

  • Apache Whirr

  • Apache Mahout

  • Apache Solr (SolrCloud)

  • Apache Crunch (incubating)

  • Apache HCatalog

  • Apache Giraph

  • LinkedIn DataFu

  • Cloudera Hue

The list of supported Linux platforms has expanded to include:

  • CentOS/RHEL 5 and 6

  • Fedora 17 and 18

  • SuSE Linux Enterprise 11

  • OpenSUSE 12.2

  • Ubuntu LTS Lucid (10.04) and Precise (12.04)

  • Ubuntu Quantal (12.10)

Subscribe to the podcast and listen to what Mark and Roman had to say.  Available also on iTunes

Joe Stein
Founder, Principal Consultant
Big Data Open Source Security LLC
Twitter: @allthingshadoop

Hadoop as a Service cloud platform with the Mortar Framework and Pig

August 9, 2013 Leave a comment

Episode #11 of the podcast is a talk with K Young.  Available also on iTunes

Mortar is the fastest and easiest way to work with Pig and Python on Hadoop in the Cloud.

Mortar’s platform is for everything from joining and cleansing large data sets to machine learning and building recommender systems.

Mortar makes it easy for developers and data scientists to do powerful work with Hadoop. The main advantages of Mortar are:

  • Zero Setup Time: Mortar takes only minutes to set up (or no time at all on the web), and you can start running Pig jobs immediately. No need for painful installation or configuration.
  • Powerful Tooling: Mortar provides a rich suite of tools to aid in Pig development, including the ability to Illustrate a script before running it, and an extremely fast and free local development mode.
  • Elastic Clusters: We spin up Hadoop clusters as you need them, so you don’t have to predict your needs in advance, and you don’t pay for machines you don’t use.
  • Solid Support: Whether the issue is in your script or in Hadoop, we’ll help you figure out a solution.

We talked about the Open Source Mortar Framework and their new Open Source tool for visualizing data while writing Pig scripts called Watchtower

The Mortar Blog has a great video demo on Watchtower.

There are no two ways around it, Hadoop development iterations are slow. Traditional programmers have always had the benefit of re-compiling their app, running it, and seeing the results within seconds. They have near instant validation that what they’re building is actually working. When you’re working with Hadoop, dealing with gigabytes of data, your development iteration time is more like hours.

Subscribe to the podcast and listen to what K Young had to say.  Available also on iTunes

Joe Stein
Founder, Principal Consultant
Big Data Open Source Security LLC
Twitter: @allthingshadoop

Categories: Hadoop, Pig, Podcast

Hortonworks HDP1, Apache Hadoop 2.0, NextGen MapReduce (YARN), HDFS Federation and the future of Hadoop with Arun C. Murthy

July 23, 2012 2 comments

Episode #8 of the Podcast is a talk with Arun C. Murthy.

We talked about Hortonworks HDP1, the first release from Hortonworks, Apache Hadoop 2.0, NextGen MapReduce (YARN) and HDFS Federations

subscribe to the podcast and listen to all of what Arun had to share.

Some background to what we discussed:

Hortonworks Data Platform (HDP)

from their website:

Hortonworks Data Platform (HDP) is a 100% open source data management platform based on Apache Hadoop. It allows you to load, store, process and manage data in virtually any format and at any scale. As the foundation for the next generation enterprise data architecture, HDP includes all of the necessary components to begin uncovering business insights from the quickly growing streams of data flowing into and throughout your business.

Hortonworks Data Platform is ideal for organizations that want to combine the power and cost-effectiveness of Apache Hadoop with the advanced services required for enterprise deployments. It is also ideal for solution providers that wish to integrate or extend their solutions with an open and extensible Apache Hadoop-based platform.

Key Features
  • Integrated and Tested Package – HDP includes stable versions of all the critical Apache Hadoop components in an integrated and tested package.
  • Easy Installation – HDP includes an installation and provisioning tool with a modern, intuitive user interface.
  • Management and Monitoring Services – HDP includes intuitive dashboards for monitoring your clusters and creating alerts.
  • Data Integration Services – HDP includes Talend Open Studio for Big Data, the leading open source integration tool for easily connecting Hadoop to hundreds of data systems without having to write code.
  • Metadata Services – HDP includes Apache HCatalog, which simplifies data sharing between Hadoop applications and between Hadoop and other data systems.
  • High Availability – HDP has been extended to seamlessly integrate with proven high availability solutions.

Apache Hadoop 2.0

from their website:

Apache Hadoop 2.x consists of significant improvements over the previous stable release (hadoop-1.x).

Here is a short overview of the improvments to both HDFS and MapReduce.

  • HDFS FederationIn order to scale the name service horizontally, federation uses multiple independent Namenodes/Namespaces. The Namenodes are federated, that is, the Namenodes are independent and don’t require coordination with each other. The datanodes are used as common storage for blocks by all the Namenodes. Each datanode registers with all the Namenodes in the cluster. Datanodes send periodic heartbeats and block reports and handles commands from the Namenodes.More details are available in the HDFS Federation document.
  • MapReduce NextGen aka YARN aka MRv2The new architecture introduced in hadoop-0.23, divides the two major functions of the JobTracker: resource management and job life-cycle management into separate components.The new ResourceManager manages the global assignment of compute resources to applications and the per-application ApplicationMaster manages the application‚Äôs scheduling and coordination.An application is either a single job in the sense of classic MapReduce jobs or a DAG of such jobs.The ResourceManager and per-machine NodeManager daemon, which manages the user processes on that machine, form the computation fabric.The per-application ApplicationMaster is, in effect, a framework specific library and is tasked with negotiating resources from the ResourceManager and working with the NodeManager(s) to execute and monitor the tasks.More details are available in the YARN document.
Getting Started

The Hadoop documentation includes the information you need to get started using Hadoop. Begin with the Single Node Setup which shows you how to set up a single-node Hadoop installation. Then move on to the Cluster Setup to learn how to set up a multi-node Hadoop installation.

Apache Hadoop NextGen MapReduce (YARN)

from their website:

MapReduce has undergone a complete overhaul in hadoop-0.23 and we now have, what we call, MapReduce 2.0 (MRv2) or YARN.

The fundamental idea of MRv2 is to split up the two major functionalities of the JobTracker, resource management and job scheduling/monitoring, into separate daemons. The idea is to have a global ResourceManager (RM) and per-application ApplicationMaster (AM). An application is either a single job in the classical sense of Map-Reduce jobs or a DAG of jobs.

The ResourceManager and per-node slave, the NodeManager (NM), form the data-computation framework. The ResourceManager is the ultimate authority that arbitrates resources among all the applications in the system.

The per-application ApplicationMaster is, in effect, a framework specific library and is tasked with negotiating resources from the ResourceManager and working with the NodeManager(s) to execute and monitor the tasks.

MapReduce NextGen Architecture

The ResourceManager has two main components: Scheduler and ApplicationsManager.

The Scheduler is responsible for allocating resources to the various running applications subject to familiar constraints of capacities, queues etc. The Scheduler is pure scheduler in the sense that it performs no monitoring or tracking of status for the application. Also, it offers no guarantees about restarting failed tasks either due to application failure or hardware failures. The Scheduler performs its scheduling function based the resource requirements of the applications; it does so based on the abstract notion of a resource Container which incorporates elements such as memory, cpu, disk, network etc. In the first version, only memory is supported.

The Scheduler has a pluggable policy plug-in, which is responsible for partitioning the cluster resources among the various queues, applications etc. The current Map-Reduce schedulers such as the CapacityScheduler and the FairScheduler would be some examples of the plug-in.

The CapacityScheduler supports hierarchical queues to allow for more predictable sharing of cluster resources

The ApplicationsManager is responsible for accepting job-submissions, negotiating the first container for executing the application specific ApplicationMaster and provides the service for restarting the ApplicationMaster container on failure.

The NodeManager is the per-machine framework agent who is responsible for containers, monitoring their resource usage (cpu, memory, disk, network) and reporting the same to the ResourceManager/Scheduler.

The per-application ApplicationMaster has the responsibility of negotiating appropriate resource containers from the Scheduler, tracking their status and monitoring for progress.

MRV2 maintains API compatibility with previous stable release (hadoop-0.20.205). This means that all Map-Reduce jobs should still run unchanged on top of MRv2 with just a recompile.

HDFS Federation

from their website:


HDFS LayersHDFS has two main layers:

  • Namespace
    • Consists of directories, files and blocks
    • It supports all the namespace related file system operations such as create, delete, modify and list files and directories.
  • Block Storage Service has two parts
    • Block Management (which is done in Namenode)
      • Provides datanode cluster membership by handling registrations, and periodic heart beats.
      • Processes block reports and maintains location of blocks.
      • Supports block related operations such as create, delete, modify and get block location.
      • Manages replica placement and replication of a block for under replicated blocks and deletes blocks that are over replicated.
    • Storage – is provided by datanodes by storing blocks on the local file system and allows read/write access.

    The prior HDFS architecture allows only a single namespace for the entire cluster. A single Namenode manages this namespace. HDFS Federation addresses limitation of the prior architecture by adding support multiple Namenodes/namespaces to HDFS file system.

Multiple Namenodes/Namespaces

In order to scale the name service horizontally, federation uses multiple independent Namenodes/namespaces. The Namenodes are federated, that is, the Namenodes are independent and don’t require coordination with each other. The datanodes are used as common storage for blocks by all the Namenodes. Each datanode registers with all the Namenodes in the cluster. Datanodes send periodic heartbeats and block reports and handles commands from the Namenodes.

HDFS Federation ArchitectureBlock Pool

A Block Pool is a set of blocks that belong to a single namespace. Datanodes store blocks for all the block pools in the cluster. It is managed independently of other block pools. This allows a namespace to generate Block IDs for new blocks without the need for coordination with the other namespaces. The failure of a Namenode does not prevent the datanode from serving other Namenodes in the cluster.

A Namespace and its block pool together are called Namespace Volume. It is a self-contained unit of management. When a Namenode/namespace is deleted, the corresponding block pool at the datanodes is deleted. Each namespace volume is upgraded as a unit, during cluster upgrade.


A new identifier ClusterID is added to identify all the nodes in the cluster. When a Namenode is formatted, this identifier is provided or auto generated. This ID should be used for formatting the other Namenodes into the cluster.

Key Benefits

  • Namespace Scalability – HDFS cluster storage scales horizontally but the namespace does not. Large deployments or deployments using lot of small files benefit from scaling the namespace by adding more Namenodes to the cluster
  • Performance – File system operation throughput is limited by a single Namenode in the prior architecture. Adding more Namenodes to the cluster scales the file system read/write operations throughput.
  • Isolation – A single Namenode offers no isolation in multi user environment. An experimental application can overload the Namenode and slow down production critical applications. With multiple Namenodes, different categories of applications and users can be isolated to different namespaces.

subscribe to the podcast and listen to all of what Arun had to share.


Joe Stein

Hadoop distribution bake-off and my experience with Cloudera and MapR

July 10, 2012 5 comments

A few months back we started to endeavor on a new Hadoop cluster @ medialets

We have been live with Hadoop in production since April 2010 and we are still running CDH2. Our current hosting provider does not have a very ideal implementation for us where our 36 nodes are spread out across an entire data center and 5 networks each with 1 GB link. While there are issues with this type of setup we have been able to organically grow our cluster (started at 4 nodes) which powers 100% of our batch analytics for what is now hundreds of millions of mobile devices.

One of our mapreduce jobs processes 30+ billion objects (about 3 TB of uncompressed data) and takes about 90 minutes to run. This jobs runs all day long contiguously. Each run ingests the data that was received while the previous job was running. One of the primary goals of our new cluster was to reduce the time these type of jobs take without having to make any code changes or increase our investment in hardware. We figured besides the infrastructure changes we needed/wanted to make that running an old version of Hadoop meant that we were not taking advantage of all the awesome work that folks have been putting in over the last 2 years to do things like increasing performance.

So we endeavored to what seems to have been coined as “The Hadoop Distribution Bake-off”. We wanted to not only see how new versions of the Cloudera distribution would be running our jobs but also evaluate other distributions that have emerged since we first started with Hadoop. When we did this Hortonwork’s distribution was not released yet otherwise we would have added them and their distro to the possible choices.

First we found a new vendor to setup a test cluster for us It was a four node cluster each with 2GB (1G dual bonded) NIC, 12GB of RAM, 4 x 1TB drives (using 3 of the drives for data and one for the OS) and 2x Westmere 5645 2.4GHz Hex-Core CPU. While this was not going to be the exact configuration we were going to end up with it was what they had in inventory and for the purposes of this test it was all about keeping the same hardware running with the same job with the same data and only changing the distro and configurations. As part of our due diligence, performance was not the only point we were interested in but was the primary goal of the bake-off and testing. We also reviewed other aspects of the distributions and companies which ultimately led to our final decision to go with CDH4 for our new cluster.

First, we wanted to create a baseline to see how our data and job did with the existing distribution (CDH2) we run in production with our existing production configuration. Next we wanted to give MapR a shot. We engaged with their team and they spent their time and assistance to help configure and optimize for the job’s test run. Once that was done we wanted to give CDH3 and CDH4 (which was still beta at the time) and the Cloudera folks also lent their time and helped configure and optimize the cluster.

CDH2 = 12 hours 12 min (our production configuration)
MapR = 4 hours 31 min (configuration done by MapR team)
CDH3 = 6 hours 8 min (our production configuration)
CDH4 = 4 hours 20 min (configuration done by Cloudera team)

This told us that the decision between running CDH4 or MapR was not going to be made based on performance of the distribution with our data and mapreduce jobs.

So, we had to look at the other things that were important to us.

MapR has a couple of a really nice features that are unique to their platform. Their file system features with NFS and Snapshots, both are cool so lets go through them quickly. MapR’s underlying proprietary file system allows for these unique features in the Hadoop ecosystem. The NFS feature basically allows you to copy to an NFS share that is distributed across the entire cluster (with a VIP so highly available). This means that you can use the cluster for saving data from your applications and then without any additional copies map-reduce over it. Data is compressible under the hood though this did not mean much to us since we compress all of our data in sequence files using compress by block size on the sequence file. Snapshots (and mirroring to other clusters of those snapshots) is nifty. Being able to take a point in time instance cut of things makes the cluster feel and operate like our SAN. While snapshots are nifty the same end result is capable with a distcp which sure takes longer but is still technically feasible not a lot of other benefits for us or our business, nifty none the less. The main issue we had with all of this was that all of the features that were attractive required us to license their product. Their product also is not open source so we would not be able to build the code, make changes or anything else always having to rely on them for support and maintenance. We met a lot of great folks from MapR but only 2 of them were Apache committers (they may have more on staff, I only met two though) and this is important to us from a support & maintenance perspective… for them it probably is not a huge deal since their platform is not open source and proprietary ( I think I just repeated myself here but did so on purpose ).

Cloudera… tried, true and trusted (I have been running CDH2 for 2 years in production without ever having to upgrade) and know lots of folks that can say the same thing. Everything is Open Source with a very healthy and active community. A handful of times this has been very helpful in development cycles for me to see what the container I was running in was doing to help me resolve the problems I was finding in my own code… or even to simply shoot a question over the mailing list to get a response to a question. As far as the distribution goes, it costs nothing to get it running and have it run in production with all of the features we wanted. If we ever decided to pay for support there are a boat load (a large boat) of Apache Committers not just to the Hadoop project but to lots of projects within the Hadoop eco system all of which are available and part and parcel to help answer questions and make code changes, etc. The philosophy of their distribution (besides just being open source) is to cherry pick changes from Apache Hadoop as soon as they can (or should or want) to be introduced to making their distribution best.

I can think of a lot of industries and companies were MapR would be a good choice over Cloudera.

We decided what was best for us was to go with CDH4 for our new cluster. And, if we ever decide to purchase support we would get it from Cloudera.

Joe Stein

Categories: Hadoop

Unified analytics and large scale machine learning with Milind Bhandarkar

June 1, 2012 1 comment

Episode #7 of the Podcast is a talk with Milind Bhandarkar.

We talked about unified analytics, machine learning, data science, some great history of Hadoop, the future of Hadoop and a lot more!

subscribe to the podcast and listen to all of what Milind had to share.


Joe Stein

Hadoop Streaming Made Simple using Joins and Keys with Python

December 16, 2011 10 comments

There are a lot of different ways to write MapReduce jobs!!!

Sample code for this post

I find streaming scripts a good way to interrogate data sets (especially when I have not worked with them yet or are creating new ones) and enjoy the lifecycle when the initial elaboration of the data sets lead to the construction of the finalized scripts for an entire job (or series of jobs as is often the case).

When doing streaming with Hadoop you do have a few library options.  If you are a Ruby programmer then wukong is awesome! For Python programmers you can use dumbo and more recently released mrjob.

I like working under the hood myself and getting down and dirty with the data and here is how you can too.

Lets start first with defining two simple sample data sets.

Data set 1:  countries.dat


United States|US
United Kingdom|UK

Data set 2: customers.dat


Alice Bob|not bad|US
Sam Sneed|valued|CA
Jon Sneed|valued|CA
Arnold Wesise|not so good|UK
Henry Bob|not bad|US
Yo Yo Ma|not so good|CA
Jon York|valued|CA
Alex Ball|valued|UK
Jim Davis|not so bad|JA

The requirements: you need to find out grouped by type of customer how many of each type are in each country with the name of the country listed in the countries.dat in the final result (and not the 2 digit country name).

To-do this you need to:

1) Join the data sets
2) Key on country
3) Count type of customer per country
4) Output the results

So first lets code up a quick mapper called (you can decide if smpl is short for simple or sample).

Now in coding the mapper and reducer in Python the basics are explained nicely here but I am going to dive a bit deeper to tackle our example with some more tactics.

#!/usr/bin/env python

import sys

# input comes from STDIN (standard input)
for line in sys.stdin:
	try: #sometimes bad data can cause errors use this how you like to deal with lint and bad data
		personName = "-1" #default sorted as first
		personType = "-1" #default sorted as first
		countryName = "-1" #default sorted as first
		country2digit = "-1" #default sorted as first
		# remove leading and trailing whitespace
		line = line.strip()
		splits = line.split("|")
		if len(splits) == 2: #country data
			countryName = splits[0]
			country2digit = splits[1]
		else: #people data
			personName = splits[0]
			personType = splits[1]
			country2digit = splits[2]			
		print '%s^%s^%s^%s' % (country2digit,personType,personName,countryName)
	except: #errors are going to make your job fail which you may or may not want

Don’t forget:

chmod a+x

Great! We just took care of #1 but time to test and see what is going to the reducer.

From the command line run:

cat customers.dat countries.dat|./|sort

Which will result in:

CA^not so good^Yo Yo Ma^-1
CA^valued^Jon Sneed^-1
CA^valued^Jon York^-1
CA^valued^Sam Sneed^-1
JA^not so bad^Jim Davis^-1
UK^-1^-1^United Kingdom
UK^not so good^Arnold Wesise^-1
UK^valued^Alex Ball^-1
US^-1^-1^United States
US^not bad^Alice Bob^-1
US^not bad^Henry Bob^-1

Notice how this is sorted so the country is first and the people in that country after it (so we can grab the correct country name as we loop) and with the type of customer also sorted (but within country) so we can properly count the types within the country. =8^)

Let us hold off on #2 for a moment (just hang in there it will all come together soon I promise) and get working first.

#!/usr/bin/env python
import sys
# maps words to their counts
foundKey = ""
foundValue = ""
isFirst = 1
currentCount = 0
currentCountry2digit = "-1"
currentCountryName = "-1"
isCountryMappingLine = False

# input comes from STDIN
for line in sys.stdin:
	# remove leading and trailing whitespace
	line = line.strip()
		# parse the input we got from
		country2digit,personType,personName,countryName = line.split('^')
		#the first line should be a mapping line, otherwise we need to set the currentCountryName to not known
		if personName == "-1": #this is a new country which may or may not have people in it
			currentCountryName = countryName
			currentCountry2digit = country2digit
			isCountryMappingLine = True
			isCountryMappingLine = False # this is a person we want to count
		if not isCountryMappingLine: #we only want to count people but use the country line to get the right name 

			#first check to see if the 2digit country info matches up, might be unkown country
			if currentCountry2digit != country2digit:
				currentCountry2digit = country2digit
				currentCountryName = '%s - Unkown Country' % currentCountry2digit
			currentKey = '%s\t%s' % (currentCountryName,personType) 
			if foundKey != currentKey: #new combo of keys to count
				if isFirst == 0:
					print '%s\t%s' % (foundKey,currentCount)
					currentCount = 0 #reset the count
					isFirst = 0
				foundKey = currentKey #make the found key what we see so when we loop again can see if we increment or print out
			currentCount += 1 # we increment anything not in the map list

	print '%s\t%s' % (foundKey,currentCount)

Don’t forget:

chmod a+x

And then run:

cat customers.dat countries.dat|./|sort|./

And voila!

Canada	not so good	1
Canada	valued	3
JA - Unkown Country	not so bad	1
United Kingdom	not so good	1
United Kingdom	valued	1
United States	not bad	2

So now #3 and #4 are done but what about #2? 

First put the files into Hadoop:

hadoop fs -put ~/mayo/customers.dat .
hadoop fs -put ~/mayo/countries.dat .

And now run it like this (assuming you are running as hadoop in the bin directory):

hadoop jar ../contrib/streaming/hadoop-0.20.1+169.89-streaming.jar -D mapred.reduce.tasks=4 -file ~/mayo/ -mapper -file ~/mayo/ -reducer -input customers.dat -input countries.dat -output mayo -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner -jobconf^ -jobconf -jobconf map.output.key.field.separator=^ -jobconf num.key.fields.for.partition=1

Let us look at what we did:

hadoop fs -cat mayo/part*

Which results in:

Canada	not so good	1
Canada	valued	3
United Kingdom	not so good	1
United Kingdom	valued	1
United States	not bad	2
JA - Unkown Country	not so bad	1

So #2 is the partioner KeyFieldBasedPartitioner explained here further Hadoop Wiki On Streaming which allows the key to be whatever set of columns you output (in our case by country) configurable by the command line options and the rest of the values are sorted within that key and sent to the reducer together by key.

And there you go … Simple Python Scripting Implementing Streaming in Hadoop.  

Grab the tar here and give it a spin.

Joe Stein
Twitter: @allthingshadoop
Connect: On Linked In

Categories: Hadoop, MapReduce, Python

Faster Datanodes with less wait io using df instead of du

May 20, 2011 1 comment

I have noticed often that the check Hadoop uses to calculate usage for the data nodes causes a fair amount of wait io on them driving up load.

Every cycle we can get from every spindle we want!

So I came up with a nice little hack to use df instead of du.

Here is basically what I did so you can do it too.

mv /usr/bin/du /usr/bin/bak_du
vi /usr/bin/du

and save this inside of it


mydf=$(df $2 | grep -vE '^Filesystem|tmpfs|cdrom' | awk '{ print $3 }')
echo -e "$mydf\t$2"

then give it execute permission

chmod a+x /usr/bin/du

restart you data node check the log for no errors and make sure it starts back up


Now when Hadoop calls “du -sk /yourhdfslocation” it will be expedient with its results

whats wrong with this?

1) I assume you have nothing else on your disks that you are storing so df is really close to du since almost all of your data is in HDFS

2) If you have more than 1 volume holding your hdfs blocks this is not exactly accurate so you are skewing the size of each vol by only calculating one of them and using that result for the others…. this is simple to fix just parse your df result differently and use the path passed into the second paramater to know which vol to grep in your df result… your first volume is going to be larger anyways most likely and you should be monitoring disk space another way so it is not going to be very harmefull if you just check and report the first volume’s size

3) you might not have your HDFS blocks on your first volume …. see #2 you can just grep the volume you want to report

Joe Stein

Categories: Hadoop

Cloudera, Yahoo and the Apache Hadoop Community Security Branch Release Update

May 5, 2011 1 comment

In the wake of Yahoo! having announced that they would discontinue their Hadoop distribution and focus their efforts into Apache Hadoop the landscape has become tumultuous.

Yahoo! engineers have spent their time and effort contributing back to the Apache Hadoop security branch (branch of 0.20) and have proposed release candidates.

Currently being voted and discussed is “Release candidate”. If you are following the VOTE and the DISCUSSION then maybe you are like me it just cannot be done without a bowl of popcorn before opening the emails. It is getting heated in a good and constructive kind of way. there are already more emails in 5 days of May than there were in all of April. woot!

My take? Has it become Cloudera vs Yahoo! and Apache Hadoop releases will become fragmented because of it? Well, it is kind of like that already. 0.21 is the latest and can anyone that is not a committer quickly know or find out the difference between that and the other release branches? It is esoteric 😦 0.22 is right around the corner too which is a release from trunk.

Lets take HBase as an example (a Hadoop project). Do you know what version of HDFS releases can support HBase in production without losing data? If you do then maybe you don’t realize that many people still don’t even know about the branch. And, now that CDH3 is out you can use that (thanks Cloudera!) otherwise it is highly recommended to not be in production with HBase unless you use the append branch of 0.20 which makes you miss out on other changes in trunk releases…

__ eyes crossing inwards and sideways with what branch does what and when the trunk release has everything __

Hadoop is becoming an a la cart which features and fixes can I live without for all of what I really need to deploy … or requiring companies to hire a committer … or a bunch of folks that do nothing but Hadoop day in and day out (sounds like Oracle, ahhhhhh)… or going with the Cloudera Distribution (which is what I do and don’t look back). The barrier to entry feels like it has increased over the last year. However, stepping back from that the system overall has had a lot of improvements! A lot of great work by a lot of dedicated folks putting in their time and effort towards making Hadoop (in whatever form the elephant stampedes through its data) a reality.

Big shops that have teams of “Hadoop Engineers” (Yahoo, Facebook, eBay, LinkedIn, etc) with contributors and/or committers on that team should not have lots of impact because ultimately they are able to role their own releases for whatever they need/want themselves in production and just support it. Not all are so endowed.

Now, all of that having been said I write this because the discussion is REALLY good and has a lot of folks (including those from Yahoo! and Cloudera) bringing up pain points and proposing some great solutions that hopefully will contribute to the continued growth and success of the Apache Hadoop Community…. still if you want to run it in your company (and don’t have a committer on staff) then go download CDH3 it will get you going with the latest and greatest of all the releases, branches, etc, etc, etc. Great documentation too!


Joe Stein

NoSQL HBase and Hadoop with Todd Lipcon from Cloudera

September 6, 2010 3 comments

Episode #6 of the Podcast is a talk with Todd Lipcon from Cloudera discussing HBase.

We talked about NoSQL and how it should stand for “Not Only SQL” and the tight integration between Hadoop and HBase and how systems like Cassandra (which is eventually consistent and not strongly consistent like HBase) is complementary as these systems have applicability within big data eco system depending on your use cases.

With the strong consistency of HBase you get features like incrementing counters and the tight integration with Hadoop means faster loads with HDFS thanks to a new feature in the 0.89 development preview release in the doc folders called “bulk loads”.

We covered a lot more unique features, talked about more of what is coming in upcoming releases as well as some tips with HBase so subscribe to the podcast and listen to all of what Todd had to say.


Joe Stein

Hadoop Development Tools By Karmasphere

June 29, 2010 1 comment

In Episode #5 of the Hadoop Podcast I speak with Shevek, the CTO of Karmasphere  To subscribe to the Podcast click here.

We talk a bit about their existing Community Edition (support Netbeans & Eclipse)

  • For developing, debugging and deploying Hadoop Jobs
  • Desktop MapReduce Prototyping
  • GUI to manipulate clusters, file systems and jobs
  • Easy deployment to any Hadoop version, any distribution in any cloud
  • Works through firewalls

As well as the new products they have launched:

Karmasphere Client:

The Karmasphere Client is a cross platform library for ensuring MapReduce jobs can work from any desktop environment to any Hadoop cluster in any enterprise data network. By isolating the Big Data professional and version of Hadoop, Karmasphere Client simplifies the process of switching between data centers and the cloud and enables Hadoop jobs to be independent of the version of the underlying cluster.

Unlike the standard Hadoop client , Karmasphere Client works from Microsoft Windows as well as Linux and MacOS, and works through SSH-based firewalls. Karmasphere Client provides a cloud-independent environment that makes it easy and predictable to maintain a business operation reliant on Hadoop.

  • Ensures Hadoop distribution and version independence
  • Works from Windows (unlike Hadoop Client)
  • Supports any cloud environment: public, private or public cloud service.
  • Provides:
    • Job portability
    • Operating system portability
    • Firewall hopping
    • Fault tolerant API
    • Synchronous and Asynchronous API
    • Clean Object Oriented Design
  • Making it easy and predictable to maintain a business operation reliant on Hadoop

Karmasphere Studio Professional Edition

Karmasphere Studio Professional Edition includes all the functionality of the Community Edition, plus a range of deeper functionality required to simplify the developer’s task of making a MapReduce job robust, efficient and production-ready.

For a MapReduce job to be robust, its functioning on the cluster has to be well understood in terms of time, processing, and storage requirements, as well as in terms of its behavior when implemented within well-defined “bounds.” Karmasphere Studio Professional Edition incorporates the tools and a predefined set of rules that make it easy for the developer to understand how his or her job is performing on the cluster and where there is room for improvement.

  • Enhanced cluster visualization and debugging
    • Execution diagnostics
    • Job performance timelines
    • Job charting
    • Job profiling
  • Job Export
    • For easy production deployment
  • Support

Karmasphere Studio Analyst Edition

  • SQL interface for ad hoc analysis
  • Karmasphere Application Framework + Hive + GUI =
    • No cluster changes
    • Works over proxies and firewalls
    • Integrated Hadoop monitoring Interactive syntax checking
    • Detailed diagnostics
    • Enhanced schema browser
    • Full JDBC4 compliance
    • Multi-threaded & concurrent


Joe Stein

Categories: Hadoop, Podcast, Tools