Home
> Apache Mesos, Hadoop, Kafka, Open Source Projects, Python, Security, Tools, Zookeeper > Ideas and goals behind the Go Kafka Client
Ideas and goals behind the Go Kafka Client
I think a bunch of folks have heard already that B.D.O.S.S. was working on a new Apache Kafka Client For Go. Go Kafka Client was open sourced last Friday. Today we are starting the release of Minotaur which is our lab environment for Apache Zookeeper, Apache Mesos, Apache Cassandra, Apache Kafka, Apache Hadoop and our new Go Kafka Client.
To get started using the consumer client check out our example code and its property file.
Ideas and goals behind the Go Kafka Client:
1) Partition Ownership
We decided on implementing multiple strategies for this including static assignment. The concept of re-balancing is preserved but now there are a few different strategies to re-balancing and they can run at different times depending on what is going on (like a blue/green deploy is happening). For more on blue/green deployments check out this video with Jim Plush and Sean Berry from 2014 AWS re:Invent.
2) Fetch Management
This is what “fills up the reservoir” as I like to call it so the processing (either sequential or in batch) will always have data if there is data for it to have without making a network hop. The fetcher has to stay ahead here keeping the processing tap full (or if empty that is controlled) pulling the data for the Kafka partition(s) it is owning.
3) Work Management
For the Go consumer we currently only support “fan out” using go routines and channels. If you have ever used go this will be familiar to you if not you should drop everything and learn Go.
4) Offset Management
Our offset management is based on a per batch basis with each highest offset from the batch committed on a per partition basis.
func main() { config, consumerIdPattern, topic, numConsumers, graphiteConnect, graphiteFlushInterval := resolveConfig() startMetrics(graphiteConnect, graphiteFlushInterval) ctrlc := make(chan os.Signal, 1) signal.Notify(ctrlc, os.Interrupt) consumers := make([]*kafkaClient.Consumer, numConsumers) for i := 0; i < numConsumers; i++ { consumers[i] = startNewConsumer(*config, topic, consumerIdPattern, i) time.Sleep(10 * time.Second) } <-ctrlc fmt.Println("Shutdown triggered, closing all alive consumers") for _, consumer := range consumers { <-consumer.Close() } fmt.Println("Successfully shut down all consumers") } func startMetrics(graphiteConnect string, graphiteFlushInterval time.Duration) { addr, err := net.ResolveTCPAddr("tcp", graphiteConnect) if err != nil { panic(err) } go metrics.GraphiteWithConfig(metrics.GraphiteConfig{ Addr: addr, Registry: metrics.DefaultRegistry, FlushInterval: graphiteFlushInterval, DurationUnit: time.Second, Prefix: "metrics", Percentiles: []float64{0.5, 0.75, 0.95, 0.99, 0.999}, }) } func startNewConsumer(config kafkaClient.ConsumerConfig, topic string, consumerIdPattern string, consumerIndex int) *kafkaClient.Consumer { config.Consumerid = fmt.Sprintf(consumerIdPattern, consumerIndex) config.Strategy = GetStrategy(config.Consumerid) config.WorkerFailureCallback = FailedCallback config.WorkerFailedAttemptCallback = FailedAttemptCallback consumer := kafkaClient.NewConsumer(&config) topics := map[string]int {topic : config.NumConsumerFetchers} go func() { consumer.StartStatic(topics) }() return consumer } func GetStrategy(consumerId string) func(*kafkaClient.Worker, *kafkaClient.Message, kafkaClient.TaskId) kafkaClient.WorkerResult { consumeRate := metrics.NewRegisteredMeter(fmt.Sprintf("%s-ConsumeRate", consumerId), metrics.DefaultRegistry) return func(_ *kafkaClient.Worker, msg *kafkaClient.Message, id kafkaClient.TaskId) kafkaClient.WorkerResult { kafkaClient.Tracef("main", "Got a message: %s", string(msg.Value)) consumeRate.Mark(1) return kafkaClient.NewSuccessfulResult(id) } } func FailedCallback(wm *kafkaClient.WorkerManager) kafkaClient.FailedDecision { kafkaClient.Info("main", "Failed callback") return kafkaClient.DoNotCommitOffsetAndStop } func FailedAttemptCallback(task *kafkaClient.Task, result kafkaClient.WorkerResult) kafkaClient.FailedDecision { kafkaClient.Info("main", "Failed attempt") return kafkaClient.CommitOffsetAndContinue }
We set our mailing list to be kafka-clients@googlegroups.com this is the central place for client library discussions for the Apache Kafka community.
Plans moving forward with the Go Kafka Client:
1) Build up suite of integration tests.
2) Stabilize the API nomenclature with the JVM Kafka Client for consistency.
3) Run integration tests and create baseline regression points.
4) Re-work changes from feedback of issues found and from our own customer support.
5) Push to production and continue on going support.
Ideas and goals behind Minotaur:
Provide a controlled and isolated environment through scripts for regression of versions and benchmarks against instance sizing. This comes in 3 parts: 1) Supervisor 2) Infrastructure 3) Labs. The Labs portion is broker down with both chef cook books and python wrappers for cloud formation templates for each of the following systems:
Plans moving forward with Minotaur:
1) Upstream our Cloud Formation Scripts (this is going to take some more work detangling Dexter (the name for our private repo of Minotaur).
2) Move the work we do with Apache Cassandra into the project.
3) Start support of Puppet in addition to the work we did with Chef.
4) Get the Docker supervisor console pushed upstream through Bastion hosts.
5) Cut our labs over to using Minotaur.
/*******************************************
Joe Stein
Founder, Principal Consultant
Big Data Open Source Security LLC
Twitter: @allthingshadoop
********************************************/
Categories: Apache Mesos, Hadoop, Kafka, Open Source Projects, Python, Security, Tools, Zookeeper
Comments (0)
Trackbacks (0)
Leave a comment
Trackback