Archive

Archive for the ‘Open Source Projects’ Category

Developing Apache Kafka Producers and Consumers

December 21, 2013 Leave a comment

I gave a presentation recently on Real-time streaming and data pipelines with Apache Kafka.

A correction in the talk (~ 22 minutes in) : I said that you have to have all your topic data fit on one server.  That is not true, you can’t span logs so you have to have all of your data for a partition fit on one server.  Kafka will spread your partitions around for you within topics.

For that presentation I put together sample code for producing and consuming with an Apache Kafka broker using Scala.

To get up and running, use vagrant.

1) Install Vagrant http://www.vagrantup.com/
2) Install Virtual Box https://www.virtualbox.org/

git clone https://github.com/stealthly/scala-kafka
cd scala-kafka
vagrant up
./sbt test

Your entry point is the test file

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package ly.stealth.testing

import org.specs2.mutable._
import java.util.UUID
import kafka.consumer._
import kafka.producer._
import kafka.utils._
import kafka.akka._
import akka.actor.{Actor, Props, ActorSystem}
import akka.routing.RoundRobinRouter

class KafkaSpec extends Specification with Logging {

  "Simple Producer and Consumer" should {
    "send string to broker and consume that string back" in {
      val testMessage = UUID.randomUUID().toString
      val testTopic = UUID.randomUUID().toString
      val groupId_1 = UUID.randomUUID().toString

      var testStatus = false

      info("starting sample broker testing")
      val producer = new KafkaProducer(testTopic,"192.168.86.10:9092")
      producer.sendString(testMessage)

      val consumer = new KafkaConsumer(testTopic,groupId_1,"192.168.86.5:2181")

      def exec(binaryObject: Array[Byte]) = {
        val message = new String(binaryObject)
        info("testMessage = " + testMessage + " and consumed message = " + message)
        testMessage must_== message
        consumer.close()
        testStatus = true
      }

      info("KafkaSpec is waiting some seconds")
      consumer.read(exec)
      info("KafkaSpec consumed")

      testStatus must beTrue // we need to get to this point but a failure in exec will fail the test
    }

    "send string to broker and consume that string back in different consumer groups" in {
      val testMessage = UUID.randomUUID().toString
      val testTopic = UUID.randomUUID().toString
      val groupId_1 = UUID.randomUUID().toString
      val groupId_2 = UUID.randomUUID().toString

      var testStatus1 = false
      var testStatus2 = false

      info("starting sample broker testing")
      val producer = new KafkaProducer(testTopic,"192.168.86.10:9092")
      producer.sendString(testMessage)

      val consumer1 = new KafkaConsumer(testTopic,groupId_1,"192.168.86.5:2181")

      def exec1(binaryObject: Array[Byte]) = {
        val message1 = new String(binaryObject)
        info("testMessage 1 = " + testMessage + " and consumed message 1 = " + message1)
        testMessage must_== message1
        consumer1.close()
        testStatus1 = true
      }

      info("KafkaSpec : consumer 1 - is waiting some seconds")
      consumer1.read(exec1)
      info("KafkaSpec : consumer 1 - consumed")

      val consumer2 = new KafkaConsumer(testTopic,groupId_2,"192.168.86.5:2181")

      def exec2(binaryObject: Array[Byte]) = {
        val message2 = new String(binaryObject)
        info("testMessage 2 = " + testMessage + " and consumed message 2 = " + message2)
        testMessage must_== message2
        consumer2.close()
        testStatus2 = true
      }

      info("KafkaSpec : consumer 2 - is waiting some seconds")
      consumer2.read(exec2)
      info("KafkaSpec : consumer 2 - consumed")

      testStatus2 must beTrue // we need to get to this point but a failure in exec will fail the test
    }
  }

