We talked about the new work that has gone into Apache Solr (upstream) that allows it to work on Apache Hadoop. Solr has support for writing and reading its index and transaction log files to the HDFS distributed filesystem. This does not use Hadoop Map-Reduce to process Solr data, rather it only uses the HDFS filesystem for index and transaction log file storage. https://cwiki.apache.org/confluence/display/solr/Running+Solr+on+HDFS
We also talked about Solr Cloud and how the sharding features allow Solr to scale with a Hadoop cluster https://cwiki.apache.org/confluence/display/solr/SolrCloud.
Apache Solr includes the ability to set up a cluster of Solr servers that combines fault tolerance and high availability. Called SolrCloud, these capabilities provide distributed indexing and search capabilities, supporting the following features:
- Central configuration for the entire cluster
- Automatic load balancing and fail-over for queries
- ZooKeeper integration for cluster coordination and configuration.
SolrCloud is flexible distributed search and indexing, without a master node to allocate nodes, shards and replicas. Instead, Solr uses ZooKeeper to manage these locations, depending on configuration files and schemas. Documents can be sent to any server and ZooKeeper will figure it out.
Patrick introduced me to Morphlines (part of the Cloudera Development Kit for Hadoop) http://cloudera.github.io/cdk/docs/current/cdk-morphlines/index.html
Cloudera Morphlines is an open source framework that reduces the time and skills necessary to build and change Hadoop ETL stream processing applications that extract, transform and load data into Apache Solr, HBase, HDFS, Enterprise Data Warehouses, or Analytic Online Dashboards. Want to build or facilitate ETL jobs without programming and without substantial MapReduce skills? Get the job done with a minimum amount of fuss and support costs? Here is how to get started.
A morphline is a rich configuration file that makes it easy to define a transformation chain that consumes any kind of data from any kind of data source, processes the data and loads the results into a Hadoop component. It replaces Java programming with simple configuration steps, and correspondingly reduces the cost and integration effort associated with developing and maintaining custom ETL projects.
Morphlines is a library, embeddable in any Java codebase. A morphline is an in-memory container of transformation commands. Commands are plugins to a morphline that perform tasks such as loading, parsing, transforming, or otherwise processing a single record. A record is an in-memory data structure of name-value pairs with optional blob attachments or POJO attachments. The framework is extensible and integrates existing functionality and third party systems in a straightforward manner.
The morphline commands were developed as part of Cloudera Search. Morphlines power ETL data flows from Flume and MapReduce and HBase into Apache Solr. Flume covers the real time case, whereas MapReduce covers the batch processing case. Since the launch of Cloudera Search morphline development graduated into the Cloudera Development Kit(CDK) in order to make the technology accessible to a wider range of users and products, beyond Search. The CDK is a set of libraries, tools, examples, and documentation focused on making it easier to build systems on top of the Hadoop ecosystem. The CDK is hosted on GitHub and encourages involvement by the community. For example, morphlines could be embedded into Crunch, HBase, Impala, Pig, Hive, or Sqoop. Let us know where you want to take it!
Morphlines can be seen as an evolution of Unix pipelines where the data model is generalized to work with streams of generic records, including arbitrary binary payloads. A morphline is an efficient way to consume records (e.g. Flume events, HDFS files, RDBMS tables or Avro objects), turn them into a stream of records, and pipe the stream of records through a set of easily configurable transformations on the way to a target application such as Solr, for example as outlined in the following figure:
In this figure, a Flume Source receives syslog events and sends them to a Flume Morphline Sink, which converts each Flume event to a record and pipes it into a readLine command. The readLine command extracts the log line and pipes it into a grok command. The grok command uses regular expression pattern matching to extract some substrings of the line. It pipes the resulting structured record into the loadSolr command. Finally, the loadSolr command loads the record into Solr, typically a SolrCloud. In the process, raw data or semi-structured data is transformed into structured data according to application modelling requirements.
The Morphline framework ships with a set of frequently used high level transformation and I/O commands that can be combined in application specific ways. The plugin system allows the adding of new transformations and I/O commands and integrates existing functionality and third party systems in a straightforward manner.
This integration enables rapid Hadoop ETL application prototyping, complex stream and event processing in real time, flexible log file analysis, integration of multiple heterogeneous input schemas and file formats, as well as reuse of ETL logic building blocks across Hadoop ETL applications.
The CDK ships an efficient runtime that compiles a morphline on the fly. The runtime executes all commands of a given morphline in the same thread. Piping a record from one command to another implies just a cheap Java method call. In particular, there are no queues, no handoffs among threads, no context switches and no serialization between commands, which minimizes performance overheads.
Morphlines manipulate continuous or arbitrarily large streams of records. A command transforms a record into zero or more records. The data model can be described as follows: A record is a set of named fields where each field has an ordered list of one or more values. A value can be any Java Object. That is, a record is essentially a hash table where each hash table entry contains a String key and a list of Java Objects as values. Note that a field can have multiple values and any two records need not use common field names. This flexible data model corresponds exactly to the characteristics of the Solr/Lucene data model.
Not only structured data, but also binary data can be passed into and processed by a morphline. By convention, a record can contain an optional field named _attachment_body, which can be a Java java.io.InputStream or Java byte. Optionally, such binary input data can be characterized in more detail by setting the fields named _attachment_mimetype (such as “application/pdf”) and _attachment_charset (such as “UTF-8″) and _attachment_name (such as “cars.pdf”), which assists in detecting and parsing the data type. This is similar to the way email works.
This generic data model is useful to support a wide range of applications. For example, the Apache Flume Morphline Solr Sink embeds the morphline library and executes a morphline to convert flume events into morphline records and load them into Solr. This sink fills the body of the Flume event into the _attachment_body field of the morphline record, as well as copies the headers of the Flume event into record fields of the same name. As another example, the Mappers of the MapReduceIndexerTool fill the Java java.io.InputStream referring to the currently processed HDFS file into the _attachment_body field of the morphline record. The Mappers of the MapReduceIndexerTool also fill metadata about the HDFS file into record fields, such as the file’s name, path, size, last modified time, etc. This way a morphline can act on all data received from Flume and HDFS. As yet another example, the Morphline Lily HBase Indexer fills a HBase Result Java POJO into the _attachment_body field of the morphline record. This way morphline commands such as extractHBaseCells can extract data from HBase updates and correspondingly update a Solr index.
We also talked a good deal about Apache Zookeeper and some of the history back from when Zookeeper was originally at Yahoo! and Patrick’s experience since then. To hear everything that Patrick had to say please subscribe to the podcast.
Apache Kafka has been used for some time now by organizations to consume not only all of the data within its infrastructure from an application perspective but also the server statistics of the running applications and infrastructure. Apache Kafka is great for this.
Coda Hale’s metrics’s has become a leading way to instrument your JVM applications capturing what the application is doing and reporting it to different servers like Graphite, Ganglia, Riemann and other systems. The main problem with this is the tight coupling between your application (and infrastructure) metrics with how you are charting, trending and alerting on them. Now lets insert Apache Kafka to-do the decoupling which it does best.
The systems sending data and the systems reading the data become decoupled through the Kafka brokers.
Now, once this decoupling happens it allows us to plug in new systems (producers) to send metrics and then have multiple different systems consuming them. This means that not only can you monitor and alert on the metrics but also (and at the same time) do real time analysis or analysis over the larger dataset consumed off somewhere else.
So, how does this all work? Well, we created a Metrics reporter https://github.com/stealthly/metrics-kafka that you can use within your own applications and any application supporting Coda Hale’s metrics but now reporting to a Kafka topic. We will be creating more producers (such as scrapping cpu, mem, disk, etc) and adding them to this project. To use this setup a Metrics reporter like you would normally but do so like this:
import ly.stealth.kafka.metrics.KafkaReporter val producer = KafkaReporter.builder(registry, kafkaConnection, topic).build() producer.report()
git clone https://github.com/stealthly/metrics-kafka.git cd metrics-kafka vagrant up
The highlights from 0.96 where around stability and longer term scale (moving all internal data exchange and persistence to protobufs).
0.98 introduced some exciting new security features and a new HFile format with both encryption at rest and cell level security labels.
HBaseCon has all new speakers and new use cases with new and familiar faces listening onward. A must attend if you can make it.
1.0 is focusing on SLA and more inmemorry database features and general cleanup.
Listen into the podcast and all of what they talked about together.
When I spoke with Arun a year or so a go YARN was NextGen Hadoop and there have been a lot of updates, work done and production experience since.
Besides Yahoo! other multi thousand node clusters have been and are running in production with YARN. These clusters have shown 2x capacity throughput which resulted in reduced cost for hardware (and in some cases being able to shut down co-los) while still gaining performance improvements overall to previous clusters of Hadoop 1.X.
I got to hear about some of what is in 2.4 and coming in 2.5 of Hadoop:
- Application timeline server repository and api for application specific metrics (Tez, Spark, Whatever).
- web service API to put and get with some aggregation.
- plugable nosql store (hbase, accumulo) to scale it.
- Preemption capacity scheduler.
- Multiple resource support (CPU, RAM and Disk).
- Labels tag nodes with labels can be labeled however so some windows and some linux and ask for resources with only those labels with ACLS.
- Hypervisor support as a key part of the topology.
- Hoya generalize for YARN (game changer) and now proposed as Slider to the Apache incubator.
We talked about Tez which provides complex DAGs of queries to translate what you want to-do on Hadoop without the work arounds for making it have to run in MapReduce. MapReduce was not designed to be re-workable out side of the parts of the Job it gave you for Map, Split, Shuffle, Combine, Reduce, Etc and Tez is more expressible exposing a DAG API.
Now becomes with Tez:
There were also some updates on Hive v13 coming out with sub queries, low latency queries (through Tez), high precision decimal points and more!
Subscribe to the podcast and listen to all of what Bikas and Arun had to say.
Adam talked about Apache Accumulo which is a system built for doing random i/o with peta bytes of data.
Distributing the computation to the data with cell level security is where Accumulo really shines.
Accumulo provides a richer data model than simple key-value stores, but is not a fully relational database. Data is represented as key-value pairs, where the key and value are comprised of the following elements:
All elements of the Key and the Value are represented as byte arrays except for Timestamp, which is a Long. Accumulo sorts keys by element and lexicographically in ascending order. Timestamps are sorted in descending order so that later versions of the same Key appear first in a sequential scan. Tables consist of a set of sorted key-value pairs.
Accumulo stores data in tables, which are partitioned into tablets. Tablets are partitioned on row boundaries so that all of the columns and values for a particular row are found together within the same tablet. The Master assigns Tablets to one TabletServer at a time. This enables row-level transactions to take place without using distributed locking or some other complicated synchronization mechanism. As clients insert and query data, and as machines are added and removed from the cluster, the Master migrates tablets to ensure they remain available and that the ingest and query load is balanced across the cluster.
We all know what XML is right? Just in case not, no problem here is what it is all about.
<root> <node>5</node> </root>
Now, what the computer really needs is the number five and some context around it. In XML you (human and computer) can see how it represents context to five. Now lets say instead you have a business XML document like FPML
<FpML xmlns="http://www.fpml.org/2007/FpML-4-4" xmlns:fpml="http://www.fpml.org/2007/FpML-4-4" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" version="4-4" xsi:schemaLocation="http://www.fpml.org/2007/FpML-4-4 ../fpml-main-4-4.xsd http://www.w3.org/2000/09/xmldsig# ../xmldsig-core-schema.xsd" xsi:type="RequestTradeConfirmation"> <!-- start of distinct --> <strike> <strikePrice>32.00</strikePrice> </strike> <numberOfOptions>150000</numberOfOptions> <optionEntitlement>1.00</optionEntitlement> <equityPremium> <payerPartyReference href="party2"/> <receiverPartyReference href="party1"/> <paymentAmount> <currency>EUR</currency> <amount>405000</amount> </paymentAmount> <paymentDate> <unadjustedDate>2001-07-17Z</unadjustedDate> <dateAdjustments> <businessDayConvention>NONE</businessDayConvention> </dateAdjustments> </paymentDate> <pricePerOption> <currency>EUR</currency> <amount>2.70</amount> </pricePerOption> </equityPremium> </equityOption> <calculationAgent> <calculationAgentPartyReference href="party1"/> </calculationAgent> <documentation> <masterAgreement> <masterAgreementType>ISDA2002</masterAgreementType> </masterAgreement> <contractualDefinitions>ISDA2002Equity</contractualDefinitions> <!-- populate credit support document with correct value --> <creditSupportDocument>TODO</creditSupportDocument> </documentation> <governingLaw>GBEN</governingLaw> </trade> <party id="party1"> <partyId>Party A</partyId> </party> <party id="party2"> <partyId>Party B</partyId> </party> </FpML>
That is a lot of extra unnecessary data points. Now lets look at this using Apache Avro.
With Avro, the context and the values are separated. This means the schema/structure of what the information is does not get stored or streamed over and over and over and over (and over) again.
The Avro schema is hashed. So the data structure only holds the value and the computer understands the fingerprint (the hash) of the schema and can retrieve the schema using the fingerprint.
This type of implementation is pretty typical in the data space.
When you do this you can reduce your data between 20%-80%. When I tell folks this they immediately ask, “why such a large gap of unknowns”. The answer is because not every XML is created the same. But that is the problem because you are duplicating the information the computer needs to understand the data. XML is nice for humans to read, sure … but that is not optimized for the computer.
Here is a converter we are working on https://github.com/stealthly/xml-avro to help get folks off of XML and onto lower cost, open source systems. This allows you to keep parts of your systems (specifically the domain business code) using the XML and not having to be changed (risk mitigation) but store and stream the data with less overhead (optimize budget).
Founder, Principal Consultant
Big Data Open Source Security LLC
The origins of Impala can be found in F1 – The Fault-Tolerant Distributed RDBMS Supporting Google’s Ad Business.
One of many differences between MapReduce and Impala is in Impala the intermediate data moves from process to process directly instead of storing it on HDFS for processes to get at the data needed for processing. This provides a HUGE performance advantage and doing so while consuming few cluster resources. Less hardware to-do more!
There are many advantages to this approach over alternative approaches for querying Hadoop data, including::
- Thanks to local processing on data nodes, network bottlenecks are avoided.
- A single, open, and unified metadata store can be utilized.
- Costly data format conversion is unnecessary and thus no overhead is incurred.
- All data is immediately query-able, with no delays for ETL.
- All hardware is utilized for Impala queries as well as for MapReduce.
- Only a single machine pool is needed to scale.
We encourage you to read the documentation for further exploration!
There are still transformation steps required to optimize the queries but Impala can help to-do this for you with Parquet file format. Better compression and optimized runtime performance is realized using the ParquetFormat though many other file types are supported.