Quick exploration of data using spark shell

ratatouilleData analytics has never been easier to use than in the last decade thanks to open sources projects like Hadoop, Spark and many others.

In this post we are going to use spark-shell to read a CSV file and analyze it by running sql queries on this dataset. The CSV contains the list of restaurant inspections in NYC. It is freely available on the NYC Open Data website. You can download it at: https://data.cityofnewyork.us/Health/DOHMH-New-York-City-Restaurant-Inspection-Results/xx67-kt59/data

Click on Export, then CSV.

Installing spark-shell

To explore the dataset, we use spark-shell

Download spark 2.0.2 at http://spark.apache.org/downloads.html

Uncompress it:

tar xvfz Downloads/spark-2.0.2-bin-hadoop2.7.tgz

Run spark-shell:

cd spark-2.0.2-bin-hadoop2.7/
bin/spark-shell

To load the inspection file in a rdd:

import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder().getOrCreate()
val csvFile =  "/Users/chimpler/Downloads/DOHMH_New_York_City_Restaurant_Inspection_Results.csv"
val inspections = spark.read.option("header",true).csv(csvFile)

Now let’s have a quick look at the dataframe content:

// columns
scala> inspections.printSchema
root
 |-- CAMIS: string (nullable = true)
 |-- DBA: string (nullable = true)
 |-- BORO: string (nullable = true)
 |-- BUILDING: string (nullable = true)
 |-- STREET: string (nullable = true)
 |-- ZIPCODE: string (nullable = true)
 |-- PHONE: string (nullable = true)
 |-- CUISINE DESCRIPTION: string (nullable = true)
 |-- INSPECTION DATE: string (nullable = true)
 |-- ACTION: string (nullable = true)
 |-- VIOLATION CODE: string (nullable = true)
 |-- VIOLATION DESCRIPTION: string (nullable = true)
 |-- CRITICAL FLAG: string (nullable = true)
 |-- SCORE: string (nullable = true)
 |-- GRADE: string (nullable = true)
 |-- GRADE DATE: string (nullable = true)
 |-- RECORD DATE: string (nullable = true)
 |-- INSPECTION TYPE: string (nullable = true)

// size of the dataset
scala> inspections.count
res42: Long = 436999

Let’s run some SQL queries on the dataset:

// register the table so we can access it from SQL
scala> inspections.registerTempTable("inspections")
scala> val df = sql("""
SELECT
    `CUISINE DESCRIPTION`,
    COUNT(1) AS Num
FROM inspections
GROUP BY `CUISINE DESCRIPTION`
ORDER BY Num DESC
LIMIT 10
""");
scala> df.show
+---------------------+------+
| CUISINE DESCRIPTION | Num  |
+---------------------+------+
| American            |101265|
| Chinese             | 47391|
| Latin (Cuban, Dom...| 20778|
| Pizza               | 20763|
| Italian             | 20084|
| Mexican             | 16424|
| Café/Coffee/Tea    | 16016|
| Japanese            | 15624|
| Caribbean           | 13615|
| Bakery              | 13200|
+--------------------+------+

As expected, since the inspections are taking place in New York, the top cuisines served in restaurants are American, followed by Chinese, Pizza and Italian.

Now let’s look at the distribution of the grades that the inspectors are giving to the restaurants:

scala> val df2 = sql("""
SELECT
    GRADE, COUNT(1) AS Num FROM inspections
GROUP BY GRADE
ORDER BY Num DESC
""");

scala> df2.show()
+--------------+------+
| GRADE        | Num  |
+--------------+------+
| null         |228417|
| A            |160200|
| B            | 33794|
| C            |  8551|
| Z            |  2689|
|Not Yet Graded|  1943|
| P            |  1405|
+--------------+------+

It shows that most of the restaurants are not graded yet and that among the graded restaurants, most of them are graded A.

Now let’s look at the number of A, B, C grades for each cuisine:

scala> val df3 = sql("""
SELECT
    `CUISINE DESCRIPTION`,
    SUM(IF(GRADE='A', 1, 0)) AS A,
    SUM(IF(GRADE='B', 1, 0)) AS B,
    SUM(IF(GRADE='C', 1, 0)) AS C
FROM inspections
GROUP BY `CUISINE DESCRIPTION`
""");

scala> df3.show()
+-------------------+-----+----+----+
|CUISINE DESCRIPTION|  A  |  B |  C |
+-------------------+-----+----+----+
| Pancakes/Waffles  |  128|  10|   0|
| Chinese/Japanese  |  342| 106|  24|
| Mexican           | 5210|1471| 412|
| Jewish/Kosher     | 2281| 445| 134|
| Bakery            | 4937|1027| 250|
| Turkish           |  413|  86|  18|
| Scandinavian      |   49|   5|   0|
...

We can create a view out of the result of this query:

scala> df3.registerTempTable("grades")

Note that this view is not materialized so the query on this “grades” table will be ran on the inspections table.

From this, let’s find the cuisine that shows the worst ratio of Grade C. We filter out the Cuisines having less than 1,000 inspections to have meaningful results:

scala> val query = """
  SELECT
     `CUISINE DESCRIPTION`,
     C / (A + B + C) AS Ratio,
     A + B + C AS Num
  FROM grades
  WHERE A + B + C > 1000
  ORDER BY Ratio DESC
  LIMIT 5
"""
scala> sql(query).show()
+---------------------+-------------------+----+
| CUISINE DESCRIPTION | Ratio             | Num|
+---------------------+-------------------+----+
| Asian               |0.08726249120337791|2842|
| Korean              |0.07679395719681074|2383|
| Spanish             |0.06973732889382168|5406|
| Latin (Cuban, Dom...| 0.0690481625199726|8762|
| Indian              |0.06473293243721259|2827|
+---------------------+-------------------+----+

Speeding up the queries

Running a SQL query takes some time, you can make the query faster by caching the intermediate result:


// Without caching:

scala> val startTime = System.currentTimeMillis ; sql(query).collect() ; println(s"Took ${System.currentTimeMillis - startTime}")
Took 1454

// With caching
scala> df3.cache()
// Because the caching is lazy, the first time we run the query is slow
scala> val startTime = System.currentTimeMillis ; sql(query).collect() ; println(s"Took ${System.currentTimeMillis - startTime}")
Took 1848

scala> val startTime = System.currentTimeMillis ; sql(query).collect() ; println(s"Took ${System.currentTimeMillis - startTime}")
Took 446

If the data cannot be cached because it’s too huge to fit into memory, you can use another storage than CSV. Parquet and ORC are storing data in columnar format which allows the engine to scan only the data of the columns required for the query as opposed to all the data.


// unfortunately, parquet does not play well with spaces in column names, so
// we have to define aliases for those columns

sql("""
CREATE TABLE inspections_parquet USING parquet AS
SELECT
    CAMIS, DBA, BORO, BUILDING, STREET, ZIPCODE, PHONE,
    `CUISINE DESCRIPTION` AS CUISINE_DESCRIPTION,
    `INSPECTION DATE` AS INSPECTION_DATE, ACTION,
    `VIOLATION CODE` AS VIOLATION_CODE,
    `VIOLATION DESCRIPTION` AS VIOLATION_DESCRIPTION,
    `CRITICAL FLAG` AS CRITICAL_FLAG,
    SCORE, GRADE,
    `GRADE DATE` AS GRADE_DATE,
    `RECORD DATE` AS RECORD_DATE,
    `INSPECTION TYPE` AS INSPECTION_TYPE
FROM inspections""")

Now let’s run the same query than previously but on this “inspections_parquet” table

val df4 = sql("""
SELECT
    CUISINE_DESCRIPTION,
    SUM(IF(GRADE='A', 1, 0)) AS A,
    SUM(IF(GRADE='B', 1, 0)) AS B,
    SUM(IF(GRADE='C', 1, 0)) AS C
FROM inspections_parquet
GROUP BY CUISINE_DESCRIPTION
""");

scala> df4.registerTempTable("grades_parquet")
scala> val query = """
  SELECT
     CUISINE_DESCRIPTION,
     C / (A + B + C) AS Ratio,
     A + B + C AS Num
  FROM grades_parquet
  WHERE A + B + C > 1000
  ORDER BY Ratio DESC
  LIMIT 5
"""
scala> sql(query).show()
scala> val startTime = System.currentTimeMillis ; sql(query).collect() ; println(s"Took ${System.currentTimeMillis - startTime}")
Took 549
# with caching
scala> df4.cache()
scala> val startTime = System.currentTimeMillis ; sql(query).collect() ; println(s"Took ${System.currentTimeMillis - startTime}")
Took 626
scala> val startTime = System.currentTimeMillis ; sql(query).collect() ; println(s"Took ${System.currentTimeMillis - startTime}")
Took 263

Without caching the query is now running in 549ms(parquet) vs 1,454ms(CSV).
With caching 263ms(parquet) vs 443ms(CSV). Not too shabby for just a storage format change.

Conclusion

In this post, we read a CSV file and analyze it using spark-shell. We saw some techniques to make the query faster by caching the data in memory or using a different a different storage format.

Another way to speed up the queries is to make them run on a spark cluster, it can be the subject of another post.

Alternatively to spark-shell, you can use spark-notebook and apache zeppelin that are web dashboards  that you can create in the browser to display grid and charts using Scala and SQL code.

Installing and comparing MySQL/MariaDB, MongoDB, Vertica, Hive and Impala (Part 1)

impalaA common thing a data analyst does in his day to day job is to run aggregations of data by generally summing and averaging columns using different filters. When tables start to grow to hundreds of millions or billions of rows, these operations become extremely expensive and the choice of a database engine is crucial. Indeed, the more queries an analyst can run during the day, the better he can be at understanding the data.

In this post, we’re going to install 5 popular databases on Linux Ubuntu (12.04):

  • MySQL / MariaDB 10.0: Row based database
  • MongoDB 2.4: NoSQL database
  • Vertica Community Edition 6: Columnar database (similar to Infobright, InfiniDB, …)
  • Hive 0.10: Datawarehouse built on top of HDFS using Map/Reduce
  • Impala 1.0:  Database implemented on top of HDFS (compatible with Hive) based on Dremel that can use different data formats (raw CSV format, Parquet columnar format, …)

Then we’ll provide some scripts to populate them with some test data, run some simple aggregation queries and measure the response time. The tests will be run on only one box without any tuning using a relatively small dataset (160 million rows) but we’re planning on running more thorough tests in the cloud later with much bigger datasets (billions of rows). This is just to give a general idea on the performance of each of the database.
Read more of this post

%d bloggers like this: