Impala and SQL on Hadoop

February 22, 2014 Leave a comment

The origins of Impala can be found in F1 – The Fault-Tolerant Distributed RDBMS Supporting Google’s Ad Business.

One of many differences between MapReduce and Impala is in Impala the intermediate data moves from process to process directly instead of storing it on HDFS for processes to get at the data needed for processing.  This provides a HUGE performance advantage and doing so while consuming few cluster resources.   Less hardware to-do more!

impala

There are many advantages to this approach over alternative approaches for querying Hadoop data, including::

  • Thanks to local processing on data nodes, network bottlenecks are avoided.
  • A single, open, and unified metadata store can be utilized.
  • Costly data format conversion is unnecessary and thus no overhead is incurred.
  • All data is immediately query-able, with no delays for ETL.
  • All hardware is utilized for Impala queries as well as for MapReduce.
  • Only a single machine pool is needed to scale.

We encourage you to read the documentation for further exploration!

There are still transformation steps required to optimize the queries but Impala can help to-do this for you with Parquet file format.  Better compression and optimized runtime performance is realized using the ParquetFormat though many other file types are supported.

This and a whole lot more was discussed with Marcel Kornacker the Cloudera Architect behind Impala on Episode 18 of the All Things Hadoop Podcast.

/*******************************************
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 Twitter: @allthingshadoop
********************************************/
Advertisement

Apache Kafka HTTP Producer and Consumer

January 16, 2014 Leave a comment

Big Data Open Source Security LLC has released an Apache Kafka HTTP endpoint for producing and consuming data with Apache Kafka brokers https://github.com/stealthly/dropwizard-kafka-http.

This project creates an Apache Kafka HTTP endpoint for producing and consuming messages.

Running Project

1) Install Vagrant (http://www.vagrantup.com/)
2) Install Virtual Box (https://www.virtualbox.org/)
3) git clone https://github.com/stealthly/dropwizard-kafka-http
4) cd dropwizard-kafka-http
5) vagrant up

Then produce and consume messages over HTTP e.g.

curl -d "topic=http&message=hello&key=0" "http://192.168.22.10:8080/message"

and

curl "http://192.168.22.10:8080/message?topic=http"

Your original message produced from the consumer:

[{"topic":"http","key":"0","message":"hello","partition":0,"offset":0}]

* Zookeeper will be running on 192.168.22.5
* dropwizard-kafka-http is built cleanly before Zookeeper installs in `vagrant/zk.sh`
* Broker One is running on 192.168.22.10
* dropwizard-kafka-http launches on 192.168.22.10 after the Kafka Broker starts in `vagrant/broker.sh`

If you want you can login to the machines using `vagrant ssh zookeeper` and `vagrant ssh brokerOne`.

API Methods

Produce messages

POST /message?topic=$topic&async={true|false}&message=$message0&key=$key0,
message=$message1,key=$key1, …

Produces messages on topic.

Parameters:
topic – required topic name
async – optional true|false value indicating should the producer be async
key – required key text. Multiple values are possible
message – required message text. Multiple values are possible

Note: passed keys count should match messages count.

Errors:
400 – wrong parameters passed

Consume messages

GET /message?topic=$topic&timeout=$timeout
Returns consumed messages available on topic.

Parameters:
topic – required topic name
timeout – optional timeout in ms

Errors:
400 – wrong parameters passed

/*******************************************
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 Twitter: @allthingshadoop
********************************************/
Categories: Open Source Projects

Developing Apache Kafka Producers and Consumers

December 21, 2013 Leave a comment

I gave a presentation recently on Real-time streaming and data pipelines with Apache Kafka.

A correction in the talk (~ 22 minutes in) : I said that you have to have all your topic data fit on one server.  That is not true, you can’t span logs so you have to have all of your data for a partition fit on one server.  Kafka will spread your partitions around for you within topics.

For that presentation I put together sample code for producing and consuming with an Apache Kafka broker using Scala.

To get up and running, use vagrant.

1) Install Vagrant http://www.vagrantup.com/
2) Install Virtual Box https://www.virtualbox.org/

git clone https://github.com/stealthly/scala-kafka
cd scala-kafka
vagrant up
./sbt test

Your entry point is the test file

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package ly.stealth.testing

import org.specs2.mutable._
import java.util.UUID
import kafka.consumer._
import kafka.producer._
import kafka.utils._
import kafka.akka._
import akka.actor.{Actor, Props, ActorSystem}
import akka.routing.RoundRobinRouter

class KafkaSpec extends Specification with Logging {

  "Simple Producer and Consumer" should {
    "send string to broker and consume that string back" in {
      val testMessage = UUID.randomUUID().toString
      val testTopic = UUID.randomUUID().toString
      val groupId_1 = UUID.randomUUID().toString

      var testStatus = false

      info("starting sample broker testing")
      val producer = new KafkaProducer(testTopic,"192.168.86.10:9092")
      producer.sendString(testMessage)

      val consumer = new KafkaConsumer(testTopic,groupId_1,"192.168.86.5:2181")

      def exec(binaryObject: Array[Byte]) = {
        val message = new String(binaryObject)
        info("testMessage = " + testMessage + " and consumed message = " + message)
        testMessage must_== message
        consumer.close()
        testStatus = true
      }

      info("KafkaSpec is waiting some seconds")
      consumer.read(exec)
      info("KafkaSpec consumed")

      testStatus must beTrue // we need to get to this point but a failure in exec will fail the test
    }

    "send string to broker and consume that string back in different consumer groups" in {
      val testMessage = UUID.randomUUID().toString
      val testTopic = UUID.randomUUID().toString
      val groupId_1 = UUID.randomUUID().toString
      val groupId_2 = UUID.randomUUID().toString

      var testStatus1 = false
      var testStatus2 = false

      info("starting sample broker testing")
      val producer = new KafkaProducer(testTopic,"192.168.86.10:9092")
      producer.sendString(testMessage)

      val consumer1 = new KafkaConsumer(testTopic,groupId_1,"192.168.86.5:2181")

      def exec1(binaryObject: Array[Byte]) = {
        val message1 = new String(binaryObject)
        info("testMessage 1 = " + testMessage + " and consumed message 1 = " + message1)
        testMessage must_== message1
        consumer1.close()
        testStatus1 = true
      }

      info("KafkaSpec : consumer 1 - is waiting some seconds")
      consumer1.read(exec1)
      info("KafkaSpec : consumer 1 - consumed")

      val consumer2 = new KafkaConsumer(testTopic,groupId_2,"192.168.86.5:2181")

      def exec2(binaryObject: Array[Byte]) = {
        val message2 = new String(binaryObject)
        info("testMessage 2 = " + testMessage + " and consumed message 2 = " + message2)
        testMessage must_== message2
        consumer2.close()
        testStatus2 = true
      }

      info("KafkaSpec : consumer 2 - is waiting some seconds")
      consumer2.read(exec2)
      info("KafkaSpec : consumer 2 - consumed")

      testStatus2 must beTrue // we need to get to this point but a failure in exec will fail the test
    }
  }

  "Akka Producer and Consumer" should {
    "send string to broker and consume that string back in different consumer groups" in {
      val testMessage = UUID.randomUUID().toString
      val testTopic = UUID.randomUUID().toString
      val groupId_1 = UUID.randomUUID().toString
      val groupId_2 = UUID.randomUUID().toString

      var testStatus1 = false
      var testStatus2 = false

      info("starting akka producertesting")
      val system = ActorSystem("testing")

      val actorCount = 1

      val producer = system.actorOf(Props[KafkaAkkaProducer].withRouter(RoundRobinRouter(actorCount)), "router")

      1 to actorCount foreach { i =>(
        producer ! (testTopic,"192.168.86.10:9092"))
      }

      producer ! testMessage

      val consumer1 = new KafkaConsumer(testTopic,groupId_1,"192.168.86.5:2181")

      def exec1(binaryObject: Array[Byte]) = {
        val message1 = new String(binaryObject)
        info("testMessage 1 = " + testMessage + " and consumed message 1 = " + message1)
        testMessage must_== message1
        consumer1.close()
        testStatus1 = true
      }

      info("KafkaSpec : consumer 1 - is waiting some seconds")
      consumer1.read(exec1)
      info("KafkaSpec : consumer 1 - consumed")

      val consumer2 = new KafkaConsumer(testTopic,groupId_2,"192.168.86.5:2181")

      def exec2(binaryObject: Array[Byte]) = {
        val message2 = new String(binaryObject)
        info("testMessage 2 = " + testMessage + " and consumed message 2 = " + message2)
        testMessage must_== message2
        consumer2.close()
        testStatus2 = true
      }

      info("KafkaSpec : consumer 2 - is waiting some seconds")
      consumer2.read(exec2)
      info("KafkaSpec : consumer 2 - consumed")

      testStatus2 must beTrue // we need to get to this point but a failure in exec will fail the test
    }
  }
}

On the producer side I have started to look more into using Akka. The prototype for this implementation is in the test case above “Akka Producer and Consumer” and broken out here

  "Akka Producer and Consumer" should {
    "send string to broker and consume that string back in different consumer groups" in {
      val testMessage = UUID.randomUUID().toString
      val testTopic = UUID.randomUUID().toString
      val groupId_1 = UUID.randomUUID().toString
      val groupId_2 = UUID.randomUUID().toString

      var testStatus1 = false
      var testStatus2 = false

      info("starting akka producertesting")
      val system = ActorSystem("testing")

      val actorCount = 1

      val producer = system.actorOf(Props[KafkaAkkaProducer].withRouter(RoundRobinRouter(actorCount)), "router")

      1 to actorCount foreach { i =>(
        producer ! (testTopic,"192.168.86.10:9092"))
      }

      producer ! testMessage

      val consumer1 = new KafkaConsumer(testTopic,groupId_1,"192.168.86.5:2181")

      def exec1(binaryObject: Array[Byte]) = {
        val message1 = new String(binaryObject)
        info("testMessage 1 = " + testMessage + " and consumed message 1 = " + message1)
        testMessage must_== message1
        consumer1.close()
        testStatus1 = true
      }

      info("KafkaSpec : consumer 1 - is waiting some seconds")
      consumer1.read(exec1)
      info("KafkaSpec : consumer 1 - consumed")

      val consumer2 = new KafkaConsumer(testTopic,groupId_2,"192.168.86.5:2181")

      def exec2(binaryObject: Array[Byte]) = {
        val message2 = new String(binaryObject)
        info("testMessage 2 = " + testMessage + " and consumed message 2 = " + message2)
        testMessage must_== message2
        consumer2.close()
        testStatus2 = true
      }

      info("KafkaSpec : consumer 2 - is waiting some seconds")
      consumer2.read(exec2)
      info("KafkaSpec : consumer 2 - consumed")

      testStatus2 must beTrue // we need to get to this point but a failure in exec will fail the test
    }
  }
}

What is really nice is you can up that actorCount and really start to run test data to analyze.

/*******************************************
Joe Stein
Founder, Principal Consultant
Big Data Open Source Security LLC
http://www.stealth.ly
Twitter: @allthingshadoop
********************************************/

Getting started with Apache Mesos and Apache Aurora using Vagrant

December 16, 2013 2 comments

Apache Mesos is a cluster manager that provides efficient resource isolation and sharing across distributed applications, or frameworks. Think of it as the “kernel” for your data center. Paco Nathan talked about this on one of the All Things Hadoop Podcasts.

Features:

  • Fault-tolerant replicated master using ZooKeeper
  • Scalability to 10,000s of nodes
  • Isolation between tasks with Linux Containers
  • Multi-resource scheduling (memory and CPU aware)
  • Java, Python and C++ APIs for developing new parallel applications
  • Web UI for viewing cluster state

Apache Aurora is a service scheduler that runs on top of Mesos, enabling you to run long-running services that take advantage of Mesos’ scalability, fault-tolerance, and resource isolation. Apache Aurora is currently a part of the Apache Incubator.  The main benefits to a Mesos scheduler like Aurora (and Marathon) is not having to worry about using the Mesos API to take advantage of the grid.  Your application can work the way it does today while Mesos figures out what server(s) to run it on and when to scale that differently from the scheduler.

Features:

  • Deployment and scheduling of jobs
  • The abstraction a “job” to bundle and manage Mesos tasks
  • A rich DSL to define services
  • Health checking
  • Failure domain diversity
  • Instant provisioning

First you need to make sure that you have vagrant and virtual box installed, if you don’t already have these installed then install them please.

1) Install Vagrant http://www.vagrantup.com/
2) Install Virtual Box https://www.virtualbox.org/

That is all you need (assuming you also have git installed). Everything else from here is going to be done from within the virtual machine.


git clone https://github.com/apache/incubator-aurora
cd incubator-aurora
vagrant up

The virtual machines will take some time to spin up so hang tight.

Once the virtual machines are launched you will have your command prompt back and ready to go.

There are 5 vms that are launched: devtools, zookeeper, mesos-master, mesos-slave and aurora-scheduler and they are all configured and networked together (for more info on this check out the Vagrantfile).

Next step is to create an app on the scheduler to provision it to the Mesos cluster that is running.


vagrant ssh aurora-scheduler
vagrant@precise64:~$ cd /vagrant/examples/jobs/
vagrant@precise64:~$aurora create example/www-data/prod/hello hello_world.aurora
 INFO] Creating job hello
 INFO] Response from scheduler: OK (message: 1 new tasks pending for job www-data/prod/hello)
 INFO] Job url: http://precise64:8081/scheduler/www-data/prod/hello

Now go to your browser and pull up http://192.168.33.5:8081/scheduler/www-data/prod/hello and you’ll see your job running

Screen Shot 2013-12-16 at 6.09.42 AM

Basically all of what is happening is in the configuration

hello = Process(
  name = 'hello',
  cmdline = """
    while true; do
      echo hello world
      sleep 10
    done
  """)

task = SequentialTask(
  processes = [hello],
  resources = Resources(cpu = 1.0, ram = 128*MB, disk = 128*MB))

jobs = [Service(
  task = task, cluster = 'example', role = 'www-data', environment = 'prod', name = 'hello')]

It is an exciting time for virtualization and resource scheduling and process provisioning within infrastructures. It is all open source so go dig in and see how it all works for yourself.

/*******************************************
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 Twitter: @allthingshadoop
********************************************/

Using Vagrant to get up and running with Apache Kafka

December 7, 2013 Leave a comment

If you haven’t used Vagrant before then hopefully this will not only help you get up and running with Apache Kafka but also get introduced to Vagrant.  Vagrant helps to create and configure lightweight, reproducible, and portable development environments. With Vagrant run a single command — “vagrant up” — and sit back as Vagrant puts together your complete development environment. Say goodbye to the “works on my machine” excuse as Vagrant creates identical development environments for everyone on your team. To learn more about Apache Kafka checkout our website.

To get up and running with Apache Kafka using Vagrant

1) Install Vagrant http://www.vagrantup.com/
2) Install Virtual Box https://www.virtualbox.org/

For the latest release of Apache Kafka 0.8.0 I have added the ability to use Vagrant.

1) git clone https://github.com/stealthly/kafka
2) cd kafka
3) ./sbt update
4) ./sbt package
5) ./sbt assembly-package-dependency
6) vagrant up

Once this is done (it takes a little while for vagrant to launch the four virtual machines)

  • Zookeeper will be running 192.168.50.5
  • Broker 1 on 192.168.50.10
  • Broker 2 on 192.168.50.20
  • Broker 3 on 192.168.50.30

When you are all up and running you will be back at a command prompt.

If you want you can login to the machines using vagrant ssh <machineName> but you don’t need to.

You can access the brokers and zookeeper by their IP

e.g.


bin/kafka-create-topic.sh --zookeeper 192.168.50.5:2181 --replica 3 --partition 1 --topic sandbox

bin/kafka-console-producer.sh --broker-list 192.168.50.10:9092,192.168.50.20:9092,192.168.50.30:9092 --topic sandbox

bin/kafka-console-consumer.sh --zookeeper 192.168.50.5:2181 --topic sandbox --from-beginning

A patch is pending for the 0.8.1 release to have this go back upstream.

/*******************************************
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 Twitter: @allthingshadoop
********************************************/
Categories: Open Source Projects

Technology Decisions Are About Trade Offs and Solving Problems

December 6, 2013 Leave a comment

At some point in the last decade we hit the inflection point where distributed systems, and all their complexities, became the common reality.

Maybe it was the need to change how we scale since CPU clocks are not getting any faster… Maybe it was the Google MapReduce and/or Amazon Dynamo papers… Or maybe it was just the RedSox winning the world series. It doesn’t really matter because we now live in a world where distributed computing is accessible to everyone.

Often the technical arguments around distributed computing are not wrong just rather they do not have proper context. Solutions presented should explain how the technologies/languages solve problems based on the context of their implementation and the trade offs they make.

So much has become solution, solution, solution, without separating and getting down to the problems and then linking problem to solution. Solutions (or technical warnings) come across best when you explain them in context and explain the downside of the solution (so what trade off you accepted for your context).

A few quick examples of this before we continue

 

  • Defending locks as necessary, even superior to a lockless environment. Locks have many well-known and well-documented downsides. They are also used in many systems and have well-documented and well-known benefits. So, context is key here. There are plenty of times when it makes sense to use locks to solve a particular problem. Many people probably have the same problem you have and it is often a complex implementation so they can learn from what you did with and/or without locks that was wrong and what you did that was right.
  • Ripping a language by pointing out everything that is bad about it and saying not to use it. Don’t just talk about problems without providing another solution. Sure, that language might not make sense (for you) when building some type of specific application, but that doesn’t mean it has zero benefits.
  • Promoting how you had a problem with using a technology so instead of realizing that maybe you were doing something wrong you went in and started to question that technology… like sending an email to a mailing list saying that you have “doubts” about the technology when maybe you just don’t understand it.

Just because it doesn’t make sense for you and the context of your domain and that your not willing to accept the trade offs doesn’t make technologies or languages bad or wrong… nor does it make it right.

I love a good informed discussion and debate about technologies that contrast and overlap.  I especially like it when it is not clear which is better for a specific use case. It’s even better when two (or more) technologies each only solve 80% of the problem and do so differently.  This leaves you having to write your own software. And THAT is where the magic happens!

If you can’t talk about the downside and the trade offs then you’re just a fan person waiving a coffee mug around.  That type of behavior really doesn’t benefit anyone.

Context Matters

 

Amarillo Slim said it best “If you can’t spot the sucker within the first half hour at the table, then you are the sucker.”

So, If your’re building a website for your baseball card collection then it doesn’t matter if you use FileMaker Pro, MySQL, Oracle, MongoDB, Cassandra or HBase for your backend. Only your friends and family are going to care if it goes down, loses data or can’t handle load when your entire Facebook and Twitter circles decide to login all at the same time…

Now, lets say you’re building a website (off the top of my head) for signing individuals up for healthcare (hypothetically). Now, if I suggested to write that system in COBOL on a bank of AS400s then I think it would be fair to say that everyone would think that I am crazy and be like “Joe, the sky is blue stop trying to argue it is not”. A COBOL system running on an AS400 is fine if you can find resources willing to work on it and support it with enough capital investment to work around the trade offs.

At the end of the day, computer problems are solved by people and how they apply the technologies as much as the technologies and languages themselves. If you expect tens of thousands or even millions of people all trying to sign up at once (because you have a big opening day launch) then it really doesn’t matter what your system does after that nor what it was written in, what backend it has whether it is cloud based or bare metal or whatever because if they can’t even login or signup then its a bust. The problems nor the trade offs were originally understood so arguing about the technologies/languages being wrong or right is superfluous. The root cause for the failure was a lack of understanding of the problem.

Every technology can be argued to have a use… so let’s stop trashing technologies and be a bit more understanding and focused with our approach. Boil it down to placing technologies (and languages) into their proper category for what they could be good for and what they are not good for. This applies to both cheering a technology/language along and warning people from it. And don’t forget academics and scholarly (both in teaching and research/advancement of the field) solutions. You might do something in school that make sense because it is the best way to learn but that is not actually the solution you would use in other domains.

Focus on the problems and manage the acceptable trade offs to solve those problems

 

So after you get past “is this technically possible” you then have to decide on trade offs and objectively how it affects your context.

If you are not looking at the trade offs then you are doing something wrong. There are always trade offs.

  • What are your real goals? Define them! Be clear with what you are trying to achieve and focus on the problems. Don’t create problems to solve…. Please Please Please, don’t create problems to solve that are not domain driven. And if you do, then don’t promote them as solutions.
  • How much downtime can you afford? Don’t say zero because that is not reality for anyone. Quantify and explain this. Don’t just go and say zero down time rather figure out per second how much money you lose by being down… or what else you might lose for your system being down for how long and quantify and communicate that and why. Apply this not just to the system but each sub-system and possibly every component within each sub-system.
  • How much data is acceptable to lose? If the answer is none then how much availability of the system are you willing to sacrifice? You can have zero loss of data but not without sacrificing availability; however, if availability is more important than how much data you are willing to lose then you have to accept the trade off. For more on this check out Henry’ Robinson’s CAP Confusion: Problems with ‘partition tolerance’ blog post “Partition tolerance means simply developing a coping strategy by choosing which of the other system properties to drop. This is the real lesson of the CAP theorem – if you have a network that may drop messages, then you cannot have both availability and consistency, you must choose one”
  • What are the expectations of the users? Explain this because users provide the domain and the context for what your system does and doesn’t do. What are real requirements vs nice/fancy/flashy/things to have?
  • What is the makeup of your existing team? What is the market for potential new hires? What if you try to pick all the shiny “new awesome” technologies and languages for a solution? What happens when you have to scale the team and you are not able to hire anyone that knows that skill set and no one is interested in adding it to their resume? Is it still shiny and awesome? On the flip side are you using tech that could be antiquated so even though your existing resources are comfortable using the antiquated tech will you have trouble hiring talent moving forward to support it?
  • Do you have existing code and an existing business? Do you have to maintain the code and business with new projects? What can you throw away?  How long do you have to wait before you can throw something away?
  • Are you solving problems you really have or problems you perceive to have?

“Programmers waste enormous amounts of time thinking about, or worrying about, the speed of noncritical parts of their programs, and these attempts at efficiency actually have a strong negative impact when debugging and maintenance are considered. We should forget about small efficiencies, say about 97% of the time: premature optimization is the root of all evil. Yet we should not pass up our opportunities in that critical 3%.” – Donald Knuth

Technology Decisions Are About Trade Offs and Solving Problems

 

For me I have my own opinions and my own abilities for solving problems. I don’t always get to work on the use cases I like and sometimes sure, I get use cases that really just completely and totally suck that require me to use technologies and languages that make me vomit in my mouth a little. But, when its all done, if I can still look at myself in the mirror and be proud and feel accomplished with the end result and also that the expectations were exceeded without compromising too much with the trade offs made then solution == awesome.

So with all my rants and thoughts and opinions I leave you to focus on your problems and explain them in context to what you are doing and the trade offs you accepted. Thanks to Camille Fournier, Jay Kreps, Alex Popescu and Steven Gravitz for reviewing this blog post to keep me honest and pull me off the ledge and help with some garbage collection in the process.

/*******************************************
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 Twitter: @allthingshadoop
********************************************/

Using Cassandra from Scala

November 16, 2013 5 comments

I have started a new open source project https://github.com/stealthly/scala-cassandra that is a Scala wrapper for CQL  specifically its a wrapper of the DataStax java-driver.

Test cases are a good entry point so lets start here https://github.com/stealthly/scala-cassandra/blob/master/src/test/scala/ScalaCassandraSpec.scala

class ScalaCassandraSpec extends Specification {

    CQL.init()
    CQL.startup("MetaStore")
    Meta.createTable()

    "Meta objects" should {
      "be able to store & retrieve their binary state" in {

      //we use a Thrift object here for portability of the data stored
      val tMeta = new TMeta()

      //setting up some randomness so we can confirm what we are writing is what we get back
      val metaUUID = UUID.randomUUID()
      val dataum = Random.alphanumeric.take(1000000).mkString

      tMeta.setId(metaUUID.toString)
      tMeta.setDatum(dataum)

      Meta.save(tMeta) //Saved to C*

      val someNewTMeta = Meta(metaUUID)
      someNewTMeta.getId() must_== metaUUID.toString

      someNewTMeta.getDatum() must_== dataum
      }
    }
}

The Meta class (contained in the https://github.com/stealthly/scala-cassandra/blob/master/src/main/scala/MetaDAO.scala file) is like a Widget… Whats a Widget? For us lets consider its like “sample code”.

When you want to add something different than a widget create a new thrift file in the `thrift/interface` directory

Then in that directory (lets say you created an IDL called Music.thrift) then run `thrift -gen java Music.thrift` which we did for you already and copy the jar outputted from a `mvn package` to the `lib` folder so you don’t have to worry about it. You can even just keep using the Meta implementaiotn and just shove JSON or XML or whatever you want into it; however, it makes more sense to partition the objects some so you can create wider rows with a collection key in your table along with your partition key. You can also flatten the things by storing a HashMap on the table and retrieving it.

http://stealth.ly

Using Apache Drill for Large Scale, Interactive, Real-Time Analytic Queries

October 29, 2013 Leave a comment

Episode #17 of the podcast is a talk with Jacques Nadeau  available also on iTunes

Apache Drill http://incubator.apache.org/drill/, a modern interactive query engine that runs on top of Hadoop.

Jacques talked about how Apache Drill is a modern query engine that is meant to be a query layer on top of all big data open source systems. Apache Drill is being designed to make the storage engine as plug-able so it could be the interface for any big data storage engine. The first release came out recently to allow developers to understand the data pipeline.

Leveraging an efficient columnar storage format, an optimistic execution engine and a cache-conscious memory layout, Apache Drill is blazing fast. Coordination, query planning, optimization, scheduling, and execution are all distributed throughout nodes in a system to maximize parallelization.

drill_runtime

Perform interactive analysis on all of your data, including nested and schema-less. Drill supports querying against many different schema-less data sources including HBase, Cassandra and MongoDB. Naturally flat records are included as a special case of nested data.

json

Strongly defined tiers and APIs for straightforward integration with a wide array of technologies.

arch

Subscribe to the podcast and listen to what Jacques had to say.  Available also on iTunes

/*********************************
Joe Stein
Founder, Principal Consultant
Big Data Open Source Security LLC
http://www.stealth.ly
Twitter: @allthingshadoop
**********************************/

Real-Time Data Pipelines and Analytics with Apache Kafka and Apache Samza

September 17, 2013 Leave a comment

Episode #16 of the podcast is a talk with Jay Kreps  available also on iTunes

Jay talked about the open source work he has done while @ LinkedIn

Including

Most of the conversation was about the Apache Kafka pipeline and the use of Apache Samza for processing it.

Apache Kafka

http://kafka.apache.org/documentation.html#introduction

Kafka is a distributed, partitioned, replicated commit log service. It provides the functionality of a messaging system, but with a unique design.

What does all that mean?

First let’s review some basic messaging terminology:

  • Kafka maintains feeds of messages in categories called topics.
  • We’ll call processes that publish messages to a Kafka topic producers.
  • We’ll call processes that subscribe to topics and process the feed of published messages consumers..
  • Kafka is run as a cluster comprised of one or more servers each of which is called a broker.

So, at a high level, producers send messages over the network to the Kafka cluster which in turn serves them up to consumers like this:

Communication between the clients and the servers is done with a simple, high-performance, language agnosticTCP protocol. We provide a java client for Kafka, but clients are available in many languages.

Topics and Logs

Let’s first dive into the high-level abstraction Kafka provides—the topic.

A topic is a category or feed name to which messages are published. For each topic, the Kafka cluster maintains a partitioned log that looks like this:

Each partition is an ordered, immutable sequence of messages that is continually appended to—a commit log. The messages in the partitions are each assigned a sequential id number called the offset that uniquely identifies each message within the partition.

The Kafka cluster retains all published messages—whether or not they have been consumed—for a configurable period of time. For example if the log retention is set to two days, then for the two days after a message is published it is available for consumption, after which it will be discarded to free up space. Kafka’s performance is effectively constant with respect to data size so retaining lots of data is not a problem.

In fact the only metadata retained on a per-consumer basis is the position of the consumer in in the log, called the “offset”. This offset is controlled by the consumer: normally a consumer will advance its offset linearly as it reads messages, but in fact the position is controlled by the consumer and it can consume messages in any order it likes. For example a consumer can reset to an older offset to reprocess.

This combination of features means that Kafka consumers are very cheap—they can come and go without much impact on the cluster or on other consumers. For example, you can use our command line tools to “tail” the contents of any topic without changing what is consumed by any existing consumers.

Apache Samza

http://samza.incubator.apache.org/

Apache Samza is a distributed stream processing framework. It uses Apache Kafka for messaging, and Apache Hadoop YARN to provide fault tolerance, processor isolation, security, and resource management.

  • Simple API: Unlike most low-level messaging system APIs, Samza provides a very simple call-back based “process message” API that should be familiar to anyone that’s used Map/Reduce.
  • Managed state: Samza manages snapshotting and restoration of a stream processor’s state. Samza will restore a stream processor’s state to a snapshot consistent with the processor’s last read messages when the processor is restarted.
  • Fault tolerance: Samza will work with YARN to restart your stream processor if there is a machine or processor failure.
  • Durability: Samza uses Kafka to guarantee that messages will be processed in the order they were written to a partition, and that no messages will ever be lost.
  • Scalability: Samza is partitioned and distributed at every level. Kafka provides ordered, partitioned, re-playable, fault-tolerant streams. YARN provides a distributed environment for Samza containers to run in.
  • Pluggable: Though Samza works out of the box with Kafka and YARN, Samza provides a pluggable API that lets you run Samza with other messaging systems and execution environments.
  • Processor isolation: Samza works with Apache YARN, which supports processor security through Hadoop’s security model, and resource isolation through Linux CGroups.

Check out Hello Samza to try Samza. Read the Background page to learn more about Samza.

Subscribe to the podcast and listen to what Jay had to say.  Available also on iTunes

/*********************************
Joe Stein
Founder, Principal Consultant
Big Data Open Source Security LLC
http://www.stealth.ly
Twitter: @allthingshadoop
**********************************/

Big Data, Open Source and Analytics

August 26, 2013 Leave a comment

Episode #15 of the podcast is a talk with Stefan Groschupf  available also on iTunes

Stefan is the CEO of Datameer and talked about how the company started and where it is now. Founded in 2009 by some of the original contributors to Apache Hadoop, Datameer has grown to a global team, advancing big data analytics. After several implementations of Hadoop analytics solutions at Global 500 companies, the founders were determined to build the next generation analytics application to solve the new use cases created by the explosion of structured and unstructured data. Datameer is the single application for big data analytics by combining data integration, data transformation and data visualization. Customers love us and we work to make Datameer even better each day.

Datameer provides the most complete solution to analyze structured and unstructured data. Not limited by a pre-built schema, the point and click functions means your analytics are only limited by your imagination. Even the most complex nested joins of a large number of datasets can be performed using an interactive dialog. Mix and match analytics and data transformations in unlimited number of data processing pipelines. Leave the raw data untouched.

Datameer turbocharges time series analytics by correlating multiple sets of complex, disparate data. Resulting analytics are endless including correlation of credit card transactions with card holder authorizations, network traffic data, marketing interaction data and many more. The end game is a clear window into the operations of your business, giving you the actionable insights you need to make business decisions.

some alt text

Data is the raw materials of insight and the more data you have, the deeper and broader the possible insights. Not just traditional, transaction data but all types of data so that you can get a complete view of your customers, better understand business processes and improve business performance.

Datameer ignores the limitations of ETL and static schemas to empower business users to integrate data from any source into Hadoop. Pre-built data connector wizards for all common structured and unstructured data sources means that data integration is an easy, three step process of where, what and when.

App Market Infographics

Now you never have to waste precious time by starting from scratch. Anyone can simply browse the Analytics App Market, download an app, connect to data, and get instant results. But why stop there? Every application is completely open so you can customize it, extend it, or even mash it up with other applications to get the insights you need.

Built by data scientists, analysts, or subject matter experts, analytic apps range from horizontal use cases like email and social sentiment analysis to vertical or even product-specific applications like advanced Salesforce.com sales-cycle analysis.

Check out the Datameer app market.

Subscribe to the podcast and listen to what Stefan had to say.  Available also on iTunes

/*********************************
Joe Stein
Founder, Principal Consultant
Big Data Open Source Security LLC
http://www.stealth.ly
Twitter: @allthingshadoop
**********************************/

Categories: Hadoop, Podcast

SQL Compatibility in Hadoop with Hive

August 15, 2013 Leave a comment

Episode #14 of the podcast is a talk with Alan Gates available also on iTunes

The Stinger initiative is a collection of development threads in the Hive community that will deliver 100X performance improvements as well as SQL compatibility.

Fast Interactive Query
An immediate aim of 100x performance increase for Hive is more ambitious than any other effort.
SQL Compatibility
Based on industry standard SQL, the Stinger Initiative improves HiveQL to deliver SQL compatibility.

Apache Hive is the de facto standard for SQL-in-Hadoop today with more enterprises relying on this open source project than any alternative. As Hadoop gains in popularity, enterprise requirements for Hive to become more real time or interactive have evolved… and the Hive community has responded.

He spoke in detail about the Stinger initiative, who is contributing to it, why they decided to improve upon Hive and not create a new system and more.

He talked about how Microsoft is contributing in the open source community to improve upon Hive.

Hadoop is so much more than just SQL, one of the wonderful things about Big Data is the power it brings for users to bring different processing models such as realtime streaming with Storm, Graph processing with Giraph and ETL with Pig and all different things to-do beyond just this SQL compatibility.

Alan also talked about YARN and Tez and the benefits of the Stinger initiative to other Hadoop ecosystem tools too.

Subscribe to the podcast and listen to what Alan had to say.  Available also on iTunes

/*********************************
Joe Stein
Founder, Principal Consultant
Big Data Open Source Security LLC
http://www.stealth.ly
Twitter: @allthingshadoop
**********************************/

Categories: Hadoop, Hive, Podcast

Apache Zookeeper, Distributed Systems, Open Source and more with Camille Fournier

August 13, 2013 Leave a comment

Episode #13 of the podcast is a talk with Camille Fournier Available also on iTunes

Apache Zookeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services. All of these kinds of services are used in some form or another by distributed applications. Each time they are implemented there is a lot of work that goes into fixing the bugs and race conditions that are inevitable. Because of the difficulty of implementing these kinds of services, applications initially usually skimp on them ,which make them brittle in the presence of change and difficult to manage.  Even when done correctly, different implementations of these services lead to management complexity when the applications are deployed.

Camille talked about discovery services, distributed locking as well as some tips to developing against and operating Zookeeper in production including how to build a Global, Highly Available Service Discovery Infrastructure with ZooKeeper which she also wrote about on her blog http://whilefalse.blogspot.com/2012/12/building-global-highly-available.html.

Camille gave some great insights about how to apply Open Source community practices to an organization’s SDLC to foster a better culture for better products and services where all developers need to own more parts of their software  (like it is in Open Source projects). #devops #qaops #userops

Subscribe to the podcast and listen to what Camille had to say.  Available also on iTunes

/*********************************
Joe Stein
Founder, Principal Consultant
Big Data Open Source Security LLC
http://www.stealth.ly
Twitter: @allthingshadoop
**********************************/

Categories: Podcast, Zookeeper

Apache BigTop and how packaging infrastructure binds the Hadoop ecosystem together

August 12, 2013 Leave a comment

Episode #12 of the podcast is a talk with Mark Grover and Roman Shaposhnik  Available also on iTunes

Apache Bigtop is a project for the development of packaging and tests of the Apache Hadoop ecosystem.

The primary goal of Bigtop is to build a community around the packaging and interoperability testing of Hadoop-related projects. This includes testing at various levels (packaging, platform, runtime, upgrade, etc…) developed by a community with a focus on the system as a whole, rather than individual projects.

BigTop makes it easier to deploy Hadoop Ecosystem projects including:

  • Apache Zookeeper

  • Apache Flume

  • Apache HBase

  • Apache Pig

  • Apache Hive

  • Apache Sqoop

  • Apache Oozie

  • Apache Whirr

  • Apache Mahout

  • Apache Solr (SolrCloud)

  • Apache Crunch (incubating)

  • Apache HCatalog

  • Apache Giraph

  • LinkedIn DataFu

  • Cloudera Hue

The list of supported Linux platforms has expanded to include:

  • CentOS/RHEL 5 and 6

  • Fedora 17 and 18

  • SuSE Linux Enterprise 11

  • OpenSUSE 12.2

  • Ubuntu LTS Lucid (10.04) and Precise (12.04)

  • Ubuntu Quantal (12.10)

Subscribe to the podcast and listen to what Mark and Roman had to say.  Available also on iTunes

/*********************************
Joe Stein
Founder, Principal Consultant
Big Data Open Source Security LLC
http://www.stealth.ly
Twitter: @allthingshadoop
**********************************/

Hadoop as a Service cloud platform with the Mortar Framework and Pig

August 9, 2013 Leave a comment

Episode #11 of the podcast is a talk with K Young.  Available also on iTunes

Mortar is the fastest and easiest way to work with Pig and Python on Hadoop in the Cloud.

Mortar’s platform is for everything from joining and cleansing large data sets to machine learning and building recommender systems.

Mortar makes it easy for developers and data scientists to do powerful work with Hadoop. The main advantages of Mortar are:

  • Zero Setup Time: Mortar takes only minutes to set up (or no time at all on the web), and you can start running Pig jobs immediately. No need for painful installation or configuration.
  • Powerful Tooling: Mortar provides a rich suite of tools to aid in Pig development, including the ability to Illustrate a script before running it, and an extremely fast and free local development mode.
  • Elastic Clusters: We spin up Hadoop clusters as you need them, so you don’t have to predict your needs in advance, and you don’t pay for machines you don’t use.
  • Solid Support: Whether the issue is in your script or in Hadoop, we’ll help you figure out a solution.

We talked about the Open Source Mortar Framework and their new Open Source tool for visualizing data while writing Pig scripts called Watchtower

The Mortar Blog has a great video demo on Watchtower.

There are no two ways around it, Hadoop development iterations are slow. Traditional programmers have always had the benefit of re-compiling their app, running it, and seeing the results within seconds. They have near instant validation that what they’re building is actually working. When you’re working with Hadoop, dealing with gigabytes of data, your development iteration time is more like hours.

Subscribe to the podcast and listen to what K Young had to say.  Available also on iTunes

/*********************************
Joe Stein
Founder, Principal Consultant
Big Data Open Source Security LLC
http://www.stealth.ly
Twitter: @allthingshadoop
**********************************/

Categories: Hadoop, Pig, Podcast

Hadoop, The Cloudera Development Kit, Parquet, Apache BigTop and more with Tom White

August 2, 2013 Leave a comment

Episode #10 of the podcast is a talk with Tom White.  Available also on iTunes

We talked a lot about The Cloudera Development Kit http://github.com/cloudera/cdk, or CDK for short, which is a set of libraries, tools, examples, and documentation focused on making it easier to build systems on top of the Hadoop ecosystem.

The goals of the CDK are:

  • Codify expert patterns and practices for building data-oriented systems and applications.
  • Let developers focus on business logic, not plumbing or infrastructure.
  • Provide smart defaults for platform choices.
  • Support piecemeal adoption via loosely-coupled modules.

Eric Sammer recorded a webinar in which he talks about the goals of the CDK.

This project is organized into modules. Modules may be independent or have dependencies on other modules within the CDK. When possible, dependencies on external projects are minimized.

We also talked about Parquet http://parquet.io/ which was created  to make the advantages of compressed, efficient columnar data representation available to any project in the Hadoop ecosystem, regardless of the choice of data processing framework, data model, or programming language.  Parquet is built from the ground up with complex nested data structures in mind, and uses the repetition/definition level approach to encoding such data structures, as popularized by Google Dremel. We believe this approach is superior to simple flattening of nested name spaces.

Parquet is built to support very efficient compression and encoding schemes. Parquet allows compression schemes to be specified on a per-column level, and is future-proofed to allow adding more encodings as they are invented and implemented. We separate the concepts of encoding and compression, allowing parquet consumers to implement operators that work directly on encoded data without paying decompression and decoding penalty when possible.

Tom talked about Apache BigTop too http://bigtop.apache.org/ Bigtop is a project for the development of packaging and tests of the Apache Hadoop ecosystem.  The primary goal of Bigtop is to build a community around the packaging and interoperability testing of Hadoop-related projects. This includes testing at various levels (packaging, platform, runtime, upgrade, etc…) developed by a community with a focus on the system as a whole, rather than individual projects.

Subscribe to the podcast and listen to what Tom had to say.  Available also on iTunes

/*********************************
Joe Stein
Founder, Principal Consultant
Big Data Open Source Security LLC
http://www.stealth.ly
Twitter: @allthingshadoop
**********************************/

Hadoop, Mesos, Cascading, Scalding, Cascalog and Data Science with Paco Nathan

July 30, 2013 Leave a comment

Episode #9 of the podcast is a talk with Paco Nathon.  Available also on iTunes

We talked about how he got started with Hadoop with Natural Language Processing back in 2007 with text analytics.

And then starting talking about Mesos http://mesos.apache.org/

Apache Mesos is a cluster manager that provides efficient resource isolation and sharing across distributed applications, or frameworks. It can run Hadoop, MPI, Hypertable, Spark, and other applications on a dynamically shared pool of nodes.

We talked a little about the difference between YARN and Mesos.  Paco talked about how Mesos is lower in the stack and part of the operating system where YARN is higher up in the stack and built to support the Hadoop ecosystem in the JVM.  He talked about the future of Mesos and touched on its contrast to Google Borg … for some more information on Google Borg and Mesos here is a great article http://www.wired.com/wiredenterprise/2013/03/google-borg-twitter-mesos/all/

Then we got into Cascading which was started by Chris Wensel – http://www.cascading.org/ and talked about the enterprise use cases for Cascading.  He talked about how Cascading has always been geared to satisfy enterprise use cases and not slice and dice but build an application on top of it and be able to debug it to see where it is running because it is deterministic. He talked about how this contrasts to Hive and Pig. He brought up Steve Yegeg’s post “Notes from the Mystery Machine Bus” https://plus.google.com/110981030061712822816/posts/KaSKeg4vQtz and talked a bit how Cascading applied.

We got into design patterns for the enterprise with big batch workflow breaking it up into five parts:

1) Different data sources (structured and unstructured data)
2) ETL
3) Custom data preparation and business logic to clean up the data
4) Analytics or predictive modeling to enrich the data
5) integration with end use cases that consume the data products

Cascading addresses all of these points and Paco talked in more detail about them.

We finished up the podcast with him talking about the future of these technologies and also data science.

Subscribe to the podcast and listen to what Paco had to say.  Available also on iTunes

/*
Joe Stein
Big Data Open Source Security LLC
http://www.stealth.ly
*/

Distributed System Development Considerations

June 16, 2013 Leave a comment

There are a number of factors to take into account while developing distributed software systems. If you don’t even know what I am talking about in the first sentence then let me give you some insight, examples and for instances of what distributed systems are.

Overview

A distributed system is when multiple physical hardware devices interact with separate and discrete users and collaborate together through these hardware devices to accomplishes different and also similar goals for these discrete and seperate users.  Sometimes these devices work in a pier-to-pier mode using servers as hubs for knowledge of connectivity to each other while others collaborate and coordinate through a single or set of centralized servers e.g.

An extranet like system where supply side users are reviewing documents through a web browser and click through a workflow in their browsers that ultimatly fax documents to another set of users that provide services that are sufficiently labeled… i.e. with a bar code.  This loop makes for a closed system to design and develop distributed software for.  The recipient of the faxes fill out and do whatever they need as part of their paper based world (including receiving many faxes over time and sending them all back in at once, which is another set of cases for another blog post on e-signing).  The receiving fax servers on receipt of the inbound original documents “hands” the faxed images received over for collation and processing for yet another set of users (or even the same ones that originated the outbound fax) to review and work with them under an intake workflow again in their browser.

Here you have many systems working together in the data center to accomplish different goals. You have some web servers for the user interface. Some data persistance layer for the workflow systems, analytics and reporting.  You have fax servers for management of the incoming/outgoing fax transmissions (I like Dialogic Brooktrout Fax Boards myself http://www.dialogic.com/Products/fax-boards-and-software/fax-boards.aspx ).  You have the inbound and outbound labeling, stamping, collation, scanning and object character recognition components (usually a few different servers together depending on design likely not each a separate instance but likely a half dozen or so depending on scale).

The fabric underneath all of those components are what is consistent across all of them together.   This starts at the OSI Layer because first and for most distributed systems are connected through physical layers of constraints so lets start there.

The Physical Layer Constraints

The OSI Layer has either 7 or 9 layers depending on whom you speak with.  In some teams layers 8 and 9 are politics and religion (respectively) because social structures, methodologies and human behavior around all interactions to design, develop, deploy and maintain these systems have to be taken into account.  If you don’t appreciate this then your application layer user experience is likely going to be wrong also.  Layers 1-7 (the more commonly known and often better understood layers) classify how computers move data through an interface for a user or computer and an interface to another computer for another computer or another user.  You can never do anything outside of the underlying layers without refactoring or creating new software in those layers.  So, its good to know how they work. You can do that some in your own software program writing works (please, please, pretty please) to better balance out processing constraints.

Figure 30-1 of http://fab.cba.mit.edu/classes/MIT/961.04/people/neil/ip.pdf maps many of the protocols of the Internet protocol suite and their corresponding OSI layers.

Where software continues within the hardware device

The operating system and its implementation for the local machine hardware is next and foremost.  I am hard pressed to pick many or any other layers or systems that are truelly part of the underly fabric depending on your hosting provider which ultimately is a vendor so thats a different story for another time too… unless they run software your wrote also via open source project work you have done but, I digress… The Linux Programming Interface: A Linux and UNIX System Programming Handbook is the resource I like for Linux.

Now What?

Over the last nine years a lot of classic computer concepts have been more readily achievable because of the inexpensive (sometimes cheep) commodity hardware that has become our foreshadowed reality.  This has made parallel and distributed computing platforms, distributed document and key/value stores, distributed pub/sub broker, publisher and consumer systems and even actor patterns with immutable structures start to become part of this fabric.  The rub here is the competition and infancy of these market segments arguably yet to even have “crossed the chasm“.

Now, having said all of that … There is a stand out. “ZooKeeper provides key points of correctness and coordination that must have higher transactional guarantees than all of these types of system that want to build intrinsically into their own key logic”.  Camille Fournier made me see this in one of her blog posts  http://whilefalse.blogspot.com/2013/05/zookeeper-and-distributed-operating.html?spref=tw because of how Zookeeper is tied into so many of these existing types of systems.  Since these systems have yet to mature in the marketplace (while having sufficiently matured with early adopters, by far) there are some consistencies within and across them we have to start to take a look at, support and commit to (literally).

What is also nice now about Zookeeper is that there is an open source library that has been developed and (all things being equal) continuously maintained to make developing the repeatable patterns of Zookeeper development easier http://curator.incubator.apache.org/.  If you have never worked with Zookeeper I recommend starting here http://zookeeper.apache.org/doc/trunk/zookeeperStarted.html.

/*

Joe Stein
http://twitter.com/allthingshadoop

*/

Big Data Open Source Security

May 25, 2013 1 comment

In security there has never (IMHO) been enough open source solutions and Bruce Schneier has written about this several times in the past, and there’s no need to rewrite the arguments again.

Now with “NoSQL” and “Big Data” Open Source trends in the market place Security finally has an intersection… a union if I may where new solutions to solve problems that have plagued our society can finally begin to arrise (and have already in many cases). Fraud, Malware, Phishing, Spam, etc all can be tackled now with new Security solutions because of Big Data and Open Source.

At the front lines of this is Apache Accumulo which is a Big Data, Open Source and Secure NoSQL Database that runs on top of Apache Hadoop. It was originally developed by the United States National Security Agency and submitted to the Apache Foundation as Open Source in 2011 with 3 years of development and production operation already having occurred.

Accumulo extends the BigTable data model to implement a security mechanism known as cell-level security. Every key-value pair has its own security label, stored under the column visibility element of the key, which is used to determine whether a given user meets the security requirements to read the value. This enables data of various security levels to be stored within the same row, and users of varying degrees of access to query the same table, while preserving data confidentiality.

SECURITY LABEL EXPRESSIONS

When mutations are applied, users can specify a security label for each value. This is done as the Mutation is created by passing a ColumnVisibility object to the put() method:

Text rowID = new Text("row1");
Text colFam = new Text("myColFam");
Text colQual = new Text("myColQual");
ColumnVisibility colVis = new ColumnVisibility("public");
long timestamp = System.currentTimeMillis();

Value value = new Value("myValue");

Mutation mutation = new Mutation(rowID);
mutation.put(colFam, colQual, colVis, timestamp, value);

SECURITY LABEL EXPRESSION SYNTAX

Security labels consist of a set of user-defined tokens that are required to read the value the label is associated with. The set of tokens required can be specified using syntax that supports logical AND and OR combinations of tokens, as well as nesting groups of tokens together.

For example, suppose within our organization we want to label our data values with security labels defined in terms of user roles. We might have tokens such as:

admin
audit
system
These can be specified alone or combined using logical operators:

// Users must have admin privileges:
admin

// Users must have admin and audit privileges
admin&audit

// Users with either admin or audit privileges
admin|audit

// Users must have audit and one or both of admin or system
(admin|system)&audit

When both | and & operators are used, parentheses must be used to specify precedence of the operators.

AUTHORIZATION

When clients attempt to read data from Accumulo, any security labels present are examined against the set of authorizations passed by the client code when the Scanner or BatchScanner are created. If the authorizations are determined to be insufficient to satisfy the security label, the value is suppressed from the set of results sent back to the client.

Authorizations are specified as a comma-separated list of tokens the user possesses:

// user possess both admin and system level access
Authorization auths = new Authorization("admin","system");

Scanner s = connector.createScanner("table", auths);

USER AUTHORIZATIONS

Each accumulo user has a set of associated security labels. To manipulate these in the shell use the setuaths and getauths commands. These may also be modified using the java security operations API.

When a user creates a scanner a set of Authorizations is passed. If the authorizations passed to the scanner are not a subset of the users authorizations, then an exception will be thrown.

To prevent users from writing data they can not read, add the visibility constraint to a table. Use the -evc option in the createtable shell command to enable this constraint. For existing tables use the following shell command to enable the visibility constraint. Ensure the constraint number does not conflict with any existing constraints.

config -t table -s table.constraint.1=org.apache.accumulo.core.security.VisibilityConstraint

Any user with the alter table permission can add or remove this constraint. This constraint is not applied to bulk imported data, if this a concern then disable the bulk import permission.

SECURE AUTHORIZATIONS HANDLING

For applications serving many users, it is not expected that an accumulo user will be created for each application user. In this case an accumulo user with all authorizations needed by any of the applications users must be created. To service queries, the application should create a scanner with the application users authorizations. These authorizations could be obtained from a trusted 3rd party.

Often production systems will integrate with Public-Key Infrastructure (PKI) and designate client code within the query layer to negotiate with PKI servers in order to authenticate users and retrieve their authorization tokens (credentials). This requires users to specify only the information necessary to authenticate themselves to the system. Once user identity is established, their credentials can be accessed by the client code and passed to Accumulo outside of the reach of the user.

QUERY SERVICES LAYER

Since the primary method of interaction with Accumulo is through the Java API, production environments often call for the implementation of a Query layer. This can be done using web services in containers such as Apache Tomcat, but is not a requirement. The Query Services Layer provides a mechanism for providing a platform on which user facing applications can be built. This allows the application designers to isolate potentially complex query logic, and enables a convenient point at which to perform essential security functions.

Several production environments choose to implement authentication at this layer, where users identifiers are used to retrieve their access credentials which are then cached within the query layer and presented to Accumulo through the Authorizations mechanism.

Typically, the query services layer sits between Accumulo and user workstations.

Apache Accumulo version 1.5 just came out for download with docs

New software as a service solutions will start to spring up into the market as will new out of the box open source solutions. Whether we are trying to prevent health care fraud, protect individuals from identify theft or corporations from intrusion all without comprimsing the (C)onfidentiality, (I)ntegrity and the (A)vailability of the data and distributes systems.

/*
Joe Stein
http://www.linkedin.com/in/charmalloc
*/

Using Scala To Work With Hadoop

Cloudera has a great toolkit to work with Hadoop.  Specifically it is focused on building distributed systems and services on top of the Hadoop Ecosystem.

http://cloudera.github.io/cdk/docs/0.2.0/cdk-data/guide.html

And the examples are in Scala!!!!

Here is how you you work with generic stuff on the file system including Avro files reading and writing.

https://github.com/cloudera/cdk/blob/master/cdk-examples/src/main/scala/creategeneric.scala

/**
* Copyright 2013 Cloudera Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import com.cloudera.data.{DatasetDescriptor, DatasetWriter}
import com.cloudera.data.filesystem.FileSystemDatasetRepository
import java.io.FileInputStream
import org.apache.avro.Schema
import org.apache.avro.Schema.Parser
import org.apache.avro.generic.{GenericRecord, GenericRecordBuilder}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import scala.compat.Platform
import scala.util.Random

// Construct a local filesystem dataset repository rooted at /tmp/data
val repo = new FileSystemDatasetRepository(
FileSystem.get(new Configuration()),
new Path("/tmp/data")
)

// Read an Avro schema from the user.avsc file on the classpath
val schema = new Parser().parse(new FileInputStream("src/main/resources/user.avsc"))

// Create a dataset of users with the Avro schema in the repository
val descriptor = new DatasetDescriptor.Builder().schema(schema).get()
val users = repo.create("users", descriptor)

// Get a writer for the dataset and write some users to it
val writer = users.getWriter().asInstanceOf[DatasetWriter[GenericRecord]]
writer.open()
val colors = Array("green", "blue", "pink", "brown", "yellow")
val rand = new Random()
for (i val builder = new GenericRecordBuilder(schema)
val record = builder.set("username", "user-" + i)
.set("creationDate", Platform.currentTime)
.set("favoriteColor", colors(rand.nextInt(colors.length))).build()
writer.write(record)
}
writer.close()

Big ups to the Cloudera team!

/*
Joe Stein
https://twitter.com/allthingshadoop
*/

Categories: Uncategorized

Hortonworks HDP1, Apache Hadoop 2.0, NextGen MapReduce (YARN), HDFS Federation and the future of Hadoop with Arun C. Murthy

July 23, 2012 2 comments

Episode #8 of the Podcast is a talk with Arun C. Murthy.

We talked about Hortonworks HDP1, the first release from Hortonworks, Apache Hadoop 2.0, NextGen MapReduce (YARN) and HDFS Federations

subscribe to the podcast and listen to all of what Arun had to share.

Some background to what we discussed:

Hortonworks Data Platform (HDP)

from their website: http://hortonworks.com/products/hortonworksdataplatform/

Hortonworks Data Platform (HDP) is a 100% open source data management platform based on Apache Hadoop. It allows you to load, store, process and manage data in virtually any format and at any scale. As the foundation for the next generation enterprise data architecture, HDP includes all of the necessary components to begin uncovering business insights from the quickly growing streams of data flowing into and throughout your business.

Hortonworks Data Platform is ideal for organizations that want to combine the power and cost-effectiveness of Apache Hadoop with the advanced services required for enterprise deployments. It is also ideal for solution providers that wish to integrate or extend their solutions with an open and extensible Apache Hadoop-based platform.

Key Features
  • Integrated and Tested Package – HDP includes stable versions of all the critical Apache Hadoop components in an integrated and tested package.
  • Easy Installation – HDP includes an installation and provisioning tool with a modern, intuitive user interface.
  • Management and Monitoring Services – HDP includes intuitive dashboards for monitoring your clusters and creating alerts.
  • Data Integration Services – HDP includes Talend Open Studio for Big Data, the leading open source integration tool for easily connecting Hadoop to hundreds of data systems without having to write code.
  • Metadata Services – HDP includes Apache HCatalog, which simplifies data sharing between Hadoop applications and between Hadoop and other data systems.
  • High Availability – HDP has been extended to seamlessly integrate with proven high availability solutions.

Apache Hadoop 2.0

from their website: http://hadoop.apache.org/common/docs/current/

Apache Hadoop 2.x consists of significant improvements over the previous stable release (hadoop-1.x).

Here is a short overview of the improvments to both HDFS and MapReduce.

  • HDFS FederationIn order to scale the name service horizontally, federation uses multiple independent Namenodes/Namespaces. The Namenodes are federated, that is, the Namenodes are independent and don’t require coordination with each other. The datanodes are used as common storage for blocks by all the Namenodes. Each datanode registers with all the Namenodes in the cluster. Datanodes send periodic heartbeats and block reports and handles commands from the Namenodes.More details are available in the HDFS Federation document.
  • MapReduce NextGen aka YARN aka MRv2The new architecture introduced in hadoop-0.23, divides the two major functions of the JobTracker: resource management and job life-cycle management into separate components.The new ResourceManager manages the global assignment of compute resources to applications and the per-application ApplicationMaster manages the application‚Äôs scheduling and coordination.An application is either a single job in the sense of classic MapReduce jobs or a DAG of such jobs.The ResourceManager and per-machine NodeManager daemon, which manages the user processes on that machine, form the computation fabric.The per-application ApplicationMaster is, in effect, a framework specific library and is tasked with negotiating resources from the ResourceManager and working with the NodeManager(s) to execute and monitor the tasks.More details are available in the YARN document.
Getting Started

The Hadoop documentation includes the information you need to get started using Hadoop. Begin with the Single Node Setup which shows you how to set up a single-node Hadoop installation. Then move on to the Cluster Setup to learn how to set up a multi-node Hadoop installation.

Apache Hadoop NextGen MapReduce (YARN)

from their website: http://hadoop.apache.org/common/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html

MapReduce has undergone a complete overhaul in hadoop-0.23 and we now have, what we call, MapReduce 2.0 (MRv2) or YARN.

The fundamental idea of MRv2 is to split up the two major functionalities of the JobTracker, resource management and job scheduling/monitoring, into separate daemons. The idea is to have a global ResourceManager (RM) and per-application ApplicationMaster (AM). An application is either a single job in the classical sense of Map-Reduce jobs or a DAG of jobs.

The ResourceManager and per-node slave, the NodeManager (NM), form the data-computation framework. The ResourceManager is the ultimate authority that arbitrates resources among all the applications in the system.

The per-application ApplicationMaster is, in effect, a framework specific library and is tasked with negotiating resources from the ResourceManager and working with the NodeManager(s) to execute and monitor the tasks.

MapReduce NextGen Architecture

The ResourceManager has two main components: Scheduler and ApplicationsManager.

The Scheduler is responsible for allocating resources to the various running applications subject to familiar constraints of capacities, queues etc. The Scheduler is pure scheduler in the sense that it performs no monitoring or tracking of status for the application. Also, it offers no guarantees about restarting failed tasks either due to application failure or hardware failures. The Scheduler performs its scheduling function based the resource requirements of the applications; it does so based on the abstract notion of a resource Container which incorporates elements such as memory, cpu, disk, network etc. In the first version, only memory is supported.

The Scheduler has a pluggable policy plug-in, which is responsible for partitioning the cluster resources among the various queues, applications etc. The current Map-Reduce schedulers such as the CapacityScheduler and the FairScheduler would be some examples of the plug-in.

The CapacityScheduler supports hierarchical queues to allow for more predictable sharing of cluster resources

The ApplicationsManager is responsible for accepting job-submissions, negotiating the first container for executing the application specific ApplicationMaster and provides the service for restarting the ApplicationMaster container on failure.

The NodeManager is the per-machine framework agent who is responsible for containers, monitoring their resource usage (cpu, memory, disk, network) and reporting the same to the ResourceManager/Scheduler.

The per-application ApplicationMaster has the responsibility of negotiating appropriate resource containers from the Scheduler, tracking their status and monitoring for progress.

MRV2 maintains API compatibility with previous stable release (hadoop-0.20.205). This means that all Map-Reduce jobs should still run unchanged on top of MRv2 with just a recompile.

HDFS Federation

from their website: http://hadoop.apache.org/common/docs/current/hadoop-yarn/hadoop-yarn-site/Federation.html

Background

HDFS LayersHDFS has two main layers:

  • Namespace
    • Consists of directories, files and blocks
    • It supports all the namespace related file system operations such as create, delete, modify and list files and directories.
  • Block Storage Service has two parts
    • Block Management (which is done in Namenode)
      • Provides datanode cluster membership by handling registrations, and periodic heart beats.
      • Processes block reports and maintains location of blocks.
      • Supports block related operations such as create, delete, modify and get block location.
      • Manages replica placement and replication of a block for under replicated blocks and deletes blocks that are over replicated.
    • Storage – is provided by datanodes by storing blocks on the local file system and allows read/write access.

    The prior HDFS architecture allows only a single namespace for the entire cluster. A single Namenode manages this namespace. HDFS Federation addresses limitation of the prior architecture by adding support multiple Namenodes/namespaces to HDFS file system.

Multiple Namenodes/Namespaces

In order to scale the name service horizontally, federation uses multiple independent Namenodes/namespaces. The Namenodes are federated, that is, the Namenodes are independent and don’t require coordination with each other. The datanodes are used as common storage for blocks by all the Namenodes. Each datanode registers with all the Namenodes in the cluster. Datanodes send periodic heartbeats and block reports and handles commands from the Namenodes.

HDFS Federation ArchitectureBlock Pool

A Block Pool is a set of blocks that belong to a single namespace. Datanodes store blocks for all the block pools in the cluster. It is managed independently of other block pools. This allows a namespace to generate Block IDs for new blocks without the need for coordination with the other namespaces. The failure of a Namenode does not prevent the datanode from serving other Namenodes in the cluster.

A Namespace and its block pool together are called Namespace Volume. It is a self-contained unit of management. When a Namenode/namespace is deleted, the corresponding block pool at the datanodes is deleted. Each namespace volume is upgraded as a unit, during cluster upgrade.

ClusterID

A new identifier ClusterID is added to identify all the nodes in the cluster. When a Namenode is formatted, this identifier is provided or auto generated. This ID should be used for formatting the other Namenodes into the cluster.

Key Benefits

  • Namespace Scalability – HDFS cluster storage scales horizontally but the namespace does not. Large deployments or deployments using lot of small files benefit from scaling the namespace by adding more Namenodes to the cluster
  • Performance – File system operation throughput is limited by a single Namenode in the prior architecture. Adding more Namenodes to the cluster scales the file system read/write operations throughput.
  • Isolation – A single Namenode offers no isolation in multi user environment. An experimental application can overload the Namenode and slow down production critical applications. With multiple Namenodes, different categories of applications and users can be isolated to different namespaces.

subscribe to the podcast and listen to all of what Arun had to share.

[tweetmeme http://wp.me/pTu1i-80%5D

/*
Joe Stein
http://www.linkedin.com/in/charmalloc
*/