Archive

Archive for December, 2013

Developing Apache Kafka Producers and Consumers

December 21, 2013 Leave a comment

I gave a presentation recently on Real-time streaming and data pipelines with Apache Kafka.

A correction in the talk (~ 22 minutes in) : I said that you have to have all your topic data fit on one server.  That is not true, you can’t span logs so you have to have all of your data for a partition fit on one server.  Kafka will spread your partitions around for you within topics.

For that presentation I put together sample code for producing and consuming with an Apache Kafka broker using Scala.

To get up and running, use vagrant.

1) Install Vagrant http://www.vagrantup.com/
2) Install Virtual Box https://www.virtualbox.org/

git clone https://github.com/stealthly/scala-kafka
cd scala-kafka
vagrant up
./sbt test

Your entry point is the test file

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package ly.stealth.testing

import org.specs2.mutable._
import java.util.UUID
import kafka.consumer._
import kafka.producer._
import kafka.utils._
import kafka.akka._
import akka.actor.{Actor, Props, ActorSystem}
import akka.routing.RoundRobinRouter

class KafkaSpec extends Specification with Logging {

  "Simple Producer and Consumer" should {
    "send string to broker and consume that string back" in {
      val testMessage = UUID.randomUUID().toString
      val testTopic = UUID.randomUUID().toString
      val groupId_1 = UUID.randomUUID().toString

      var testStatus = false

      info("starting sample broker testing")
      val producer = new KafkaProducer(testTopic,"192.168.86.10:9092")
      producer.sendString(testMessage)

      val consumer = new KafkaConsumer(testTopic,groupId_1,"192.168.86.5:2181")

      def exec(binaryObject: Array[Byte]) = {
        val message = new String(binaryObject)
        info("testMessage = " + testMessage + " and consumed message = " + message)
        testMessage must_== message
        consumer.close()
        testStatus = true
      }

      info("KafkaSpec is waiting some seconds")
      consumer.read(exec)
      info("KafkaSpec consumed")

      testStatus must beTrue // we need to get to this point but a failure in exec will fail the test
    }

    "send string to broker and consume that string back in different consumer groups" in {
      val testMessage = UUID.randomUUID().toString
      val testTopic = UUID.randomUUID().toString
      val groupId_1 = UUID.randomUUID().toString
      val groupId_2 = UUID.randomUUID().toString

      var testStatus1 = false
      var testStatus2 = false

      info("starting sample broker testing")
      val producer = new KafkaProducer(testTopic,"192.168.86.10:9092")
      producer.sendString(testMessage)

      val consumer1 = new KafkaConsumer(testTopic,groupId_1,"192.168.86.5:2181")

      def exec1(binaryObject: Array[Byte]) = {
        val message1 = new String(binaryObject)
        info("testMessage 1 = " + testMessage + " and consumed message 1 = " + message1)
        testMessage must_== message1
        consumer1.close()
        testStatus1 = true
      }

      info("KafkaSpec : consumer 1 - is waiting some seconds")
      consumer1.read(exec1)
      info("KafkaSpec : consumer 1 - consumed")

      val consumer2 = new KafkaConsumer(testTopic,groupId_2,"192.168.86.5:2181")

      def exec2(binaryObject: Array[Byte]) = {
        val message2 = new String(binaryObject)
        info("testMessage 2 = " + testMessage + " and consumed message 2 = " + message2)
        testMessage must_== message2
        consumer2.close()
        testStatus2 = true
      }

      info("KafkaSpec : consumer 2 - is waiting some seconds")
      consumer2.read(exec2)
      info("KafkaSpec : consumer 2 - consumed")

      testStatus2 must beTrue // we need to get to this point but a failure in exec will fail the test
    }
  }

