Archive for the ‘Hadoop’ Category

Apache Hadoop HDFS Data Node Apache Mesos Framework


This project allows running HDFS on Mesos.

You should be familiar with HDFS and Mesos basics:

Project requires:

  • Mesos 0.23.0+
  • JDK 1.7.x
  • Hadoop 1.2.x or 2.7.x

Mesos in Vagrant

Project includes vagrant environment, that allows to run Mesos cluster locally.

If you are going to use external Mesos cluster, you can skip this section.

1. Start vagrant nodes:

# cd hdfs-mesos/vagrant
# vagrant up

It creates mesos master and slave nodes.

2. Add vagrant node names to /etc/hosts

Now Mesos in vagrant should be running. You can proceed with starting scheduler.

For more details about vagrant environment please read vagrant/

Running Scheduler

1. Download hdfs-mesos\*.jar OR clone & build the project:

Download jar:

# mkdir hdfs-mesos
# cd hdfs-mesos
# wget

OR clone & build:

# git clone
# cd hdfs-mesos
# ./gradlew jar

2. Download hadoop tarball:

# wget

3. Start scheduler:

# ./ scheduler --api=http://$scheduler:7000 --master=zk://$master:2181/mesos --user=vagrant
2016-03-18 15:04:48,785 [main] INFO hdfs.Scheduler - Starting Scheduler:
api: http://$scheduler:7000
files: jar:./hdfs-mesos-, hadoop:./hadoop-1.2.1.tar.gz
mesos: master:master:5050, user:vagrant, principal:<none>, secret:<none>
framework: name:hdfs, role:*, timeout:30d
2016-03-18 15:04:48,916 [main] INFO hdfs.HttpServer - started on port 7000
I0318 15:04:49.008314 19123 sched.cpp:164] Version: 0.25.0
I0318 15:04:49.017160 19155 sched.cpp:262] New master detected at master@
I0318 15:04:49.019287 19155 sched.cpp:272] No credentials provided. Attempting to register without authentication
I0318 15:04:49.029218 19155 sched.cpp:641] Framework registered with 20160310-141004-84125888-5050-10895-0006
2016-03-18 15:04:49,044 [Thread-17] INFO hdfs.Scheduler - [registered] framework:#-0006 master:#326bb pid:master@ hostname:master
2016-03-18 15:04:49,078 [Thread-18] INFO hdfs.Scheduler - [resourceOffers]
slave0#-O761 cpus:1.00; mem:2500.00; disk:35164.00; ports:[5000..32000]
master#-O762 cpus:1.00; mem:2500.00; disk:35164.00; ports:[5000..32000]
2016-03-18 15:04:49,078 [Thread-18] INFO hdfs.Scheduler - [resourceOffers]


  • $scheduler is scheduler address accessible from slave nodes;
  • $master master address accessible from scheduler node;

Scheduler should register itself and start receiving resource offers. If scheduler is not receiving offers it could be required to specify LIBPROCESS_IP:

# export LIBPROCESS_IP=$scheduler_ip

Now scheduler should be running and you can proceed with starting HDFS nodes.

Running HDFS Cluster

Project provides CLI & REST API for managing HDFS nodes. We will focus first on CLI.

1. Add namenode & datanode:

# ./ node add nn --type=namenode
node added:
  id: nn
  type: namenode
  state: idle
  resources: cpus:0.5, mem:512

# ./ node add dn0 --type=datanode
node added:
  id: dn0
  type: datanode
  state: idle
  resources: cpus:0.5, mem:512

2. Start nodes:

# ./ node start \*
nodes started:
  id: nn
  type: namenode
  state: running
  resources: cpus:0.5, mem:512
  reservation: cpus:0.5, mem:512, ports:http=5000,ipc=5001
    task: 383aaab9-982b-400e-aa35-463e66cdcb3b
    executor: 19065e07-a006-49a4-8f2b-636d8b1f2ad6
    slave: 241be3a2-39bc-417c-a967-82b4018a0762-S0 (master)

  id: dn0
  type: datanode
  state: running
  resources: cpus:0.5, mem:512
  reservation: cpus:0.5, mem:512, ports:http=5002,ipc=5003,data=5004
    task: 37f3bcbb-10a5-4323-96d2-aef8846aa281
    executor: 088463c9-5f2e-4d1d-8195-56427168b86f
    slave: 241be3a2-39bc-417c-a967-82b4018a0762-S0 (master)

Nodes are up & running now.

Note: starting may take some time. You can view the progress via Mesos UI.

3. Do some FS operations:

# hadoop fs -mkdir hdfs://master:5001/dir
# hadoop fs -ls hdfs://master:5001/
Found 1 items
drwxr-xr-x   - vagrant supergroup          0 2016-03-17 12:46 /dir

Note: namenode host and ipc port is used in fs url.

Using CLI

Project provides CLI with following structure:

# ./ help
Usage: <cmd> ...

  help [cmd [cmd]] - print general or command-specific help
  scheduler        - start scheduler
  node             - node management

Help is provided for each command and sub-command:

# ./ help node
Node management commands
Usage: node <cmd>

  list       - list nodes
  add        - add node
  update     - update node
  start      - start node
  stop       - stop node
  remove     - remove node

Run `help node <cmd>` to see details of specific command

# ./ help node add
Add node
Usage: node add <ids> [options]

Option (* = required)  Description
---------------------  -----------
--core-site-opts       Hadoop core-site.xml options.
--cpus <Double>        CPU amount (0.5, 1, 2).
--executor-jvm-opts    Executor JVM options.
--hadoop-jvm-opts      Hadoop JVM options.
--hdfs-site-opts       Hadoop hdfs-site.xml options.
--mem <Long>           Mem amount in Mb.
* --type               node type (name_node, data_node).

Generic Options
Option  Description
------  -----------
--api   REST api url (same as --api option for

All node-related commands support bulk operations using node-id-expressions. Examples:

# ./ node add dn0..1 --type=datanode
nodes added:
  id: dn0
  type: datanode

  id: dn1
  type: datanode

# ./ node update dn* --cpus=1
nodes updated:
  id: dn0
  resources: cpus:1.0, mem:512

  id: dn1
  resources: cpus:1.0, mem:512

# ./ node start dn0,dn1
nodes started:
  id: dn0

  id: dn0

Id expression examples:

  • nn – matches node with id nn
  • * – matches any node (should be slash-escaped in shell)
  • dn* – matches node with id starting with dn
  • dn0..2 – matches nodes dn0, dn1, dn2

Using REST

Scheduler uses embedded HTTP server. Server serves two functions:

  • distributing binaries of Hadoop, JRE and executor;
  • serving REST API, invoked by CLI;

Most CLI commands map to REST API call. Examples:

CLI command REST call
node add nn --type=namenode --cpus=2 /api/node/add?node=nn&type=namenode&cpus=2
node start dn* --timeout=3m- /api/node/start?node=dn*&timeout=3m
node remove dn5 /api/node/remove?node=dn5

REST calls accepts plain HTTP params and return JSON responses. Examples:

# curl http://$scheduler:7000/api/node/list
        "id": "nn",
        "type": "namenode",
        "id": "dn0",
        "type": "datanode",

# curl http://$scheduler:7000/api/node/start?node=nn,dn0
    "status": "started",
    "nodes": [
            "id": "nn",
            "state": "running",
            "id": "dn0",
            "state": "running",

CLI params maps one-to-one to REST params. CLI params use dashed style while REST params use camel-case. Example of mappings:

CLI param REST param
<id> (node add|update|…) node
timeout (node start|stop) timeout
core-site-opts (node add|update) coreSiteOpts
executor-jvm-opts (node add|update) executorJvmOpts

REST API call could return error in some cases. Errors are marked with status code other than 200. Error response is returned in JSON format.


# curl -v
HTTP/1.1 400 node not found
{"error":"node not found","code":400}

For more detail on REST API please refer to sources.

Categories: Hadoop

Hadoop isn’t dead but you might be doing it wrong!

March 18, 2016 Leave a comment

I haven’t blogged (or podcasted for that matter) in a while. There are lots of different reasons for that and I am always happy to chat and grab tea if folks are interested but after attending this year’s HIMSS conference I just couldn’t hold it in anymore.

I went to HIMSS so excited it was supposed to be the year of Big Data! Everything was about transformation and interoperability and OMGZ the excitement.

The first keynote Monday evening was OFF THE HOOK The rest of the time myself and two of my colleagues where at the expo. It is basically CES for Healthcare (if you don’t know what CES is then think DEFCON for Healthcare… or something). Its big.

But where was the Big Data?

Not really anywhere … There were 3 recognizable “big data companies” and one of them was in the booth as a partner for cloud services. It was weird. What happened?

One of the engineers from Cerner has a lightening talk at the Kafka Summit, go Cerner!!

Didn’t everyone get the memo? We need to help reduce costs of patient care!

Here are two ways to help reduce costs of patient care!

  1. (Paraphrasing Michael Dell from his keynote) Innovation funding for Healthcare IT will come from optimizing your data center resources.
  2. (This one is from me but inspired by Bruce Schneier) Through Open Source we can enable better systems by sharing in the R&D costs and also make them more secure.

Totally agree with #1, have seen it first hand people saving 82% of their data center bill. Not even using spot (or as they say “preemptive“) instances yet. Amazing!

As for #2, you have to realize that different people are good at different things. One person can write anything but sometimes 2 or 3 or 45 of them can write it better…. at least make sure the tests always keep passing and evolving properly, etc, etc, etc, stewardship, etc.

Besides all of that, the conference was great. There were a lot of companies and people I recognized and bumped into and it was great to catch up.

I was also really REALLY excited to see how far physician signatures and form signing has (finally) come in healthcare removing all that paper. Fax is almost dead but there are still a couple of companies kicking.

One last thing, the cyber security part of the expo was also disappointing. I know it was during the RSA Conference but Healthcare needs good solutions too. For that there were a good set of solutions not bad in some cases legit and known (thanks for showing up!) but the “pavilion” was downstairs in the back left corner. Maybe if HIMSS coincided with Strata it would have been different, hard to say.

There was one tweet about it (at least) not sure if there were more.

So, Big Data, Healthcare, Security, OH MY! I am in!

I will be talking more about problems and solutions with using the Open Source Interoperable XML based FHIR standard in Healthcare removing the need to integrate and make interoperable HL7 systems in New York City on 03/29/2016 and getting into realtime stream processing on Mesos.

I will also be conducting a training on SMACK Stack 1.0 (Streaming Mesos Analytics Cassandra Kafka) using telephone systems and API to start stream events and interactions with different systems because of them. Yes, I bought phones and yes you get to keep yours.

What has attracted me (for almost 2 years now) to running on Mesos Hadoop systems and eco-system components is the ease it brings for the developers, systems engineers, data scientists, analysts and the users of the software systems that run (as a service often) those components. There are lots of things to research and read in those cases I would

1) scour my blog

2) read this

3) and this

4) your own thing

Hadoop! Mesos!


~ Joestein

p.s. if you have something good to say about Hadoop and want to talk about it and it is gripping and good and gets back to the history and continued efforts. Let me know. Thanks!



Open Source Cloud Formation with Minotaur for Mesos, Kafka and Hadoop

December 17, 2014 Leave a comment

Today I am happy to announce “Minotaur” which is our Open Source AWS based infrastructure for managing big data open source projects including (but not limited too): Apache Kafka, Apache Mesos and Cloudera’s Distribution of Hadoop. Minotaur is based on AWS Cloud Formation.

The following labs are currently supported:


Supervisor is a Docker-based image that contains all the necessary software to manage nodes/resources in AWS.

Supervisor set-up

  • clone this repo to repo_dir
  • cd to repo_dir/supervisor folder

Before trying to build docker image, you must put some configuration files under config directory:

a) aws_config

This file is just a regular aws-cli config, you must paste your secret and access keys, provided by Amazon in it:

output = json
region = us-east-1
aws_access_key_id = SECRET_KEY
aws_secret_access_key = ACCESS_KEY

Do not add or remove any extra whitespaces (especially before and after “=” sign in keys)

b) private.key

This is your private SSH key, public part of which is registered on Bastion host

c) environment.key

This is a shared key for all nodes in environment Supervisor is supposed to manage.

d) ssh_config

This is a regular SSH config file, you have to change your_username only (this is the one registered on Bastion).

BASTION_IP is handled dynamically when container is built.

# BDOSS environment
Host 10.0.2.*
    IdentityFile ~/.ssh/environment.key 
    User ubuntu
    ProxyCommand  ssh -i ~/.ssh/private.key your_username@BASTION_IP nc %h %p
Host 10.0.*.*
    IdentityFile ~/.ssh/environment.key 
    User ubuntu
    ProxyCommand  ssh -i ~/.ssh/private.key your_username@BASTION_IP nc %h %p
  • exec

If this is the first time you’re launching supervisor – it will take some time to build.

Subsequent up’s will take seconds.

Using supervisor

Now you can cd to /deploy/labs/ and deploy whatever you want


minotaur lab deploy mesosmaster -e bdoss-dev -d test -r us-east-1 -z us-east-1a
Creating new stack 'mesos-master-test-bdoss-dev-us-east-1-us-east-1a'...
Stack deployed.

this will spin up a mesos master node in “testing” deployment.


Supervisor has a built-in “awsinfo” command, which relies on AWS API and provides brief info about running machines. It is also capable of searching through that info.

Usage example

awsinfo – will display brief info about all nodes running in AWS

root@supervisor:/deploy# awsinfo
Cloud:  bdoss/us-east-1
Name                                Instance ID  Instance Type  Instance State  Private IP      Public IP      
----                                -----------  -------------  --------------  ----------      ---------      
nat.bdoss-dev                       i-c46a0b2a   m1.small       running  
bastion.bdoss-dev                   i-3faa69de   m1.small       running      None           
mesos-master.test.bdoss-dev         i-e80ddc09   m1.small       terminated      None            None           
mesos-slave.test.bdoss-dev          i-e00ddc01   m1.small       terminated      None            None           

awsinfo mesos-master – will display info about all mesos-master nodes running in AWS.

root@supervisor:/deploy/labs# awsinfo mesos-master
Cloud:  bdoss/us-east-1
Name                                Instance ID  Instance Type  Instance State  Private IP      Public IP      
----                                -----------  -------------  --------------  ----------      ---------      
mesos-master.test.bdoss-dev         i-e80ddc09   m1.small       terminated      None            None           

awsinfo 10.0.2 – match a private/public subnet

root@supervisor:/deploy/labs# awsinfo 10.0.2
Cloud:  bdoss/us-east-1
Name                                Instance ID  Instance Type  Instance State  Private IP      Public IP      
----                                -----------  -------------  --------------  ----------      ---------      
nat.bdoss-dev                       i-c46a0b2a   m1.small       running  
mesos-master.test.bdoss-dev         i-e96ebd08   m1.small       running 


If you can’t use Docker directly for some reason, there’s a Vagrant wrapper VM for it.

Before doing anything with Vagrant, complete the above steps for Docker, but don’t execute script

Just cd into vagrant directory and exec vagrant up, then vagrant ssh (nothing special here yet).

When you will exec vagrant ssh, docker container build process will spawn up immediately, so wait a bit and let it complete.

Now you’re inside a Docker container nested in Vagrant VM and can proceed with deployment in the same manner as it’s described for docker.

All the following vagrant ssh‘s will spawn Docker container almost immediately.

Once you are inside of the supervisor image, the script may be used to provision an environment and labs. The rest of this readme assumes that the script is executed from within the supervisor container.

Minotaur Commands

List Infrastructure Components

root@supervisor:/deploy# ./ infrastructure list
Available deployments are: ['bastion', 'iampolicies', 'iamusertogroupadditions', 'nat', 'sns', 'subnet', 'vpc']

Print Infrastructure Component Usage

root@supervisor:/deploy# ./ infrastructure deploy bastion -h
usage: infrastructure deploy bastion [-h] -e ENVIRONMENT -r REGION
                                                 -z AVAILABILITY_ZONE
                                                 [-i INSTANCE_TYPE]

optional arguments:
  -h, --help            show this help message and exit
                        CloudFormation environment to deploy to
  -r REGION, --region REGION
                        Geographic area to deploy to
                        Isolated location to deploy to
  -i INSTANCE_TYPE, --instance-type INSTANCE_TYPE
                        AWS EC2 instance type to deploy

Deploy Infrastructure Component

In this example, the bdoss-dev bastion already existed, so the CloudFormation stack was updated with the current template.

root@supervisor:/deploy# ./ infrastructure deploy bastion -e bdoss-dev -r us-east-1 -z -us-east-1a
Template successfully validated.
Updating existing 'bastion-bdoss-dev-us-east-1-us-east-1a' stack...
Stack updated.

List Labs

List all supported labs.

root@supervisor:/deploy# ./ lab list
Available deployments are: ['clouderahadoop', 'gokafkaconsumer', 'gokafkaproducer', 'kafka', 'mesosmaster', 'mesosslave', 'zookeeper']

Print Lab Usage

Print the kafka lab usage.

root@supervisor:/deploy# ./ lab deploy kafka -h
usage: lab deploy kafka [-h] -e ENVIRONMENT -d DEPLOYMENT -r
                                    REGION -z AVAILABILITY_ZONE [-n NUM_NODES]
                                    [-i INSTANCE_TYPE] [-v ZK_VERSION]
                                    [-k KAFKA_URL]

optional arguments:
  -h, --help            show this help message and exit
                        CloudFormation environment to deploy to
  -d DEPLOYMENT, --deployment DEPLOYMENT
                        Unique name for the deployment
  -r REGION, --region REGION
                        Geographic area to deploy to
                        Isolated location to deploy to
  -n NUM_NODES, --num-nodes NUM_NODES
                        Number of instances to deploy
  -i INSTANCE_TYPE, --instance-type INSTANCE_TYPE
                        AWS EC2 instance type to deploy
  -v ZK_VERSION, --zk-version ZK_VERSION
                        The Zookeeper version to deploy
  -k KAFKA_URL, --kafka-url KAFKA_URL
                        The Kafka URL

Deploy Lab

Deploy a 3-broker Kafka cluster.

root@supervisor:/deploy# ./ lab deploy kafka -e bdoss-dev -d kafka-example -r us-east-1 -z us-east-1a -n 3 -i m1.small    
Template successfully validated.
Creating new 'kafka-bdoss-dev-kafka-example-us-east-1-us-east-1a' stack...
Stack deployed.

 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 Twitter: @allthingshadoop


Ideas and goals behind the Go Kafka Client

December 10, 2014 Leave a comment

I think a bunch of folks have heard already that B.D.O.S.S. was working on a new Apache Kafka Client For Go. Go Kafka Client was open sourced last Friday. Today we are starting the release of Minotaur which is our lab environment for Apache Zookeeper, Apache Mesos, Apache CassandraApache Kafka, Apache Hadoop and our new Go Kafka Client.

To get started using the consumer client check out our example code and its property file.

Ideas and goals behind the Go Kafka Client:


1) Partition Ownership

We decided on implementing multiple strategies for this including static assignment. The concept of re-balancing is preserved but now there are a few different strategies to re-balancing and they can run at different times depending on what is going on (like a blue/green deploy is happening). For more on blue/green deployments check out this video with Jim Plush and Sean Berry from 2014 AWS re:Invent.

2) Fetch Management

This is what “fills up the reservoir” as I like to call it so the processing (either sequential or in batch) will always have data if there is data for it to have without making a network hop. The fetcher has to stay ahead here keeping the processing tap full (or if empty that is controlled) pulling the data for the Kafka partition(s) it is owning.

3) Work Management

For the Go consumer we currently only support “fan out” using go routines and channels. If you have ever used go this will be familiar to you if not you should drop everything and learn Go.

4) Offset Management

Our offset management is based on a per batch basis with each highest offset from the batch committed on a per partition basis.
func main() {
	config, consumerIdPattern, topic, numConsumers, graphiteConnect, graphiteFlushInterval := resolveConfig()
	startMetrics(graphiteConnect, graphiteFlushInterval)

	ctrlc := make(chan os.Signal, 1)
	signal.Notify(ctrlc, os.Interrupt)

	consumers := make([]*kafkaClient.Consumer, numConsumers)
	for i := 0; i < numConsumers; i++ {
		consumers[i] = startNewConsumer(*config, topic, consumerIdPattern, i)
		time.Sleep(10 * time.Second)

	fmt.Println("Shutdown triggered, closing all alive consumers")
	for _, consumer := range consumers {
	fmt.Println("Successfully shut down all consumers")

func startMetrics(graphiteConnect string, graphiteFlushInterval time.Duration) {
	addr, err := net.ResolveTCPAddr("tcp", graphiteConnect)
	if err != nil {
	go metrics.GraphiteWithConfig(metrics.GraphiteConfig{
		Addr:          addr,
		Registry:      metrics.DefaultRegistry,
		FlushInterval: graphiteFlushInterval,
		DurationUnit:  time.Second,
		Prefix:        "metrics",
		Percentiles:   []float64{0.5, 0.75, 0.95, 0.99, 0.999},

func startNewConsumer(config kafkaClient.ConsumerConfig, topic string, consumerIdPattern string, consumerIndex int) *kafkaClient.Consumer {
	config.Consumerid = fmt.Sprintf(consumerIdPattern, consumerIndex)
	config.Strategy = GetStrategy(config.Consumerid)
	config.WorkerFailureCallback = FailedCallback
	config.WorkerFailedAttemptCallback = FailedAttemptCallback
	consumer := kafkaClient.NewConsumer(&config)
	topics := map[string]int {topic : config.NumConsumerFetchers}
	go func() {
	return consumer

func GetStrategy(consumerId string) func(*kafkaClient.Worker, *kafkaClient.Message, kafkaClient.TaskId) kafkaClient.WorkerResult {
	consumeRate := metrics.NewRegisteredMeter(fmt.Sprintf("%s-ConsumeRate", consumerId), metrics.DefaultRegistry)
	return func(_ *kafkaClient.Worker, msg *kafkaClient.Message, id kafkaClient.TaskId) kafkaClient.WorkerResult {
		kafkaClient.Tracef("main", "Got a message: %s", string(msg.Value))

		return kafkaClient.NewSuccessfulResult(id)

func FailedCallback(wm *kafkaClient.WorkerManager) kafkaClient.FailedDecision {
	kafkaClient.Info("main", "Failed callback")

	return kafkaClient.DoNotCommitOffsetAndStop

func FailedAttemptCallback(task *kafkaClient.Task, result kafkaClient.WorkerResult) kafkaClient.FailedDecision {
	kafkaClient.Info("main", "Failed attempt")

	return kafkaClient.CommitOffsetAndContinue
We set our mailing list to be this is the central place for client library discussions for the Apache Kafka community.

Plans moving forward with the Go Kafka Client:

1) Build up suite of integration tests.
2) Stabilize the API nomenclature with the JVM Kafka Client for consistency.
3) Run integration tests and create baseline regression points.
4) Re-work changes from feedback of issues found and from our own customer support.
5) Push to production and continue on going support.

Ideas and goals behind Minotaur:

Provide a controlled and isolated environment through scripts for regression of versions and benchmarks against instance sizing. This comes in 3 parts: 1) Supervisor 2) Infrastructure 3) Labs. The Labs portion is broker down with both chef cook books and python wrappers for cloud formation templates for each of the following systems:

Plans moving forward with Minotaur:

1) Upstream our Cloud Formation Scripts (this is going to take some more work detangling Dexter (the name for our private repo of Minotaur).
2) Move the work we do with Apache Cassandra into the project.
3) Start support of Puppet in addition to the work we did with Chef.
4) Get the Docker supervisor console pushed upstream through Bastion hosts.
5) Cut our labs over to using Minotaur.
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 Twitter: @allthingshadoop

Apache Solr real-time live index updates at scale with Apache Hadoop

Episode # 22 of the podcast was a talk with Patrick Hunt

We talked about the new work that has gone into Apache Solr (upstream) that allows it to work on Apache Hadoop. Solr has support for writing and reading its index and transaction log files to the HDFS distributed filesystem. This does not use Hadoop Map-Reduce to process Solr data, rather it only uses the HDFS filesystem for index and transaction log file storage.

We also talked about Solr Cloud and how the sharding features allow Solr to scale with a Hadoop cluster

Apache Solr includes the ability to set up a cluster of Solr servers that combines fault tolerance and high availability. Called SolrCloud, these capabilities provide distributed indexing and search capabilities, supporting the following features:

  • Central configuration for the entire cluster
  • Automatic load balancing and fail-over for queries
  • ZooKeeper integration for cluster coordination and configuration.

SolrCloud is flexible distributed search and indexing, without a master node to allocate nodes, shards and replicas. Instead, Solr uses ZooKeeper to manage these locations, depending on configuration files and schemas. Documents can be sent to any server and ZooKeeper will figure it out.

Patrick introduced me to Morphlines (part of the Cloudera Development Kit for Hadoop)

Cloudera Morphlines is an open source framework that reduces the time and skills necessary to build and change Hadoop ETL stream processing applications that extract, transform and load data into Apache Solr, HBase, HDFS, Enterprise Data Warehouses, or Analytic Online Dashboards. Want to build or facilitate ETL jobs without programming and without substantial MapReduce skills? Get the job done with a minimum amount of fuss and support costs? Here is how to get started.

A morphline is a rich configuration file that makes it easy to define a transformation chain that consumes any kind of data from any kind of data source, processes the data and loads the results into a Hadoop component. It replaces Java programming with simple configuration steps, and correspondingly reduces the cost and integration effort associated with developing and maintaining custom ETL projects.

Morphlines is a library, embeddable in any Java codebase. A morphline is an in-memory container of transformation commands. Commands are plugins to a morphline that perform tasks such as loading, parsing, transforming, or otherwise processing a single record. A record is an in-memory data structure of name-value pairs with optional blob attachments or POJO attachments. The framework is extensible and integrates existing functionality and third party systems in a straightforward manner.

The morphline commands were developed as part of Cloudera Search. Morphlines power ETL data flows from Flume and MapReduce and HBase into Apache Solr. Flume covers the real time case, whereas MapReduce covers the batch processing case. Since the launch of Cloudera Search morphline development graduated into the Cloudera Development Kit(CDK) in order to make the technology accessible to a wider range of users and products, beyond Search. The CDK is a set of libraries, tools, examples, and documentation focused on making it easier to build systems on top of the Hadoop ecosystem. The CDK is hosted on GitHub and encourages involvement by the community. For example, morphlines could be embedded into Crunch, HBase, Impala, Pig, Hive, or Sqoop. Let us know where you want to take it!

Morphlines can be seen as an evolution of Unix pipelines where the data model is generalized to work with streams of generic records, including arbitrary binary payloads. A morphline is an efficient way to consume records (e.g. Flume events, HDFS files, RDBMS tables or Avro objects), turn them into a stream of records, and pipe the stream of records through a set of easily configurable transformations on the way to a target application such as Solr, for example as outlined in the following figure:

In this figure, a Flume Source receives syslog events and sends them to a Flume Morphline Sink, which converts each Flume event to a record and pipes it into a readLine command. The readLine command extracts the log line and pipes it into a grok command. The grok command uses regular expression pattern matching to extract some substrings of the line. It pipes the resulting structured record into the loadSolr command. Finally, the loadSolr command loads the record into Solr, typically a SolrCloud. In the process, raw data or semi-structured data is transformed into structured data according to application modelling requirements.

The Morphline framework ships with a set of frequently used high level transformation and I/O commands that can be combined in application specific ways. The plugin system allows the adding of new transformations and I/O commands and integrates existing functionality and third party systems in a straightforward manner.

This integration enables rapid Hadoop ETL application prototyping, complex stream and event processing in real time, flexible log file analysis, integration of multiple heterogeneous input schemas and file formats, as well as reuse of ETL logic building blocks across Hadoop ETL applications.

The CDK ships an efficient runtime that compiles a morphline on the fly. The runtime executes all commands of a given morphline in the same thread. Piping a record from one command to another implies just a cheap Java method call. In particular, there are no queues, no handoffs among threads, no context switches and no serialization between commands, which minimizes performance overheads.

Morphlines manipulate continuous or arbitrarily large streams of records. A command transforms a record into zero or more records. The data model can be described as follows: A record is a set of named fields where each field has an ordered list of one or more values. A value can be any Java Object. That is, a record is essentially a hash table where each hash table entry contains a String key and a list of Java Objects as values. Note that a field can have multiple values and any two records need not use common field names. This flexible data model corresponds exactly to the characteristics of the Solr/Lucene data model.

Not only structured data, but also binary data can be passed into and processed by a morphline. By convention, a record can contain an optional field named _attachment_body, which can be a Java or Java byte[]. Optionally, such binary input data can be characterized in more detail by setting the fields named _attachment_mimetype (such as “application/pdf”) and _attachment_charset (such as “UTF-8”) and _attachment_name (such as “cars.pdf”), which assists in detecting and parsing the data type. This is similar to the way email works.

This generic data model is useful to support a wide range of applications. For example, the Apache Flume Morphline Solr Sink embeds the morphline library and executes a morphline to convert flume events into morphline records and load them into Solr. This sink fills the body of the Flume event into the _attachment_body field of the morphline record, as well as copies the headers of the Flume event into record fields of the same name. As another example, the Mappers of the MapReduceIndexerTool fill the Java referring to the currently processed HDFS file into the _attachment_body field of the morphline record. The Mappers of the MapReduceIndexerTool also fill metadata about the HDFS file into record fields, such as the file’s name, path, size, last modified time, etc. This way a morphline can act on all data received from Flume and HDFS. As yet another example, the Morphline Lily HBase Indexer fills a HBase Result Java POJO into the _attachment_body field of the morphline record. This way morphline commands such as extractHBaseCells can extract data from HBase updates and correspondingly update a Solr index.

We also talked a good deal about Apache Zookeeper and some of the history back from when Zookeeper was originally at Yahoo! and Patrick’s experience since then. To hear everything that Patrick had to say please subscribe to the podcast.


 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 Twitter: @allthingshadoop


Beyond MapReduce and Apache Hadoop 2.X with Bikas Saha and Arun Murthy

April 2, 2014 Leave a comment

Episode 20 of the podcast was with Bikas Saha and Arun Murthy.

When I spoke with Arun a year or so a go YARN was NextGen Hadoop and there have been a lot of updates, work done and production experience since.

Besides Yahoo! other multi thousand node clusters have been and are running in production with YARN. These clusters have shown 2x capacity throughput which resulted in reduced cost for hardware (and in some cases being able to shut down co-los) while still gaining performance improvements overall to previous clusters of Hadoop 1.X.

I got to hear about some of what is in 2.4 and coming in 2.5 of Hadoop:

  • Application timeline server repository and api for application specific metrics (Tez, Spark, Whatever).
    • web service API to put and get with some aggregation.
    • plugable nosql store (hbase, accumulo) to scale it.
  • Preemption capacity scheduler.
  • Multiple resource support (CPU, RAM and Disk).
  • Labels tag nodes with labels can be labeled however so some windows and some linux and ask for resources with only those labels with ACLS.
  • Hypervisor support as a key part of the topology.
  • Hoya generalize for YARN (game changer) and now proposed as Slider to the Apache incubator.

We talked about Tez which provides complex DAGs of queries to translate what you want to-do on Hadoop without the work arounds for making it have to run in MapReduce.  MapReduce was not designed to be re-workable out side of the parts of the Job it gave you for Map, Split, Shuffle, Combine, Reduce, Etc and Tez is more expressible exposing a DAG API.


Now becomes with Tez:



There were also some updates on Hive v13 coming out with sub queries, low latency queries (through Tez), high precision decimal points and more!

Subscribe to the podcast and listen to all of what Bikas and Arun had to say.

 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 Twitter: @allthingshadoop



Impala and SQL on Hadoop

February 22, 2014 Leave a comment

The origins of Impala can be found in F1 – The Fault-Tolerant Distributed RDBMS Supporting Google’s Ad Business.

One of many differences between MapReduce and Impala is in Impala the intermediate data moves from process to process directly instead of storing it on HDFS for processes to get at the data needed for processing.  This provides a HUGE performance advantage and doing so while consuming few cluster resources.   Less hardware to-do more!


There are many advantages to this approach over alternative approaches for querying Hadoop data, including::

  • Thanks to local processing on data nodes, network bottlenecks are avoided.
  • A single, open, and unified metadata store can be utilized.
  • Costly data format conversion is unnecessary and thus no overhead is incurred.
  • All data is immediately query-able, with no delays for ETL.
  • All hardware is utilized for Impala queries as well as for MapReduce.
  • Only a single machine pool is needed to scale.

We encourage you to read the documentation for further exploration!

There are still transformation steps required to optimize the queries but Impala can help to-do this for you with Parquet file format.  Better compression and optimized runtime performance is realized using the ParquetFormat though many other file types are supported.

This and a whole lot more was discussed with Marcel Kornacker the Cloudera Architect behind Impala on Episode 18 of the All Things Hadoop Podcast.

 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 Twitter: @allthingshadoop

Get every new post delivered to your Inbox.

Join 62 other followers