Using SOLR Cloud as a NoSQL database for low latency analytics

SOLR is a popular full text search engine based on Lucene. Not only is it very efficient to search documents but it is also very fast to run simple queries on relational data and unstructured data and can be used as a NoSQL database. SOLR4 was released recently and now includes SOLR Cloud that allows to scale SOLR by using sharding and replication.

Although SOLR is very fast for operations such as filtering, sorting and pagination, it does not support aggregations (similar to SELECT item_id, SUM(purchases) FROM t GROUP BY item_id in SQL) so they have to be aggregated in advance. We will show in this post how to install SOLR Cloud and then illustrate it with a simple e-commerce example where we describe how one can use SOLR Cloud to provide very responsive analytics. Finally we will provide some hints on how to design a schema to deal with pre-aggregations.

Use Case

Let’s consider an e-commerce site that stores the volume of monthly purchases for each item.

For that we will consider the following table items:

CREATE TABLE items (
  id INTEGER,
  name VARCHAR(256),
  year INTEGER,
  month INTEGER,
  shipping_method INTEGER,
  us_sold INTEGER,
  ca_sold INTEGER,
  fr_sold INTEGER,
  uk_sold INTEGER
);

The items table will contain for each item and for each month the number of purchases for each country (we only used 4 countries but you can use many more).

They are also broken up by different shipping methods:

  • 1: Fedex Overnight
  • 2: UPS ground 2-3 days
  • 3: UPS ground 3-5 days saver (free shipping)
  • 0: any shipping method (sum of all of the above)

We suppose that we have a responsive UI (that will use sorting and pagination) where analysts can have a very fast response time for queries like:

  • What are my best selling products in America (US and CA)?
  • What are my worst sales for Christmas 2012 for the product X in Europe that used Fedex overnight?
  • What are my sales for item X over the last 10 years broken up by shipping method

SOLR Cloud Architecture

SOLR runs inside a servlet container (e.g., Jetty, Tomcat, …). Because SOLR cloud shards and replicates the data across the different nodes in the cluster, every node has to know who is in charge of which shard. This information is stored in ZooKeeper that can be seen as a shared filesystem used to store shared data but also to do more complex things such as synchronization and leader election. ZooKeeper can also be distributed on multiple nodes to form a ZooKeeper ensemble to ensure fault resiliency.

We will consider in our architecture 2 nodes that will each has SOLR running and ZooKeeper as depicted in the figure below.

solr_cloud_arch

Replication and Partitioning

In order to scale data there are two different things one can do:
Read more of this post

Playing with HazelCast, a distributed datagrid on Amazon EC2 with jclouds-cli

datagridHazelcast is an open-source in-memory datagrid that allows to store data in memory distributed across a cluster of servers and to execute distributed tasks. It can be used as an in-memory database that can be queried using SQL-like queries or any filter that you can implement in Java. To prevent data loss, data in memory can be backed by a persistent storage (file, relational database, NoSQL database, …). Data can be persisted synchronously when the data are written to the in-memory database (write through) or asynchronously to batch the writes (write behind).

In applications which are read and write intensive, relying on a relational database server (or a master/slaves configuration) can be very inefficient as it often becomes a bottleneck and a single point of failure. With data in memory, reads and writes are very fast and as data is distributed and replicated there is no single point of failure. Indeed, if we consider a replication factor of 3, we have a primary and 2 backups nodes so if one node of the cluster were to go down, other nodes of the network can take over and get reassigned the data. In the catastrophic event where the database goes down, writes in the cache are queued in a log file so the writes can be persisted in the database once it is backed up.

There are other products offering similar features than Hazelcast:

  • Oracle Coherence: it is a very robust and popular data grid solution used by many financial companies and systems having to deal with a lot of transaction. It also has an active community.
  • VMWare Gemfire: It is used by some financial companies and provides most of the features Coherence has but the community is much smaller so it’s harder to find documentation.
  • GigaSpaces XAP: The system provides a lot of features. It allows among other things to dynamically instantiate services on multiple servers and handles services failover.

In this tutorial we are going to deploy hazelcast on an EC2 cluster. Then we will run some operations in the datagrid and finally we will stop the cluster.