  "Akka Producer and Consumer" should {
    "send string to broker and consume that string back in different consumer groups" in {
      val testMessage = UUID.randomUUID().toString
      val testTopic = UUID.randomUUID().toString
      val groupId_1 = UUID.randomUUID().toString
      val groupId_2 = UUID.randomUUID().toString

      var testStatus1 = false
      var testStatus2 = false

      info("starting akka producertesting")
      val system = ActorSystem("testing")

      val actorCount = 1

      val producer = system.actorOf(Props[KafkaAkkaProducer].withRouter(RoundRobinRouter(actorCount)), "router")

      1 to actorCount foreach { i =>(
        producer ! (testTopic,"192.168.86.10:9092"))
      }

      producer ! testMessage

      val consumer1 = new KafkaConsumer(testTopic,groupId_1,"192.168.86.5:2181")

      def exec1(binaryObject: Array[Byte]) = {
        val message1 = new String(binaryObject)
        info("testMessage 1 = " + testMessage + " and consumed message 1 = " + message1)
        testMessage must_== message1
        consumer1.close()
        testStatus1 = true
      }

      info("KafkaSpec : consumer 1 - is waiting some seconds")
      consumer1.read(exec1)
      info("KafkaSpec : consumer 1 - consumed")

      val consumer2 = new KafkaConsumer(testTopic,groupId_2,"192.168.86.5:2181")

      def exec2(binaryObject: Array[Byte]) = {
        val message2 = new String(binaryObject)
        info("testMessage 2 = " + testMessage + " and consumed message 2 = " + message2)
        testMessage must_== message2
        consumer2.close()
        testStatus2 = true
      }

      info("KafkaSpec : consumer 2 - is waiting some seconds")
      consumer2.read(exec2)
      info("KafkaSpec : consumer 2 - consumed")

      testStatus2 must beTrue // we need to get to this point but a failure in exec will fail the test
    }
  }
}

On the producer side I have started to look more into using Akka. The prototype for this implementation is in the test case above “Akka Producer and Consumer” and broken out here

  "Akka Producer and Consumer" should {
    "send string to broker and consume that string back in different consumer groups" in {
      val testMessage = UUID.randomUUID().toString
      val testTopic = UUID.randomUUID().toString
      val groupId_1 = UUID.randomUUID().toString
      val groupId_2 = UUID.randomUUID().toString

      var testStatus1 = false
      var testStatus2 = false

      info("starting akka producertesting")
      val system = ActorSystem("testing")

      val actorCount = 1

      val producer = system.actorOf(Props[KafkaAkkaProducer].withRouter(RoundRobinRouter(actorCount)), "router")

      1 to actorCount foreach { i =>(
        producer ! (testTopic,"192.168.86.10:9092"))
      }

      producer ! testMessage

      val consumer1 = new KafkaConsumer(testTopic,groupId_1,"192.168.86.5:2181")

      def exec1(binaryObject: Array[Byte]) = {
        val message1 = new String(binaryObject)
        info("testMessage 1 = " + testMessage + " and consumed message 1 = " + message1)
        testMessage must_== message1
        consumer1.close()
        testStatus1 = true
      }

      info("KafkaSpec : consumer 1 - is waiting some seconds")
      consumer1.read(exec1)
      info("KafkaSpec : consumer 1 - consumed")

      val consumer2 = new KafkaConsumer(testTopic,groupId_2,"192.168.86.5:2181")

      def exec2(binaryObject: Array[Byte]) = {
        val message2 = new String(binaryObject)
        info("testMessage 2 = " + testMessage + " and consumed message 2 = " + message2)
        testMessage must_== message2
        consumer2.close()
        testStatus2 = true
      }

      info("KafkaSpec : consumer 2 - is waiting some seconds")
      consumer2.read(exec2)
      info("KafkaSpec : consumer 2 - consumed")

      testStatus2 must beTrue // we need to get to this point but a failure in exec will fail the test
    }
  }
}

What is really nice is you can up that actorCount and really start to run test data to analyze.

/*******************************************
Joe Stein
Founder, Principal Consultant
Big Data Open Source Security LLC
http://www.stealth.ly
Twitter: @allthingshadoop
********************************************/

Advertisement

Getting started with Apache Mesos and Apache Aurora using Vagrant

December 16, 2013 2 comments

Apache Mesos is a cluster manager that provides efficient resource isolation and sharing across distributed applications, or frameworks. Think of it as the “kernel” for your data center. Paco Nathan talked about this on one of the All Things Hadoop Podcasts.

Features:

  • Fault-tolerant replicated master using ZooKeeper
  • Scalability to 10,000s of nodes
  • Isolation between tasks with Linux Containers
  • Multi-resource scheduling (memory and CPU aware)
  • Java, Python and C++ APIs for developing new parallel applications
  • Web UI for viewing cluster state

Apache Aurora is a service scheduler that runs on top of Mesos, enabling you to run long-running services that take advantage of Mesos’ scalability, fault-tolerance, and resource isolation. Apache Aurora is currently a part of the Apache Incubator.  The main benefits to a Mesos scheduler like Aurora (and Marathon) is not having to worry about using the Mesos API to take advantage of the grid.  Your application can work the way it does today while Mesos figures out what server(s) to run it on and when to scale that differently from the scheduler.

Features:

  • Deployment and scheduling of jobs
  • The abstraction a “job” to bundle and manage Mesos tasks
  • A rich DSL to define services
  • Health checking
  • Failure domain diversity
  • Instant provisioning

First you need to make sure that you have vagrant and virtual box installed, if you don’t already have these installed then install them please.

1) Install Vagrant http://www.vagrantup.com/
2) Install Virtual Box https://www.virtualbox.org/

That is all you need (assuming you also have git installed). Everything else from here is going to be done from within the virtual machine.


git clone https://github.com/apache/incubator-aurora
cd incubator-aurora
vagrant up

The virtual machines will take some time to spin up so hang tight.

Once the virtual machines are launched you will have your command prompt back and ready to go.

There are 5 vms that are launched: devtools, zookeeper, mesos-master, mesos-slave and aurora-scheduler and they are all configured and networked together (for more info on this check out the Vagrantfile).

Next step is to create an app on the scheduler to provision it to the Mesos cluster that is running.


vagrant ssh aurora-scheduler
vagrant@precise64:~$ cd /vagrant/examples/jobs/
vagrant@precise64:~$aurora create example/www-data/prod/hello hello_world.aurora
 INFO] Creating job hello
 INFO] Response from scheduler: OK (message: 1 new tasks pending for job www-data/prod/hello)
 INFO] Job url: http://precise64:8081/scheduler/www-data/prod/hello

Now go to your browser and pull up http://192.168.33.5:8081/scheduler/www-data/prod/hello and you’ll see your job running

Screen Shot 2013-12-16 at 6.09.42 AM

Basically all of what is happening is in the configuration

hello = Process(
  name = 'hello',
  cmdline = """
    while true; do
      echo hello world
      sleep 10
    done
  """)

task = SequentialTask(
  processes = [hello],
  resources = Resources(cpu = 1.0, ram = 128*MB, disk = 128*MB))

jobs = [Service(
  task = task, cluster = 'example', role = 'www-data', environment = 'prod', name = 'hello')]

It is an exciting time for virtualization and resource scheduling and process provisioning within infrastructures. It is all open source so go dig in and see how it all works for yourself.

/*******************************************
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 Twitter: @allthingshadoop
********************************************/

Using Vagrant to get up and running with Apache Kafka

December 7, 2013 Leave a comment

If you haven’t used Vagrant before then hopefully this will not only help you get up and running with Apache Kafka but also get introduced to Vagrant.  Vagrant helps to create and configure lightweight, reproducible, and portable development environments. With Vagrant run a single command — “vagrant up” — and sit back as Vagrant puts together your complete development environment. Say goodbye to the “works on my machine” excuse as Vagrant creates identical development environments for everyone on your team. To learn more about Apache Kafka checkout our website.

To get up and running with Apache Kafka using Vagrant

1) Install Vagrant http://www.vagrantup.com/
2) Install Virtual Box https://www.virtualbox.org/

For the latest release of Apache Kafka 0.8.0 I have added the ability to use Vagrant.

1) git clone https://github.com/stealthly/kafka
2) cd kafka
3) ./sbt update
4) ./sbt package
5) ./sbt assembly-package-dependency
6) vagrant up

Once this is done (it takes a little while for vagrant to launch the four virtual machines)

  • Zookeeper will be running 192.168.50.5
  • Broker 1 on 192.168.50.10
  • Broker 2 on 192.168.50.20
  • Broker 3 on 192.168.50.30

When you are all up and running you will be back at a command prompt.

If you want you can login to the machines using vagrant ssh <machineName> but you don’t need to.

You can access the brokers and zookeeper by their IP

e.g.


bin/kafka-create-topic.sh --zookeeper 192.168.50.5:2181 --replica 3 --partition 1 --topic sandbox

bin/kafka-console-producer.sh --broker-list 192.168.50.10:9092,192.168.50.20:9092,192.168.50.30:9092 --topic sandbox

bin/kafka-console-consumer.sh --zookeeper 192.168.50.5:2181 --topic sandbox --from-beginning

A patch is pending for the 0.8.1 release to have this go back upstream.

/*******************************************
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 Twitter: @allthingshadoop
********************************************/
Categories: Open Source Projects

Technology Decisions Are About Trade Offs and Solving Problems

December 6, 2013 Leave a comment

At some point in the last decade we hit the inflection point where distributed systems, and all their complexities, became the common reality.

Maybe it was the need to change how we scale since CPU clocks are not getting any faster… Maybe it was the Google MapReduce and/or Amazon Dynamo papers… Or maybe it was just the RedSox winning the world series. It doesn’t really matter because we now live in a world where distributed computing is accessible to everyone.

Often the technical arguments around distributed computing are not wrong just rather they do not have proper context. Solutions presented should explain how the technologies/languages solve problems based on the context of their implementation and the trade offs they make.

So much has become solution, solution, solution, without separating and getting down to the problems and then linking problem to solution. Solutions (or technical warnings) come across best when you explain them in context and explain the downside of the solution (so what trade off you accepted for your context).

A few quick examples of this before we continue

 

  • Defending locks as necessary, even superior to a lockless environment. Locks have many well-known and well-documented downsides. They are also used in many systems and have well-documented and well-known benefits. So, context is key here. There are plenty of times when it makes sense to use locks to solve a particular problem. Many people probably have the same problem you have and it is often a complex implementation so they can learn from what you did with and/or without locks that was wrong and what you did that was right.
  • Ripping a language by pointing out everything that is bad about it and saying not to use it. Don’t just talk about problems without providing another solution. Sure, that language might not make sense (for you) when building some type of specific application, but that doesn’t mean it has zero benefits.
  • Promoting how you had a problem with using a technology so instead of realizing that maybe you were doing something wrong you went in and started to question that technology… like sending an email to a mailing list saying that you have “doubts” about the technology when maybe you just don’t understand it.

Just because it doesn’t make sense for you and the context of your domain and that your not willing to accept the trade offs doesn’t make technologies or languages bad or wrong… nor does it make it right.

I love a good informed discussion and debate about technologies that contrast and overlap.  I especially like it when it is not clear which is better for a specific use case. It’s even better when two (or more) technologies each only solve 80% of the problem and do so differently.  This leaves you having to write your own software. And THAT is where the magic happens!

If you can’t talk about the downside and the trade offs then you’re just a fan person waiving a coffee mug around.  That type of behavior really doesn’t benefit anyone.

Context Matters

 

Amarillo Slim said it best “If you can’t spot the sucker within the first half hour at the table, then you are the sucker.”

So, If your’re building a website for your baseball card collection then it doesn’t matter if you use FileMaker Pro, MySQL, Oracle, MongoDB, Cassandra or HBase for your backend. Only your friends and family are going to care if it goes down, loses data or can’t handle load when your entire Facebook and Twitter circles decide to login all at the same time…

Now, lets say you’re building a website (off the top of my head) for signing individuals up for healthcare (hypothetically). Now, if I suggested to write that system in COBOL on a bank of AS400s then I think it would be fair to say that everyone would think that I am crazy and be like “Joe, the sky is blue stop trying to argue it is not”. A COBOL system running on an AS400 is fine if you can find resources willing to work on it and support it with enough capital investment to work around the trade offs.

At the end of the day, computer problems are solved by people and how they apply the technologies as much as the technologies and languages themselves. If you expect tens of thousands or even millions of people all trying to sign up at once (because you have a big opening day launch) then it really doesn’t matter what your system does after that nor what it was written in, what backend it has whether it is cloud based or bare metal or whatever because if they can’t even login or signup then its a bust. The problems nor the trade offs were originally understood so arguing about the technologies/languages being wrong or right is superfluous. The root cause for the failure was a lack of understanding of the problem.

Every technology can be argued to have a use… so let’s stop trashing technologies and be a bit more understanding and focused with our approach. Boil it down to placing technologies (and languages) into their proper category for what they could be good for and what they are not good for. This applies to both cheering a technology/language along and warning people from it. And don’t forget academics and scholarly (both in teaching and research/advancement of the field) solutions. You might do something in school that make sense because it is the best way to learn but that is not actually the solution you would use in other domains.

Focus on the problems and manage the acceptable trade offs to solve those problems

 

So after you get past “is this technically possible” you then have to decide on trade offs and objectively how it affects your context.

If you are not looking at the trade offs then you are doing something wrong. There are always trade offs.

  • What are your real goals? Define them! Be clear with what you are trying to achieve and focus on the problems. Don’t create problems to solve…. Please Please Please, don’t create problems to solve that are not domain driven. And if you do, then don’t promote them as solutions.
  • How much downtime can you afford? Don’t say zero because that is not reality for anyone. Quantify and explain this. Don’t just go and say zero down time rather figure out per second how much money you lose by being down… or what else you might lose for your system being down for how long and quantify and communicate that and why. Apply this not just to the system but each sub-system and possibly every component within each sub-system.
  • How much data is acceptable to lose? If the answer is none then how much availability of the system are you willing to sacrifice? You can have zero loss of data but not without sacrificing availability; however, if availability is more important than how much data you are willing to lose then you have to accept the trade off. For more on this check out Henry’ Robinson’s CAP Confusion: Problems with ‘partition tolerance’ blog post “Partition tolerance means simply developing a coping strategy by choosing which of the other system properties to drop. This is the real lesson of the CAP theorem – if you have a network that may drop messages, then you cannot have both availability and consistency, you must choose one”
  • What are the expectations of the users? Explain this because users provide the domain and the context for what your system does and doesn’t do. What are real requirements vs nice/fancy/flashy/things to have?
  • What is the makeup of your existing team? What is the market for potential new hires? What if you try to pick all the shiny “new awesome” technologies and languages for a solution? What happens when you have to scale the team and you are not able to hire anyone that knows that skill set and no one is interested in adding it to their resume? Is it still shiny and awesome? On the flip side are you using tech that could be antiquated so even though your existing resources are comfortable using the antiquated tech will you have trouble hiring talent moving forward to support it?
  • Do you have existing code and an existing business? Do you have to maintain the code and business with new projects? What can you throw away?  How long do you have to wait before you can throw something away?
  • Are you solving problems you really have or problems you perceive to have?

“Programmers waste enormous amounts of time thinking about, or worrying about, the speed of noncritical parts of their programs, and these attempts at efficiency actually have a strong negative impact when debugging and maintenance are considered. We should forget about small efficiencies, say about 97% of the time: premature optimization is the root of all evil. Yet we should not pass up our opportunities in that critical 3%.” – Donald Knuth

Technology Decisions Are About Trade Offs and Solving Problems

 

For me I have my own opinions and my own abilities for solving problems. I don’t always get to work on the use cases I like and sometimes sure, I get use cases that really just completely and totally suck that require me to use technologies and languages that make me vomit in my mouth a little. But, when its all done, if I can still look at myself in the mirror and be proud and feel accomplished with the end result and also that the expectations were exceeded without compromising too much with the trade offs made then solution == awesome.

So with all my rants and thoughts and opinions I leave you to focus on your problems and explain them in context to what you are doing and the trade offs you accepted. Thanks to Camille Fournier, Jay Kreps, Alex Popescu and Steven Gravitz for reviewing this blog post to keep me honest and pull me off the ledge and help with some garbage collection in the process.

/*******************************************
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 Twitter: @allthingshadoop
********************************************/