Archive for the ‘Security’ Category

Ideas and goals behind the Go Kafka Client

December 10, 2014 Leave a comment

I think a bunch of folks have heard already that B.D.O.S.S. was working on a new Apache Kafka Client For Go. Go Kafka Client was open sourced last Friday. Today we are starting the release of Minotaur which is our lab environment for Apache Zookeeper, Apache Mesos, Apache CassandraApache Kafka, Apache Hadoop and our new Go Kafka Client.

To get started using the consumer client check out our example code and its property file.

Ideas and goals behind the Go Kafka Client:


1) Partition Ownership

We decided on implementing multiple strategies for this including static assignment. The concept of re-balancing is preserved but now there are a few different strategies to re-balancing and they can run at different times depending on what is going on (like a blue/green deploy is happening). For more on blue/green deployments check out this video with Jim Plush and Sean Berry from 2014 AWS re:Invent.

2) Fetch Management

This is what “fills up the reservoir” as I like to call it so the processing (either sequential or in batch) will always have data if there is data for it to have without making a network hop. The fetcher has to stay ahead here keeping the processing tap full (or if empty that is controlled) pulling the data for the Kafka partition(s) it is owning.

3) Work Management

For the Go consumer we currently only support “fan out” using go routines and channels. If you have ever used go this will be familiar to you if not you should drop everything and learn Go.

4) Offset Management

Our offset management is based on a per batch basis with each highest offset from the batch committed on a per partition basis.
func main() {
	config, consumerIdPattern, topic, numConsumers, graphiteConnect, graphiteFlushInterval := resolveConfig()
	startMetrics(graphiteConnect, graphiteFlushInterval)

	ctrlc := make(chan os.Signal, 1)
	signal.Notify(ctrlc, os.Interrupt)

	consumers := make([]*kafkaClient.Consumer, numConsumers)
	for i := 0; i < numConsumers; i++ {
		consumers[i] = startNewConsumer(*config, topic, consumerIdPattern, i)
		time.Sleep(10 * time.Second)

	fmt.Println("Shutdown triggered, closing all alive consumers")
	for _, consumer := range consumers {
	fmt.Println("Successfully shut down all consumers")

func startMetrics(graphiteConnect string, graphiteFlushInterval time.Duration) {
	addr, err := net.ResolveTCPAddr("tcp", graphiteConnect)
	if err != nil {
	go metrics.GraphiteWithConfig(metrics.GraphiteConfig{
		Addr:          addr,
		Registry:      metrics.DefaultRegistry,
		FlushInterval: graphiteFlushInterval,
		DurationUnit:  time.Second,
		Prefix:        "metrics",
		Percentiles:   []float64{0.5, 0.75, 0.95, 0.99, 0.999},

func startNewConsumer(config kafkaClient.ConsumerConfig, topic string, consumerIdPattern string, consumerIndex int) *kafkaClient.Consumer {
	config.Consumerid = fmt.Sprintf(consumerIdPattern, consumerIndex)
	config.Strategy = GetStrategy(config.Consumerid)
	config.WorkerFailureCallback = FailedCallback
	config.WorkerFailedAttemptCallback = FailedAttemptCallback
	consumer := kafkaClient.NewConsumer(&config)
	topics := map[string]int {topic : config.NumConsumerFetchers}
	go func() {
	return consumer

func GetStrategy(consumerId string) func(*kafkaClient.Worker, *kafkaClient.Message, kafkaClient.TaskId) kafkaClient.WorkerResult {
	consumeRate := metrics.NewRegisteredMeter(fmt.Sprintf("%s-ConsumeRate", consumerId), metrics.DefaultRegistry)
	return func(_ *kafkaClient.Worker, msg *kafkaClient.Message, id kafkaClient.TaskId) kafkaClient.WorkerResult {
		kafkaClient.Tracef("main", "Got a message: %s", string(msg.Value))

		return kafkaClient.NewSuccessfulResult(id)

func FailedCallback(wm *kafkaClient.WorkerManager) kafkaClient.FailedDecision {
	kafkaClient.Info("main", "Failed callback")

	return kafkaClient.DoNotCommitOffsetAndStop

func FailedAttemptCallback(task *kafkaClient.Task, result kafkaClient.WorkerResult) kafkaClient.FailedDecision {
	kafkaClient.Info("main", "Failed attempt")

	return kafkaClient.CommitOffsetAndContinue
We set our mailing list to be this is the central place for client library discussions for the Apache Kafka community.

Plans moving forward with the Go Kafka Client:

1) Build up suite of integration tests.
2) Stabilize the API nomenclature with the JVM Kafka Client for consistency.
3) Run integration tests and create baseline regression points.
4) Re-work changes from feedback of issues found and from our own customer support.
5) Push to production and continue on going support.

Ideas and goals behind Minotaur:

Provide a controlled and isolated environment through scripts for regression of versions and benchmarks against instance sizing. This comes in 3 parts: 1) Supervisor 2) Infrastructure 3) Labs. The Labs portion is broker down with both chef cook books and python wrappers for cloud formation templates for each of the following systems:

Plans moving forward with Minotaur:

1) Upstream our Cloud Formation Scripts (this is going to take some more work detangling Dexter (the name for our private repo of Minotaur).
2) Move the work we do with Apache Cassandra into the project.
3) Start support of Puppet in addition to the work we did with Chef.
4) Get the Docker supervisor console pushed upstream through Bastion hosts.
5) Cut our labs over to using Minotaur.
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 Twitter: @allthingshadoop

XML to Avro Conversion

February 26, 2014 1 comment

We all know what XML is right?  Just in case not, no problem here is what it is all about.  


Now, what the computer really needs is the number five and some context around it. In XML you (human and computer) can see how it represents context to five. Now lets say instead you have a business XML document like FPML

<FpML xmlns="" xmlns:fpml="" xmlns:xsi="" version="4-4" xsi:schemaLocation=" ../fpml-main-4-4.xsd ../xmldsig-core-schema.xsd" xsi:type="RequestTradeConfirmation">
<!--  start of distinct  -->
<payerPartyReference href="party2"/>
<receiverPartyReference href="party1"/>
<calculationAgentPartyReference href="party1"/>
 populate credit support document with correct value 
<party id="party1">
<partyId>Party A</partyId>
<party id="party2">
<partyId>Party B</partyId>

That is a lot of extra unnecessary data points. Now lets look at this using Apache Avro.

With Avro, the context and the values are separated. This means the schema/structure of what the information is does not get stored or streamed over and over and over and over (and over) again.

The Avro schema is hashed. So the data structure only holds the value and the computer understands the fingerprint (the hash) of the schema and can retrieve the schema using the fingerprint.

0x d7a8fbb307d7809469ca9abcb0082e4f8d5651e46d3cdb762d02d0bf37c9e592

This type of implementation is pretty typical in the data space.

When you do this you can reduce your data between 20%-80%. When I tell folks this they immediately ask, “why such a large gap of unknowns”. The answer is because not every XML is created the same. But that is the problem because you are duplicating the information the computer needs to understand the data. XML is nice for humans to read, sure … but that is not optimized for the computer.

Here is a converter we are working on to help get folks off of XML and onto lower cost, open source systems. This allows you to keep parts of your systems (specifically the domain business code) using the XML and not having to be changed (risk mitigation) but store and stream the data with less overhead (optimize budget).

Joe Stein
Founder, Principal Consultant
Big Data Open Source Security LLC

Twitter: @allthingshadoop

Big Data Open Source Security

May 25, 2013 1 comment

In security there has never (IMHO) been enough open source solutions and Bruce Schneier has written about this several times in the past, and there’s no need to rewrite the arguments again.

Now with “NoSQL” and “Big Data” Open Source trends in the market place Security finally has an intersection… a union if I may where new solutions to solve problems that have plagued our society can finally begin to arrise (and have already in many cases). Fraud, Malware, Phishing, Spam, etc all can be tackled now with new Security solutions because of Big Data and Open Source.

At the front lines of this is Apache Accumulo which is a Big Data, Open Source and Secure NoSQL Database that runs on top of Apache Hadoop. It was originally developed by the United States National Security Agency and submitted to the Apache Foundation as Open Source in 2011 with 3 years of development and production operation already having occurred.

Accumulo extends the BigTable data model to implement a security mechanism known as cell-level security. Every key-value pair has its own security label, stored under the column visibility element of the key, which is used to determine whether a given user meets the security requirements to read the value. This enables data of various security levels to be stored within the same row, and users of varying degrees of access to query the same table, while preserving data confidentiality.


When mutations are applied, users can specify a security label for each value. This is done as the Mutation is created by passing a ColumnVisibility object to the put() method:

Text rowID = new Text("row1");
Text colFam = new Text("myColFam");
Text colQual = new Text("myColQual");
ColumnVisibility colVis = new ColumnVisibility("public");
long timestamp = System.currentTimeMillis();

Value value = new Value("myValue");

Mutation mutation = new Mutation(rowID);
mutation.put(colFam, colQual, colVis, timestamp, value);


Security labels consist of a set of user-defined tokens that are required to read the value the label is associated with. The set of tokens required can be specified using syntax that supports logical AND and OR combinations of tokens, as well as nesting groups of tokens together.

For example, suppose within our organization we want to label our data values with security labels defined in terms of user roles. We might have tokens such as:

These can be specified alone or combined using logical operators:

// Users must have admin privileges:

// Users must have admin and audit privileges

// Users with either admin or audit privileges

// Users must have audit and one or both of admin or system

When both | and & operators are used, parentheses must be used to specify precedence of the operators.


When clients attempt to read data from Accumulo, any security labels present are examined against the set of authorizations passed by the client code when the Scanner or BatchScanner are created. If the authorizations are determined to be insufficient to satisfy the security label, the value is suppressed from the set of results sent back to the client.

Authorizations are specified as a comma-separated list of tokens the user possesses:

// user possess both admin and system level access
Authorization auths = new Authorization("admin","system");

Scanner s = connector.createScanner("table", auths);


Each accumulo user has a set of associated security labels. To manipulate these in the shell use the setuaths and getauths commands. These may also be modified using the java security operations API.

When a user creates a scanner a set of Authorizations is passed. If the authorizations passed to the scanner are not a subset of the users authorizations, then an exception will be thrown.

To prevent users from writing data they can not read, add the visibility constraint to a table. Use the -evc option in the createtable shell command to enable this constraint. For existing tables use the following shell command to enable the visibility constraint. Ensure the constraint number does not conflict with any existing constraints.

config -t table -s

Any user with the alter table permission can add or remove this constraint. This constraint is not applied to bulk imported data, if this a concern then disable the bulk import permission.


For applications serving many users, it is not expected that an accumulo user will be created for each application user. In this case an accumulo user with all authorizations needed by any of the applications users must be created. To service queries, the application should create a scanner with the application users authorizations. These authorizations could be obtained from a trusted 3rd party.

Often production systems will integrate with Public-Key Infrastructure (PKI) and designate client code within the query layer to negotiate with PKI servers in order to authenticate users and retrieve their authorization tokens (credentials). This requires users to specify only the information necessary to authenticate themselves to the system. Once user identity is established, their credentials can be accessed by the client code and passed to Accumulo outside of the reach of the user.


Since the primary method of interaction with Accumulo is through the Java API, production environments often call for the implementation of a Query layer. This can be done using web services in containers such as Apache Tomcat, but is not a requirement. The Query Services Layer provides a mechanism for providing a platform on which user facing applications can be built. This allows the application designers to isolate potentially complex query logic, and enables a convenient point at which to perform essential security functions.

Several production environments choose to implement authentication at this layer, where users identifiers are used to retrieve their access credentials which are then cached within the query layer and presented to Accumulo through the Authorizations mechanism.

Typically, the query services layer sits between Accumulo and user workstations.

Apache Accumulo version 1.5 just came out for download with docs

New software as a service solutions will start to spring up into the market as will new out of the box open source solutions. Whether we are trying to prevent health care fraud, protect individuals from identify theft or corporations from intrusion all without comprimsing the (C)onfidentiality, (I)ntegrity and the (A)vailability of the data and distributes systems.

Joe Stein

Cloudera, Yahoo and the Apache Hadoop Community Security Branch Release Update

May 5, 2011 1 comment

In the wake of Yahoo! having announced that they would discontinue their Hadoop distribution and focus their efforts into Apache Hadoop the landscape has become tumultuous.

Yahoo! engineers have spent their time and effort contributing back to the Apache Hadoop security branch (branch of 0.20) and have proposed release candidates.

Currently being voted and discussed is “Release candidate″. If you are following the VOTE and the DISCUSSION then maybe you are like me it just cannot be done without a bowl of popcorn before opening the emails. It is getting heated in a good and constructive kind of way. there are already more emails in 5 days of May than there were in all of April. woot!

My take? Has it become Cloudera vs Yahoo! and Apache Hadoop releases will become fragmented because of it? Well, it is kind of like that already. 0.21 is the latest and can anyone that is not a committer quickly know or find out the difference between that and the other release branches? It is esoteric :( 0.22 is right around the corner too which is a release from trunk.

Lets take HBase as an example (a Hadoop project). Do you know what version of HDFS releases can support HBase in production without losing data? If you do then maybe you don’t realize that many people still don’t even know about the branch. And, now that CDH3 is out you can use that (thanks Cloudera!) otherwise it is highly recommended to not be in production with HBase unless you use the append branch of 0.20 which makes you miss out on other changes in trunk releases…

__ eyes crossing inwards and sideways with what branch does what and when the trunk release has everything __

Hadoop is becoming an a la cart which features and fixes can I live without for all of what I really need to deploy … or requiring companies to hire a committer … or a bunch of folks that do nothing but Hadoop day in and day out (sounds like Oracle, ahhhhhh)… or going with the Cloudera Distribution (which is what I do and don’t look back). The barrier to entry feels like it has increased over the last year. However, stepping back from that the system overall has had a lot of improvements! A lot of great work by a lot of dedicated folks putting in their time and effort towards making Hadoop (in whatever form the elephant stampedes through its data) a reality.

Big shops that have teams of “Hadoop Engineers” (Yahoo, Facebook, eBay, LinkedIn, etc) with contributors and/or committers on that team should not have lots of impact because ultimately they are able to role their own releases for whatever they need/want themselves in production and just support it. Not all are so endowed.

Now, all of that having been said I write this because the discussion is REALLY good and has a lot of folks (including those from Yahoo! and Cloudera) bringing up pain points and proposing some great solutions that hopefully will contribute to the continued growth and success of the Apache Hadoop Community…. still if you want to run it in your company (and don’t have a committer on staff) then go download CDH3 it will get you going with the latest and greatest of all the releases, branches, etc, etc, etc. Great documentation too!

Joe Stein


Get every new post delivered to your Inbox.

Join 53 other followers