Archive
Open Source Cloud Formation with Minotaur for Mesos, Kafka and Hadoop
Today I am happy to announce “Minotaur” which is our Open Source AWS based infrastructure for managing big data open source projects including (but not limited too): Apache Kafka, Apache Mesos and Cloudera’s Distribution of Hadoop. Minotaur is based on AWS Cloud Formation.
The following labs are currently supported:
- Apache Mesos
- Apache Kafka
- Apache Zookeeper
- Cloudera Hadoop
- Golang Kafka Consumer
- Golang Kafka Producer
Supervisor
Supervisor is a Docker-based image that contains all the necessary software to manage nodes/resources in AWS.
Supervisor set-up
- clone this repo to repo_dir
- cd to repo_dir/supervisor folder
Before trying to build docker image, you must put some configuration files under config directory:
a) aws_config
This file is just a regular aws-cli config, you must paste your secret and access keys, provided by Amazon in it:
[default]
output = json
region = us-east-1
aws_access_key_id = SECRET_KEY
aws_secret_access_key = ACCESS_KEY
Do not add or remove any extra whitespaces (especially before and after “=” sign in keys)
b) private.key
This is your private SSH key, public part of which is registered on Bastion host
c) environment.key
This is a shared key for all nodes in environment Supervisor is supposed to manage.
d) ssh_config
This is a regular SSH config file, you have to change your_username only (this is the one registered on Bastion).
BASTION_IP is handled dynamically when container is built.
# BDOSS environment
Host 10.0.2.*
IdentityFile ~/.ssh/environment.key
User ubuntu
ProxyCommand ssh -i ~/.ssh/private.key your_username@BASTION_IP nc %h %p
Host 10.0.*.*
IdentityFile ~/.ssh/environment.key
User ubuntu
ProxyCommand ssh -i ~/.ssh/private.key your_username@BASTION_IP nc %h %p
- exec up.sh:
If this is the first time you’re launching supervisor – it will take some time to build.
Subsequent up’s will take seconds.
Using supervisor
Now you can cd to /deploy/labs/ and deploy whatever you want
Example:
minotaur lab deploy mesosmaster -e bdoss-dev -d test -r us-east-1 -z us-east-1a
Creating new stack 'mesos-master-test-bdoss-dev-us-east-1-us-east-1a'...
Stack deployed.
this will spin up a mesos master node in “testing” deployment.
awsinfo
Supervisor has a built-in “awsinfo” command, which relies on AWS API and provides brief info about running machines. It is also capable of searching through that info.
Usage example
awsinfo
– will display brief info about all nodes running in AWS
root@supervisor:/deploy# awsinfo
Cloud: bdoss/us-east-1
Name Instance ID Instance Type Instance State Private IP Public IP
---- ----------- ------------- -------------- ---------- ---------
nat.bdoss-dev i-c46a0b2a m1.small running 10.0.2.94 54.86.153.142
bastion.bdoss-dev i-3faa69de m1.small running 10.0.0.207 None
mesos-master.test.bdoss-dev i-e80ddc09 m1.small terminated None None
mesos-slave.test.bdoss-dev i-e00ddc01 m1.small terminated None None
awsinfo mesos-master
– will display info about all mesos-master nodes running in AWS.
root@supervisor:/deploy/labs# awsinfo mesos-master
Cloud: bdoss/us-east-1
Name Instance ID Instance Type Instance State Private IP Public IP
---- ----------- ------------- -------------- ---------- ---------
mesos-master.test.bdoss-dev i-e80ddc09 m1.small terminated None None
awsinfo 10.0.2
– match a private/public subnet
root@supervisor:/deploy/labs# awsinfo 10.0.2
Cloud: bdoss/us-east-1
Name Instance ID Instance Type Instance State Private IP Public IP
---- ----------- ------------- -------------- ---------- ---------
nat.bdoss-dev i-c46a0b2a m1.small running 10.0.2.94 54.86.153.142
mesos-master.test.bdoss-dev i-e96ebd08 m1.small running 10.0.2.170 54.172.160.254
Vagrant
If you can’t use Docker directly for some reason, there’s a Vagrant wrapper VM for it.
Before doing anything with Vagrant, complete the above steps for Docker, but don’t execute up.sh script
Just cd into vagrant directory and exec vagrant up
, then vagrant ssh
(nothing special here yet).
When you will exec vagrant ssh
, docker container build process will spawn up immediately, so wait a bit and let it complete.
Now you’re inside a Docker container nested in Vagrant VM and can proceed with deployment in the same manner as it’s described for docker.
All the following vagrant ssh
‘s will spawn Docker container almost immediately.
Once you are inside of the supervisor image, the minotaur.py
script may be used to provision an environment and labs. The rest of this readme assumes that the script is executed from within the supervisor container.
Minotaur Commands
List Infrastructure Components
root@supervisor:/deploy# ./minotaur.py infrastructure list
Available deployments are: ['bastion', 'iampolicies', 'iamusertogroupadditions', 'nat', 'sns', 'subnet', 'vpc']
Print Infrastructure Component Usage
root@supervisor:/deploy# ./minotaur.py infrastructure deploy bastion -h
usage: minotaur.py infrastructure deploy bastion [-h] -e ENVIRONMENT -r REGION
-z AVAILABILITY_ZONE
[-i INSTANCE_TYPE]
optional arguments:
-h, --help show this help message and exit
-e ENVIRONMENT, --environment ENVIRONMENT
CloudFormation environment to deploy to
-r REGION, --region REGION
Geographic area to deploy to
-z AVAILABILITY_ZONE, --availability-zone AVAILABILITY_ZONE
Isolated location to deploy to
-i INSTANCE_TYPE, --instance-type INSTANCE_TYPE
AWS EC2 instance type to deploy
Deploy Infrastructure Component
In this example, the bdoss-dev
bastion already existed, so the CloudFormation stack was updated with the current template.
root@supervisor:/deploy# ./minotaur.py infrastructure deploy bastion -e bdoss-dev -r us-east-1 -z -us-east-1a
Template successfully validated.
Updating existing 'bastion-bdoss-dev-us-east-1-us-east-1a' stack...
Stack updated.
List Labs
List all supported labs.
root@supervisor:/deploy# ./minotaur.py lab list
Available deployments are: ['clouderahadoop', 'gokafkaconsumer', 'gokafkaproducer', 'kafka', 'mesosmaster', 'mesosslave', 'zookeeper']
Print Lab Usage
Print the kafka lab usage.
root@supervisor:/deploy# ./minotaur.py lab deploy kafka -h
usage: minotaur.py lab deploy kafka [-h] -e ENVIRONMENT -d DEPLOYMENT -r
REGION -z AVAILABILITY_ZONE [-n NUM_NODES]
[-i INSTANCE_TYPE] [-v ZK_VERSION]
[-k KAFKA_URL]
optional arguments:
-h, --help show this help message and exit
-e ENVIRONMENT, --environment ENVIRONMENT
CloudFormation environment to deploy to
-d DEPLOYMENT, --deployment DEPLOYMENT
Unique name for the deployment
-r REGION, --region REGION
Geographic area to deploy to
-z AVAILABILITY_ZONE, --availability-zone AVAILABILITY_ZONE
Isolated location to deploy to
-n NUM_NODES, --num-nodes NUM_NODES
Number of instances to deploy
-i INSTANCE_TYPE, --instance-type INSTANCE_TYPE
AWS EC2 instance type to deploy
-v ZK_VERSION, --zk-version ZK_VERSION
The Zookeeper version to deploy
-k KAFKA_URL, --kafka-url KAFKA_URL
The Kafka URL
Deploy Lab
Deploy a 3-broker Kafka cluster.
root@supervisor:/deploy# ./minotaur.py lab deploy kafka -e bdoss-dev -d kafka-example -r us-east-1 -z us-east-1a -n 3 -i m1.small
Template successfully validated.
Creating new 'kafka-bdoss-dev-kafka-example-us-east-1-us-east-1a' stack...
Stack deployed.
Ideas and goals behind the Go Kafka Client
I think a bunch of folks have heard already that B.D.O.S.S. was working on a new Apache Kafka Client For Go. Go Kafka Client was open sourced last Friday. Today we are starting the release of Minotaur which is our lab environment for Apache Zookeeper, Apache Mesos, Apache Cassandra, Apache Kafka, Apache Hadoop and our new Go Kafka Client.
To get started using the consumer client check out our example code and its property file.
Ideas and goals behind the Go Kafka Client:
1) Partition Ownership
2) Fetch Management
3) Work Management
4) Offset Management
func main() { config, consumerIdPattern, topic, numConsumers, graphiteConnect, graphiteFlushInterval := resolveConfig() startMetrics(graphiteConnect, graphiteFlushInterval) ctrlc := make(chan os.Signal, 1) signal.Notify(ctrlc, os.Interrupt) consumers := make([]*kafkaClient.Consumer, numConsumers) for i := 0; i < numConsumers; i++ { consumers[i] = startNewConsumer(*config, topic, consumerIdPattern, i) time.Sleep(10 * time.Second) } <-ctrlc fmt.Println("Shutdown triggered, closing all alive consumers") for _, consumer := range consumers { <-consumer.Close() } fmt.Println("Successfully shut down all consumers") } func startMetrics(graphiteConnect string, graphiteFlushInterval time.Duration) { addr, err := net.ResolveTCPAddr("tcp", graphiteConnect) if err != nil { panic(err) } go metrics.GraphiteWithConfig(metrics.GraphiteConfig{ Addr: addr, Registry: metrics.DefaultRegistry, FlushInterval: graphiteFlushInterval, DurationUnit: time.Second, Prefix: "metrics", Percentiles: []float64{0.5, 0.75, 0.95, 0.99, 0.999}, }) } func startNewConsumer(config kafkaClient.ConsumerConfig, topic string, consumerIdPattern string, consumerIndex int) *kafkaClient.Consumer { config.Consumerid = fmt.Sprintf(consumerIdPattern, consumerIndex) config.Strategy = GetStrategy(config.Consumerid) config.WorkerFailureCallback = FailedCallback config.WorkerFailedAttemptCallback = FailedAttemptCallback consumer := kafkaClient.NewConsumer(&config) topics := map[string]int {topic : config.NumConsumerFetchers} go func() { consumer.StartStatic(topics) }() return consumer } func GetStrategy(consumerId string) func(*kafkaClient.Worker, *kafkaClient.Message, kafkaClient.TaskId) kafkaClient.WorkerResult { consumeRate := metrics.NewRegisteredMeter(fmt.Sprintf("%s-ConsumeRate", consumerId), metrics.DefaultRegistry) return func(_ *kafkaClient.Worker, msg *kafkaClient.Message, id kafkaClient.TaskId) kafkaClient.WorkerResult { kafkaClient.Tracef("main", "Got a message: %s", string(msg.Value)) consumeRate.Mark(1) return kafkaClient.NewSuccessfulResult(id) } } func FailedCallback(wm *kafkaClient.WorkerManager) kafkaClient.FailedDecision { kafkaClient.Info("main", "Failed callback") return kafkaClient.DoNotCommitOffsetAndStop } func FailedAttemptCallback(task *kafkaClient.Task, result kafkaClient.WorkerResult) kafkaClient.FailedDecision { kafkaClient.Info("main", "Failed attempt") return kafkaClient.CommitOffsetAndContinue }
Plans moving forward with the Go Kafka Client:
Ideas and goals behind Minotaur:
Plans moving forward with Minotaur:
Apache Solr real-time live index updates at scale with Apache Hadoop
Episode # 22 of the podcast was a talk with Patrick Hunt
We talked about the new work that has gone into Apache Solr (upstream) that allows it to work on Apache Hadoop. Solr has support for writing and reading its index and transaction log files to the HDFS distributed filesystem. This does not use Hadoop Map-Reduce to process Solr data, rather it only uses the HDFS filesystem for index and transaction log file storage. https://cwiki.apache.org/confluence/display/solr/Running+Solr+on+HDFS
We also talked about Solr Cloud and how the sharding features allow Solr to scale with a Hadoop cluster https://cwiki.apache.org/confluence/display/solr/SolrCloud.
Apache Solr includes the ability to set up a cluster of Solr servers that combines fault tolerance and high availability. Called SolrCloud, these capabilities provide distributed indexing and search capabilities, supporting the following features:
- Central configuration for the entire cluster
- Automatic load balancing and fail-over for queries
- ZooKeeper integration for cluster coordination and configuration.
SolrCloud is flexible distributed search and indexing, without a master node to allocate nodes, shards and replicas. Instead, Solr uses ZooKeeper to manage these locations, depending on configuration files and schemas. Documents can be sent to any server and ZooKeeper will figure it out.
Patrick introduced me to Morphlines (part of the Cloudera Development Kit for Hadoop) http://cloudera.github.io/cdk/docs/current/cdk-morphlines/index.html
Cloudera Morphlines is an open source framework that reduces the time and skills necessary to build and change Hadoop ETL stream processing applications that extract, transform and load data into Apache Solr, HBase, HDFS, Enterprise Data Warehouses, or Analytic Online Dashboards. Want to build or facilitate ETL jobs without programming and without substantial MapReduce skills? Get the job done with a minimum amount of fuss and support costs? Here is how to get started.
A morphline is a rich configuration file that makes it easy to define a transformation chain that consumes any kind of data from any kind of data source, processes the data and loads the results into a Hadoop component. It replaces Java programming with simple configuration steps, and correspondingly reduces the cost and integration effort associated with developing and maintaining custom ETL projects.
Morphlines is a library, embeddable in any Java codebase. A morphline is an in-memory container of transformation commands. Commands are plugins to a morphline that perform tasks such as loading, parsing, transforming, or otherwise processing a single record. A record is an in-memory data structure of name-value pairs with optional blob attachments or POJO attachments. The framework is extensible and integrates existing functionality and third party systems in a straightforward manner.
The morphline commands were developed as part of Cloudera Search. Morphlines power ETL data flows from Flume and MapReduce and HBase into Apache Solr. Flume covers the real time case, whereas MapReduce covers the batch processing case. Since the launch of Cloudera Search morphline development graduated into the Cloudera Development Kit(CDK) in order to make the technology accessible to a wider range of users and products, beyond Search. The CDK is a set of libraries, tools, examples, and documentation focused on making it easier to build systems on top of the Hadoop ecosystem. The CDK is hosted on GitHub and encourages involvement by the community. For example, morphlines could be embedded into Crunch, HBase, Impala, Pig, Hive, or Sqoop. Let us know where you want to take it!
Morphlines can be seen as an evolution of Unix pipelines where the data model is generalized to work with streams of generic records, including arbitrary binary payloads. A morphline is an efficient way to consume records (e.g. Flume events, HDFS files, RDBMS tables or Avro objects), turn them into a stream of records, and pipe the stream of records through a set of easily configurable transformations on the way to a target application such as Solr, for example as outlined in the following figure:
In this figure, a Flume Source receives syslog events and sends them to a Flume Morphline Sink, which converts each Flume event to a record and pipes it into a readLine command. The readLine command extracts the log line and pipes it into a grok command. The grok command uses regular expression pattern matching to extract some substrings of the line. It pipes the resulting structured record into the loadSolr command. Finally, the loadSolr command loads the record into Solr, typically a SolrCloud. In the process, raw data or semi-structured data is transformed into structured data according to application modelling requirements.
The Morphline framework ships with a set of frequently used high level transformation and I/O commands that can be combined in application specific ways. The plugin system allows the adding of new transformations and I/O commands and integrates existing functionality and third party systems in a straightforward manner.
This integration enables rapid Hadoop ETL application prototyping, complex stream and event processing in real time, flexible log file analysis, integration of multiple heterogeneous input schemas and file formats, as well as reuse of ETL logic building blocks across Hadoop ETL applications.
The CDK ships an efficient runtime that compiles a morphline on the fly. The runtime executes all commands of a given morphline in the same thread. Piping a record from one command to another implies just a cheap Java method call. In particular, there are no queues, no handoffs among threads, no context switches and no serialization between commands, which minimizes performance overheads.
Morphlines manipulate continuous or arbitrarily large streams of records. A command transforms a record into zero or more records. The data model can be described as follows: A record is a set of named fields where each field has an ordered list of one or more values. A value can be any Java Object. That is, a record is essentially a hash table where each hash table entry contains a String key and a list of Java Objects as values. Note that a field can have multiple values and any two records need not use common field names. This flexible data model corresponds exactly to the characteristics of the Solr/Lucene data model.
Not only structured data, but also binary data can be passed into and processed by a morphline. By convention, a record can contain an optional field named _attachment_body, which can be a Java java.io.InputStream or Java byte[]. Optionally, such binary input data can be characterized in more detail by setting the fields named _attachment_mimetype (such as “application/pdf”) and _attachment_charset (such as “UTF-8”) and _attachment_name (such as “cars.pdf”), which assists in detecting and parsing the data type. This is similar to the way email works.
This generic data model is useful to support a wide range of applications. For example, the Apache Flume Morphline Solr Sink embeds the morphline library and executes a morphline to convert flume events into morphline records and load them into Solr. This sink fills the body of the Flume event into the _attachment_body field of the morphline record, as well as copies the headers of the Flume event into record fields of the same name. As another example, the Mappers of the MapReduceIndexerTool fill the Java java.io.InputStream referring to the currently processed HDFS file into the _attachment_body field of the morphline record. The Mappers of the MapReduceIndexerTool also fill metadata about the HDFS file into record fields, such as the file’s name, path, size, last modified time, etc. This way a morphline can act on all data received from Flume and HDFS. As yet another example, the Morphline Lily HBase Indexer fills a HBase Result Java POJO into the _attachment_body field of the morphline record. This way morphline commands such as extractHBaseCells can extract data from HBase updates and correspondingly update a Solr index.
We also talked a good deal about Apache Zookeeper and some of the history back from when Zookeeper was originally at Yahoo! and Patrick’s experience since then. To hear everything that Patrick had to say please subscribe to the podcast.
Getting started with Apache Mesos and Apache Aurora using Vagrant
Apache Mesos is a cluster manager that provides efficient resource isolation and sharing across distributed applications, or frameworks. Think of it as the “kernel” for your data center. Paco Nathan talked about this on one of the All Things Hadoop Podcasts.
Features:
- Fault-tolerant replicated master using ZooKeeper
- Scalability to 10,000s of nodes
- Isolation between tasks with Linux Containers
- Multi-resource scheduling (memory and CPU aware)
- Java, Python and C++ APIs for developing new parallel applications
- Web UI for viewing cluster state
Apache Aurora is a service scheduler that runs on top of Mesos, enabling you to run long-running services that take advantage of Mesos’ scalability, fault-tolerance, and resource isolation. Apache Aurora is currently a part of the Apache Incubator. The main benefits to a Mesos scheduler like Aurora (and Marathon) is not having to worry about using the Mesos API to take advantage of the grid. Your application can work the way it does today while Mesos figures out what server(s) to run it on and when to scale that differently from the scheduler.
Features:
- Deployment and scheduling of jobs
- The abstraction a “job” to bundle and manage Mesos tasks
- A rich DSL to define services
- Health checking
- Failure domain diversity
- Instant provisioning
First you need to make sure that you have vagrant and virtual box installed, if you don’t already have these installed then install them please.
1) Install Vagrant http://www.vagrantup.com/
2) Install Virtual Box https://www.virtualbox.org/
That is all you need (assuming you also have git installed). Everything else from here is going to be done from within the virtual machine.
git clone https://github.com/apache/incubator-aurora cd incubator-aurora vagrant up
The virtual machines will take some time to spin up so hang tight.
Once the virtual machines are launched you will have your command prompt back and ready to go.
There are 5 vms that are launched: devtools, zookeeper, mesos-master, mesos-slave and aurora-scheduler and they are all configured and networked together (for more info on this check out the Vagrantfile).
Next step is to create an app on the scheduler to provision it to the Mesos cluster that is running.
vagrant ssh aurora-scheduler vagrant@precise64:~$ cd /vagrant/examples/jobs/ vagrant@precise64:~$aurora create example/www-data/prod/hello hello_world.aurora INFO] Creating job hello INFO] Response from scheduler: OK (message: 1 new tasks pending for job www-data/prod/hello) INFO] Job url: http://precise64:8081/scheduler/www-data/prod/hello
Now go to your browser and pull up http://192.168.33.5:8081/scheduler/www-data/prod/hello and you’ll see your job running
Basically all of what is happening is in the configuration
hello = Process( name = 'hello', cmdline = """ while true; do echo hello world sleep 10 done """) task = SequentialTask( processes = [hello], resources = Resources(cpu = 1.0, ram = 128*MB, disk = 128*MB)) jobs = [Service( task = task, cluster = 'example', role = 'www-data', environment = 'prod', name = 'hello')]
It is an exciting time for virtualization and resource scheduling and process provisioning within infrastructures. It is all open source so go dig in and see how it all works for yourself.
Apache Zookeeper, Distributed Systems, Open Source and more with Camille Fournier
Episode #13 of the podcast is a talk with Camille Fournier Available also on iTunes
Apache Zookeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services. All of these kinds of services are used in some form or another by distributed applications. Each time they are implemented there is a lot of work that goes into fixing the bugs and race conditions that are inevitable. Because of the difficulty of implementing these kinds of services, applications initially usually skimp on them ,which make them brittle in the presence of change and difficult to manage. Even when done correctly, different implementations of these services lead to management complexity when the applications are deployed.
Camille talked about discovery services, distributed locking as well as some tips to developing against and operating Zookeeper in production including how to build a Global, Highly Available Service Discovery Infrastructure with ZooKeeper which she also wrote about on her blog http://whilefalse.blogspot.com/2012/12/building-global-highly-available.html.
Camille gave some great insights about how to apply Open Source community practices to an organization’s SDLC to foster a better culture for better products and services where all developers need to own more parts of their software (like it is in Open Source projects). #devops #qaops #userops
Subscribe to the podcast and listen to what Camille had to say. Available also on iTunes
/*********************************
Joe Stein
Founder, Principal Consultant
Big Data Open Source Security LLC
http://www.stealth.ly
Twitter: @allthingshadoop
**********************************/