Playing with Hadoop Pig

Hadoop Pig is a tool to manipulate data from various sources (CSV file, MySQL, MongoDB, …) using a procedural language (Pig Latin). It can run standalone or distributed with Hadoop. Unlike Hive, it can manipulate non-relational data and do things like aggregations, joins (including left joins), regular expression, sorting, …

In our post, in order to simplify we will consider only standalone executions of Pig.

To install Pig in Ubuntu:

$ sudo apt-get install hadoop-pig

Let’s take a simple example by considering the geo location database from Maxmind. Download the latest GeoLite City file in CSV format (e.g., ) from:

The file in the zip we’re interested in is GeoLiteCity-Location.csv.

Let’s remove the header from the csv file:

$ tail -n +3  GeoLiteCity-Location.csv > location.csv

Let’s first start by loading the CSV file and display it:

data = LOAD 'location.csv' USING PigStorage(',')
       AS (locId, country, region, city, postalCode,
           latitude, longitude, metroCode, areaCode);
dump data;

To run it:

$ pig -x local script.pig

The first line maps the different columns in the CSV to fields which are by default chararray (e.g., city:chararray).
Let’s get rid of information we don’t need and only keep city, region and country:

cities = FOREACH data GENERATE city, region, country;

Now let’s only keep US cities where the city name is set:

usCities = FILTER cities BY country == '"US"' AND city != '""';

Now let’s try to see what city names are popular, i.e., see how many states use the city names.
In SQL, the query would be something like:

SELECT city, COUNT(*) AS c
FROM cities
WHERE region='US'

In Pig, we have to use different steps:

/* Group by city */
cityGroups = GROUP usCities BY city;

* In the CSV file, the same city can appear many times in a state, so we first
* meed to make sure to remove duplicate states using DISTINCT.
* Note that we can access the cityGroups fields 'group' the key used for grouping
* and 'usCities' as the value of the group.
numStatesByCities = FOREACH cityGroups {
        states = DISTINCT usCities.state;
    FLATTEN(group) AS city,
    COUNT(states) as numStates;

/* Order cities by number of occurences in states */
sortedCitiesByNumStates = ORDER numStatesByCities BY numStates DESC;

/* Only keep the first 10 cities */
result = LIMIT sortedCitiesByNumStates 10;

dump result;

Let’s run it again:

$ pig -x local script.pig

This will generate the following output:



So in summary, the following SQL operations can be translated as follows. We put the order in which the operations have to be run between parenthesis.

  • SELECT id, name:  resultData = FOREACH limitData GENERATE id, name (5)
  • FROM Table: data = LOAD ‘person.csv’ USING PigStorage(‘,’) AS (id:int, name:chararray, age:int)   (1)
  • WHERE a=1: filteredData = FILTER data BY a=1 (2)
  • ORDER BY age DESC: orderedDataORDER filteredData BY age DESC (3)
  • LIMIT 10: limitData = LIMIT orderedData 10 (4)

One can also use left join and join as follows:

  • JOIN: join_data: JOIN data1 BY id1, data2 BY id2
  • LEFT JOIN: left_join_data = JOIN data1 BY id1 LEFT OUTER, data2 BY id2

You can also use string manipulation on JOIN. For instance

left_join = 
      JOIN website1
                BY LOWER(
                      REGEX_EXTRACT(website1_url, 'http.*://(?:www\\.)?([^/]+)', 1)
      LEFT OUTER, website2
          BY website2_url;

will extract in the domain name out of the url to do the join.

The resulting collection will prefix all the fields with the origin collection as follows:

  • website1::website1_url
  • website1::other_fields
  • website2::website2_url
  • website2::other_fields

GROUP BY is a little trickier. When you do something like:

groupedData = GROUP cities BY (country, region);

you actually create rows like:

(("US","NY"), {("New York", "NY", "US"), ("Albany", "NY", "US")})
(("US","CA"), {("San Francisco", "CA", "US"), ("San Jose", "CA", "US")})

where (“US”, “NY”) is a composite key and {(“New York”, “NY”, “US”), (“Albany”, “NY”, “US”)} is a bag.
The key will be manipulated by using the keyword group and the bag through the original collection name before the group occurred (GROUP cities).

Suppose you want to do a count, then you would need first to flatten the key so it become simply “US”, “NY” in the tuple and apply an aggregation function to the bag. In order to do so, you can do:

numCitiesInStates = FOREACH groupedData {
        FLATTEN(group) AS (country, state),
        COUNT(cities) as numStates;

In our next post we’ll show how to load data and store data to MongoDB.

About chimpler

2 Responses to Playing with Hadoop Pig

  1. Pingback: Playing with Hadoop Pig | EEDSP |

  2. debjyotipaul says:

    Reblogged this on Geekological and commented:
    SQL and pig!

Leave a Reply

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

%d bloggers like this: