Apache Kafka has been used for some time now by organizations to consume not only all of the data within its infrastructure from an application perspective but also the server statistics of the running applications and infrastructure. Apache Kafka is great for this.
Coda Hale’s metrics’s has become a leading way to instrument your JVM applications capturing what the application is doing and reporting it to different servers like Graphite, Ganglia, Riemann and other systems. The main problem with this is the tight coupling between your application (and infrastructure) metrics with how you are charting, trending and alerting on them. Now lets insert Apache Kafka to-do the decoupling which it does best.
The systems sending data and the systems reading the data become decoupled through the Kafka brokers.
Now, once this decoupling happens it allows us to plug in new systems (producers) to send metrics and then have multiple different systems consuming them. This means that not only can you monitor and alert on the metrics but also (and at the same time) do real time analysis or analysis over the larger dataset consumed off somewhere else.
So, how does this all work? Well, we created a Metrics reporter https://github.com/stealthly/metrics-kafka that you can use within your own applications and any application supporting Coda Hale’s metrics but now reporting to a Kafka topic. We will be creating more producers (such as scrapping cpu, mem, disk, etc) and adding them to this project. To use this setup a Metrics reporter like you would normally but do so like this:
import ly.stealth.kafka.metrics.KafkaReporter val producer = KafkaReporter.builder(registry, kafkaConnection, topic).build() producer.report()
git clone https://github.com/stealthly/metrics-kafka.git cd metrics-kafka vagrant up
The highlights from 0.96 where around stability and longer term scale (moving all internal data exchange and persistence to protobufs).
0.98 introduced some exciting new security features and a new HFile format with both encryption at rest and cell level security labels.
HBaseCon has all new speakers and new use cases with new and familiar faces listening onward. A must attend if you can make it.
1.0 is focusing on SLA and more inmemorry database features and general cleanup.
Listen into the podcast and all of what they talked about together.
When I spoke with Arun a year or so a go YARN was NextGen Hadoop and there have been a lot of updates, work done and production experience since.
Besides Yahoo! other multi thousand node clusters have been and are running in production with YARN. These clusters have shown 2x capacity throughput which resulted in reduced cost for hardware (and in some cases being able to shut down co-los) while still gaining performance improvements overall to previous clusters of Hadoop 1.X.
I got to hear about some of what is in 2.4 and coming in 2.5 of Hadoop:
- Application timeline server repository and api for application specific metrics (Tez, Spark, Whatever).
- web service API to put and get with some aggregation.
- plugable nosql store (hbase, accumulo) to scale it.
- Preemption capacity scheduler.
- Multiple resource support (CPU, RAM and Disk).
- Labels tag nodes with labels can be labeled however so some windows and some linux and ask for resources with only those labels with ACLS.
- Hypervisor support as a key part of the topology.
- Hoya generalize for YARN (game changer) and now proposed as Slider to the Apache incubator.
We talked about Tez which provides complex DAGs of queries to translate what you want to-do on Hadoop without the work arounds for making it have to run in MapReduce. MapReduce was not designed to be re-workable out side of the parts of the Job it gave you for Map, Split, Shuffle, Combine, Reduce, Etc and Tez is more expressible exposing a DAG API.
Now becomes with Tez:
There were also some updates on Hive v13 coming out with sub queries, low latency queries (through Tez), high precision decimal points and more!
Subscribe to the podcast and listen to all of what Bikas and Arun had to say.
Adam talked about Apache Accumulo which is a system built for doing random i/o with peta bytes of data.
Distributing the computation to the data with cell level security is where Accumulo really shines.
Accumulo provides a richer data model than simple key-value stores, but is not a fully relational database. Data is represented as key-value pairs, where the key and value are comprised of the following elements:
All elements of the Key and the Value are represented as byte arrays except for Timestamp, which is a Long. Accumulo sorts keys by element and lexicographically in ascending order. Timestamps are sorted in descending order so that later versions of the same Key appear first in a sequential scan. Tables consist of a set of sorted key-value pairs.
Accumulo stores data in tables, which are partitioned into tablets. Tablets are partitioned on row boundaries so that all of the columns and values for a particular row are found together within the same tablet. The Master assigns Tablets to one TabletServer at a time. This enables row-level transactions to take place without using distributed locking or some other complicated synchronization mechanism. As clients insert and query data, and as machines are added and removed from the cluster, the Master migrates tablets to ensure they remain available and that the ingest and query load is balanced across the cluster.
We all know what XML is right? Just in case not, no problem here is what it is all about.
<root> <node>5</node> </root>
Now, what the computer really needs is the number five and some context around it. In XML you (human and computer) can see how it represents context to five. Now lets say instead you have a business XML document like FPML
<FpML xmlns="http://www.fpml.org/2007/FpML-4-4" xmlns:fpml="http://www.fpml.org/2007/FpML-4-4" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" version="4-4" xsi:schemaLocation="http://www.fpml.org/2007/FpML-4-4 ../fpml-main-4-4.xsd http://www.w3.org/2000/09/xmldsig# ../xmldsig-core-schema.xsd" xsi:type="RequestTradeConfirmation"> <!-- start of distinct --> <strike> <strikePrice>32.00</strikePrice> </strike> <numberOfOptions>150000</numberOfOptions> <optionEntitlement>1.00</optionEntitlement> <equityPremium> <payerPartyReference href="party2"/> <receiverPartyReference href="party1"/> <paymentAmount> <currency>EUR</currency> <amount>405000</amount> </paymentAmount> <paymentDate> <unadjustedDate>2001-07-17Z</unadjustedDate> <dateAdjustments> <businessDayConvention>NONE</businessDayConvention> </dateAdjustments> </paymentDate> <pricePerOption> <currency>EUR</currency> <amount>2.70</amount> </pricePerOption> </equityPremium> </equityOption> <calculationAgent> <calculationAgentPartyReference href="party1"/> </calculationAgent> <documentation> <masterAgreement> <masterAgreementType>ISDA2002</masterAgreementType> </masterAgreement> <contractualDefinitions>ISDA2002Equity</contractualDefinitions> <!-- populate credit support document with correct value --> <creditSupportDocument>TODO</creditSupportDocument> </documentation> <governingLaw>GBEN</governingLaw> </trade> <party id="party1"> <partyId>Party A</partyId> </party> <party id="party2"> <partyId>Party B</partyId> </party> </FpML>
That is a lot of extra unnecessary data points. Now lets look at this using Apache Avro.
With Avro, the context and the values are separated. This means the schema/structure of what the information is does not get stored or streamed over and over and over and over (and over) again.
The Avro schema is hashed. So the data structure only holds the value and the computer understands the fingerprint (the hash) of the schema and can retrieve the schema using the fingerprint.
This type of implementation is pretty typical in the data space.
When you do this you can reduce your data between 20%-80%. When I tell folks this they immediately ask, “why such a large gap of unknowns”. The answer is because not every XML is created the same. But that is the problem because you are duplicating the information the computer needs to understand the data. XML is nice for humans to read, sure … but that is not optimized for the computer.
Here is a converter we are working on https://github.com/stealthly/xml-avro to help get folks off of XML and onto lower cost, open source systems. This allows you to keep parts of your systems (specifically the domain business code) using the XML and not having to be changed (risk mitigation) but store and stream the data with less overhead (optimize budget).
Founder, Principal Consultant
Big Data Open Source Security LLC
The origins of Impala can be found in F1 – The Fault-Tolerant Distributed RDBMS Supporting Google’s Ad Business.
One of many differences between MapReduce and Impala is in Impala the intermediate data moves from process to process directly instead of storing it on HDFS for processes to get at the data needed for processing. This provides a HUGE performance advantage and doing so while consuming few cluster resources. Less hardware to-do more!
There are many advantages to this approach over alternative approaches for querying Hadoop data, including::
- Thanks to local processing on data nodes, network bottlenecks are avoided.
- A single, open, and unified metadata store can be utilized.
- Costly data format conversion is unnecessary and thus no overhead is incurred.
- All data is immediately query-able, with no delays for ETL.
- All hardware is utilized for Impala queries as well as for MapReduce.
- Only a single machine pool is needed to scale.
We encourage you to read the documentation for further exploration!
There are still transformation steps required to optimize the queries but Impala can help to-do this for you with Parquet file format. Better compression and optimized runtime performance is realized using the ParquetFormat though many other file types are supported.
Big Data Open Source Security LLC has released an Apache Kafka HTTP endpoint for producing and consuming data with Apache Kafka brokers https://github.com/stealthly/dropwizard-kafka-http.
This project creates an Apache Kafka HTTP endpoint for producing and consuming messages.
1) Install Vagrant (http://www.vagrantup.com/)
2) Install Virtual Box (https://www.virtualbox.org/)
3) git clone https://github.com/stealthly/dropwizard-kafka-http
4) cd dropwizard-kafka-http
5) vagrant up
Then produce and consume messages over HTTP e.g.
curl -d "topic=http&message=hello&key=0" "http://192.168.22.10:8080/message"
Your original message produced from the consumer:
* Zookeeper will be running on 192.168.22.5
* dropwizard-kafka-http is built cleanly before Zookeeper installs in `vagrant/zk.sh`
* Broker One is running on 192.168.22.10
* dropwizard-kafka-http launches on 192.168.22.10 after the Kafka Broker starts in `vagrant/broker.sh`
If you want you can login to the machines using `vagrant ssh zookeeper` and `vagrant ssh brokerOne`.
Produces messages on topic.
topic – required topic name
async – optional true|false value indicating should the producer be async
key – required key text. Multiple values are possible
message – required message text. Multiple values are possible
Note: passed keys count should match messages count.
400 – wrong parameters passed
Returns consumed messages available on topic.
topic – required topic name
timeout – optional timeout in ms
400 – wrong parameters passed