Playing with Apache Hive, MongoDB and the MTA

Apache Hive is a popular datawarehouse system for Hadoop that allows to run SQL queries on top of Hadoop by translating queries into Map/Reduce jobs. Due to the high latency incurred by Hadoop to execute Map/Reduce jobs, Hive cannot be used in applications that require fast access to data. One common technique is to use Hive to pre-aggregate data logs stored in HDFS and then sync the data to a Datawarehouse.

In this post we’re going to describe how to install Hive and then, as New York City straphangers, we’re going to load subway train movement data from the MTA in HDFS, execute Hive queries to aggregate the number of daily average train movements per line and store the result in MongoDB.

Requirements

  • Java installed
  • Hadoop running
  • MongoDB running

Running Hadoop

In this section, we are going to describe how to quickly install and configure Hadoop on a single machine.

Alternatively you can follow the instruction on this post to deploy Hadoop for free on an Amazon EC2 cluster.

To install Hadoop on your local box, go to http://www.apache.org/dyn/closer.cgi/hadoop/common/ and download hadoop-1.1.1.tar.gz
Uncompress the archive:

tar xvfz hadoop-1.1.1-bin.tar.gz

Edit the file conf/hadoop-env.sh and add the following line:

export JAVA_HOME=<JDK DIRECTORY>

Generate a rsa key to ssh to your local box without password:

ssh-keygen -t rsa -P ''

And save in <HOME>/.ssh/id_rsa
Now authorize the access to your local box to itself

cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys

Check that it works by doing

ssh localhost ls

It should not ask for your password.

Now set the environment variables:

export HADOOP_PREFIX=<HADOOP DIRECTORY>
export HADOOP_CONF_DIR=$HADOOP_PREFIX/conf
export PATH=$HADOOP_PREFIX/bin:$PATH

To configure HDFS, edit the file conf/core-site.xml and add the following property in configuration:

<configuration>
        <property>
                <name>fs.default.name</name>
                <value>hdfs://localhost:9000</value>
        </property>
</configuration>

Then format the HDFS filesystem:

hadoop namenode -format

We are now ready to start hadoop:

start-all.sh

Installing Hive

Before running Hive, you need to create a few directories in HDFS:

$ hadoop fs -mkdir /tmp
$ hadoop fs -mkdir /user/hive/warehouse
$ hadoop fs -chmod g+w /tmp
$ hadoop fs -chmod g+w /user/hive/warehouse

Download the latest version of Hive from http://www.apache.org/dyn/closer.cgi/hive/.
We suppose in this post that the latest version is: hive-0.10.0-bin.tar.gz.

Untar it:

$ tar xvfz hive-0.10.0-bin.tar.gz
$ cd hive-0.10.0-bin
$ export HIVE_HOME=$PWD
$ export PATH=$HIVE_HOME/bin:$PATH

Installing MongoDB connector for Hive

Get the source from GitHub:

$ git clone https://github.com/yc-huang/Hive-mongo
$ cd Hive-mongo
$ mvn package
$ sudo cp target/hive-mongo-0.0.1-SNAPSHOT-jar-with-dependencies.jar $HIVE_HOME/lib

Importing Data into Hive

MTA offers free data such as train movements, bus movements, and even turnstile usage. You can find them at: http://mta.info/developers/download.html

Let’s download the subway train movement history for the lines 1/2/3 and 4/5/6 during May 2011 at: A Division Historical Data May 20111.zip

Now let’s import the data (we remove the headers from the files):

$ unzip "A Division Historical Data May 2011.zip"
$ mv "A Division Historical Data May 2011" test_data
$ cd test_data
$ for i in *.csv
do
  tail -n +2 $i >> file.csv
done
$ hive
hive> CREATE TABLE IF NOT EXISTS train_movements (
     service_date STRING,
     train_id STRING,
     direction_id STRING,
     event_timestamp STRING,
     event_type STRING,
     route_id STRING,
     stop_id STRING,
     track_id STRING
) ROW FORMAT
DELIMITED FIELDS TERMINATED BY ',';
OK
Time taken: 9.125 seconds

hive> LOAD DATA LOCAL INPATH 'file.csv' OVERWRITE INTO TABLE train_movements;
OK
Time taken: 16.679 seconds

Let’s import the table that contains the mapping between route_id and names.
Download the file http://www.mta.info/developers/data/nyct/subway/google_transit.zip, unzip it in the same test_data directory and let’s load them into Hive.

$ tail -n +2 routes.txt > routes.csv
$ hive
hive > CREATE TABLE IF NOT EXISTS routes (
    route_id INT,
    agency_id STRING,
    route_short_name STRING,
    route_long_name STRING,
    route_desc STRING,
    route_type INT,
    route_url STRING,
    route_color STRING,
    route_text_color STRING
) ROW FORMAT
DELIMITED FIELDS TERMINATED BY ',';
OK
Time taken: 0.012 seconds

hive > LOAD DATA LOCAL INPATH 'routes.csv' OVERWRITE INTO TABLE routes;
OK
Time taken: 1.203 seconds

Aggregating data

In order to store the data in MongoDB, we can create a table mapped to a MongoDB collection.
So in our case of the table that gives the number of train movements for each line, we can create the
following external table in MongoDB:

hive > CREATE EXTERNAL TABLE mongo_popular_routes (
    id INT,
    name STRING,
    daily_movements INT
) STORED BY "org.yong3.hive.mongo.MongoStorageHandler"
WITH serdeproperties ("mongo.column.mapping" = "_id,name,daily_movements")
tblproperties (
    "mongo.host" = "localhost",
    "mongo.port" = "27017",
    "mongo.db" = "test",
    "mongo.collection" = "popular_routes"
);
OK
Time taken: 0.417 seconds

Note that when we created the external mongo table, in mongo.column.mapping we didn’t put any white spaces between the field names. If you do the Mongo plugin will add a white space in the field name.

Now let’s run the aggregation that will give us the number of train movements per line.
Since we have 31 days of data, we can divide the number of movements by 31. Also in the csv, we
have a row when a train arrives at a station and when it leaves the station (event_type) so in our case we can only consider one of them (event_type = 1).

hive > insert into table mongo_popular_routes
select r.route_short_name as id,
    r.route_long_name as name,
    cast(count(*) / 31 as INT) as daily_movements
from train_movements tm
join routes r
on tm.route_id = r.route_id
where tm.event_type = 1
group by r.route_short_name, r.route_long_name;
OK
Time taken: 84.547 seconds

Now let’s see the the data in MongoDB:

$ mongo
> use test;
> db.popular_routes.find().sort({"daily_movements":-1});
{ "_id" : 6, "daily_movements" : 15407, "name" : "Lexington Avenue Local" }
{ "_id" : 1, "daily_movements" : 13464, "name" : "Broadway - 7 Avenue Local" }
{ "_id" : 4, "daily_movements" : 9916, "name" : "Lexington Avenue Express" }
{ "_id" : 2, "daily_movements" : 9279, "name" : "7 Avenue Express" }
{ "_id" : 3, "daily_movements" : 8058, "name" : "7 Avenue Express" }
{ "_id" : 5, "daily_movements" : 4535, "name" : "Lexington Avenue Express" }

So we can conclude that living near the 6 line is great!

As an exercise, you can use the file stops.txt from the google transit file and see what stops have the most trains!

Conclusion

We showed in this simple example how to use Hive to aggregate data and then store the result in MongoDB. While Hive is powerful and can be used to easily aggregate data using SQL, it cannot be used for fast analytics. Indeed the aggregation time due to the Map / Reduce phases makes it more suitable for offline aggregation which is then stored in a fast database for handling user requests.

About chimpler
http://www.chimpler.com

3 Responses to Playing with Apache Hive, MongoDB and the MTA

  1. Pingback: Playing with Apache Hive and SOLR | Chimpler

  2. Gopi says:

    Thanks. Typo here: cat ~/.ssh/id_rsa.pub >> ~/.ssh/~/authorized_keys

Leave a reply to chimpler Cancel reply