Implementing a real-time data pipeline with Spark Streaming

Real-time analytics has become a very popular topic in recent years. Whether it is in finance (high frequency trading), adtech (real-time bidding), social networks (real-time activity), Internet of things (sensors sending real-time data), server/traffic monitoring, providing real-time reporting can bring tremendous value (e.g., detect potential attacks on network immediately, quickly adjust ad campaigns, …). Apache Storm is one of the most popular frameworks to aggregate data in real-time but there are also many others such as Apache S4, Apache Samza, Akka Streams, SQLStream and more recently Spark Streaming.

According to Kyle Moses, on his page on Spark Streaming, it can process about 400,000 records / node / second for simple aggregations on small records and significantly outperforms other popular streaming systems such as Apache Storm (40x) and Yahoo S4 (57x). This can be mainly explained because Apache Storm processes messages individually while Apache Spark groups messages in small batches. Moreover in case of failure, where in Storm messages can be replayed multiple times, Spark Streaming batches are only processed once which greatly simplifies the logic (e.g., to make sure some values are not counted multiple times).

At a higher level, we can see Spark Streaming as a layer on top of Spark where data streams (coming from various sources such as Kafka, ZeroMQ, Twitter, …) can be transformed and batched in a sequence of RDDs (Resilient Distributed DataSets) using a sliding window. These RDDs can then be manipulated using normal Spark operations.

In this post we consider an adnetwork where adservers log impressions in Apache Kafka (distributed publish-subscribe messaging system). These impressions are then aggregated by Spark Streaming into a datawarehouse (here MongoDB to simplify). Usually the data is further aggregated into a fast data access layer (direct lookup) and accessible through an API as depicted in the figure below.

adnetwork architecture
Figure 1. Ad network architecture

To simplify, let’s consider that impression logs are in this format:

timestamp               publisher advertiser  website geo bid     cookie
----------------------------------------------------------------------------
2013-01-28 13:21:12     pub1      adv10       abc.com NY  0.0001  1214
2013-01-28 13:21:13     pub1      adv10       abc.com NY  0.0005  1214
2013-01-28 13:21:14     pub2      adv20       xyz.com CA  0.0003  4321
2013-01-28 13:21:15     pub2      adv20       xyz.com CA  0.0001  5675

Our goal is to aggregate these logs by publisher and geo, and compute the average bid, the number of impressions and the number of uniques by minute. So the aggregation will look something like:

timestamp               publisher geo  avg_bid imps  uniques
----------------------------------------------------------------------------
2013-01-28 13:21:00     pub1      NY   0.0003  256   104
2013-01-28 13:21:00     pub2      CA   0.0002  121   15
2013-01-28 13:22:00     pub1      NY   0.0001  190   98
2013-01-28 13:22:00     pub2      CA   0.0007  137   19

Prerequisite

In order to run our example, we need to install the followings:

The source code is on GitHub:

$ git clone http://github.com/chimpler/blog-spark-streaming-log-aggregation

In our example, Spark Streaming listens to the Kafka topic “adnetwork-topic”. In the aggregation we want to skip the geo that hasn’t been resolved (i.e., with a geo “unknown”), consider a window of 10 seconds (instead of 60 seconds so you can see results faster) and compute the number of impressions, unique cookies and the average bid. In order to count the number of unique visitors (cookies), we use the HyperLogLog algorithm developed by late French professor Philippe Flajolet at INRIA that trades off memory usage (use only 4KB) for precision (99%).

Adserver log generator

The Random Generator simulates adservers that generate random ImpressionLogs that are sent to Kafka:

object RandomLogGenerator extends App {
  val random = new Random()

  val props = new Properties()
  props ++= Map(
    "serializer.class" -> "com.chimpler.sparkstreaminglogaggregation.ImpressionLogEncoder",
    "metadata.broker.list" -> "127.0.0.1:9093"
  )

  val config = new ProducerConfig(props)
  val producer = new Producer[String, ImpressionLog](config)

  println("Sending messages...")
  var i = 0
  // infinite loop
  while(true) {
    val timestamp = System.currentTimeMillis()
    val publisher = Publishers(random.nextInt(NumPublishers))
    val advertiser = Advertisers(random.nextInt(NumAdvertisers))
    val website = s"website_${random.nextInt(Constants.NumWebsites)}.com"
    val cookie = s"cookie_${random.nextInt(Constants.NumCookies)}"
    val geo = Geos(random.nextInt(Geos.size))
    val bid = math.abs(random.nextDouble()) % 1
    val log = ImpressionLog(timestamp, publisher, advertiser, website, geo, bid, cookie)
    producer.send(new KeyedMessage[String, ImpressionLog](Constants.KafkaTopic, log))
    i = i + 1
    if (i % 10000 == 0) {
      println(s"Sent $i messages!")
    }
  }
}

Log Aggregation

The aggregator code is also simple. First we initialize Spark and create a DStream from Kafka:

  val sparkContext = new SparkContext("local[4]", "logAggregator")

  // we discretize the stream in BatchDuration seconds intervals
  val streamingContext = new StreamingContext(sparkContext, BatchDuration)

  // stream of (topic, ImpressionLog)
  val messages = KafkaUtils.createStream[String, ImpressionLog, StringDecoder, ImpressionLogDecoder](streamingContext, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK)

We provide the serializer for the key (Kafka topic which is a String) and the serializer for the messages which are of type ImpressionLog. If you send Strings to Kafka, you can simply use the StringDecoder.

The way we’re going to run the aggregation is as follows:

  • Filter unknown geo
  • Map ImpressionLog to (pub, geo) -> AggregationLog which is a partial aggregation of 1 log that will then be reduced with the other ones sharing the same (pub, geo)
  • Reduce Logs by (pub, geo) for each batch
  • Finalize reduction (compute avgBid and uniques from HyperLogLog) and store the result in MongoDB

The stream contains pairs (KafkaTopic, ImpressionLog), we transform it into a stream of pairs (PublisherGeo, AggregationLog):

  // we filter out non resolved geo (unknown) and map (pub, geo) -> AggregrationLog that will be reduced
  val logsByPubGeo = messages.map(_._2).filter(_.geo != Constants.UnknownGeo).map {
    log =>
      val key = PublisherGeoKey(log.publisher, log.geo)
      val agg = AggregationLog(
        timestamp = log.timestamp,
        sumBids = log.bid,
        imps = 1,
        uniquesHll = hyperLogLog(log.cookie.getBytes(Charsets.UTF_8))
      )
      (key, agg)
  }

We then reduce the AggregationLogs in batches of BatchDuration seconds summing the bids (to later on compute the average), summing the impressions (originally each AggregationLog has 1 impression) and updating the HyperLogLog for the uniques:

  // Reduce to generate imps, uniques, sumBid per pub and geo
  import org.apache.spark.streaming.StreamingContext._
  val aggLogs = logsByPubGeo.reduceByKeyAndWindow(reduceAggregationLogs, BatchDuration)

  private def reduceAggregationLogs(aggLog1: AggregationLog, aggLog2: AggregationLog) = {
    aggLog1.copy(
      timestamp = math.min(aggLog1.timestamp, aggLog2.timestamp),
      sumBids = aggLog1.sumBids + aggLog2.sumBids,
      imps = aggLog1.imps + aggLog2.imps,
      uniqueHll = aggLog1.uniqueHll + aggLog2.uniqueHll
    )
  }

We finalize the reduction (compute the average bids and uniques) in AggregationResult and store it in MongoDB:

  // Store in MongoDB
  aggLogs.foreachRDD(saveLogs(_))

  private def saveLogs(logRdd: RDD[(PublisherGeoKey, AggregationLog)]) {
    val logs = logRdd.map {
      case (PublisherGeoKey(pub, geo), AggregationLog(timestamp, sumBids, imps, uniquesHll)) =>
        AggregationResult(new DateTime(timestamp), pub, geo, imps, uniquesHll.estimatedSize.toInt, sumBids / imps)
    }.collect()

    // save in MongoDB
    logs.foreach(collection.save(_))
  }

Finally, in order to start processing the stream, we just need to start the streaming context:

  streamingContext.start()

Running the example

Build package and generate the scripts for the log generator and aggregator:

$ sbt pack

You can find the property files in the config directory for Zookeeper and Kafka.

Start Zookeeper:

$ zookeeper-server-start.sh config/zookeeper.properties

Start Kafka:

$ kafka-server-start.sh config/kafka-server1.properties

Create a topic “adnetwork-topic”:

$ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic adnetwork-topic

Start MongoDB if it is not started already (sudo mongod).

On one window, run the aggregator:

$ target/pack/bin/aggregator

On the other one, run the adserver log random generator:

$ target/pack/bin/generator

After a few seconds, you should see the results in MongoDB:

$ mongoexport -d adlogdb -c impsPerPubGeo --csv -f date,publisher,geo,imps,uniques,avgBids
connected to: 127.0.0.1

date,publisher,geo,imps,uniques,avgBids
    2014-07-01T03:24:39.679Z,"publisher_4","CA",3980,3248,0.50062253292876
    2014-07-01T03:24:39.681Z,"publisher_4","MI",3958,3229,0.505213545705667
    2014-07-01T03:24:39.681Z,"publisher_1","HI",3886,3218,0.4984981221446526
    2014-07-01T03:24:39.681Z,"publisher_3","CA",3937,3226,0.5038157362872939
    2014-07-01T03:24:39.679Z,"publisher_4","NY",3894,3200,0.5022389599376207
    2014-07-01T03:24:39.679Z,"publisher_2","HI",3906,3240,0.4988378174961185
    2014-07-01T03:24:39.679Z,"publisher_3","HI",3989,3309,0.4975347625823641
    2014-07-01T03:24:39.681Z,"publisher_3","FL",3957,3167,0.4993339490605483

In case you cannot see anything, check that Kafka receives the messages using the Kafka console consumer:

$ kafka-console-consumer.sh --topic adnetwork-topic --zookeeper localhost:2181

You can also check that the Spark jobs are executed properly using the Spark console on http://localhost:4040/.

In this example we store the data in MongoDB, but in case of analytics application column oriented databases such as Vertica, Impala, InfiniDB, CitusDB are more appropriate as they are efficient in computing complex aggregations due to their storage model. While these datawarehouse are powerful and relatively fast (a few seconds), they are not very good at running a large number of concurrent queries.

In order for the API to access the data efficiently, we usually require a fast data access layer that can be implemented using different kind of technologies depending on the use case:

Conclusion

We described in this post how to use Spark Streaming to aggregate data in real-time. As one can see, it is very straightforward and fits very well in the Scala paradigm. Solutions to even further simplify the process of stream aggregation based on SQL have also emerged: SQLStream, Squall and PipelineDB. Just like what Hive is for Hadoop and what Squall is for Storm, a new project called StreamSQL developed by Intel was announced at the Spark Summit 2014 to run SQL queries on streaming data on top of Spark Streaming.

Spark’s ecosystem has grown very quickly in the recent years. Recently the project BlinkDB based on Spark got some attention at the Very Large Database Conference 2012 as it claimed to be able to process 17TB of data in less than 2 seconds. The idea is to use samples (that are computed offline) and to trade precision for performance. In most cases (e.g., campaign performance), numbers don’t have to be very precise especially when numbers are in the hundred thousand / million range so this kind of approach is very promising.

Due to its versatility, simplicity and performance (up to 100x compared to Hadoop), it will probably replace Hadoop for batch aggregations and Storm for real-time analytics in a not so distant future.

About chimpler
http://www.chimpler.com

5 Responses to Implementing a real-time data pipeline with Spark Streaming

  1. dana sandu says:

    Thank you for the inclusion. Happy to provide more info to anyone who’s interested in our take on real-time architectures.

  2. Pingback: Implementing a real-time data pipeline with Spa...

  3. Hans Li says:

    Amazing! Great documentation and easy to understand code. Thanks for sharing!

  4. peking2 says:

    Thanks for sharing. It’s very useful. I have one question. Have you tried to make “sbt assembly” work? I tried but encountered the following errors. Tried many solutions but still couldn’t make it work.

    [error] (*:assembly) deduplicate: different file contents found in the following:
    [error] /Users/gzhao/.ivy2/cache/org.eclipse.jetty.orbit/javax.transaction/orbits/javax.transaction-1.1.1.v201105210645.jar:META-INF/ECLIPSEF.RSA
    [error] /Users/gzhao/.ivy2/cache/org.eclipse.jetty.orbit/javax.servlet/orbits/javax.servlet-3.0.0.v201112011016.jar:META-INF/ECLIPSEF.RSA
    [error] /Users/gzhao/.ivy2/cache/org.eclipse.jetty.orbit/javax.mail.glassfish/orbits/javax.mail.glassfish-1.4.1.v201005082020.jar:META-INF/ECLIPSEF.RSA
    [error] /Users/gzhao/.ivy2/cache/org.eclipse.jetty.orbit/javax.activation/orbits/javax.activation-1.1.0.v201105071233.jar:META-INF/ECLIPSEF.RSA
    [error] Total time: 150 s, completed Oct 15, 2014 10:28:02 PM

  5. Pingback: Building Custom Queries, Grouping, Aggregators and Filters for Apache Spark | BigSnarf blog

Leave a comment