Implementing a real-time data pipeline with Spark Streaming

Real-time analytics has become a very popular topic in recent years. Whether it is in finance (high frequency trading), adtech (real-time bidding), social networks (real-time activity), Internet of things (sensors sending real-time data), server/traffic monitoring, providing real-time reporting can bring tremendous value (e.g., detect potential attacks on network immediately, quickly adjust ad campaigns, …). Apache Storm is one of the most popular frameworks to aggregate data in real-time but there are also many others such as Apache S4, Apache Samza, Akka Streams, SQLStream and more recently Spark Streaming.

According to Kyle Moses, on his page on Spark Streaming, it can process about 400,000 records / node / second for simple aggregations on small records and significantly outperforms other popular streaming systems such as Apache Storm (40x) and Yahoo S4 (57x). This can be mainly explained because Apache Storm processes messages individually while Apache Spark groups messages in small batches. Moreover in case of failure, where in Storm messages can be replayed multiple times, Spark Streaming batches are only processed once which greatly simplifies the logic (e.g., to make sure some values are not counted multiple times).

At a higher level, we can see Spark Streaming as a layer on top of Spark where data streams (coming from various sources such as Kafka, ZeroMQ, Twitter, …) can be transformed and batched in a sequence of RDDs (Resilient Distributed DataSets) using a sliding window. These RDDs can then be manipulated using normal Spark operations.

In this post we consider an adnetwork where adservers log impressions in Apache Kafka (distributed publish-subscribe messaging system). These impressions are then aggregated by Spark Streaming into a datawarehouse (here MongoDB to simplify). Usually the data is further aggregated into a fast data access layer (direct lookup) and accessible through an API as depicted in the figure below.

adnetwork architecture
Figure 1. Ad network architecture
Read more of this post

Advertisements

Using SOLR Cloud as a NoSQL database for low latency analytics

SOLR is a popular full text search engine based on Lucene. Not only is it very efficient to search documents but it is also very fast to run simple queries on relational data and unstructured data and can be used as a NoSQL database. SOLR4 was released recently and now includes SOLR Cloud that allows to scale SOLR by using sharding and replication.

Although SOLR is very fast for operations such as filtering, sorting and pagination, it does not support aggregations (similar to SELECT item_id, SUM(purchases) FROM t GROUP BY item_id in SQL) so they have to be aggregated in advance. We will show in this post how to install SOLR Cloud and then illustrate it with a simple e-commerce example where we describe how one can use SOLR Cloud to provide very responsive analytics. Finally we will provide some hints on how to design a schema to deal with pre-aggregations.

Use Case

Let’s consider an e-commerce site that stores the volume of monthly purchases for each item.

For that we will consider the following table items:

CREATE TABLE items (
  id INTEGER,
  name VARCHAR(256),
  year INTEGER,
  month INTEGER,
  shipping_method INTEGER,
  us_sold INTEGER,
  ca_sold INTEGER,
  fr_sold INTEGER,
  uk_sold INTEGER
);

The items table will contain for each item and for each month the number of purchases for each country (we only used 4 countries but you can use many more).

They are also broken up by different shipping methods:

  • 1: Fedex Overnight
  • 2: UPS ground 2-3 days
  • 3: UPS ground 3-5 days saver (free shipping)
  • 0: any shipping method (sum of all of the above)

We suppose that we have a responsive UI (that will use sorting and pagination) where analysts can have a very fast response time for queries like:

  • What are my best selling products in America (US and CA)?
  • What are my worst sales for Christmas 2012 for the product X in Europe that used Fedex overnight?
  • What are my sales for item X over the last 10 years broken up by shipping method

SOLR Cloud Architecture

SOLR runs inside a servlet container (e.g., Jetty, Tomcat, …). Because SOLR cloud shards and replicates the data across the different nodes in the cluster, every node has to know who is in charge of which shard. This information is stored in ZooKeeper that can be seen as a shared filesystem used to store shared data but also to do more complex things such as synchronization and leader election. ZooKeeper can also be distributed on multiple nodes to form a ZooKeeper ensemble to ensure fault resiliency.

We will consider in our architecture 2 nodes that will each has SOLR running and ZooKeeper as depicted in the figure below.

solr_cloud_arch

Replication and Partitioning

In order to scale data there are two different things one can do:
Read more of this post

%d bloggers like this: