Using SOLR Cloud as a NoSQL database for low latency analytics

SOLR is a popular full text search engine based on Lucene. Not only is it very efficient to search documents but it is also very fast to run simple queries on relational data and unstructured data and can be used as a NoSQL database. SOLR4 was released recently and now includes SOLR Cloud that allows to scale SOLR by using sharding and replication.

Although SOLR is very fast for operations such as filtering, sorting and pagination, it does not support aggregations (similar to SELECT item_id, SUM(purchases) FROM t GROUP BY item_id in SQL) so they have to be aggregated in advance. We will show in this post how to install SOLR Cloud and then illustrate it with a simple e-commerce example where we describe how one can use SOLR Cloud to provide very responsive analytics. Finally we will provide some hints on how to design a schema to deal with pre-aggregations.

Use Case

Let’s consider an e-commerce site that stores the volume of monthly purchases for each item.

For that we will consider the following table items:

CREATE TABLE items (
  id INTEGER,
  name VARCHAR(256),
  year INTEGER,
  month INTEGER,
  shipping_method INTEGER,
  us_sold INTEGER,
  ca_sold INTEGER,
  fr_sold INTEGER,
  uk_sold INTEGER
);

The items table will contain for each item and for each month the number of purchases for each country (we only used 4 countries but you can use many more).

They are also broken up by different shipping methods:

  • 1: Fedex Overnight
  • 2: UPS ground 2-3 days
  • 3: UPS ground 3-5 days saver (free shipping)
  • 0: any shipping method (sum of all of the above)

We suppose that we have a responsive UI (that will use sorting and pagination) where analysts can have a very fast response time for queries like:

  • What are my best selling products in America (US and CA)?
  • What are my worst sales for Christmas 2012 for the product X in Europe that used Fedex overnight?
  • What are my sales for item X over the last 10 years broken up by shipping method

SOLR Cloud Architecture

SOLR runs inside a servlet container (e.g., Jetty, Tomcat, …). Because SOLR cloud shards and replicates the data across the different nodes in the cluster, every node has to know who is in charge of which shard. This information is stored in ZooKeeper that can be seen as a shared filesystem used to store shared data but also to do more complex things such as synchronization and leader election. ZooKeeper can also be distributed on multiple nodes to form a ZooKeeper ensemble to ensure fault resiliency.

We will consider in our architecture 2 nodes that will each has SOLR running and ZooKeeper as depicted in the figure below.

solr_cloud_arch

Replication and Partitioning

In order to scale data there are two different things one can do:

  • partitioning / sharding: split the data
  • replication: copy the data to multiple servers

Using replication will speed up read operations (one can read the data from any servers having the data) but write operations will be slower (the system has to write the data to multiple servers). Moreover, replication also protects against failures. Indeed, if a collection is replicated 3 times, the cluster can survive 2 node failures.

Partitioning is mostly used to increase the overall capacity. By splitting a collection on 2 servers of 16GB, we can have a whole collection of 32GB. In this case the write operation will be faster as they can be run in parallel on different servers. Read operations can be faster or slower depending of the read operation (e.g., data sort can be slower on 2 different servers but basic lookups will be faster as they can be run on either of the servers).

Partitioning and replication can be both used at the same time in SOLR Cloud. If we consider a collection that we shard in 2 parts (numShards=2) and if we have 2 servers, each of them will be in charge of one shard (both being the respective shard leader). With 2 shards and 4 servers, we will have 2 servers that will act as leader nodes for the 2 shards and 2 servers as replica nodes for the shards. In this case, if a leader node of a shard fails, one of the shard replica becomes the leader.

One can hit any of the nodes in the cluster and queries will be rerouted to the proper shard or leader (for updates).

Note that in our simple example we only consider 2 SOLR nodes and a ZooKeeper ensemble of 2. In a production environment you would need to consider 4 SOLR nodes (2 used as replica nodes for each shard in case one of the leader shards fail) and a ZooKeeper ensemble of 3 so that it can survive the failure of one node and still have a quorum.

Installation

We ran the installation on 2 boxes:

  • padthai. We’ll suppose that this box will be started first and will be in charge of putting the SOLR config files into ZooKeeper (config bootstrap) so that couscous can pick up the config from there.
  • couscous

You can find some of the config files and scripts on our github:

$ git clone http://github.com/chimpler/blog-solr-cloud-example

Java

SOLR is written in Java, so we first have to get a JVM:

$ sudo apt-get install openjdk-7-jre

Jetty

SOLR runs inside a java servlet container (Tomcat, Jetty, …) so we need to get one first. Let’s install Jetty:

$ sudo apt-get install jetty

Edit the file /etc/default/jetty and update the following lines:

NO_START=0
JETTY_HOST=0.0.0.0
JETTY_PORT=8983
JAVA_HOME=/usr/lib/jvm/java-7-openjdk-amd64/

Note: 0.0.0.0 means all network interfaces so Jetty will listen on all network interfaces (loopback, ethernet interfaces, …).

ZooKeeper

In order to coordinate the different shards, SOLR relies on ZooKeeper:

$ sudo apt-get install zookeeper
$ sudo zookeeper-server-initialize

Apache ZooKeeper is a distributed coordination service used to share configurations between different servers, help with synchronization, leader election, … by exposing a virtual file system where different processes can write/read files to/from the file system. A Zookeeper ensemble is a set of ZooKeeper instances running together to ensure high-availability and fault tolerance. In our example we will configure the zookeeper ensemble to run on our 2 solr boxes.

In the case of SOLR Cloud, ZooKeeper is primarily used to share configuration (including schemas) between nodes and to keep the state of the cluster (who is in charge of each shard) and will be used at the beginning when nodes start and when there is a change in the state of the cluster (e.g., a node crashed). So it won’t be used much and there is no need to really tune ZooKeeper to make it fast in our case.

Each ZooKeeper node in the ensemble must have a unique id stored in /var/lib/zookeeper/myid which is a number between 1 and 255.

on padthai:  $ sudo sh -c "echo 1 > /var/lib/zookeeper/myid"
on couscous: $ sudo sh -c "echo 2 > /var/lib/zookeeper/myid"

Start ZooKeeper on both machines:

sudo zookeeper-server start

If things went well, you should have the followings:

// for the leader
$ tail /var/log/zookeeper/zookeeper.log
2013-02-12 22:00:34,285 [myid:1] - INFO  [LearnerHandler-/192.168.1.67:58523:Leader@549] - Have quorum of supporters; starting up and setting last processed zxid: 0x100000000

// for the other node
$ tail /var/log/zookeeper/zookeeper.log
2013-02-12 22:00:34,369 [myid:2] - INFO  [QuorumPeer[myid=2]/0:0:0:0:0:0:0:0:2181:Learner@325] - Getting a snapshot from leader

SOLR4

Before we go further let’s see the directory structure we’re going to use:

  • /etc/solr/core0: contains configuration file for core0
  • /var/lib/solr/core0: contains data for core0
  • /usr/share/solr: contains the SOLR binaries that will be run by Jetty

Let’s download SOLR4 from http://lucene.apache.org/solr/ and store the archive in /tmp
Let’s assume the latest version is 4.1.0.

$ cd /tmp
$ tar xvfz solr-4.1.0.tgz
$ sudo mkdir /usr/share/solr
$ cd /usr/share/solr
$ sudo jar xvf /tmp/solr-4.1.0/example/webapps/solr.war
$ sudo ln -s /usr/share/solr /usr/share/jetty/webapps/solr
$ sudo mkdirhier /var/lib/solr/core0
$ sudo chown -R jetty /var/lib/solr

As you can see we un-jarred the file solr.war because we are going to add a symbolic link to /etc/solr/conf/jetty-web.xml in /usr/share/solr/WEB-INF. Let’s create the file /etc/solr/conf/jetty-web.xml:

<?xml version="1.0"  encoding="ISO-8859-1"?>
<!DOCTYPE Configure PUBLIC "-//Mort Bay Consulting//DTD Configure//EN"
"http://jetty.mortbay.org/configure.dtd">

<!-- Set the solr.solr.home system property -->
<Configure class="org.mortbay.jetty.webapp.WebAppContext">
  <Call name="setProperty" class="java.lang.System">
    <Arg type="String">solr.solr.home</Arg>
    <Arg type="String">/etc/solr</Arg>
  </Call>
  <Call name="setProperty" class="java.lang.System">
    <Arg type="String">zkHost</Arg>
    <Arg type="String">padthai:2181,couscous:2181</Arg>
  </Call>
  <!-- only the first node has to use bootstrap_conf and specify
       the number of shards to populate ZooKeeper
       in our case, we have the following elements only for padthai
   -->
  <Call name="setProperty" class="java.lang.System">
    <Arg type="String">bootstrap_conf</Arg>
    <Arg type="String">true</Arg>
  </Call>
  <Call name="setProperty" class="java.lang.System">
    <Arg type="String">numShards</Arg>
    <Arg type="String">2</Arg>
  </Call>
</Configure>

Note that the bootstrap_conf has to be defined only by the first node that starts the cluster
so it can populate ZooKeeper with the configs.

Now let’s create the symbolic link:

$ sudo ln -s /etc/solr/conf/jetty-web.xml /usr/share/solr/WEB-INF

Let’s create the SOLR configuration file in /etc/solr/solr.xml:

<?xml version="1.0" encoding="UTF-8" ?>
<solr persistent="true">
  <cores defaultCoreName="core0" adminPath="/admin/cores"
         zkClientTimeout="${zkClientTimeout:15000}" hostPort="${jetty.port:}"
         hostContext="solr">
    <core loadOnStartup="true" instanceDir="core0" name="core0"
          collection="collection0" dataDir="/var/lib/solr/core0"/>
  </cores>
</solr>

The remainder of this section only needs to be done on the node that will be in charge of putting the config into ZooKeeper, in our case padthai. The config will then be read from ZooKeeper by the other nodes.

Let’s create the configuration for the collection collection0 that will be stored in the core core0.

Create a file solrconfig.xml in /etc/solr/core0:

<?xml version="1.0" encoding="UTF-8" ?>
<config>
  <luceneMatchVersion>LUCENE_41</luceneMatchVersion>
  <dataDir>${solr.data.dir:}</dataDir>

  <directoryFactory name="DirectoryFactory"
                    class="${solr.directoryFactory:solr.NRTCachingDirectoryFactory}"/>

  <!-- The default high-performance update handler -->
  <updateHandler>
    <updateLog />
  </updateHandler>
  <requestHandler name="/update" class="solr.UpdateRequestHandler"/>
  <requestHandler name="/update/json" class="solr.JsonUpdateRequestHandler"/>
  <requestHandler name="/update/csv" class="solr.CSVRequestHandler"/>
  <requestHandler name="/select" class="solr.SearchHandler"/>
  <requestHandler name="/admin/"
                  class="solr.admin.AdminHandlers" />
  <requestHandler name="/replication" class="solr.ReplicationHandler" />
  <!-- for streaming file -->
  <requestDispatcher handleSelect="true" >
    <requestParsers enableRemoteStreaming="true"
                    multipartUploadLimitInKB="2048000" />
  </requestDispatcher>
</config>

Note that this config is very simplified, if you need one that provides all the functionality consider using the one provided in the SOLR archive examples.

Let’s now create the schema file schema.xml in /etc/solr/core0/conf/schema.xml:

<?xml version="1.0" ?>
<schema name="items" version="1.1">
  <types>
   <fieldtype name="string"  class="solr.StrField" sortMissingLast="true" omitNorms="true"/>
   <fieldType name="int" class="solr.TrieIntField" precisionStep="0" positionIncrementGap="0"/>
   <fieldType name="float" class="solr.TrieFloatField" precisionStep="0" positionIncrementGap="0"/>
   <fieldType name="long" class="solr.TrieLongField" precisionStep="0" positionIncrementGap="0"/>
   <fieldType name="double" class="solr.TrieDoubleField" precisionStep="0" positionIncrementGap="0"/>
  </types>

 <fields>
   <field name="id" type="int" indexed="true" stored="true" required="true" />
   <field name="_version_" type="long" indexed="true" stored="true" required="true" />
   <field name="item_id" type="int" indexed="true" stored="true" required="true" />
   <field name="name" type="string" indexed="true" stored="true" required="true" />
   <field name="year" type="int" indexed="true" stored="true" required="true" />
   <field name="month" type="int" indexed="true" stored="true" required="true" />
   <field name="shipping_method" type="int" indexed="true" stored="true" required="true" />
   <field name="us_sold" type="int" indexed="true" stored="true" required="true" />
   <field name="ca_sold" type="int" indexed="true" stored="true" required="true" />
   <field name="fr_sold" type="int" indexed="true" stored="true" required="true" />
   <field name="uk_sold" type="int" indexed="true" stored="true" required="true" />
 </fields>

 <!-- field to use to determine and enforce document uniqueness. -->
 <uniqueKey>id</uniqueKey>

 <!-- field for the QueryParser to use when an explicit fieldname is absent -->
 <defaultSearchField>name</defaultSearchField>

 <!-- SolrQueryParser configuration: defaultOperator="AND|OR" -->
 <solrQueryParser defaultOperator="AND"/>
</schema>

We should be all set and we only have to start jetty, first on padthai then on couscous:

$ sudo /etc/init.d/jetty start

Open your browser at: http://localhost:8983/solr. You should see an admin panel.
You can also see the partitioning at: http://localhost:8983/solr/#/~cloud.

browser-solr-cloud

Also you can check if zookeeper has loaded all the configs and the cluster is formed properly as follows:

$ zookeeper-client
[zk: localhost:2181(CONNECTED) 0] ls /
[configs, zookeeper, clusterstate.json, live_nodes, overseer, collections,
 overseer_elect]
