Bill explained how Twitter, using Apache Mesos and Apache Aurora, gets more for their money for the hardware and saves engineering time (both development and operations) by utilizing fine grained resources scheduling across their infrastructure. Bill talked a bit how the power of what he saw and experienced at Google with Borg is how they wanted to run things at Twitter and what they built Aurora for. Now after years of running in production at Twitter, Aurora is open source, part of the Apache foundation and available for use. Lots of new use cases that they didn’t see coming have become very powerful for their teams and Bill went into more detail about that too.
Bill also talked about the type of instrumentation that was done with features in Aurora to get to a place where now all new systems and almost all legacy systems at Twitter are run on top of Aurora. Bill went into detail about how that works in regards to Twitter’s cache and how the SLA features of Aurora make this a reality. Aurora is amazing providing end users (everyone from engineers to analysts) the ability to have full access to the potential resources of their hardware clusters. Aurora provides features like quotas and preemption so that any user can be provided the access to the compute resources of the entire hardware infrastructure without worry of abuse to hog resources and keep production always as the priority.
Apache Mesos abstracts CPU, memory, storage, and other compute resources away from machines (physical or virtual), enabling fault-tolerant and elastic distributed systems to easily be built and run effectively. Mesos is built using the same principles as the Linux kernel, only at a different level of abstraction. The Mesos kernel runs on every machine and provides applications (e.g., Hadoop, Spark, Kafka, Elastic Search) with API’s for resource management and scheduling across entire datacenter and cloud environments.
Apache Aurora is a Mesos framework. A Mesos frameworks is a scheduler of resources and launcher of tasks. Aurora provides a Job abstraction consisting of a Task template and instructions for creating near-identical replicas of that Task. Typically a Task is a single Process corresponding to a single command line, such as
python2.6 my_script.py. However, sometimes you must colocate separate Processes together within a single Task, which runs within a single container and
chroot, often referred to as a “sandbox”. For example, if you run multiple cooperating agents together such as
installer, and master or slave processes. Thermos provides a Process abstraction under the Mesos Tasks.
To use and get up to speed on Aurora, you should look the docs in this directory in this order:
- How to deploy Aurora or, how to install Aurora on virtual machines on your private machine (the Tutorial uses the virtual machine approach).
- As a user, get started quickly with a Tutorial.
- For an overview of Aurora’s process flow under the hood, see the User Guide.
- To learn how to write a configuration file, look at our Configuration Tutorial. From there, look at the Aurora + Thermos Reference.
- Then read up on the Aurora Command Line Client.
- Find out general information and useful tips about how Aurora does Resource Isolation.
For some more great background on Mesos and Aurora please check out these three videos.
Datacenter Management with Apache Mesos
An intro video to Apache Aurora
Past, Present, Future of Apache Aurora
To hear everything that Bill had to say please subscribe to the podcast.
We talked about the new work that has gone into Apache Solr (upstream) that allows it to work on Apache Hadoop. Solr has support for writing and reading its index and transaction log files to the HDFS distributed filesystem. This does not use Hadoop Map-Reduce to process Solr data, rather it only uses the HDFS filesystem for index and transaction log file storage. https://cwiki.apache.org/confluence/display/solr/Running+Solr+on+HDFS
We also talked about Solr Cloud and how the sharding features allow Solr to scale with a Hadoop cluster https://cwiki.apache.org/confluence/display/solr/SolrCloud.
Apache Solr includes the ability to set up a cluster of Solr servers that combines fault tolerance and high availability. Called SolrCloud, these capabilities provide distributed indexing and search capabilities, supporting the following features:
- Central configuration for the entire cluster
- Automatic load balancing and fail-over for queries
- ZooKeeper integration for cluster coordination and configuration.
SolrCloud is flexible distributed search and indexing, without a master node to allocate nodes, shards and replicas. Instead, Solr uses ZooKeeper to manage these locations, depending on configuration files and schemas. Documents can be sent to any server and ZooKeeper will figure it out.
Patrick introduced me to Morphlines (part of the Cloudera Development Kit for Hadoop) http://cloudera.github.io/cdk/docs/current/cdk-morphlines/index.html
Cloudera Morphlines is an open source framework that reduces the time and skills necessary to build and change Hadoop ETL stream processing applications that extract, transform and load data into Apache Solr, HBase, HDFS, Enterprise Data Warehouses, or Analytic Online Dashboards. Want to build or facilitate ETL jobs without programming and without substantial MapReduce skills? Get the job done with a minimum amount of fuss and support costs? Here is how to get started.
A morphline is a rich configuration file that makes it easy to define a transformation chain that consumes any kind of data from any kind of data source, processes the data and loads the results into a Hadoop component. It replaces Java programming with simple configuration steps, and correspondingly reduces the cost and integration effort associated with developing and maintaining custom ETL projects.
Morphlines is a library, embeddable in any Java codebase. A morphline is an in-memory container of transformation commands. Commands are plugins to a morphline that perform tasks such as loading, parsing, transforming, or otherwise processing a single record. A record is an in-memory data structure of name-value pairs with optional blob attachments or POJO attachments. The framework is extensible and integrates existing functionality and third party systems in a straightforward manner.
The morphline commands were developed as part of Cloudera Search. Morphlines power ETL data flows from Flume and MapReduce and HBase into Apache Solr. Flume covers the real time case, whereas MapReduce covers the batch processing case. Since the launch of Cloudera Search morphline development graduated into the Cloudera Development Kit(CDK) in order to make the technology accessible to a wider range of users and products, beyond Search. The CDK is a set of libraries, tools, examples, and documentation focused on making it easier to build systems on top of the Hadoop ecosystem. The CDK is hosted on GitHub and encourages involvement by the community. For example, morphlines could be embedded into Crunch, HBase, Impala, Pig, Hive, or Sqoop. Let us know where you want to take it!
Morphlines can be seen as an evolution of Unix pipelines where the data model is generalized to work with streams of generic records, including arbitrary binary payloads. A morphline is an efficient way to consume records (e.g. Flume events, HDFS files, RDBMS tables or Avro objects), turn them into a stream of records, and pipe the stream of records through a set of easily configurable transformations on the way to a target application such as Solr, for example as outlined in the following figure:
In this figure, a Flume Source receives syslog events and sends them to a Flume Morphline Sink, which converts each Flume event to a record and pipes it into a readLine command. The readLine command extracts the log line and pipes it into a grok command. The grok command uses regular expression pattern matching to extract some substrings of the line. It pipes the resulting structured record into the loadSolr command. Finally, the loadSolr command loads the record into Solr, typically a SolrCloud. In the process, raw data or semi-structured data is transformed into structured data according to application modelling requirements.
The Morphline framework ships with a set of frequently used high level transformation and I/O commands that can be combined in application specific ways. The plugin system allows the adding of new transformations and I/O commands and integrates existing functionality and third party systems in a straightforward manner.
This integration enables rapid Hadoop ETL application prototyping, complex stream and event processing in real time, flexible log file analysis, integration of multiple heterogeneous input schemas and file formats, as well as reuse of ETL logic building blocks across Hadoop ETL applications.
The CDK ships an efficient runtime that compiles a morphline on the fly. The runtime executes all commands of a given morphline in the same thread. Piping a record from one command to another implies just a cheap Java method call. In particular, there are no queues, no handoffs among threads, no context switches and no serialization between commands, which minimizes performance overheads.
Morphlines manipulate continuous or arbitrarily large streams of records. A command transforms a record into zero or more records. The data model can be described as follows: A record is a set of named fields where each field has an ordered list of one or more values. A value can be any Java Object. That is, a record is essentially a hash table where each hash table entry contains a String key and a list of Java Objects as values. Note that a field can have multiple values and any two records need not use common field names. This flexible data model corresponds exactly to the characteristics of the Solr/Lucene data model.
Not only structured data, but also binary data can be passed into and processed by a morphline. By convention, a record can contain an optional field named _attachment_body, which can be a Java java.io.InputStream or Java byte. Optionally, such binary input data can be characterized in more detail by setting the fields named _attachment_mimetype (such as “application/pdf”) and _attachment_charset (such as “UTF-8″) and _attachment_name (such as “cars.pdf”), which assists in detecting and parsing the data type. This is similar to the way email works.
This generic data model is useful to support a wide range of applications. For example, the Apache Flume Morphline Solr Sink embeds the morphline library and executes a morphline to convert flume events into morphline records and load them into Solr. This sink fills the body of the Flume event into the _attachment_body field of the morphline record, as well as copies the headers of the Flume event into record fields of the same name. As another example, the Mappers of the MapReduceIndexerTool fill the Java java.io.InputStream referring to the currently processed HDFS file into the _attachment_body field of the morphline record. The Mappers of the MapReduceIndexerTool also fill metadata about the HDFS file into record fields, such as the file’s name, path, size, last modified time, etc. This way a morphline can act on all data received from Flume and HDFS. As yet another example, the Morphline Lily HBase Indexer fills a HBase Result Java POJO into the _attachment_body field of the morphline record. This way morphline commands such as extractHBaseCells can extract data from HBase updates and correspondingly update a Solr index.
We also talked a good deal about Apache Zookeeper and some of the history back from when Zookeeper was originally at Yahoo! and Patrick’s experience since then. To hear everything that Patrick had to say please subscribe to the podcast.
Apache Kafka has been used for some time now by organizations to consume not only all of the data within its infrastructure from an application perspective but also the server statistics of the running applications and infrastructure. Apache Kafka is great for this.
Coda Hale’s metrics’s has become a leading way to instrument your JVM applications capturing what the application is doing and reporting it to different servers like Graphite, Ganglia, Riemann and other systems. The main problem with this is the tight coupling between your application (and infrastructure) metrics with how you are charting, trending and alerting on them. Now lets insert Apache Kafka to-do the decoupling which it does best.
The systems sending data and the systems reading the data become decoupled through the Kafka brokers.
Now, once this decoupling happens it allows us to plug in new systems (producers) to send metrics and then have multiple different systems consuming them. This means that not only can you monitor and alert on the metrics but also (and at the same time) do real time analysis or analysis over the larger dataset consumed off somewhere else.
So, how does this all work? Well, we created a Metrics reporter https://github.com/stealthly/metrics-kafka that you can use within your own applications and any application supporting Coda Hale’s metrics but now reporting to a Kafka topic. We will be creating more producers (such as scrapping cpu, mem, disk, etc) and adding them to this project. To use this setup a Metrics reporter like you would normally but do so like this:
import ly.stealth.kafka.metrics.KafkaReporter val producer = KafkaReporter.builder(registry, kafkaConnection, topic).build() producer.report()
git clone https://github.com/stealthly/metrics-kafka.git cd metrics-kafka vagrant up
The highlights from 0.96 where around stability and longer term scale (moving all internal data exchange and persistence to protobufs).
0.98 introduced some exciting new security features and a new HFile format with both encryption at rest and cell level security labels.
HBaseCon has all new speakers and new use cases with new and familiar faces listening onward. A must attend if you can make it.
1.0 is focusing on SLA and more inmemorry database features and general cleanup.
Listen into the podcast and all of what they talked about together.
When I spoke with Arun a year or so a go YARN was NextGen Hadoop and there have been a lot of updates, work done and production experience since.
Besides Yahoo! other multi thousand node clusters have been and are running in production with YARN. These clusters have shown 2x capacity throughput which resulted in reduced cost for hardware (and in some cases being able to shut down co-los) while still gaining performance improvements overall to previous clusters of Hadoop 1.X.
I got to hear about some of what is in 2.4 and coming in 2.5 of Hadoop:
- Application timeline server repository and api for application specific metrics (Tez, Spark, Whatever).
- web service API to put and get with some aggregation.
- plugable nosql store (hbase, accumulo) to scale it.
- Preemption capacity scheduler.
- Multiple resource support (CPU, RAM and Disk).
- Labels tag nodes with labels can be labeled however so some windows and some linux and ask for resources with only those labels with ACLS.
- Hypervisor support as a key part of the topology.
- Hoya generalize for YARN (game changer) and now proposed as Slider to the Apache incubator.
We talked about Tez which provides complex DAGs of queries to translate what you want to-do on Hadoop without the work arounds for making it have to run in MapReduce. MapReduce was not designed to be re-workable out side of the parts of the Job it gave you for Map, Split, Shuffle, Combine, Reduce, Etc and Tez is more expressible exposing a DAG API.
Now becomes with Tez:
There were also some updates on Hive v13 coming out with sub queries, low latency queries (through Tez), high precision decimal points and more!
Subscribe to the podcast and listen to all of what Bikas and Arun had to say.
Adam talked about Apache Accumulo which is a system built for doing random i/o with peta bytes of data.
Distributing the computation to the data with cell level security is where Accumulo really shines.
Accumulo provides a richer data model than simple key-value stores, but is not a fully relational database. Data is represented as key-value pairs, where the key and value are comprised of the following elements:
All elements of the Key and the Value are represented as byte arrays except for Timestamp, which is a Long. Accumulo sorts keys by element and lexicographically in ascending order. Timestamps are sorted in descending order so that later versions of the same Key appear first in a sequential scan. Tables consist of a set of sorted key-value pairs.
Accumulo stores data in tables, which are partitioned into tablets. Tablets are partitioned on row boundaries so that all of the columns and values for a particular row are found together within the same tablet. The Master assigns Tablets to one TabletServer at a time. This enables row-level transactions to take place without using distributed locking or some other complicated synchronization mechanism. As clients insert and query data, and as machines are added and removed from the cluster, the Master migrates tablets to ensure they remain available and that the ingest and query load is balanced across the cluster.
We all know what XML is right? Just in case not, no problem here is what it is all about.
<root> <node>5</node> </root>
Now, what the computer really needs is the number five and some context around it. In XML you (human and computer) can see how it represents context to five. Now lets say instead you have a business XML document like FPML
<FpML xmlns="http://www.fpml.org/2007/FpML-4-4" xmlns:fpml="http://www.fpml.org/2007/FpML-4-4" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" version="4-4" xsi:schemaLocation="http://www.fpml.org/2007/FpML-4-4 ../fpml-main-4-4.xsd http://www.w3.org/2000/09/xmldsig# ../xmldsig-core-schema.xsd" xsi:type="RequestTradeConfirmation"> <!-- start of distinct --> <strike> <strikePrice>32.00</strikePrice> </strike> <numberOfOptions>150000</numberOfOptions> <optionEntitlement>1.00</optionEntitlement> <equityPremium> <payerPartyReference href="party2"/> <receiverPartyReference href="party1"/> <paymentAmount> <currency>EUR</currency> <amount>405000</amount> </paymentAmount> <paymentDate> <unadjustedDate>2001-07-17Z</unadjustedDate> <dateAdjustments> <businessDayConvention>NONE</businessDayConvention> </dateAdjustments> </paymentDate> <pricePerOption> <currency>EUR</currency> <amount>2.70</amount> </pricePerOption> </equityPremium> </equityOption> <calculationAgent> <calculationAgentPartyReference href="party1"/> </calculationAgent> <documentation> <masterAgreement> <masterAgreementType>ISDA2002</masterAgreementType> </masterAgreement> <contractualDefinitions>ISDA2002Equity</contractualDefinitions> <!-- populate credit support document with correct value --> <creditSupportDocument>TODO</creditSupportDocument> </documentation> <governingLaw>GBEN</governingLaw> </trade> <party id="party1"> <partyId>Party A</partyId> </party> <party id="party2"> <partyId>Party B</partyId> </party> </FpML>
That is a lot of extra unnecessary data points. Now lets look at this using Apache Avro.
With Avro, the context and the values are separated. This means the schema/structure of what the information is does not get stored or streamed over and over and over and over (and over) again.
The Avro schema is hashed. So the data structure only holds the value and the computer understands the fingerprint (the hash) of the schema and can retrieve the schema using the fingerprint.
This type of implementation is pretty typical in the data space.
When you do this you can reduce your data between 20%-80%. When I tell folks this they immediately ask, “why such a large gap of unknowns”. The answer is because not every XML is created the same. But that is the problem because you are duplicating the information the computer needs to understand the data. XML is nice for humans to read, sure … but that is not optimized for the computer.
Here is a converter we are working on https://github.com/stealthly/xml-avro to help get folks off of XML and onto lower cost, open source systems. This allows you to keep parts of your systems (specifically the domain business code) using the XML and not having to be changed (risk mitigation) but store and stream the data with less overhead (optimize budget).
Founder, Principal Consultant
Big Data Open Source Security LLC