Archive
Multi Datacenter Replication with Apache Kafka
When requiring multi datacenter replication with Apache Kafka folks most often rely on the project’s MirrorMaker tool. This tool works great but was designed for a specific set of use cases requiring more work to get it working for all needs. We found a common need in the community for additional use cases so we built a new MirrorMaker tool using our Go Kafka Client to support these needs and more!
The Go Kafka Mirror Maker supports:
- No JVM required when consuming from source cluster and producing to destination cluster.
- Guarantees of at least once mirroring of data from source to destination.
- Preservation of ordering from source partition to destination partition.
- Ability to prefix destination topic to avoid collisions of topic names between clusters.
- Everything else the existing MirrorMaker tool supports.
Usage:
go run mirror_maker.go --consumer.config sourceCluster1Consumer.config --consumer.config sourceCluster2Consumer.config --num.streams 2 --producer.config targetClusterProducer.config --whitelist=".*"
Configuration parameters:
--whitelist
, --blacklist
– whitelist or blacklist of topics to mirror. Exactly one whitelist or blacklist is allowed, e.g. passing both whitelist and blacklist will cause panic. This parameter is required.
--consumer.config
– consumer property files to consume from a source cluster. You can pass multiple of those like this: --consumer.config sourceCluster1Consumer.config --consumer.config sourceCluster2Consumer.config
. At least one consumer config is required.
--producer.config
– property file to configure embedded producers. This parameter is required.
--num.producers
– number of producer instances. This can be used to increase throughput. This helps because each producer’s requests are effectively handled by a single thread on the receiving Kafka broker. i.e., even if you have multiple consumption streams (see next section), the throughput can be bottle-necked at handling stage of the mirror maker’s producer request. Defaults to 1.
--num.streams
– used to specify the number of mirror consumer goroutines to create. If the number of consumption streams is higher than number of available partitions then some of the mirroring routines will be idle by virtue of the consumer rebalancing algorithm (if they do not end up owning any partitions for consumption). Defaults to 1.
--preserve.partitions
– flag to preserve partition number. E.g. if message was read from partition 5 it’ll be written to partition 5. Note that this can affect performance. Defaults to false.
--preserve.order
– flag to preserve message order. E.g. message sequence 1, 2, 3, 4, 5 will remain 1, 2, 3, 4, 5 in destination topic. Note that this can affect performance. Defaults to false.
--prefix
– destination topic prefix. E.g. if message was read from topic “test” and prefix is “dc1_” it’ll be written to topic “dc1_test”. Defaults to empty string.
--queue.size
– number of messages that are buffered between the consumer and producer. Defaults to 10000.
The MirrorMaker code and tests are also a great starting point for learning how to produce and consume data with Kafka using the Go programming language.
Big Data Open Source Security LLC provides professional services and product solutions for the collection, storage, transfer, real-time analytics, batch processing and reporting for complex data streams, data sets and distributed systems. BDOSS is all about the “glue” and helping companies to not only figure out what Big Data Infrastructure Components to use but also how to change their existing (or build new) systems to work with them. The focus of our services and solutions are end to end including architecture, development, implementation, documentation, training and support for complex data streams, data sets and distributed systems using Open Source Software.
Open Source Cloud Formation with Minotaur for Mesos, Kafka and Hadoop
Today I am happy to announce “Minotaur” which is our Open Source AWS based infrastructure for managing big data open source projects including (but not limited too): Apache Kafka, Apache Mesos and Cloudera’s Distribution of Hadoop. Minotaur is based on AWS Cloud Formation.
The following labs are currently supported:
- Apache Mesos
- Apache Kafka
- Apache Zookeeper
- Cloudera Hadoop
- Golang Kafka Consumer
- Golang Kafka Producer
Supervisor
Supervisor is a Docker-based image that contains all the necessary software to manage nodes/resources in AWS.
Supervisor set-up
- clone this repo to repo_dir
- cd to repo_dir/supervisor folder
Before trying to build docker image, you must put some configuration files under config directory:
a) aws_config
This file is just a regular aws-cli config, you must paste your secret and access keys, provided by Amazon in it:
[default]
output = json
region = us-east-1
aws_access_key_id = SECRET_KEY
aws_secret_access_key = ACCESS_KEY
Do not add or remove any extra whitespaces (especially before and after “=” sign in keys)
b) private.key
This is your private SSH key, public part of which is registered on Bastion host
c) environment.key
This is a shared key for all nodes in environment Supervisor is supposed to manage.
d) ssh_config
This is a regular SSH config file, you have to change your_username only (this is the one registered on Bastion).
BASTION_IP is handled dynamically when container is built.
# BDOSS environment
Host 10.0.2.*
IdentityFile ~/.ssh/environment.key
User ubuntu
ProxyCommand ssh -i ~/.ssh/private.key your_username@BASTION_IP nc %h %p
Host 10.0.*.*
IdentityFile ~/.ssh/environment.key
User ubuntu
ProxyCommand ssh -i ~/.ssh/private.key your_username@BASTION_IP nc %h %p
- exec up.sh:
If this is the first time you’re launching supervisor – it will take some time to build.
Subsequent up’s will take seconds.
Using supervisor
Now you can cd to /deploy/labs/ and deploy whatever you want
Example:
minotaur lab deploy mesosmaster -e bdoss-dev -d test -r us-east-1 -z us-east-1a
Creating new stack 'mesos-master-test-bdoss-dev-us-east-1-us-east-1a'...
Stack deployed.
this will spin up a mesos master node in “testing” deployment.
awsinfo
Supervisor has a built-in “awsinfo” command, which relies on AWS API and provides brief info about running machines. It is also capable of searching through that info.
Usage example
awsinfo
– will display brief info about all nodes running in AWS
root@supervisor:/deploy# awsinfo
Cloud: bdoss/us-east-1
Name Instance ID Instance Type Instance State Private IP Public IP
---- ----------- ------------- -------------- ---------- ---------
nat.bdoss-dev i-c46a0b2a m1.small running 10.0.2.94 54.86.153.142
bastion.bdoss-dev i-3faa69de m1.small running 10.0.0.207 None
mesos-master.test.bdoss-dev i-e80ddc09 m1.small terminated None None
mesos-slave.test.bdoss-dev i-e00ddc01 m1.small terminated None None
awsinfo mesos-master
– will display info about all mesos-master nodes running in AWS.
root@supervisor:/deploy/labs# awsinfo mesos-master
Cloud: bdoss/us-east-1
Name Instance ID Instance Type Instance State Private IP Public IP
---- ----------- ------------- -------------- ---------- ---------
mesos-master.test.bdoss-dev i-e80ddc09 m1.small terminated None None
awsinfo 10.0.2
– match a private/public subnet
root@supervisor:/deploy/labs# awsinfo 10.0.2
Cloud: bdoss/us-east-1
Name Instance ID Instance Type Instance State Private IP Public IP
---- ----------- ------------- -------------- ---------- ---------
nat.bdoss-dev i-c46a0b2a m1.small running 10.0.2.94 54.86.153.142
mesos-master.test.bdoss-dev i-e96ebd08 m1.small running 10.0.2.170 54.172.160.254
Vagrant
If you can’t use Docker directly for some reason, there’s a Vagrant wrapper VM for it.
Before doing anything with Vagrant, complete the above steps for Docker, but don’t execute up.sh script
Just cd into vagrant directory and exec vagrant up
, then vagrant ssh
(nothing special here yet).
When you will exec vagrant ssh
, docker container build process will spawn up immediately, so wait a bit and let it complete.
Now you’re inside a Docker container nested in Vagrant VM and can proceed with deployment in the same manner as it’s described for docker.
All the following vagrant ssh
‘s will spawn Docker container almost immediately.
Once you are inside of the supervisor image, the minotaur.py
script may be used to provision an environment and labs. The rest of this readme assumes that the script is executed from within the supervisor container.
Minotaur Commands
List Infrastructure Components
root@supervisor:/deploy# ./minotaur.py infrastructure list
Available deployments are: ['bastion', 'iampolicies', 'iamusertogroupadditions', 'nat', 'sns', 'subnet', 'vpc']
Print Infrastructure Component Usage
root@supervisor:/deploy# ./minotaur.py infrastructure deploy bastion -h
usage: minotaur.py infrastructure deploy bastion [-h] -e ENVIRONMENT -r REGION
-z AVAILABILITY_ZONE
[-i INSTANCE_TYPE]
optional arguments:
-h, --help show this help message and exit
-e ENVIRONMENT, --environment ENVIRONMENT
CloudFormation environment to deploy to
-r REGION, --region REGION
Geographic area to deploy to
-z AVAILABILITY_ZONE, --availability-zone AVAILABILITY_ZONE
Isolated location to deploy to
-i INSTANCE_TYPE, --instance-type INSTANCE_TYPE
AWS EC2 instance type to deploy
Deploy Infrastructure Component
In this example, the bdoss-dev
bastion already existed, so the CloudFormation stack was updated with the current template.
root@supervisor:/deploy# ./minotaur.py infrastructure deploy bastion -e bdoss-dev -r us-east-1 -z -us-east-1a
Template successfully validated.
Updating existing 'bastion-bdoss-dev-us-east-1-us-east-1a' stack...
Stack updated.
List Labs
List all supported labs.
root@supervisor:/deploy# ./minotaur.py lab list
Available deployments are: ['clouderahadoop', 'gokafkaconsumer', 'gokafkaproducer', 'kafka', 'mesosmaster', 'mesosslave', 'zookeeper']
Print Lab Usage
Print the kafka lab usage.
root@supervisor:/deploy# ./minotaur.py lab deploy kafka -h
usage: minotaur.py lab deploy kafka [-h] -e ENVIRONMENT -d DEPLOYMENT -r
REGION -z AVAILABILITY_ZONE [-n NUM_NODES]
[-i INSTANCE_TYPE] [-v ZK_VERSION]
[-k KAFKA_URL]
optional arguments:
-h, --help show this help message and exit
-e ENVIRONMENT, --environment ENVIRONMENT
CloudFormation environment to deploy to
-d DEPLOYMENT, --deployment DEPLOYMENT
Unique name for the deployment
-r REGION, --region REGION
Geographic area to deploy to
-z AVAILABILITY_ZONE, --availability-zone AVAILABILITY_ZONE
Isolated location to deploy to
-n NUM_NODES, --num-nodes NUM_NODES
Number of instances to deploy
-i INSTANCE_TYPE, --instance-type INSTANCE_TYPE
AWS EC2 instance type to deploy
-v ZK_VERSION, --zk-version ZK_VERSION
The Zookeeper version to deploy
-k KAFKA_URL, --kafka-url KAFKA_URL
The Kafka URL
Deploy Lab
Deploy a 3-broker Kafka cluster.
root@supervisor:/deploy# ./minotaur.py lab deploy kafka -e bdoss-dev -d kafka-example -r us-east-1 -z us-east-1a -n 3 -i m1.small
Template successfully validated.
Creating new 'kafka-bdoss-dev-kafka-example-us-east-1-us-east-1a' stack...
Stack deployed.
Ideas and goals behind the Go Kafka Client
I think a bunch of folks have heard already that B.D.O.S.S. was working on a new Apache Kafka Client For Go. Go Kafka Client was open sourced last Friday. Today we are starting the release of Minotaur which is our lab environment for Apache Zookeeper, Apache Mesos, Apache Cassandra, Apache Kafka, Apache Hadoop and our new Go Kafka Client.
To get started using the consumer client check out our example code and its property file.
Ideas and goals behind the Go Kafka Client:
1) Partition Ownership
2) Fetch Management
3) Work Management
4) Offset Management
func main() { config, consumerIdPattern, topic, numConsumers, graphiteConnect, graphiteFlushInterval := resolveConfig() startMetrics(graphiteConnect, graphiteFlushInterval) ctrlc := make(chan os.Signal, 1) signal.Notify(ctrlc, os.Interrupt) consumers := make([]*kafkaClient.Consumer, numConsumers) for i := 0; i < numConsumers; i++ { consumers[i] = startNewConsumer(*config, topic, consumerIdPattern, i) time.Sleep(10 * time.Second) } <-ctrlc fmt.Println("Shutdown triggered, closing all alive consumers") for _, consumer := range consumers { <-consumer.Close() } fmt.Println("Successfully shut down all consumers") } func startMetrics(graphiteConnect string, graphiteFlushInterval time.Duration) { addr, err := net.ResolveTCPAddr("tcp", graphiteConnect) if err != nil { panic(err) } go metrics.GraphiteWithConfig(metrics.GraphiteConfig{ Addr: addr, Registry: metrics.DefaultRegistry, FlushInterval: graphiteFlushInterval, DurationUnit: time.Second, Prefix: "metrics", Percentiles: []float64{0.5, 0.75, 0.95, 0.99, 0.999}, }) } func startNewConsumer(config kafkaClient.ConsumerConfig, topic string, consumerIdPattern string, consumerIndex int) *kafkaClient.Consumer { config.Consumerid = fmt.Sprintf(consumerIdPattern, consumerIndex) config.Strategy = GetStrategy(config.Consumerid) config.WorkerFailureCallback = FailedCallback config.WorkerFailedAttemptCallback = FailedAttemptCallback consumer := kafkaClient.NewConsumer(&config) topics := map[string]int {topic : config.NumConsumerFetchers} go func() { consumer.StartStatic(topics) }() return consumer } func GetStrategy(consumerId string) func(*kafkaClient.Worker, *kafkaClient.Message, kafkaClient.TaskId) kafkaClient.WorkerResult { consumeRate := metrics.NewRegisteredMeter(fmt.Sprintf("%s-ConsumeRate", consumerId), metrics.DefaultRegistry) return func(_ *kafkaClient.Worker, msg *kafkaClient.Message, id kafkaClient.TaskId) kafkaClient.WorkerResult { kafkaClient.Tracef("main", "Got a message: %s", string(msg.Value)) consumeRate.Mark(1) return kafkaClient.NewSuccessfulResult(id) } } func FailedCallback(wm *kafkaClient.WorkerManager) kafkaClient.FailedDecision { kafkaClient.Info("main", "Failed callback") return kafkaClient.DoNotCommitOffsetAndStop } func FailedAttemptCallback(task *kafkaClient.Task, result kafkaClient.WorkerResult) kafkaClient.FailedDecision { kafkaClient.Info("main", "Failed attempt") return kafkaClient.CommitOffsetAndContinue }
Plans moving forward with the Go Kafka Client:
Ideas and goals behind Minotaur:
Plans moving forward with Minotaur:
Mesosphere’s new data center mother brain will blow your mind
Mesosphere has been making a name for itself in the the world of data centers and cloud computing since 2013 with its distributed-system smarts and various introductions of open-source technologies, each designed to tackle the challenges of running tons of workloads across multiple machines. On Monday, the startup plans to announce that its much-anticipated data center operating system — the culmination of its many technologies — has been released as a private beta and will be available to the public in early 2015.
As part of the new operating system’s launch, [company]Mesosphere[/company] also plans to announce that it has raised a $36 million Series B investment round, which brings its total funding to $50 million. Khosla Ventures, a new investor, drove the financing along with Andreessen Horowitz, Fuel Capital, SV Angel and other unnamed entities.
Mesosphere’s new data center operating system, dubbed DCOS, tackles the complexity behind trying to…
View original post 1,120 more words
What’s coming in Apache Kafka 0.8.2
I am very excited to tell you about the forthcoming 0.8.2 release of Apache Kafka. Kafka is a fault-tolerant, low-latency, high-throughput distributed messaging system used in data pipelines at several companies. Kafka became a top-level Apache project in 2012 and was originally created at LinkedIn, where it forms a critical part of LinkedIn’s infrastructure and transmits data to all systems and applications. The project is currently under active development from a diverse group of contributors.
Since there are many new features in 0.8.2, we released 0.8.2-beta. The final release will be done when 0.8.2 is stable.
Here is a quick overview of the notable work in this release.
New features
New producer
The JVM clients that Kafka ships haven’t changed much since Kafka was originally built. Over time, we have realized some of the limitations and problems that came both from the design of these clients and…
View original post 1,444 more words