  "Akka Producer and Consumer" should {
    "send string to broker and consume that string back in different consumer groups" in {
      val testMessage = UUID.randomUUID().toString
      val testTopic = UUID.randomUUID().toString
      val groupId_1 = UUID.randomUUID().toString
      val groupId_2 = UUID.randomUUID().toString

      var testStatus1 = false
      var testStatus2 = false

      info("starting akka producertesting")
      val system = ActorSystem("testing")

      val actorCount = 1

      val producer = system.actorOf(Props[KafkaAkkaProducer].withRouter(RoundRobinRouter(actorCount)), "router")

      1 to actorCount foreach { i =>(
        producer ! (testTopic,"192.168.86.10:9092"))
      }

      producer ! testMessage

      val consumer1 = new KafkaConsumer(testTopic,groupId_1,"192.168.86.5:2181")

      def exec1(binaryObject: Array[Byte]) = {
        val message1 = new String(binaryObject)
        info("testMessage 1 = " + testMessage + " and consumed message 1 = " + message1)
        testMessage must_== message1
        consumer1.close()
        testStatus1 = true
      }

      info("KafkaSpec : consumer 1 - is waiting some seconds")
      consumer1.read(exec1)
      info("KafkaSpec : consumer 1 - consumed")

      val consumer2 = new KafkaConsumer(testTopic,groupId_2,"192.168.86.5:2181")

      def exec2(binaryObject: Array[Byte]) = {
        val message2 = new String(binaryObject)
        info("testMessage 2 = " + testMessage + " and consumed message 2 = " + message2)
        testMessage must_== message2
        consumer2.close()
        testStatus2 = true
      }

      info("KafkaSpec : consumer 2 - is waiting some seconds")
      consumer2.read(exec2)
      info("KafkaSpec : consumer 2 - consumed")

      testStatus2 must beTrue // we need to get to this point but a failure in exec will fail the test
    }
  }
}

On the producer side I have started to look more into using Akka. The prototype for this implementation is in the test case above “Akka Producer and Consumer” and broken out here

  "Akka Producer and Consumer" should {
    "send string to broker and consume that string back in different consumer groups" in {
      val testMessage = UUID.randomUUID().toString
      val testTopic = UUID.randomUUID().toString
      val groupId_1 = UUID.randomUUID().toString
      val groupId_2 = UUID.randomUUID().toString

      var testStatus1 = false
      var testStatus2 = false

      info("starting akka producertesting")
      val system = ActorSystem("testing")

      val actorCount = 1

      val producer = system.actorOf(Props[KafkaAkkaProducer].withRouter(RoundRobinRouter(actorCount)), "router")

      1 to actorCount foreach { i =>(
        producer ! (testTopic,"192.168.86.10:9092"))
      }

      producer ! testMessage

      val consumer1 = new KafkaConsumer(testTopic,groupId_1,"192.168.86.5:2181")

      def exec1(binaryObject: Array[Byte]) = {
        val message1 = new String(binaryObject)
        info("testMessage 1 = " + testMessage + " and consumed message 1 = " + message1)
        testMessage must_== message1
        consumer1.close()
        testStatus1 = true
      }

      info("KafkaSpec : consumer 1 - is waiting some seconds")
      consumer1.read(exec1)
      info("KafkaSpec : consumer 1 - consumed")

      val consumer2 = new KafkaConsumer(testTopic,groupId_2,"192.168.86.5:2181")

      def exec2(binaryObject: Array[Byte]) = {
        val message2 = new String(binaryObject)
        info("testMessage 2 = " + testMessage + " and consumed message 2 = " + message2)
        testMessage must_== message2
        consumer2.close()
        testStatus2 = true
      }

      info("KafkaSpec : consumer 2 - is waiting some seconds")
      consumer2.read(exec2)
      info("KafkaSpec : consumer 2 - consumed")

      testStatus2 must beTrue // we need to get to this point but a failure in exec will fail the test
    }
  }
}

What is really nice is you can up that actorCount and really start to run test data to analyze.

/*******************************************
Joe Stein
Founder, Principal Consultant
Big Data Open Source Security LLC
http://www.stealth.ly
Twitter: @allthingshadoop
********************************************/

Advertisements

Getting started with Apache Mesos and Apache Aurora using Vagrant

December 16, 2013 2 comments

Apache Mesos is a cluster manager that provides efficient resource isolation and sharing across distributed applications, or frameworks. Think of it as the “kernel” for your data center. Paco Nathan talked about this on one of the All Things Hadoop Podcasts.

Features:

  • Fault-tolerant replicated master using ZooKeeper
  • Scalability to 10,000s of nodes
  • Isolation between tasks with Linux Containers
  • Multi-resource scheduling (memory and CPU aware)
  • Java, Python and C++ APIs for developing new parallel applications
  • Web UI for viewing cluster state

Apache Aurora is a service scheduler that runs on top of Mesos, enabling you to run long-running services that take advantage of Mesos’ scalability, fault-tolerance, and resource isolation. Apache Aurora is currently a part of the Apache Incubator.  The main benefits to a Mesos scheduler like Aurora (and Marathon) is not having to worry about using the Mesos API to take advantage of the grid.  Your application can work the way it does today while Mesos figures out what server(s) to run it on and when to scale that differently from the scheduler.

Features:

  • Deployment and scheduling of jobs
  • The abstraction a “job” to bundle and manage Mesos tasks
  • A rich DSL to define services
  • Health checking
  • Failure domain diversity
  • Instant provisioning

First you need to make sure that you have vagrant and virtual box installed, if you don’t already have these installed then install them please.

1) Install Vagrant http://www.vagrantup.com/
2) Install Virtual Box https://www.virtualbox.org/

That is all you need (assuming you also have git installed). Everything else from here is going to be done from within the virtual machine.


git clone https://github.com/apache/incubator-aurora
cd incubator-aurora
vagrant up

The virtual machines will take some time to spin up so hang tight.

Once the virtual machines are launched you will have your command prompt back and ready to go.

There are 5 vms that are launched: devtools, zookeeper, mesos-master, mesos-slave and aurora-scheduler and they are all configured and networked together (for more info on this check out the Vagrantfile).

Next step is to create an app on the scheduler to provision it to the Mesos cluster that is running.


vagrant ssh aurora-scheduler
vagrant@precise64:~$ cd /vagrant/examples/jobs/
vagrant@precise64:~$aurora create example/www-data/prod/hello hello_world.aurora
 INFO] Creating job hello
 INFO] Response from scheduler: OK (message: 1 new tasks pending for job www-data/prod/hello)
 INFO] Job url: http://precise64:8081/scheduler/www-data/prod/hello

Now go to your browser and pull up http://192.168.33.5:8081/scheduler/www-data/prod/hello and you’ll see your job running

Screen Shot 2013-12-16 at 6.09.42 AM

Basically all of what is happening is in the configuration

hello = Process(
  name = 'hello',
  cmdline = """
    while true; do
      echo hello world
      sleep 10
    done
  """)

task = SequentialTask(
  processes = [hello],
  resources = Resources(cpu = 1.0, ram = 128*MB, disk = 128*MB))

jobs = [Service(
  task = task, cluster = 'example', role = 'www-data', environment = 'prod', name = 'hello')]

It is an exciting time for virtualization and resource scheduling and process provisioning within infrastructures. It is all open source so go dig in and see how it all works for yourself.

/*******************************************
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 Twitter: @allthingshadoop
********************************************/

Using Vagrant to get up and running with Apache Kafka

December 7, 2013 Leave a comment

If you haven’t used Vagrant before then hopefully this will not only help you get up and running with Apache Kafka but also get introduced to Vagrant.  Vagrant helps to create and configure lightweight, reproducible, and portable development environments. With Vagrant run a single command — “vagrant up” — and sit back as Vagrant puts together your complete development environment. Say goodbye to the “works on my machine” excuse as Vagrant creates identical development environments for everyone on your team. To learn more about Apache Kafka checkout our website.

To get up and running with Apache Kafka using Vagrant

1) Install Vagrant http://www.vagrantup.com/
2) Install Virtual Box https://www.virtualbox.org/

For the latest release of Apache Kafka 0.8.0 I have added the ability to use Vagrant.

1) git clone https://github.com/stealthly/kafka
2) cd kafka
3) ./sbt update
4) ./sbt package
5) ./sbt assembly-package-dependency
6) vagrant up

Once this is done (it takes a little while for vagrant to launch the four virtual machines)

  • Zookeeper will be running 192.168.50.5
  • Broker 1 on 192.168.50.10
  • Broker 2 on 192.168.50.20
  • Broker 3 on 192.168.50.30

When you are all up and running you will be back at a command prompt.

If you want you can login to the machines using vagrant ssh <machineName> but you don’t need to.

You can access the brokers and zookeeper by their IP

e.g.


bin/kafka-create-topic.sh --zookeeper 192.168.50.5:2181 --replica 3 --partition 1 --topic sandbox

bin/kafka-console-producer.sh --broker-list 192.168.50.10:9092,192.168.50.20:9092,192.168.50.30:9092 --topic sandbox

bin/kafka-console-consumer.sh --zookeeper 192.168.50.5:2181 --topic sandbox --from-beginning

A patch is pending for the 0.8.1 release to have this go back upstream.

/*******************************************
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 Twitter: @allthingshadoop
********************************************/
Categories: Open Source Projects

Using Cassandra from Scala

November 16, 2013 5 comments

I have started a new open source project https://github.com/stealthly/scala-cassandra that is a Scala wrapper for CQL  specifically its a wrapper of the DataStax java-driver.

Test cases are a good entry point so lets start here https://github.com/stealthly/scala-cassandra/blob/master/src/test/scala/ScalaCassandraSpec.scala

class ScalaCassandraSpec extends Specification {

    CQL.init()
    CQL.startup("MetaStore")
    Meta.createTable()

    "Meta objects" should {
      "be able to store & retrieve their binary state" in {

      //we use a Thrift object here for portability of the data stored
      val tMeta = new TMeta()

      //setting up some randomness so we can confirm what we are writing is what we get back
      val metaUUID = UUID.randomUUID()
      val dataum = Random.alphanumeric.take(1000000).mkString

      tMeta.setId(metaUUID.toString)
      tMeta.setDatum(dataum)

      Meta.save(tMeta) //Saved to C*

      val someNewTMeta = Meta(metaUUID)
      someNewTMeta.getId() must_== metaUUID.toString

      someNewTMeta.getDatum() must_== dataum
      }
    }
}

The Meta class (contained in the https://github.com/stealthly/scala-cassandra/blob/master/src/main/scala/MetaDAO.scala file) is like a Widget… Whats a Widget? For us lets consider its like “sample code”.

When you want to add something different than a widget create a new thrift file in the `thrift/interface` directory

Then in that directory (lets say you created an IDL called Music.thrift) then run `thrift -gen java Music.thrift` which we did for you already and copy the jar outputted from a `mvn package` to the `lib` folder so you don’t have to worry about it. You can even just keep using the Meta implementaiotn and just shove JSON or XML or whatever you want into it; however, it makes more sense to partition the objects some so you can create wider rows with a collection key in your table along with your partition key. You can also flatten the things by storing a HashMap on the table and retrieving it.

http://stealth.ly

Using Apache Drill for Large Scale, Interactive, Real-Time Analytic Queries

October 29, 2013 Leave a comment

Episode #17 of the podcast is a talk with Jacques Nadeau  available also on iTunes

Apache Drill http://incubator.apache.org/drill/, a modern interactive query engine that runs on top of Hadoop.

Jacques talked about how Apache Drill is a modern query engine that is meant to be a query layer on top of all big data open source systems. Apache Drill is being designed to make the storage engine as plug-able so it could be the interface for any big data storage engine. The first release came out recently to allow developers to understand the data pipeline.

Leveraging an efficient columnar storage format, an optimistic execution engine and a cache-conscious memory layout, Apache Drill is blazing fast. Coordination, query planning, optimization, scheduling, and execution are all distributed throughout nodes in a system to maximize parallelization.

drill_runtime

Perform interactive analysis on all of your data, including nested and schema-less. Drill supports querying against many different schema-less data sources including HBase, Cassandra and MongoDB. Naturally flat records are included as a special case of nested data.

json

Strongly defined tiers and APIs for straightforward integration with a wide array of technologies.

arch

Subscribe to the podcast and listen to what Jacques had to say.  Available also on iTunes

/*********************************
Joe Stein
Founder, Principal Consultant
Big Data Open Source Security LLC
http://www.stealth.ly
Twitter: @allthingshadoop
**********************************/

Real-Time Data Pipelines and Analytics with Apache Kafka and Apache Samza

September 17, 2013 Leave a comment

Episode #16 of the podcast is a talk with Jay Kreps  available also on iTunes

Jay talked about the open source work he has done while @ LinkedIn

Including

Most of the conversation was about the Apache Kafka pipeline and the use of Apache Samza for processing it.

Apache Kafka

http://kafka.apache.org/documentation.html#introduction

Kafka is a distributed, partitioned, replicated commit log service. It provides the functionality of a messaging system, but with a unique design.

What does all that mean?

First let’s review some basic messaging terminology:

  • Kafka maintains feeds of messages in categories called topics.
  • We’ll call processes that publish messages to a Kafka topic producers.
  • We’ll call processes that subscribe to topics and process the feed of published messages consumers..
  • Kafka is run as a cluster comprised of one or more servers each of which is called a broker.

So, at a high level, producers send messages over the network to the Kafka cluster which in turn serves them up to consumers like this:

Communication between the clients and the servers is done with a simple, high-performance, language agnosticTCP protocol. We provide a java client for Kafka, but clients are available in many languages.

Topics and Logs

Let’s first dive into the high-level abstraction Kafka provides—the topic.

A topic is a category or feed name to which messages are published. For each topic, the Kafka cluster maintains a partitioned log that looks like this:

Each partition is an ordered, immutable sequence of messages that is continually appended to—a commit log. The messages in the partitions are each assigned a sequential id number called the offset that uniquely identifies each message within the partition.

The Kafka cluster retains all published messages—whether or not they have been consumed—for a configurable period of time. For example if the log retention is set to two days, then for the two days after a message is published it is available for consumption, after which it will be discarded to free up space. Kafka’s performance is effectively constant with respect to data size so retaining lots of data is not a problem.

In fact the only metadata retained on a per-consumer basis is the position of the consumer in in the log, called the “offset”. This offset is controlled by the consumer: normally a consumer will advance its offset linearly as it reads messages, but in fact the position is controlled by the consumer and it can consume messages in any order it likes. For example a consumer can reset to an older offset to reprocess.

This combination of features means that Kafka consumers are very cheap—they can come and go without much impact on the cluster or on other consumers. For example, you can use our command line tools to “tail” the contents of any topic without changing what is consumed by any existing consumers.

Apache Samza

http://samza.incubator.apache.org/

Apache Samza is a distributed stream processing framework. It uses Apache Kafka for messaging, and Apache Hadoop YARN to provide fault tolerance, processor isolation, security, and resource management.

  • Simple API: Unlike most low-level messaging system APIs, Samza provides a very simple call-back based “process message” API that should be familiar to anyone that’s used Map/Reduce.
  • Managed state: Samza manages snapshotting and restoration of a stream processor’s state. Samza will restore a stream processor’s state to a snapshot consistent with the processor’s last read messages when the processor is restarted.
  • Fault tolerance: Samza will work with YARN to restart your stream processor if there is a machine or processor failure.
  • Durability: Samza uses Kafka to guarantee that messages will be processed in the order they were written to a partition, and that no messages will ever be lost.
  • Scalability: Samza is partitioned and distributed at every level. Kafka provides ordered, partitioned, re-playable, fault-tolerant streams. YARN provides a distributed environment for Samza containers to run in.
  • Pluggable: Though Samza works out of the box with Kafka and YARN, Samza provides a pluggable API that lets you run Samza with other messaging systems and execution environments.
  • Processor isolation: Samza works with Apache YARN, which supports processor security through Hadoop’s security model, and resource isolation through Linux CGroups.

Check out Hello Samza to try Samza. Read the Background page to learn more about Samza.

Subscribe to the podcast and listen to what Jay had to say.  Available also on iTunes

/*********************************
Joe Stein
Founder, Principal Consultant
Big Data Open Source Security LLC
http://www.stealth.ly
Twitter: @allthingshadoop
**********************************/

Apache BigTop and how packaging infrastructure binds the Hadoop ecosystem together

August 12, 2013 Leave a comment

Episode #12 of the podcast is a talk with Mark Grover and Roman Shaposhnik  Available also on iTunes

Apache Bigtop is a project for the development of packaging and tests of the Apache Hadoop ecosystem.

The primary goal of Bigtop is to build a community around the packaging and interoperability testing of Hadoop-related projects. This includes testing at various levels (packaging, platform, runtime, upgrade, etc…) developed by a community with a focus on the system as a whole, rather than individual projects.

BigTop makes it easier to deploy Hadoop Ecosystem projects including:

  • Apache Zookeeper

  • Apache Flume

  • Apache HBase

  • Apache Pig

  • Apache Hive

  • Apache Sqoop

  • Apache Oozie

  • Apache Whirr

  • Apache Mahout

  • Apache Solr (SolrCloud)

  • Apache Crunch (incubating)

  • Apache HCatalog

  • Apache Giraph

  • LinkedIn DataFu

  • Cloudera Hue

The list of supported Linux platforms has expanded to include:

  • CentOS/RHEL 5 and 6

  • Fedora 17 and 18

  • SuSE Linux Enterprise 11

  • OpenSUSE 12.2

  • Ubuntu LTS Lucid (10.04) and Precise (12.04)

  • Ubuntu Quantal (12.10)

Subscribe to the podcast and listen to what Mark and Roman had to say.  Available also on iTunes

/*********************************
Joe Stein
Founder, Principal Consultant
Big Data Open Source Security LLC
http://www.stealth.ly
Twitter: @allthingshadoop
**********************************/