Archive

Author Archive

Syslog Producer For Apache Kafka

January 16, 2015 Leave a comment

The Go Kafka Client (sponsored by CrowdStrike and developed by Big Data Open Source Security LLC) has had introduced to it a Syslog producer.

There are a few ways to get started with the Syslog Producer

Docker Image, From Source, Vagrant

Checkout the README for more detail https://github.com/stealthly/go_kafka_client/tree/master/syslog

The syslog producer operates in two different modes

Both modes produce to Apache Kafka.

Raw data

In this case you are basically just funneling the data to a Kafka topic without any alterations. The bytes into the server are the bytes written to the Kafka topic.

As a ProtoBuff with metadata

This metadata  is set when the server starts and by the server before sending to Kafka. This ProtBuff ends up being really useful downstream.

The fields for the LogLine ProtoBuf:

  • string line
  • string source
  • repeated tag
  • int64 logtypeid
  • repeated int64 timings

line - This is the log data that is coming into the sys log server. This is unaltered from what was received. It is a required field and set by the syslog server.

source - This has whatever meaning you want to give it. It is meant to be a specific representation of the instance the data is coming from (e.g. i-59a059a8). It is an optional field set by passing in the value on the command line.

tag – This is a structure of key/value pairs. You can add however many key/value pairs you want when starting the server. e.g. (dc=dc1,floor=3,aisle=4,rack2,u=3). It is an optional field set by passing in the value on the command line.

logtypeid - This field (which defaults to 0) can be used to conditionalize the parsing of the line field in your Kafka Consumer. (e.g. 1=rfc5424, 2=rfc5424, 3=gosyslog, 4=etc). It is an optional field set by pass the value on the command line.

timings – This field will be set twice, once when we receive the message and once when we produce to Kafka. The purpose of this field is so that as it flows through the data pipeline (other producers and consumers) the timings can continuously be added to. If your data pipelines are dynamic you can use the tag field when reading/writing to know which timing in the list was for which component. This allows for an end to end latency analysis of the message and the processing and transmission steps at each point in the data pipeline.

You can check out more details about the command line options in the README

/*******************************************
  Joe Stein
  Founder, Principal Consultant
  Big Data Open Source Security LLC
  Twitter: @allthingshadoop
********************************************/

Multi Datacenter Replication with Apache Kafka

December 29, 2014 Leave a comment

When requiring multi datacenter replication with Apache Kafka folks most often rely on the project’s MirrorMaker tool. This tool works great but was designed for a specific set of use cases requiring more work to get it working for all needs. We found a common need in the community for additional use cases so we built a new MirrorMaker tool using our Go Kafka Client to support these needs and more!

The Go Kafka Mirror Maker supports:

  • No JVM required when consuming from source cluster and producing to destination cluster.
  • Guarantees of at least once mirroring of data from source to destination.
  • Preservation of ordering from source partition to destination partition.
  • Ability to prefix destination topic to avoid collisions of topic names between clusters.
  • Everything else the existing MirrorMaker tool supports.

Usage:

go run mirror_maker.go --consumer.config sourceCluster1Consumer.config --consumer.config sourceCluster2Consumer.config --num.streams 2 --producer.config targetClusterProducer.config --whitelist=".*"

Configuration parameters:

--whitelist, --blacklist – whitelist or blacklist of topics to mirror. Exactly one whitelist or blacklist is allowed, e.g. passing both whitelist and blacklist will cause panic. This parameter is required.

--consumer.config – consumer property files to consume from a source cluster. You can pass multiple of those like this: --consumer.config sourceCluster1Consumer.config --consumer.config sourceCluster2Consumer.config. At least one consumer config is required.

--producer.config – property file to configure embedded producers. This parameter is required.

--num.producers – number of producer instances. This can be used to increase throughput. This helps because each producer’s requests are effectively handled by a single thread on the receiving Kafka broker. i.e., even if you have multiple consumption streams (see next section), the throughput can be bottle-necked at handling stage of the mirror maker’s producer request. Defaults to 1.