[zk: localhost:2181(CONNECTED) 0] get /clusterstate.json
{"core0":{
    "shards":{
      "shard1":{
        "replicas":{"192.168.1.67:8983_solr_core0":{
          ...
      "shard2":{
        "replicas":{"192.168.1.68:8983_solr_core0":{
          ...

As you can see, each server is in charge of a shard.

Importing data into SOLR

There are different ways to import data in SOLR. You can use various formats (CSV, JSON and XML).

Let’s generate some fake data with a simple Python script gen-core0-data.py:

#!/usr/bin/python

import random

NUM_ITEMS = 1000
print "id,item_id,name,year,month,shipping_method,us_sold,ca_sold,fr_sold,uk_sold"
for item_id in range(0, NUM_ITEMS):
    for year in range(1990, 2014):
        for month in range(1, 13):
            for shipping_method in range(0, 4):
                id = (item_id - NUM_ITEMS / 2) * 1000000 + year * 1000 + month * 10 + shipping_method
                name = "item %d"%item_id
                us_sold = random.randint(0, 10000)
                ca_sold = random.randint(0, 10000)
                fr_sold = random.randint(0, 10000)
                uk_sold = random.randint(0, 10000)
                print "%d,%d,%s,%d,%d,%d,%d,%d,%d,%d"%(
                     id, item_id, name, year, month, shipping_method,
                     us_sold, ca_sold,fr_sold, uk_sold)

Let’s now run it and import the data into SOLR

$ python gen-core0-data.py > data.csv
$ curl "http://localhost:8983/solr/core0/update/csv?stream.file=$PWD/data.csv&commit=true"

Not let’s check if the data is there:

$ curl "http://localhost:8983/solr/core0/select?wt=csv" -d "q=*:*"
id,name,year,month,shipping_method,us_sold,ca_sold,fr_sold,uk_sold,_version_
884194973,item 987263437,1990,0,3,9476,5056,4248,9798,-268435455
918525711,item 1230817374,1990,0,2,6823,7643,7975,6577,-270532608
1170021386,item 1224294727,1990,0,1,9391,3455,9397,7190,-175112192

Playing with SOLR

In the following we explain how to run select, insert/update and delete queries. Note that the commands apply to both SOLR and SOLR Cloud.

Select

In order to select data, one can use the request handler /select defined in solrconfig.xml, so:

http://localhost:8983/solr/core0/select

In the request, one can also specify the output format that can be xml, json or csv using the wt parameter (e.g., http://localhost:8983/solr/core0/select?wt=json).

The query will be passed by using a POST command with a query string using the following format:

q=[filter]&[options]

Filters:

  • WHERE 1=1: q=*:*
  • WHERE id=2: q=id:1′
  • WHERE name LIKE ‘%3%: q=*3*’
  • WHERE shipping_method IN (1, 2, 3): q=shipping_method:[1 OR 2 OR 3]
  • WHERE us_sold > 100: q=us_sold:[100 TO *]
  • WHERE us_sold between 100 and 200: q=us_sold:[100 TO 200]
  • WHERE us_sold > 100 AND id < 1000: q=us_sold:[100 TO *] AND id:[* TO 1000]
  • WHERE (us_sold > 100 OR ca_sold > 100) AND fr_sold > 100: q=(us_sold:[100 TO *] OR ca_sold:[100 TO *]) AND fr_sold:[100 TO *]

Options:

  • SELECT id, us_sold, ca_sold: &fl=id,us_sold,ca_sold (show all columns by default)
  • LIMIT 10: &rows=10
  • LIMIT 100, 10: &start=100&rows=10
  • ORDER BY id ASC: &sort=id asc
  • ORDER BY us_sold DESC, id asc: &sort=us_sold desc, id asc
  • ORDER BY us_sold+ca_sold DESC: &sort=add(us_sold, ca_sold) desc. For more complex operations see http://wiki.apache.org/solr/FunctionQuery

So for example:

curl http://localhost:8983/solr/core0/select?wt=json -d 'q=*:*&sort=id asc&rows=10'

will return the first 10 rows sorted by id in JSON format.

Update

In SOLR, you can add/update data depending if a document with the same id already exists or not.
You can add the documents using an XML, JSON or CSV format (you can use /update/json, /update/csv or /update/xml depending on the data format you used).

Let’s add 2 new docs in JSON format:

$ curl "http://localhost:8983/solr/core0/update/json?commit=true&wt=json" -H 'Content-type:application/json' -d '
[{"id":15255657, "item_id": 10000, "name":"item X", "year":1970, "month":1, "shipping_method":0, "us_sold": 23111, "ca_sold":3433, "fr_sold":103, "uk_sold": 32},
{"id":15255602, "item_id": 10001, "name":"item Y", "year":1970, "month":2, "shipping_method":1, "us_sold": 43219, "ca_sold":9034, "fr_sold":47, "uk_sold": 12}]'

We specify wt=json to get the response in json. Note the use of commit=true in the request. By default when data is added to SOLR it is not indexed until it is committed. One can explicitly use commit=true to autocommit the data or can do it later on after having run several updates (it is faster to run a commit at the end of a lot of updates rather than every time we run an update).

$ curl http://localhost:8983/solr/core0/update?commit=true

Conversely, one can rollback uncommitted docs:

$ curl http://localhost:8983/solr/core0/update?rollback=true

One can also in the solrconfig.xml specify an autoCommit to be run after a certain number of documents is inserted or after a certain time after a doc is inserted:

<updateHandler class="solr.DirectUpdateHandler2">
  ...
  <autoCommit>
    <maxDocs>10000</maxDocs>
    <maxTime>15000</maxTime>
  </autoCommit>
</updateHandler>

More details about it can be seen at: http://wiki.apache.org/solr/SolrConfigXml.

Since SOLR4, one can do atomic updates (e.g., increment a field without retrieving it, updating the field and put it back to SOLR).

Delete

To delete, one can issue an update request as follows:

$ curl "http://localhost:8983/solr/core0/update?commit=true&wt=json" \
 -H 'Content-type:application/json' -d '{"delete":{"id":2}}'

to delete a particular document. One can also delete documents based on a filter as follows:

$ curl "http://localhost:8983/solr/core0/update?commit=true&wt=json" \
  -H 'Content-type:application/json' -d '{"delete":{"query":"us_sold:[0 TO 100]"}}'

To delete a whole core:

$ curl "http://localhost:8983/solr/core0/update?commit=true&wt=json" \
 -H 'Content-type:application/json' -d '{"delete":{"query":"*:*"}}'

Use case solutions

As you can remember, the kind of questions we were trying to solve were:

Now let’s answer the initial questions:

  • What are my best selling products in America (US and CA)?
  • What are my worst sales for the Christmas 2012 for the product X in Europe that used Fedex overnight?
  • What are my sales for item X over the last 10 years broken up by shipping method

 What are my best selling products in America (US and CA)?

The query will be something like:

$ curl http://localhost:8983/solr/core0/select?wt=csv -d 'q=year:2013 AND month:1
AND shipping_method:0&sort=add(us_sold, ca_sold) desc&start=0&rows=20'

and then one can go to the next pages by incrementing the start parameter (start=20, start=40 and so on).

As you can see the result was returned very quickly. The UI will then sum up us_sold and ca_sold to have the total number of purchases in America and present it to the user.

What are my worst sales for Christmas 2012 in Europe that used Fedex overnight?

$ curl http://localhost:8983/solr/core0/select?wt=csv -d 'q=year:2012 AND month:12 AND
shipping_method:1&sort=add(uk_sold, fr_sold) asc&start=0&rows=20'

and then one can go to the next pages by incrementing the start parameter (start=20, start=40 and so on).
As you can see the result was returned very quickly. The UI will then sum up uk_sold and fr_sold to have the total number of purchases in Europe and present it to the user.

What are my sales for item X over the last 10 years broken up by shipping method

$ curl http://localhost:8983/solr/core0/select?wt=csv -d 'q=item_id:500
AND year:[2002 TO 2012]&year asc, month asc,
shipping_method asc&start=0&rows=80'

Note that here we are fetching 80 rows at a time. The reason is that we want to get 20 dates (years / month) and that for each of them we have 4 different shipping methods. In the UI that will present these results to the user, we would need to sum up the number of purchases (us_sold + ca_sold + uk_sold + fr_sold) for each shipping method, year and month and present them as follows:

item id, item name, year, month, ship_fedex, ship_ups_3, ship_ups_5, ship_all
1      , Flax Seed, 2010, 1    , 1232      , 21233      , 11123     , 33588
1      , Flex Seed, 2010, 2    , 23        , 1322       , 12312     , 13657
...

And then one can go to the next pages by incrementing the start parameter by 80.

Schema consideration

Since SOLR cannot do aggregations, they have to be pre-aggregated into SOLR. The way we pre-aggregate really depends on the context:

  • if we consider exclusive filters (e.g., radio buttons). In our case, for instance an item can be shipped by 3 distinct methods, so we can consider one row for each of them and one row for all of them.
  • if we consider multiple choice filters (e.g., checkboxes). In our case, for instance we want to know the sales for different countries (US, CA, FR, UK) or a combination of countries (e.g., US and CA), so in this case we add one column for each country and for each metrics (e.g., us_sold, ca_sold, fr_sold and uk_sold).

Note: if the number of multiple choices for a filter is limited (e.g. 3) we can still consider one row per possibility (e.g., 2^3 = 8 rows). The number of possibilities is exponential so this case has to be considered when there is a very limited number of choices.

Conclusion

Although SOLR has originally been designed as a full-text search engine, it can be used to handle relational data as well and provide a low latency solution for simple analytics where aggregations can be precomputed in advance. You can easily store a few billions of records and run a high number of concurrent queries with a very short response time compared to other relational databases. SOLR Cloud offers the flexibility to scale SOLR on multiple nodes and to make it fault tolerant. At the moment, once the sharding scheme is set, you cannot change it but this is something that is currently being addressed by the SOLR team. It is worth mentioning that Elastic Search which is also based on Lucene also offers cloud features and that the community around it is growing  (see a very thorough comparison of the two on the Sematext blog).

We will show on a next post how to deploy SOLR Cloud to Amazon EC2.

Advertisements

About chimpler
http://www.chimpler.com

One Response to Using SOLR Cloud as a NoSQL database for low latency analytics

  1. Pingback: Playing with Apache Hive and SOLR | Chimpler

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: