Apache Mesos at Bloomberg

November 6, 2015 Leave a comment


In the video recording of the talk Mr. Gupta discusses BVault, a massive, scalable, archiving and e-discovery solution for communications that has been adopted by more than 800 enterprises, processing more than 220 million daily messages with an archive containing more than 90 billion communication objects. He describes how the service is optimized for fast deployment of data-centric and processing-intensive applications using elastic cloud computing strategies. Mr. Gupta further explains how his team uses Apache Mesos to abstract heterogeneous data center assets as a homogeneous set of resources, prefabricated hardware in secure and geographically distributed data centers to provide on-demand capacity management, and Continuous development and integration using containers as an emerging standard in cloud infrastructure.

Mr. Gupta encourages anyone who is interested in working on Mesos @ Bloomberg to view Bloomberg’s relevant job listings and contact him on Twitter @skandsg.

Tweet @elodinadotnet If you are interested in learning more about the Apache Mesos NYC Meetup. We are always looking for speakers and sponsors for upcoming events.

~ Joe Stein

Categories: Uncategorized

How to choose the number of topics/partitions in a Kafka cluster?

March 12, 2015 Leave a comment


This is a common question asked by many Kafka users. The goal of this post is to explain a few important determining factors and provide a few simple formulas.

More partitions lead to higher throughput

The first thing to understand is that a topic partition is the unit of parallelism in Kafka. On both the producer and the broker side, writes to different partitions can be done fully in parallel. So expensive operations such as compression can utilize more hardware resources. On the consumer side, Kafka always gives a single partition’s data to one consumer thread. Thus, the degree of parallelism in the consumer (within a consumer group) is bounded by the number of partitions being consumed. Therefore, in general, the more partitions there are in a Kafka cluster, the higher the throughput one can achieve.

A rough formula for picking the number of partitions is based on throughput. You measure the throughout…

View original post 1,439 more words

Categories: Uncategorized

Syslog Producer For Apache Kafka

January 16, 2015 Leave a comment

The Go Kafka Client (sponsored by CrowdStrike and developed by Big Data Open Source Security LLC) has had introduced to it a Syslog producer.

There are a few ways to get started with the Syslog Producer

Docker Image, From Source, Vagrant

Checkout the README for more detail https://github.com/stealthly/go_kafka_client/tree/master/syslog

The syslog producer operates in two different modes

Both modes produce to Apache Kafka.

Raw data

In this case you are basically just funneling the data to a Kafka topic without any alterations. The bytes into the server are the bytes written to the Kafka topic.

As a ProtoBuff with metadata

This metadata  is set when the server starts and by the server before sending to Kafka. This ProtBuff ends up being really useful downstream.

The fields for the LogLine ProtoBuf:

  • string line
  • string source
  • repeated tag
  • int64 logtypeid
  • repeated int64 timings

line – This is the log data that is coming into the sys log server. This is unaltered from what was received. It is a required field and set by the syslog server.

source – This has whatever meaning you want to give it. It is meant to be a specific representation of the instance the data is coming from (e.g. i-59a059a8). It is an optional field set by passing in the value on the command line.

tag – This is a structure of key/value pairs. You can add however many key/value pairs you want when starting the server. e.g. (dc=dc1,floor=3,aisle=4,rack2,u=3). It is an optional field set by passing in the value on the command line.

logtypeid – This field (which defaults to 0) can be used to conditionalize the parsing of the line field in your Kafka Consumer. (e.g. 1=rfc5424, 2=rfc5424, 3=gosyslog, 4=etc). It is an optional field set by pass the value on the command line.

timings – This field will be set twice, once when we receive the message and once when we produce to Kafka. The purpose of this field is so that as it flows through the data pipeline (other producers and consumers) the timings can continuously be added to. If your data pipelines are dynamic you can use the tag field when reading/writing to know which timing in the list was for which component. This allows for an end to end latency analysis of the message and the processing and transmission steps at each point in the data pipeline.

You can check out more details about the command line options in the README

  Joe Stein
  Founder, Principal Consultant
  Big Data Open Source Security LLC
  Twitter: @allthingshadoop

Multi Datacenter Replication with Apache Kafka

December 29, 2014 Leave a comment

When requiring multi datacenter replication with Apache Kafka folks most often rely on the project’s MirrorMaker tool. This tool works great but was designed for a specific set of use cases requiring more work to get it working for all needs. We found a common need in the community for additional use cases so we built a new MirrorMaker tool using our Go Kafka Client to support these needs and more!

The Go Kafka Mirror Maker supports:

  • No JVM required when consuming from source cluster and producing to destination cluster.
  • Guarantees of at least once mirroring of data from source to destination.
  • Preservation of ordering from source partition to destination partition.
  • Ability to prefix destination topic to avoid collisions of topic names between clusters.
  • Everything else the existing MirrorMaker tool supports.


go run mirror_maker.go --consumer.config sourceCluster1Consumer.config --consumer.config sourceCluster2Consumer.config --num.streams 2 --producer.config targetClusterProducer.config --whitelist=".*"

Configuration parameters:

--whitelist, --blacklist – whitelist or blacklist of topics to mirror. Exactly one whitelist or blacklist is allowed, e.g. passing both whitelist and blacklist will cause panic. This parameter is required.

--consumer.config – consumer property files to consume from a source cluster. You can pass multiple of those like this: --consumer.config sourceCluster1Consumer.config --consumer.config sourceCluster2Consumer.config. At least one consumer config is required.

--producer.config – property file to configure embedded producers. This parameter is required.

--num.producers – number of producer instances. This can be used to increase throughput. This helps because each producer’s requests are effectively handled by a single thread on the receiving Kafka broker. i.e., even if you have multiple consumption streams (see next section), the throughput can be bottle-necked at handling stage of the mirror maker’s producer request. Defaults to 1.

--num.streams – used to specify the number of mirror consumer goroutines to create. If the number of consumption streams is higher than number of available partitions then some of the mirroring routines will be idle by virtue of the consumer rebalancing algorithm (if they do not end up owning any partitions for consumption). Defaults to 1.

--preserve.partitions – flag to preserve partition number. E.g. if message was read from partition 5 it’ll be written to partition 5. Note that this can affect performance. Defaults to false.

--preserve.order – flag to preserve message order. E.g. message sequence 1, 2, 3, 4, 5 will remain 1, 2, 3, 4, 5 in destination topic. Note that this can affect performance. Defaults to false.

--prefix – destination topic prefix. E.g. if message was read from topic “test” and prefix is “dc1_” it’ll be written to topic “dc1_test”. Defaults to empty string.

--queue.size – number of messages that are buffered between the consumer and producer. Defaults to 10000.

The MirrorMaker code and tests are also a great starting point for learning how to produce and consume data with Kafka using the Go programming language.

Big Data Open Source Security LLC provides professional services and product solutions for the collection, storage, transfer, real-time analytics, batch processing and reporting for complex data streams, data sets and distributed systems. BDOSS is all about the “glue” and helping companies to not only figure out what Big Data Infrastructure Components to use but also how to change their existing (or build new) systems to work with them. The focus of our services and solutions are end to end including architecture, development, implementation, documentation, training and support for complex data streams, data sets and distributed systems using Open Source Software.

 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 Twitter: @allthingshadoop

Open Source Cloud Formation with Minotaur for Mesos, Kafka and Hadoop

December 17, 2014 Leave a comment

Today I am happy to announce “Minotaur” which is our Open Source AWS based infrastructure for managing big data open source projects including (but not limited too): Apache Kafka, Apache Mesos and Cloudera’s Distribution of Hadoop. Minotaur is based on AWS Cloud Formation.

The following labs are currently supported:


Supervisor is a Docker-based image that contains all the necessary software to manage nodes/resources in AWS.

Supervisor set-up

  • clone this repo to repo_dir
  • cd to repo_dir/supervisor folder

Before trying to build docker image, you must put some configuration files under config directory:

a) aws_config

This file is just a regular aws-cli config, you must paste your secret and access keys, provided by Amazon in it:

output = json
region = us-east-1
aws_access_key_id = SECRET_KEY
aws_secret_access_key = ACCESS_KEY

Do not add or remove any extra whitespaces (especially before and after “=” sign in keys)

b) private.key

This is your private SSH key, public part of which is registered on Bastion host

c) environment.key

This is a shared key for all nodes in environment Supervisor is supposed to manage.

d) ssh_config

This is a regular SSH config file, you have to change your_username only (this is the one registered on Bastion).

BASTION_IP is handled dynamically when container is built.

# BDOSS environment
Host 10.0.2.*
    IdentityFile ~/.ssh/environment.key 
    User ubuntu
    ProxyCommand  ssh -i ~/.ssh/private.key your_username@BASTION_IP nc %h %p
Host 10.0.*.*
    IdentityFile ~/.ssh/environment.key 
    User ubuntu
    ProxyCommand  ssh -i ~/.ssh/private.key your_username@BASTION_IP nc %h %p
  • exec up.sh:

If this is the first time you’re launching supervisor – it will take some time to build.

Subsequent up’s will take seconds.

Using supervisor

Now you can cd to /deploy/labs/ and deploy whatever you want


minotaur lab deploy mesosmaster -e bdoss-dev -d test -r us-east-1 -z us-east-1a
Creating new stack 'mesos-master-test-bdoss-dev-us-east-1-us-east-1a'...
Stack deployed.

this will spin up a mesos master node in “testing” deployment.


Supervisor has a built-in “awsinfo” command, which relies on AWS API and provides brief info about running machines. It is also capable of searching through that info.

Usage example

awsinfo – will display brief info about all nodes running in AWS

root@supervisor:/deploy# awsinfo
Cloud:  bdoss/us-east-1
Name                                Instance ID  Instance Type  Instance State  Private IP      Public IP      
----                                -----------  -------------  --------------  ----------      ---------      
nat.bdoss-dev                       i-c46a0b2a   m1.small       running  
bastion.bdoss-dev                   i-3faa69de   m1.small       running      None           
mesos-master.test.bdoss-dev         i-e80ddc09   m1.small       terminated      None            None           
mesos-slave.test.bdoss-dev          i-e00ddc01   m1.small       terminated      None            None           

awsinfo mesos-master – will display info about all mesos-master nodes running in AWS.

root@supervisor:/deploy/labs# awsinfo mesos-master
Cloud:  bdoss/us-east-1
Name                                Instance ID  Instance Type  Instance State  Private IP      Public IP      
----                                -----------  -------------  --------------  ----------      ---------      
mesos-master.test.bdoss-dev         i-e80ddc09   m1.small       terminated      None            None           

awsinfo 10.0.2 – match a private/public subnet

root@supervisor:/deploy/labs# awsinfo 10.0.2
Cloud:  bdoss/us-east-1
Name                                Instance ID  Instance Type  Instance State  Private IP      Public IP      
----                                -----------  -------------  --------------  ----------      ---------      
nat.bdoss-dev                       i-c46a0b2a   m1.small       running  
mesos-master.test.bdoss-dev         i-e96ebd08   m1.small       running 


If you can’t use Docker directly for some reason, there’s a Vagrant wrapper VM for it.

Before doing anything with Vagrant, complete the above steps for Docker, but don’t execute up.sh script

Just cd into vagrant directory and exec vagrant up, then vagrant ssh (nothing special here yet).

When you will exec vagrant ssh, docker container build process will spawn up immediately, so wait a bit and let it complete.

Now you’re inside a Docker container nested in Vagrant VM and can proceed with deployment in the same manner as it’s described for docker.

All the following vagrant ssh‘s will spawn Docker container almost immediately.

Once you are inside of the supervisor image, the minotaur.py script may be used to provision an environment and labs. The rest of this readme assumes that the script is executed from within the supervisor container.

Minotaur Commands

List Infrastructure Components

root@supervisor:/deploy# ./minotaur.py infrastructure list
Available deployments are: ['bastion', 'iampolicies', 'iamusertogroupadditions', 'nat', 'sns', 'subnet', 'vpc']

Print Infrastructure Component Usage

root@supervisor:/deploy# ./minotaur.py infrastructure deploy bastion -h
usage: minotaur.py infrastructure deploy bastion [-h] -e ENVIRONMENT -r REGION
                                                 -z AVAILABILITY_ZONE
                                                 [-i INSTANCE_TYPE]

optional arguments:
  -h, --help            show this help message and exit
                        CloudFormation environment to deploy to
  -r REGION, --region REGION
                        Geographic area to deploy to
                        Isolated location to deploy to
  -i INSTANCE_TYPE, --instance-type INSTANCE_TYPE
                        AWS EC2 instance type to deploy

Deploy Infrastructure Component

In this example, the bdoss-dev bastion already existed, so the CloudFormation stack was updated with the current template.

root@supervisor:/deploy# ./minotaur.py infrastructure deploy bastion -e bdoss-dev -r us-east-1 -z -us-east-1a
Template successfully validated.
Updating existing 'bastion-bdoss-dev-us-east-1-us-east-1a' stack...
Stack updated.

List Labs

List all supported labs.

root@supervisor:/deploy# ./minotaur.py lab list
Available deployments are: ['clouderahadoop', 'gokafkaconsumer', 'gokafkaproducer', 'kafka', 'mesosmaster', 'mesosslave', 'zookeeper']

Print Lab Usage

Print the kafka lab usage.

root@supervisor:/deploy# ./minotaur.py lab deploy kafka -h
usage: minotaur.py lab deploy kafka [-h] -e ENVIRONMENT -d DEPLOYMENT -r
                                    REGION -z AVAILABILITY_ZONE [-n NUM_NODES]
                                    [-i INSTANCE_TYPE] [-v ZK_VERSION]
                                    [-k KAFKA_URL]

optional arguments:
  -h, --help            show this help message and exit
                        CloudFormation environment to deploy to
  -d DEPLOYMENT, --deployment DEPLOYMENT
                        Unique name for the deployment
  -r REGION, --region REGION
                        Geographic area to deploy to
                        Isolated location to deploy to
  -n NUM_NODES, --num-nodes NUM_NODES
                        Number of instances to deploy
  -i INSTANCE_TYPE, --instance-type INSTANCE_TYPE
                        AWS EC2 instance type to deploy
  -v ZK_VERSION, --zk-version ZK_VERSION
                        The Zookeeper version to deploy
  -k KAFKA_URL, --kafka-url KAFKA_URL
                        The Kafka URL

Deploy Lab

Deploy a 3-broker Kafka cluster.

root@supervisor:/deploy# ./minotaur.py lab deploy kafka -e bdoss-dev -d kafka-example -r us-east-1 -z us-east-1a -n 3 -i m1.small    
Template successfully validated.
Creating new 'kafka-bdoss-dev-kafka-example-us-east-1-us-east-1a' stack...
Stack deployed.

 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 Twitter: @allthingshadoop


Ideas and goals behind the Go Kafka Client

December 10, 2014 Leave a comment

I think a bunch of folks have heard already that B.D.O.S.S. was working on a new Apache Kafka Client For Go. Go Kafka Client was open sourced last Friday. Today we are starting the release of Minotaur which is our lab environment for Apache Zookeeper, Apache Mesos, Apache CassandraApache Kafka, Apache Hadoop and our new Go Kafka Client.

To get started using the consumer client check out our example code and its property file.

Ideas and goals behind the Go Kafka Client:


1) Partition Ownership

We decided on implementing multiple strategies for this including static assignment. The concept of re-balancing is preserved but now there are a few different strategies to re-balancing and they can run at different times depending on what is going on (like a blue/green deploy is happening). For more on blue/green deployments check out this video with Jim Plush and Sean Berry from 2014 AWS re:Invent.

2) Fetch Management

This is what “fills up the reservoir” as I like to call it so the processing (either sequential or in batch) will always have data if there is data for it to have without making a network hop. The fetcher has to stay ahead here keeping the processing tap full (or if empty that is controlled) pulling the data for the Kafka partition(s) it is owning.

3) Work Management

For the Go consumer we currently only support “fan out” using go routines and channels. If you have ever used go this will be familiar to you if not you should drop everything and learn Go.

4) Offset Management

Our offset management is based on a per batch basis with each highest offset from the batch committed on a per partition basis.
func main() {
	config, consumerIdPattern, topic, numConsumers, graphiteConnect, graphiteFlushInterval := resolveConfig()
	startMetrics(graphiteConnect, graphiteFlushInterval)

	ctrlc := make(chan os.Signal, 1)
	signal.Notify(ctrlc, os.Interrupt)

	consumers := make([]*kafkaClient.Consumer, numConsumers)
	for i := 0; i < numConsumers; i++ {
		consumers[i] = startNewConsumer(*config, topic, consumerIdPattern, i)
		time.Sleep(10 * time.Second)

	fmt.Println("Shutdown triggered, closing all alive consumers")
	for _, consumer := range consumers {
	fmt.Println("Successfully shut down all consumers")

func startMetrics(graphiteConnect string, graphiteFlushInterval time.Duration) {
	addr, err := net.ResolveTCPAddr("tcp", graphiteConnect)
	if err != nil {
	go metrics.GraphiteWithConfig(metrics.GraphiteConfig{
		Addr:          addr,
		Registry:      metrics.DefaultRegistry,
		FlushInterval: graphiteFlushInterval,
		DurationUnit:  time.Second,
		Prefix:        "metrics",
		Percentiles:   []float64{0.5, 0.75, 0.95, 0.99, 0.999},

func startNewConsumer(config kafkaClient.ConsumerConfig, topic string, consumerIdPattern string, consumerIndex int) *kafkaClient.Consumer {
	config.Consumerid = fmt.Sprintf(consumerIdPattern, consumerIndex)
	config.Strategy = GetStrategy(config.Consumerid)
	config.WorkerFailureCallback = FailedCallback
	config.WorkerFailedAttemptCallback = FailedAttemptCallback
	consumer := kafkaClient.NewConsumer(&config)
	topics := map[string]int {topic : config.NumConsumerFetchers}
	go func() {
	return consumer

func GetStrategy(consumerId string) func(*kafkaClient.Worker, *kafkaClient.Message, kafkaClient.TaskId) kafkaClient.WorkerResult {
	consumeRate := metrics.NewRegisteredMeter(fmt.Sprintf("%s-ConsumeRate", consumerId), metrics.DefaultRegistry)
	return func(_ *kafkaClient.Worker, msg *kafkaClient.Message, id kafkaClient.TaskId) kafkaClient.WorkerResult {
		kafkaClient.Tracef("main", "Got a message: %s", string(msg.Value))

		return kafkaClient.NewSuccessfulResult(id)

func FailedCallback(wm *kafkaClient.WorkerManager) kafkaClient.FailedDecision {
	kafkaClient.Info("main", "Failed callback")

	return kafkaClient.DoNotCommitOffsetAndStop

func FailedAttemptCallback(task *kafkaClient.Task, result kafkaClient.WorkerResult) kafkaClient.FailedDecision {
	kafkaClient.Info("main", "Failed attempt")

	return kafkaClient.CommitOffsetAndContinue
We set our mailing list to be kafka-clients@googlegroups.com this is the central place for client library discussions for the Apache Kafka community.

Plans moving forward with the Go Kafka Client:

1) Build up suite of integration tests.
2) Stabilize the API nomenclature with the JVM Kafka Client for consistency.
3) Run integration tests and create baseline regression points.
4) Re-work changes from feedback of issues found and from our own customer support.
5) Push to production and continue on going support.

Ideas and goals behind Minotaur:

Provide a controlled and isolated environment through scripts for regression of versions and benchmarks against instance sizing. This comes in 3 parts: 1) Supervisor 2) Infrastructure 3) Labs. The Labs portion is broker down with both chef cook books and python wrappers for cloud formation templates for each of the following systems:

Plans moving forward with Minotaur:

1) Upstream our Cloud Formation Scripts (this is going to take some more work detangling Dexter (the name for our private repo of Minotaur).
2) Move the work we do with Apache Cassandra into the project.
3) Start support of Puppet in addition to the work we did with Chef.
4) Get the Docker supervisor console pushed upstream through Bastion hosts.
5) Cut our labs over to using Minotaur.
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 Twitter: @allthingshadoop

Mesosphere’s new data center mother brain will blow your mind

December 8, 2014 Leave a comment
Categories: Apache Mesos

Get every new post delivered to your Inbox.

Join 58 other followers