Read more of this post

Playing with the Mahout recommendation engine on a Hadoop cluster

Elephant and riderApache Mahout is an open source library which implements several scalable machine learning algorithms. They can be used among other things to categorize data, group items by cluster, and to implement a recommendation engine.

In this tutorial we will run the Mahout recommendation engine on a data set of movie ratings and show the movie recommendations for each user.

For more details on the recommendation algorithm, you can look at the tutorial from Jee Vang.

Requirement

  • Java (to run hadoop)
  • Hadoop (used by Mahout)
  • Mahout
  • Python (use to show the result)

Running Hadoop

In this section, we are going to describe how to quickly install and configure hadoop on a single machine.
Read more of this post

A Hadoop Alternative: Building a real-time data pipeline with Storm

With the tremendous growth of the online advertising industry, ad networks have to deal with a humongous amount of data to process. For years, Hadoop has been the de-facto technology used to aggregate data logs but although it is efficient in processing big batches, it has not been designed to deal with real-time data. With the popularity of streaming aggregation engine (Storm, S4, …) ad networks started to integrate them in their data pipeline to offer real-time aggregation.

When you go to a website and see an ad, it’s usually the result of a process run by an ad network that runs a real-time bidding between different advertisers. Whoever puts the highest bid will have their ads displayed on your browser. Advertisers use different information to determine their bids based on the user’s segment, the page the user is on and a lot of other factors. This process is usually run by an ad server.

Each of these (ad) impressions is usually logged by an ad server and sent to a data pipeline that is in charge of aggregating these data (e.g., number of impressions for a particular advertiser in the last hour). The data pipeline usually processes billions and billions of logs daily and it’s a challenge to have the resources to be able to cope with the huge input of data. The aggregation must also be made available to advertisers and publishers in a reasonable amount of time so that they can adapt their strategy in their campaigns (an advertiser might see that they manage to get sales from a particular publisher and thus wants to display more ads on their website). So, the faster they get these data, the better, so companies are trying to aggregate data and made them available up to the last hour and for some up to the minute.

adnetwork-hadoop

Data pipeline based on Hadoop

Most of the ad networks use Hadoop to aggregate data. Hadoop is really efficient in processing a large amount of data but it is not suited for real-time aggregation where data need to be available to the minute. Usually the way it works is each ad server sends its logs to the data pipeline continuously through a queue mechanism. Then Hadoop is scheduled to run an aggregation every hour and then store it in a data warehouse.

More recently, some ad networks started to use Storm which is a real-time aggregation ETL developed by Nathan Marz in late 2011. In this new architecture, Storm can read the stream of logs from the adservers, aggregate them on the fly and store the aggregated data directly in the data warehouse as soon as they’re available. We explore here through a simple example how to develop and deploy a simple real-time aggregator that aggregates data by minute, by hour and by day.
Read more of this post

Pushing real-time data to the browser using cometD and Spring

Comet is a set of techniques which allows web applications to push data to the browser. It is also known as Ajax Push, Reverse Ajax and HTTP push server among others.

It is used in web applications to display fluctuating stock prices in real time, in chat applications, collaborative documentation editing and a lot of other applications.

Without comet, the only way the browser can get updates from the server is by periodically issuing AJAX calls to check for updates. This technique is not efficient in term of bandwidth and load on the server but has the advantage of being easily implemented.

Comet can be implemented by using the following techniques:

  • long-polling: the browser issues a request to the server. The server waits until there is a message available to be sent to the client or after a suitable timeout. Then the browser immediately sends a new request to the server to repeat this process.
  • callback-polling: the web page uses a script tag to load javascript code. The server keeps the connection open and sends a callback of a function with the message content in parameter (e.g. handleMessage({“msg”:”this is the message 1″})). The server can sends multiple messages with the same connection until a timeout occurs. The browser handles those calls as they come in(it does not need to wait for the end of the connection). This method is quite effective as we don’t need to constantly open and close the connection.
  • websocket: it allows a browser to open a full duplex communication to the server using a single TCP connection. All the main browsers have this implemented in their latest version.
  • flash socket/java applet socket: it uses a flash applet or a java applet to open a durable connection with the server. When the applet receives data from the server, it forwards them to the web page by calling callback javascript method.

For a good description of those techniques, you can look at the push technology page on wikipedia.

Depending on the browser, its version and the network settings (proxy, firewall), some of those techniques might work and some might not. So to overcome this issue, comet frameworks usually implement several techniques.

The are a lot of comet frameworks on the market today. We have tried the following in the past 4 years:

  • LightStreamer. They have an interesting push model which is a mix of a cache model and publisher/subscriber model. The application writes data to a cache. The client(browser, java application, flash application, …) can get the current value of an element in the cache and listen to its updates. If the client asks for an element in the cache which is not there yet, lightstreamer can try to populate it(from the database or another source) and sends it to the client.
  • CometD. It’s an open source project developed by the Dojo Foundation. It uses the publisher subscriber model to push the data to the clients.
  • DWR. This open source AJAX framework was very popular 4 years ago but the project seems to have not been updated for several years. It allows to define proxy class in javascript to directly call java method and in the last version 3, it provides a reverse ajax implementation.

cometd-screenshot

In our tutorial, we are going to get a live stream of twitter statuses, and publish them to a web page using cometD. CometD supports multiple techniques to push data to the browser: websocket, long-polling, and callback-polling.
Read more of this post

Using Hadoop Pig with MongoDB

In this post, we’ll see how to install MongoDB support for Pig and we’ll illustrate it with an example where we join 2 MongoDB collections with Pig and store the result in a new collection.

Requirements

Building Mongo Hadoop

We’re going to use the GIT project  developed by 10gen but with a slightly modification that we made. Because the Pig language doesn’t support variable that starts with underscore (e.g., _id) which is used in MongoDB, we added the ability to use it by replacing the _ prefix with u__ so _id becomes u__id.

First get the source:

$ git clone https://github.com/darthbear/mongo-hadoop

Compile the Hadoop pig part of it:

$ ./sbt package
$ ./sbt mongo-hadoop-core/package
$ ./sbt mongo-hadoop-pig/package
$ mkdir ~/pig_libraries
$ cp ./pig/target/mongo-hadoop-pig-1.1.0-SNAPSHOT.jar \
./target/mongo-hadoop-core-1.1.0-SNAPSHOT.jar ~/pig_libraries

Running a join query with Pig on MongoDB collections

One of the thing you can’t do in MongoDB is to do a join between 2 collections. So let’s see how we can do it simply with a pig script.
Read more of this post

Playing with Hadoop Pig

Hadoop Pig is a tool to manipulate data from various sources (CSV file, MySQL, MongoDB, …) using a procedural language (Pig Latin). It can run standalone or distributed with Hadoop. Unlike Hive, it can manipulate non-relational data and do things like aggregations, joins (including left joins), regular expression, sorting, …

In our post, in order to simplify we will consider only standalone executions of Pig.

To install Pig in Ubuntu:

$ sudo apt-get install hadoop-pig

Let’s take a simple example by considering the geo location database from Maxmind. Download the latest GeoLite City file in CSV format (e.g., GeoLiteCity_20130101.zip ) from: http://geolite.maxmind.com/download/geoip/database/GeoLiteCity_CSV/.

The file in the zip we’re interested in is GeoLiteCity-Location.csv.

Let’s remove the header from the csv file:

$ tail -n +3  GeoLiteCity-Location.csv > location.csv

Let’s first start by loading the CSV file and display it:

data = LOAD 'location.csv' USING PigStorage(',')
       AS (locId, country, region, city, postalCode,
           latitude, longitude, metroCode, areaCode);
dump data;

To run it:

$ pig -x local script.pig

The first line maps the different columns in the CSV to fields which are by default chararray (e.g., city:chararray).
Let’s get rid of information we don’t need and only keep city, region and country:

cities = FOREACH data GENERATE city, region, country;

Now let’s only keep US cities where the city name is set:

usCities = FILTER cities BY country == '"US"' AND city != '""';

Now let’s try to see what city names are popular, i.e., see how many states use the city names.
In SQL, the query would be something like:
Read more of this post