--num.streams – used to specify the number of mirror consumer goroutines to create. If the number of consumption streams is higher than number of available partitions then some of the mirroring routines will be idle by virtue of the consumer rebalancing algorithm (if they do not end up owning any partitions for consumption). Defaults to 1.

--preserve.partitions – flag to preserve partition number. E.g. if message was read from partition 5 it’ll be written to partition 5. Note that this can affect performance. Defaults to false.

--preserve.order – flag to preserve message order. E.g. message sequence 1, 2, 3, 4, 5 will remain 1, 2, 3, 4, 5 in destination topic. Note that this can affect performance. Defaults to false.

--prefix – destination topic prefix. E.g. if message was read from topic “test” and prefix is “dc1_” it’ll be written to topic “dc1_test”. Defaults to empty string.

--queue.size – number of messages that are buffered between the consumer and producer. Defaults to 10000.

The MirrorMaker code and tests are also a great starting point for learning how to produce and consume data with Kafka using the Go programming language.

Big Data Open Source Security LLC provides professional services and product solutions for the collection, storage, transfer, real-time analytics, batch processing and reporting for complex data streams, data sets and distributed systems. BDOSS is all about the “glue” and helping companies to not only figure out what Big Data Infrastructure Components to use but also how to change their existing (or build new) systems to work with them. The focus of our services and solutions are end to end including architecture, development, implementation, documentation, training and support for complex data streams, data sets and distributed systems using Open Source Software.

/*******************************************
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 Twitter: @allthingshadoop
********************************************/

Open Source Cloud Formation with Minotaur for Mesos, Kafka and Hadoop

December 17, 2014 Leave a comment

Today I am happy to announce “Minotaur” which is our Open Source AWS based infrastructure for managing big data open source projects including (but not limited too): Apache Kafka, Apache Mesos and Cloudera’s Distribution of Hadoop. Minotaur is based on AWS Cloud Formation.

The following labs are currently supported:

Supervisor

Supervisor is a Docker-based image that contains all the necessary software to manage nodes/resources in AWS.

Supervisor set-up

  • clone this repo to repo_dir
  • cd to repo_dir/supervisor folder

Before trying to build docker image, you must put some configuration files under config directory:

a) aws_config

This file is just a regular aws-cli config, you must paste your secret and access keys, provided by Amazon in it:

[default]
output = json
region = us-east-1
aws_access_key_id = SECRET_KEY
aws_secret_access_key = ACCESS_KEY

Do not add or remove any extra whitespaces (especially before and after “=” sign in keys)

b) private.key

This is your private SSH key, public part of which is registered on Bastion host

c) environment.key

This is a shared key for all nodes in environment Supervisor is supposed to manage.

d) ssh_config

This is a regular SSH config file, you have to change your_username only (this is the one registered on Bastion).

BASTION_IP is handled dynamically when container is built.

# BDOSS environment
Host 10.0.2.*
    IdentityFile ~/.ssh/environment.key 
    User ubuntu
    ProxyCommand  ssh -i ~/.ssh/private.key your_username@BASTION_IP nc %h %p
Host 10.0.*.*
    IdentityFile ~/.ssh/environment.key 
    User ubuntu
    ProxyCommand  ssh -i ~/.ssh/private.key your_username@BASTION_IP nc %h %p
  • exec up.sh:

If this is the first time you’re launching supervisor – it will take some time to build.

Subsequent up’s will take seconds.

Using supervisor

Now you can cd to /deploy/labs/ and deploy whatever you want

Example:


minotaur lab deploy mesosmaster -e bdoss-dev -d test -r us-east-1 -z us-east-1a
Creating new stack 'mesos-master-test-bdoss-dev-us-east-1-us-east-1a'...
Stack deployed.

this will spin up a mesos master node in “testing” deployment.

awsinfo

Supervisor has a built-in “awsinfo” command, which relies on AWS API and provides brief info about running machines. It is also capable of searching through that info.

Usage example

awsinfo – will display brief info about all nodes running in AWS

root@supervisor:/deploy# awsinfo
Cloud:  bdoss/us-east-1
Name                                Instance ID  Instance Type  Instance State  Private IP      Public IP      
----                                -----------  -------------  --------------  ----------      ---------      
nat.bdoss-dev                       i-c46a0b2a   m1.small       running         10.0.2.94       54.86.153.142  
bastion.bdoss-dev                   i-3faa69de   m1.small       running         10.0.0.207      None           
mesos-master.test.bdoss-dev         i-e80ddc09   m1.small       terminated      None            None           
mesos-slave.test.bdoss-dev          i-e00ddc01   m1.small       terminated      None            None           

awsinfo mesos-master – will display info about all mesos-master nodes running in AWS.

root@supervisor:/deploy/labs# awsinfo mesos-master
Cloud:  bdoss/us-east-1
Name                                Instance ID  Instance Type  Instance State  Private IP      Public IP      
----                                -----------  -------------  --------------  ----------      ---------      
mesos-master.test.bdoss-dev         i-e80ddc09   m1.small       terminated      None            None           

awsinfo 10.0.2 – match a private/public subnet

root@supervisor:/deploy/labs# awsinfo 10.0.2
Cloud:  bdoss/us-east-1
Name                                Instance ID  Instance Type  Instance State  Private IP      Public IP      
----                                -----------  -------------  --------------  ----------      ---------      
nat.bdoss-dev                       i-c46a0b2a   m1.small       running         10.0.2.94       54.86.153.142  
mesos-master.test.bdoss-dev         i-e96ebd08   m1.small       running         10.0.2.170      54.172.160.254 

Vagrant

If you can’t use Docker directly for some reason, there’s a Vagrant wrapper VM for it.

Before doing anything with Vagrant, complete the above steps for Docker, but don’t execute up.sh script

Just cd into vagrant directory and exec vagrant up, then vagrant ssh (nothing special here yet).

When you will exec vagrant ssh, docker container build process will spawn up immediately, so wait a bit and let it complete.

Now you’re inside a Docker container nested in Vagrant VM and can proceed with deployment in the same manner as it’s described for docker.

All the following vagrant ssh‘s will spawn Docker container almost immediately.

Once you are inside of the supervisor image, the minotaur.py script may be used to provision an environment and labs. The rest of this readme assumes that the script is executed from within the supervisor container.

Minotaur Commands

List Infrastructure Components

root@supervisor:/deploy# ./minotaur.py infrastructure list
Available deployments are: ['bastion', 'iampolicies', 'iamusertogroupadditions', 'nat', 'sns', 'subnet', 'vpc']

Print Infrastructure Component Usage

root@supervisor:/deploy# ./minotaur.py infrastructure deploy bastion -h
usage: minotaur.py infrastructure deploy bastion [-h] -e ENVIRONMENT -r REGION
                                                 -z AVAILABILITY_ZONE
                                                 [-i INSTANCE_TYPE]

optional arguments:
  -h, --help            show this help message and exit
  -e ENVIRONMENT, --environment ENVIRONMENT
                        CloudFormation environment to deploy to
  -r REGION, --region REGION
                        Geographic area to deploy to
  -z AVAILABILITY_ZONE, --availability-zone AVAILABILITY_ZONE
                        Isolated location to deploy to
  -i INSTANCE_TYPE, --instance-type INSTANCE_TYPE
                        AWS EC2 instance type to deploy

Deploy Infrastructure Component

In this example, the bdoss-dev bastion already existed, so the CloudFormation stack was updated with the current template.

root@supervisor:/deploy# ./minotaur.py infrastructure deploy bastion -e bdoss-dev -r us-east-1 -z -us-east-1a
Template successfully validated.
Updating existing 'bastion-bdoss-dev-us-east-1-us-east-1a' stack...
Stack updated.

List Labs

List all supported labs.

root@supervisor:/deploy# ./minotaur.py lab list
Available deployments are: ['clouderahadoop', 'gokafkaconsumer', 'gokafkaproducer', 'kafka', 'mesosmaster', 'mesosslave', 'zookeeper']

Print Lab Usage

Print the kafka lab usage.

root@supervisor:/deploy# ./minotaur.py lab deploy kafka -h
usage: minotaur.py lab deploy kafka [-h] -e ENVIRONMENT -d DEPLOYMENT -r
                                    REGION -z AVAILABILITY_ZONE [-n NUM_NODES]
                                    [-i INSTANCE_TYPE] [-v ZK_VERSION]
                                    [-k KAFKA_URL]

optional arguments:
  -h, --help            show this help message and exit
  -e ENVIRONMENT, --environment ENVIRONMENT
                        CloudFormation environment to deploy to
  -d DEPLOYMENT, --deployment DEPLOYMENT
                        Unique name for the deployment
  -r REGION, --region REGION
                        Geographic area to deploy to
  -z AVAILABILITY_ZONE, --availability-zone AVAILABILITY_ZONE
                        Isolated location to deploy to
  -n NUM_NODES, --num-nodes NUM_NODES
                        Number of instances to deploy
  -i INSTANCE_TYPE, --instance-type INSTANCE_TYPE
                        AWS EC2 instance type to deploy
  -v ZK_VERSION, --zk-version ZK_VERSION
                        The Zookeeper version to deploy
  -k KAFKA_URL, --kafka-url KAFKA_URL
                        The Kafka URL

Deploy Lab

Deploy a 3-broker Kafka cluster.

root@supervisor:/deploy# ./minotaur.py lab deploy kafka -e bdoss-dev -d kafka-example -r us-east-1 -z us-east-1a -n 3 -i m1.small    
Template successfully validated.
Creating new 'kafka-bdoss-dev-kafka-example-us-east-1-us-east-1a' stack...
Stack deployed.

/*******************************************
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 Twitter: @allthingshadoop
********************************************/

								

Ideas and goals behind the Go Kafka Client

December 10, 2014 Leave a comment

I think a bunch of folks have heard already that B.D.O.S.S. was working on a new Apache Kafka Client For Go. Go Kafka Client was open sourced last Friday. Today we are starting the release of Minotaur which is our lab environment for Apache Zookeeper, Apache Mesos, Apache CassandraApache Kafka, Apache Hadoop and our new Go Kafka Client.

To get started using the consumer client check out our example code and its property file.

Ideas and goals behind the Go Kafka Client:

 

1) Partition Ownership

We decided on implementing multiple strategies for this including static assignment. The concept of re-balancing is preserved but now there are a few different strategies to re-balancing and they can run at different times depending on what is going on (like a blue/green deploy is happening). For more on blue/green deployments check out this video with Jim Plush and Sean Berry from 2014 AWS re:Invent.
 

2) Fetch Management

This is what “fills up the reservoir” as I like to call it so the processing (either sequential or in batch) will always have data if there is data for it to have without making a network hop. The fetcher has to stay ahead here keeping the processing tap full (or if empty that is controlled) pulling the data for the Kafka partition(s) it is owning.
 

3) Work Management

For the Go consumer we currently only support “fan out” using go routines and channels. If you have ever used go this will be familiar to you if not you should drop everything and learn Go.
 

4) Offset Management

Our offset management is based on a per batch basis with each highest offset from the batch committed on a per partition basis.
 
func main() {
	config, consumerIdPattern, topic, numConsumers, graphiteConnect, graphiteFlushInterval := resolveConfig()
	startMetrics(graphiteConnect, graphiteFlushInterval)

	ctrlc := make(chan os.Signal, 1)
	signal.Notify(ctrlc, os.Interrupt)

	consumers := make([]*kafkaClient.Consumer, numConsumers)
	for i := 0; i < numConsumers; i++ {
		consumers[i] = startNewConsumer(*config, topic, consumerIdPattern, i)
		time.Sleep(10 * time.Second)
	}

	<-ctrlc
	fmt.Println("Shutdown triggered, closing all alive consumers")
	for _, consumer := range consumers {
		<-consumer.Close()
	}
	fmt.Println("Successfully shut down all consumers")
}

func startMetrics(graphiteConnect string, graphiteFlushInterval time.Duration) {
	addr, err := net.ResolveTCPAddr("tcp", graphiteConnect)
	if err != nil {
		panic(err)
	}
	go metrics.GraphiteWithConfig(metrics.GraphiteConfig{
		Addr:          addr,
		Registry:      metrics.DefaultRegistry,
		FlushInterval: graphiteFlushInterval,
		DurationUnit:  time.Second,
		Prefix:        "metrics",
		Percentiles:   []float64{0.5, 0.75, 0.95, 0.99, 0.999},
	})
}

func startNewConsumer(config kafkaClient.ConsumerConfig, topic string, consumerIdPattern string, consumerIndex int) *kafkaClient.Consumer {
	config.Consumerid = fmt.Sprintf(consumerIdPattern, consumerIndex)
	config.Strategy = GetStrategy(config.Consumerid)
	config.WorkerFailureCallback = FailedCallback
	config.WorkerFailedAttemptCallback = FailedAttemptCallback
	consumer := kafkaClient.NewConsumer(&config)
	topics := map[string]int {topic : config.NumConsumerFetchers}
	go func() {
		consumer.StartStatic(topics)
	}()
	return consumer
}

func GetStrategy(consumerId string) func(*kafkaClient.Worker, *kafkaClient.Message, kafkaClient.TaskId) kafkaClient.WorkerResult {
	consumeRate := metrics.NewRegisteredMeter(fmt.Sprintf("%s-ConsumeRate", consumerId), metrics.DefaultRegistry)
	return func(_ *kafkaClient.Worker, msg *kafkaClient.Message, id kafkaClient.TaskId) kafkaClient.WorkerResult {
		kafkaClient.Tracef("main", "Got a message: %s", string(msg.Value))
		consumeRate.Mark(1)

		return kafkaClient.NewSuccessfulResult(id)
	}
}

func FailedCallback(wm *kafkaClient.WorkerManager) kafkaClient.FailedDecision {
	kafkaClient.Info("main", "Failed callback")

	return kafkaClient.DoNotCommitOffsetAndStop
}

func FailedAttemptCallback(task *kafkaClient.Task, result kafkaClient.WorkerResult) kafkaClient.FailedDecision {
	kafkaClient.Info("main", "Failed attempt")

	return kafkaClient.CommitOffsetAndContinue
}
We set our mailing list to be kafka-clients@googlegroups.com this is the central place for client library discussions for the Apache Kafka community.
 

Plans moving forward with the Go Kafka Client:

1) Build up suite of integration tests.
2) Stabilize the API nomenclature with the JVM Kafka Client for consistency.
3) Run integration tests and create baseline regression points.
4) Re-work changes from feedback of issues found and from our own customer support.
5) Push to production and continue on going support.
 
 

Ideas and goals behind Minotaur:

Provide a controlled and isolated environment through scripts for regression of versions and benchmarks against instance sizing. This comes in 3 parts: 1) Supervisor 2) Infrastructure 3) Labs. The Labs portion is broker down with both chef cook books and python wrappers for cloud formation templates for each of the following systems:
 

Plans moving forward with Minotaur:

1) Upstream our Cloud Formation Scripts (this is going to take some more work detangling Dexter (the name for our private repo of Minotaur).
2) Move the work we do with Apache Cassandra into the project.
3) Start support of Puppet in addition to the work we did with Chef.
4) Get the Docker supervisor console pushed upstream through Bastion hosts.
5) Cut our labs over to using Minotaur.
 
/*******************************************
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 Twitter: @allthingshadoop
********************************************/

Mesosphere’s new data center mother brain will blow your mind

December 8, 2014 Leave a comment

Originally posted on Gigaom:

Mesosphere has been making a name for itself in the the world of data centers and cloud computing since 2013 with its distributed-system smarts and various introductions of open-source technologies, each designed to tackle the challenges of running tons of workloads across multiple machines. On Monday, the startup plans to announce that its much-anticipated data center operating system — the culmination of its many technologies — has been released as a private beta and will be available to the public in early 2015.

As part of the new operating system’s launch, [company]Mesosphere[/company] also plans to announce that it has raised a $36 million Series B investment round, which brings its total funding to $50 million. Khosla Ventures, a new investor, drove the financing along with Andreessen Horowitz, Fuel Capital, SV Angel and other unnamed entities.

Mesosphere’s new data center operating system, dubbed DCOS, tackles the complexity behind trying to…

View original 1,120 more words

Categories: Apache Mesos

What’s coming in Apache Kafka 0.8.2

December 7, 2014 Leave a comment

Originally posted on Confluent:

I am very excited to tell you about the forthcoming 0.8.2 release of Apache Kafka. Kafka is a fault-tolerant, low-latency, high-throughput distributed messaging system used in data pipelines at several companies. Kafka became a top-level Apache project in 2012 and was originally created at LinkedIn, where it forms a critical part of LinkedIn’s infrastructure and transmits data to all systems and applications. The project is currently under active development from a diverse group of contributors.

Since there are many new features in 0.8.2, we released 0.8.2-beta. The final release will be done when 0.8.2 is stable.

Here is a quick overview of the notable work in this release.

New features

New producer

The JVM clients that Kafka ships haven’t changed much since Kafka was originally built. Over time, we have realized some of the limitations and problems that came both from the design of these clients and…

View original 1,444 more words

Categories: Kafka

Resource scheduling and task launching with Apache Mesos and Apache Aurora at Twitter

October 26, 2014 Leave a comment

Episode # 23 of the podcast was a talk with Bill Farner

Bill explained how Twitter, using Apache Mesos and Apache Aurora, gets more for their money for the hardware and saves engineering time (both development and operations) by utilizing fine grained resources scheduling across their infrastructure. Bill talked a bit how the power of what he saw and experienced at Google with Borg is how they wanted to run things at Twitter and what they built Aurora for.  Now after years of running in production at Twitter, Aurora is open source, part of the Apache foundation and available for use. Lots of new use cases that they didn’t see coming have become very powerful for their teams and Bill went into more detail about that too.

Bill also talked about the type of instrumentation that was done with features in Aurora to get to a place where now all new systems and almost all legacy systems at Twitter are run on top of Aurora. Bill went into detail about how that works in regards to Twitter’s cache and how the SLA features of Aurora make this a reality. Aurora is amazing providing end users (everyone from engineers to analysts) the ability to have full access to the potential resources of their hardware clusters. Aurora provides features like quotas and preemption so that any user can be provided the access to the compute resources of the entire hardware infrastructure without worry of abuse to hog resources and keep production always as the priority.

Apache Mesos abstracts CPU, memory, storage, and other compute resources away from machines (physical or virtual), enabling fault-tolerant and elastic distributed systems to easily be built and run effectively. Mesos is built using the same principles as the Linux kernel, only at a different level of abstraction. The Mesos kernel runs on every machine and provides applications (e.g., Hadoop, Spark, Kafka, Elastic Search) with API’s for resource management and scheduling across entire datacenter and cloud environments.

Apache Aurora is a Mesos framework.  A Mesos frameworks is a scheduler of resources and launcher of tasks. Aurora provides a Job abstraction consisting of a Task template and instructions for creating near-identical replicas of that Task. Typically a Task is a single Process corresponding to a single command line, such as python2.6 my_script.py. However, sometimes you must colocate separate Processes together within a single Task, which runs within a single container and chroot, often referred to as a “sandbox”. For example, if you run multiple cooperating agents together such as logrotate, installer, and master or slave processes. Thermos provides a Process abstraction under the Mesos Tasks.

To use and get up to speed on Aurora, you should look the docs in this directory in this order:

  1. How to deploy Aurora or, how to install Aurora on virtual machines on your private machine (the Tutorial uses the virtual machine approach).
  2. As a user, get started quickly with a Tutorial.
  3. For an overview of Aurora’s process flow under the hood, see the User Guide.
  4. To learn how to write a configuration file, look at our Configuration Tutorial. From there, look at the Aurora + Thermos Reference.
  5. Then read up on the Aurora Command Line Client.
  6. Find out general information and useful tips about how Aurora does Resource Isolation.

For some more great background on Mesos and Aurora please check out these three videos.

Datacenter Management with Apache Mesos

An intro video to Apache Aurora

Past, Present, Future of Apache Aurora

 

To hear everything that Bill had to say please subscribe to the podcast.

 

/*******************************************
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 Twitter: @allthingshadoop
********************************************/

 

Follow

Get every new post delivered to your Inbox.

Join 55 other followers