Archive

Archive for December, 2014

Multi Datacenter Replication with Apache Kafka

December 29, 2014 Leave a comment

When requiring multi datacenter replication with Apache Kafka folks most often rely on the project’s MirrorMaker tool. This tool works great but was designed for a specific set of use cases requiring more work to get it working for all needs. We found a common need in the community for additional use cases so we built a new MirrorMaker tool using our Go Kafka Client to support these needs and more!

The Go Kafka Mirror Maker supports:

  • No JVM required when consuming from source cluster and producing to destination cluster.
  • Guarantees of at least once mirroring of data from source to destination.
  • Preservation of ordering from source partition to destination partition.
  • Ability to prefix destination topic to avoid collisions of topic names between clusters.
  • Everything else the existing MirrorMaker tool supports.

Usage:

go run mirror_maker.go --consumer.config sourceCluster1Consumer.config --consumer.config sourceCluster2Consumer.config --num.streams 2 --producer.config targetClusterProducer.config --whitelist=".*"

Configuration parameters:

--whitelist, --blacklist – whitelist or blacklist of topics to mirror. Exactly one whitelist or blacklist is allowed, e.g. passing both whitelist and blacklist will cause panic. This parameter is required.

--consumer.config – consumer property files to consume from a source cluster. You can pass multiple of those like this: --consumer.config sourceCluster1Consumer.config --consumer.config sourceCluster2Consumer.config. At least one consumer config is required.

--producer.config – property file to configure embedded producers. This parameter is required.

--num.producers – number of producer instances. This can be used to increase throughput. This helps because each producer’s requests are effectively handled by a single thread on the receiving Kafka broker. i.e., even if you have multiple consumption streams (see next section), the throughput can be bottle-necked at handling stage of the mirror maker’s producer request. Defaults to 1.

--num.streams – used to specify the number of mirror consumer goroutines to create. If the number of consumption streams is higher than number of available partitions then some of the mirroring routines will be idle by virtue of the consumer rebalancing algorithm (if they do not end up owning any partitions for consumption). Defaults to 1.

--preserve.partitions – flag to preserve partition number. E.g. if message was read from partition 5 it’ll be written to partition 5. Note that this can affect performance. Defaults to false.

--preserve.order – flag to preserve message order. E.g. message sequence 1, 2, 3, 4, 5 will remain 1, 2, 3, 4, 5 in destination topic. Note that this can affect performance. Defaults to false.

--prefix – destination topic prefix. E.g. if message was read from topic “test” and prefix is “dc1_” it’ll be written to topic “dc1_test”. Defaults to empty string.

--queue.size – number of messages that are buffered between the consumer and producer. Defaults to 10000.

The MirrorMaker code and tests are also a great starting point for learning how to produce and consume data with Kafka using the Go programming language.

Big Data Open Source Security LLC provides professional services and product solutions for the collection, storage, transfer, real-time analytics, batch processing and reporting for complex data streams, data sets and distributed systems. BDOSS is all about the “glue” and helping companies to not only figure out what Big Data Infrastructure Components to use but also how to change their existing (or build new) systems to work with them. The focus of our services and solutions are end to end including architecture, development, implementation, documentation, training and support for complex data streams, data sets and distributed systems using Open Source Software.

/*******************************************
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 Twitter: @allthingshadoop
********************************************/
Advertisement

Open Source Cloud Formation with Minotaur for Mesos, Kafka and Hadoop

December 17, 2014 Leave a comment

Today I am happy to announce “Minotaur” which is our Open Source AWS based infrastructure for managing big data open source projects including (but not limited too): Apache Kafka, Apache Mesos and Cloudera’s Distribution of Hadoop. Minotaur is based on AWS Cloud Formation.

The following labs are currently supported:

Supervisor

Supervisor is a Docker-based image that contains all the necessary software to manage nodes/resources in AWS.

Supervisor set-up

  • clone this repo to repo_dir
  • cd to repo_dir/supervisor folder

Before trying to build docker image, you must put some configuration files under config directory:

a) aws_config

This file is just a regular aws-cli config, you must paste your secret and access keys, provided by Amazon in it:

[default]
output = json
region = us-east-1
aws_access_key_id = SECRET_KEY
aws_secret_access_key = ACCESS_KEY

Do not add or remove any extra whitespaces (especially before and after “=” sign in keys)

b) private.key

This is your private SSH key, public part of which is registered on Bastion host

c) environment.key

This is a shared key for all nodes in environment Supervisor is supposed to manage.

d) ssh_config

This is a regular SSH config file, you have to change your_username only (this is the one registered on Bastion).

BASTION_IP is handled dynamically when container is built.

# BDOSS environment
Host 10.0.2.*
    IdentityFile ~/.ssh/environment.key 
    User ubuntu
    ProxyCommand  ssh -i ~/.ssh/private.key your_username@BASTION_IP nc %h %p
Host 10.0.*.*
    IdentityFile ~/.ssh/environment.key 
    User ubuntu
    ProxyCommand  ssh -i ~/.ssh/private.key your_username@BASTION_IP nc %h %p
  • exec up.sh:

If this is the first time you’re launching supervisor – it will take some time to build.

Subsequent up’s will take seconds.

Using supervisor

Now you can cd to /deploy/labs/ and deploy whatever you want

Example:


minotaur lab deploy mesosmaster -e bdoss-dev -d test -r us-east-1 -z us-east-1a
Creating new stack 'mesos-master-test-bdoss-dev-us-east-1-us-east-1a'...
Stack deployed.

this will spin up a mesos master node in “testing” deployment.

awsinfo

Supervisor has a built-in “awsinfo” command, which relies on AWS API and provides brief info about running machines. It is also capable of searching through that info.

Usage example

awsinfo – will display brief info about all nodes running in AWS

root@supervisor:/deploy# awsinfo
Cloud:  bdoss/us-east-1
Name                                Instance ID  Instance Type  Instance State  Private IP      Public IP      
----                                -----------  -------------  --------------  ----------      ---------      
nat.bdoss-dev                       i-c46a0b2a   m1.small       running         10.0.2.94       54.86.153.142  
bastion.bdoss-dev                   i-3faa69de   m1.small       running         10.0.0.207      None           
mesos-master.test.bdoss-dev         i-e80ddc09   m1.small       terminated      None            None           
mesos-slave.test.bdoss-dev          i-e00ddc01   m1.small       terminated      None            None           

awsinfo mesos-master – will display info about all mesos-master nodes running in AWS.

root@supervisor:/deploy/labs# awsinfo mesos-master
Cloud:  bdoss/us-east-1
Name                                Instance ID  Instance Type  Instance State  Private IP      Public IP      
----                                -----------  -------------  --------------  ----------      ---------      
mesos-master.test.bdoss-dev         i-e80ddc09   m1.small       terminated      None            None           

awsinfo 10.0.2 – match a private/public subnet

root@supervisor:/deploy/labs# awsinfo 10.0.2
Cloud:  bdoss/us-east-1
Name                                Instance ID  Instance Type  Instance State  Private IP      Public IP      
----                                -----------  -------------  --------------  ----------      ---------      
nat.bdoss-dev                       i-c46a0b2a   m1.small       running         10.0.2.94       54.86.153.142  
mesos-master.test.bdoss-dev         i-e96ebd08   m1.small       running         10.0.2.170      54.172.160.254 

Vagrant

If you can’t use Docker directly for some reason, there’s a Vagrant wrapper VM for it.

Before doing anything with Vagrant, complete the above steps for Docker, but don’t execute up.sh script

Just cd into vagrant directory and exec vagrant up, then vagrant ssh (nothing special here yet).

When you will exec vagrant ssh, docker container build process will spawn up immediately, so wait a bit and let it complete.

Now you’re inside a Docker container nested in Vagrant VM and can proceed with deployment in the same manner as it’s described for docker.

All the following vagrant ssh‘s will spawn Docker container almost immediately.

Once you are inside of the supervisor image, the minotaur.py script may be used to provision an environment and labs. The rest of this readme assumes that the script is executed from within the supervisor container.

Minotaur Commands

List Infrastructure Components

root@supervisor:/deploy# ./minotaur.py infrastructure list
Available deployments are: ['bastion', 'iampolicies', 'iamusertogroupadditions', 'nat', 'sns', 'subnet', 'vpc']

Print Infrastructure Component Usage

root@supervisor:/deploy# ./minotaur.py infrastructure deploy bastion -h
usage: minotaur.py infrastructure deploy bastion [-h] -e ENVIRONMENT -r REGION
                                                 -z AVAILABILITY_ZONE
                                                 [-i INSTANCE_TYPE]

optional arguments:
  -h, --help            show this help message and exit
  -e ENVIRONMENT, --environment ENVIRONMENT
                        CloudFormation environment to deploy to
  -r REGION, --region REGION
                        Geographic area to deploy to
  -z AVAILABILITY_ZONE, --availability-zone AVAILABILITY_ZONE
                        Isolated location to deploy to
  -i INSTANCE_TYPE, --instance-type INSTANCE_TYPE
                        AWS EC2 instance type to deploy

Deploy Infrastructure Component

In this example, the bdoss-dev bastion already existed, so the CloudFormation stack was updated with the current template.

root@supervisor:/deploy# ./minotaur.py infrastructure deploy bastion -e bdoss-dev -r us-east-1 -z -us-east-1a
Template successfully validated.
Updating existing 'bastion-bdoss-dev-us-east-1-us-east-1a' stack...
Stack updated.

List Labs

List all supported labs.

root@supervisor:/deploy# ./minotaur.py lab list
Available deployments are: ['clouderahadoop', 'gokafkaconsumer', 'gokafkaproducer', 'kafka', 'mesosmaster', 'mesosslave', 'zookeeper']

Print Lab Usage

Print the kafka lab usage.

root@supervisor:/deploy# ./minotaur.py lab deploy kafka -h
usage: minotaur.py lab deploy kafka [-h] -e ENVIRONMENT -d DEPLOYMENT -r
                                    REGION -z AVAILABILITY_ZONE [-n NUM_NODES]
                                    [-i INSTANCE_TYPE] [-v ZK_VERSION]
                                    [-k KAFKA_URL]

optional arguments:
  -h, --help            show this help message and exit
  -e ENVIRONMENT, --environment ENVIRONMENT
                        CloudFormation environment to deploy to
  -d DEPLOYMENT, --deployment DEPLOYMENT
                        Unique name for the deployment
  -r REGION, --region REGION
                        Geographic area to deploy to
  -z AVAILABILITY_ZONE, --availability-zone AVAILABILITY_ZONE
                        Isolated location to deploy to
  -n NUM_NODES, --num-nodes NUM_NODES
                        Number of instances to deploy
  -i INSTANCE_TYPE, --instance-type INSTANCE_TYPE
                        AWS EC2 instance type to deploy
  -v ZK_VERSION, --zk-version ZK_VERSION
                        The Zookeeper version to deploy
  -k KAFKA_URL, --kafka-url KAFKA_URL
                        The Kafka URL

Deploy Lab

Deploy a 3-broker Kafka cluster.

root@supervisor:/deploy# ./minotaur.py lab deploy kafka -e bdoss-dev -d kafka-example -r us-east-1 -z us-east-1a -n 3 -i m1.small    
Template successfully validated.
Creating new 'kafka-bdoss-dev-kafka-example-us-east-1-us-east-1a' stack...
Stack deployed.

/*******************************************
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 Twitter: @allthingshadoop
********************************************/

								

Ideas and goals behind the Go Kafka Client

December 10, 2014 Leave a comment

I think a bunch of folks have heard already that B.D.O.S.S. was working on a new Apache Kafka Client For Go. Go Kafka Client was open sourced last Friday. Today we are starting the release of Minotaur which is our lab environment for Apache Zookeeper, Apache Mesos, Apache CassandraApache Kafka, Apache Hadoop and our new Go Kafka Client.

To get started using the consumer client check out our example code and its property file.

Ideas and goals behind the Go Kafka Client:

 

1) Partition Ownership

We decided on implementing multiple strategies for this including static assignment. The concept of re-balancing is preserved but now there are a few different strategies to re-balancing and they can run at different times depending on what is going on (like a blue/green deploy is happening). For more on blue/green deployments check out this video with Jim Plush and Sean Berry from 2014 AWS re:Invent.
 

2) Fetch Management

This is what “fills up the reservoir” as I like to call it so the processing (either sequential or in batch) will always have data if there is data for it to have without making a network hop. The fetcher has to stay ahead here keeping the processing tap full (or if empty that is controlled) pulling the data for the Kafka partition(s) it is owning.
 

3) Work Management

For the Go consumer we currently only support “fan out” using go routines and channels. If you have ever used go this will be familiar to you if not you should drop everything and learn Go.
 

4) Offset Management

Our offset management is based on a per batch basis with each highest offset from the batch committed on a per partition basis.
 
func main() {
	config, consumerIdPattern, topic, numConsumers, graphiteConnect, graphiteFlushInterval := resolveConfig()
	startMetrics(graphiteConnect, graphiteFlushInterval)

	ctrlc := make(chan os.Signal, 1)
	signal.Notify(ctrlc, os.Interrupt)

	consumers := make([]*kafkaClient.Consumer, numConsumers)
	for i := 0; i < numConsumers; i++ {
		consumers[i] = startNewConsumer(*config, topic, consumerIdPattern, i)
		time.Sleep(10 * time.Second)
	}

	<-ctrlc
	fmt.Println("Shutdown triggered, closing all alive consumers")
	for _, consumer := range consumers {
		<-consumer.Close()
	}
	fmt.Println("Successfully shut down all consumers")
}

func startMetrics(graphiteConnect string, graphiteFlushInterval time.Duration) {
	addr, err := net.ResolveTCPAddr("tcp", graphiteConnect)
	if err != nil {
		panic(err)
	}
	go metrics.GraphiteWithConfig(metrics.GraphiteConfig{
		Addr:          addr,
		Registry:      metrics.DefaultRegistry,
		FlushInterval: graphiteFlushInterval,
		DurationUnit:  time.Second,
		Prefix:        "metrics",
		Percentiles:   []float64{0.5, 0.75, 0.95, 0.99, 0.999},
	})
}

func startNewConsumer(config kafkaClient.ConsumerConfig, topic string, consumerIdPattern string, consumerIndex int) *kafkaClient.Consumer {
	config.Consumerid = fmt.Sprintf(consumerIdPattern, consumerIndex)
	config.Strategy = GetStrategy(config.Consumerid)
	config.WorkerFailureCallback = FailedCallback
	config.WorkerFailedAttemptCallback = FailedAttemptCallback
	consumer := kafkaClient.NewConsumer(&config)
	topics := map[string]int {topic : config.NumConsumerFetchers}
	go func() {
		consumer.StartStatic(topics)
	}()
	return consumer
}

func GetStrategy(consumerId string) func(*kafkaClient.Worker, *kafkaClient.Message, kafkaClient.TaskId) kafkaClient.WorkerResult {
	consumeRate := metrics.NewRegisteredMeter(fmt.Sprintf("%s-ConsumeRate", consumerId), metrics.DefaultRegistry)
	return func(_ *kafkaClient.Worker, msg *kafkaClient.Message, id kafkaClient.TaskId) kafkaClient.WorkerResult {
		kafkaClient.Tracef("main", "Got a message: %s", string(msg.Value))
		consumeRate.Mark(1)

		return kafkaClient.NewSuccessfulResult(id)
	}
}

func FailedCallback(wm *kafkaClient.WorkerManager) kafkaClient.FailedDecision {
	kafkaClient.Info("main", "Failed callback")

	return kafkaClient.DoNotCommitOffsetAndStop
}

func FailedAttemptCallback(task *kafkaClient.Task, result kafkaClient.WorkerResult) kafkaClient.FailedDecision {
	kafkaClient.Info("main", "Failed attempt")

	return kafkaClient.CommitOffsetAndContinue
}
We set our mailing list to be kafka-clients@googlegroups.com this is the central place for client library discussions for the Apache Kafka community.
 

Plans moving forward with the Go Kafka Client:

1) Build up suite of integration tests.
2) Stabilize the API nomenclature with the JVM Kafka Client for consistency.
3) Run integration tests and create baseline regression points.
4) Re-work changes from feedback of issues found and from our own customer support.
5) Push to production and continue on going support.
 
 

Ideas and goals behind Minotaur:

Provide a controlled and isolated environment through scripts for regression of versions and benchmarks against instance sizing. This comes in 3 parts: 1) Supervisor 2) Infrastructure 3) Labs. The Labs portion is broker down with both chef cook books and python wrappers for cloud formation templates for each of the following systems:
 

Plans moving forward with Minotaur:

1) Upstream our Cloud Formation Scripts (this is going to take some more work detangling Dexter (the name for our private repo of Minotaur).
2) Move the work we do with Apache Cassandra into the project.
3) Start support of Puppet in addition to the work we did with Chef.
4) Get the Docker supervisor console pushed upstream through Bastion hosts.
5) Cut our labs over to using Minotaur.
 
/*******************************************
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 Twitter: @allthingshadoop
********************************************/

Mesosphere’s new data center mother brain will blow your mind

December 8, 2014 Leave a comment

Gigaom

Mesosphere has been making a name for itself in the the world of data centers and cloud computing since 2013 with its distributed-system smarts and various introductions of open-source technologies, each designed to tackle the challenges of running tons of workloads across multiple machines. On Monday, the startup plans to announce that its much-anticipated data center operating system — the culmination of its many technologies — has been released as a private beta and will be available to the public in early 2015.

As part of the new operating system’s launch, [company]Mesosphere[/company] also plans to announce that it has raised a $36 million Series B investment round, which brings its total funding to $50 million. Khosla Ventures, a new investor, drove the financing along with Andreessen Horowitz, Fuel Capital, SV Angel and other unnamed entities.

Mesosphere’s new data center operating system, dubbed DCOS, tackles the complexity behind trying to…

View original post 1,120 more words

Categories: Apache Mesos

What’s coming in Apache Kafka 0.8.2

December 7, 2014 Leave a comment

Confluent

I am very excited to tell you about the forthcoming 0.8.2 release of Apache Kafka. Kafka is a fault-tolerant, low-latency, high-throughput distributed messaging system used in data pipelines at several companies. Kafka became a top-level Apache project in 2012 and was originally created at LinkedIn, where it forms a critical part of LinkedIn’s infrastructure and transmits data to all systems and applications. The project is currently under active development from a diverse group of contributors.

Since there are many new features in 0.8.2, we released 0.8.2-beta. The final release will be done when 0.8.2 is stable.

Here is a quick overview of the notable work in this release.

New features

New producer

The JVM clients that Kafka ships haven’t changed much since Kafka was originally built. Over time, we have realized some of the limitations and problems that came both from the design of these clients and…

View original post 1,444 more words

Categories: Kafka