Scaling an application usually involves adding processing nodes. This means you end up with valuable data (e.g. server logs) existing on multiple different machines. Very often, we want to mine those logs for useful information.
One way to do this would be to put all the logs in one place and run some kind of computation over all the data. This is relatively simple, but it really doesn't scale. Pretty soon, you reach the point where the machine which is analysing a day's worth of log data is taking more than a day to do it.
Another way is to let each processing node analyse its own logs, and then have some way to collate the analysis from each node to find the overall answer you're looking for. This is a more practical solution for the long term, but it still has a couple of problems;
1. A processing node should be processing - i.e. doing the task for which it was created. Ideally, you wouldn't want to bog it down with other responsibilities. If you do, you probably need to make the node more powerful than it needs to be in order to carry out its primary task, so you're paying for extra capacity just in case you want to run a log analysis task.
2. In the case of log data in particular, keeping the logs on the node which created them generally means you have to keep the node around too. This makes it awkward to remove nodes if you want to scale down, or replace nodes with better nodes, because you have to worry about copying the logs off the node and keeping them somewhere.
It would be nice if we could have each node push its logs into something like Amazon S3 for storage, and spin up a distributed computing task whenever we want to run some analysis. Amazon Elastic Map Reduce (EMR) is designed to work in exactly this way, but the learning curve for writing map/reduce job flows is pretty steep - particularly if you're used to writing simple scripts to get useful information out of log data.
As of October 1st 2009, Amazon EMR supports Apache Hive, which makes things a lot easier.
What is Hive?
The proper answer is here.
The way I think of Hive is that it lets you pretend that a whole mess of semi-structured log files are actually big database tables, and then helps you run SQL-like queries over those tables. All this without having to actually insert the data into any kind of table, and without having to know how to write distrubuted map/reduce tasks.
Using Hive with Amazon EMR
This is a very basic introduction to working with Hive on Amazon EMR. Very basic because I've only just started looking into this myself.
You will need to be signed up for Amazon Web Services, including S3 and Elastic Map Reduce.
I'm going to go through part of an exercise from the Cloudera Introduction to Hive, which I strongly recommend working through. That training exercise uses a Cloudera VMWare virtual appliance running Hive. Here is how to I did a similar task using Hive on Amazon EMR.
For this exercise, we're going to take a data file consisting of words and the frequency of occurrence of those words within the complete works of William Shakespeare. The file consists of a number of lines like this;
The first value is an integer saying how many times the word occurs, then a tab character, then the word. This file is generated by an earlier exercise in one of the Cloudera Hadoop tutorials. If you don't feel like running through those exercises, just generate a file containing a bunch of numbers and words, separated by a tab character, and use that.
Upload the data to S3
Before we can analyse the data, we need it to be available in S3. Create a bucket called "hive-emr", and upload your data file into it using the key "/tables/wordfreqs/whatever". In my case, I have the tab-delimited text file in;
NB: The S3 path "hive-emr/tables/wordfreqs" is going to be our Hive table. If you're unfamiliar with S3, "hive-emr" is the name of our bucket, and 'tables/wordfreqs/shakespeare.txt' is the key whose value is the contents of our "shakespeare.txt" file.
Everything in the 'directory' "tables/wordfreqs/" (which isn't really a directory, but we can pretend it is) must be parseable as data for our table, so don't put any other types of file in there. You could, if you wanted, have more than one tab-delimited text file though, and all of the data in all of those files would become records in the same Hive table.
It's also important not to have any underscores in the S3 bucket or key. S3 will happily let you create and upload files to buckets/keys with underscores, but you'll get an S3 URI error when you try to create the table in Hive.
I'm using s3sync to upload the data files, but you can use anything you want provided you get the data into S3 with the correct bucket and key name.
Generating an EC2 Key Pair
We need a key pair to enable us to SSH onto our Hive cluster, when we've started it. If you don't have a suitable key pair already, sign in to the Amazon Web Services console and go to the Amazon EC2 tab. Near the bottom of the left-hand column, use the "Key Pairs" function to generate a key pair and save the secret key to your local machine.
Be aware of the "Region" you're using - key pairs will only work for servers of the same region. I'm using "EU-West", but it doesn't matter which you use, as long as you're consistent.
Sign in to the Amazon Web Services console and go to the Amazon Elastic MapReduce tab (you won't see the tab if you haven't signed up to the service, so make sure you do that first).
Click "Create New Job Flow". Make sure you're using the same region you used when you generated your key pair.
Give the job flow a name, and select "Hive Program" for the job type.
On the next screen, choose "Start an Interactive Hive Session".
On the next screen, we choose the number and size of the machines we want to comprise our cluster. In real life use, using a lot of big machines will make things go faster. For the purpose of this exercise, one small instance will do. We're not doing anything heavyweight here, and we only have one data file, so there isn't much point spending the extra money to run lots of large machines.
Select the key pair you generated earlier, and start the job flow. Don't forget to terminate the job flow when you've finished, otherwise you'll be paying to keep an idle cluster going.
Now we have to wait for the cluster to start up and reach the point where it's ready to do some work. This usually takes a few minutes.
When the job flow status is "WAITING", click on the job flow and scroll down in the lower pane to get the "Master Public DNS Name" assigned to your cluster so that we can SSH to it.
From a terminal window, ssh onto your cluster like this;
ssh -i key/hive.pem firstname.lastname@example.org
Replace key/hive.pem with the location and filename of the secret key you created and saved earlier.
Replace "ec2-79-125-30-42.eu-west-1.compute.amazonaws.com" with the Master Public DNS Name of your cluster. The username 'hadoop' is required.
You should now have a terminal prompt like this;
Type "hive" to get to the hive console. This is an interactive shell that works in a similar way to the "mysql" command-line client.
Creating a table
We're almost ready to start querying our data. First, we have to tell Hive where it is, and what kind of data is contained in our file.
Type these lines into the hive shell;
hive> create external table wordfreqs (freq int, word string)
> row format delimited fields terminated by '\t'
> stored as textfile
> location 's3://hive-emr/tables/wordfreqs';
Time taken: 1.29 seconds
Note that we didn't need to put "shakespeare.txt" as part of the location. Hive will look at the location we gave it and, provided all the "files" in that "directory" have the right kind of contents (lines consisting of an integer, a tab character and a string), all of their contents will be accessible in the 'wordfreqs' table.
Now that we've told Hive how to find and parse our data, we can start asking questions in almost the same way as we would do if it were in a mysql table.
hive> select * from wordfreqs limit 5;
Time taken: 4.868 seconds
So far, so good - even though that's a long time to take for a very simple query. Let's try something a little more interesting;
hive> select count(word) from wordfreqs;
Here is the output I got from this;
Total MapReduce jobs = 1
Number of reduce tasks determined at compile time: 1
In order to change the average load for a reducer (in bytes):
In order to limit the maximum number of reducers:
In order to set a constant number of reducers:
Starting Job = job_200911121319_0001, Tracking URL = http://ip-10-227-111-150.eu-west-1.compute.internal:9100/jobdetails.jsp?jobid=job_200911121319_0001
Kill Command = /home/hadoop/bin/../bin/hadoop job -Dmapred.job.tracker=ip-10-227-111-150.eu-west-1.compute.internal:9001 -kill job_200911121319_0001
2009-11-12 01:37:33,004 map = 0%, reduce =0%
2009-11-12 01:37:46,653 map = 50%, reduce =0%
2009-11-12 01:37:47,665 map = 100%, reduce =0%
2009-11-12 01:37:55,709 map = 100%, reduce =100%
Ended Job = job_200911121319_0001
Time taken: 28.455 seconds
All of that is Hive translating our sql-like query into MapReduce jobs that are then farmed out to our cluster. Since we're using a single, small instance, and since we only have one data file, there isn't any parallelisation happening, and the whole thing runs quite slowly. But, in principle, we could have terabytes of data files in our S3 bucket, and be using more and much larger machines in our cluster. Under those circumstances, we should see major gains from using Hive.
FYI, the reason the first query didn't have this kind of output is that Hive is smart enough to figure out that no MapReduce trickery is necessary for this request - it can just read a few lines from the file to satisfy the query.
This has been a very quick and simple introduction to Hive on Amazon EMR. I hope you found it useful. I plan to go into more advanced, and hopefully more useful, territory in future posts.
PS: Don't forget to terminate your Hive cluster!