How-to: Do Data Quality Checks using Apache Spark DataFrames

Categories: How-to Spark

Apache Spark’s ability to support data quality checks via DataFrames is progressing rapidly. This post explains the state of the art and future possibilities.

Apache Hadoop and Apache Spark make Big Data accessible and usable so we can easily find value, but that data has to be correct, first. This post will focus on this problem and how to solve it with Apache Spark 1.3 and Apache Spark 1.4 using DataFrames. (Note: although relatively new to Spark and thus not yet supported by Cloudera at the time of this writing, DataFrames are highly worthy of exploration and experimentation. Learn more about Cloudera’s support for Apache Spark here.)

Why Spark 1.3 and 1.4? Well, I’ve been using 1.3 a lot in the field, and initially this post was going to describe how to solve data quality checks with 1.3. However, as I was writing this post, my friend (Cloudera Software Engineer/Apache Spark committer) Imran Rashid pointed out Spark 7242 and the great work of Xiangrui Meng that applies to this use case in Apache Spark 1.4.

The great thing about showing both implementations is that it emphasizes the fact that within only a couple of months, Spark has made data processing even more accessible and easier to explore—which makes me super excited about the future of Apache Spark.

Data Quality Use Cases

In many use cases, large amounts of data comes from third-party providers and that data isn’t always 100% reliable. The first line of defense are schemas that allow us to validate if the data provider is sending us the data in the right format—that is, the right number of columns and the data types that go into those columns.

However, schemas only get you half the way to that goal, because although the providers may be sending you everything in the right format you may still see odd things like:

  • More than half of all the values are empty or null
  • The data load is 10x smaller or larger than expected
  • Distribution of activity normally seen in the data is off by a factor that warrants review
  • In a field that must have all unique values, there is a duplicate

In this post, you will learn about a solution that helps you evaluate data quality by retrieving the following information about every column in your data:

  • # of empties values
  • # of nulls,
  • Total # or records
  • # of unique values
  • If the field is a number field then get the Min, Max, Sum, and Avg for the column
  • The top-N most commonly appearing values along with their # of appearances (to derive their cardinality)

Spark 1.3 Data Quality Checker

In the following example (follow along/see source here), you will validate data that is stored in the Apache Parquet columnar format on HDFS. The data for this test can be generated with the generator here.

To generate the data, simply use the following command:

With this command the data generator will create a dataset in the ./gen1/output folder with 10 columns and 10 million records using the processing power of eight cores.

Now execute this main object:

with the following spark-submit command:

This command will read over the data, do some magic, and print out all the data quality statistics information you need in this use case.

The following code is from the TableStatsSinglePathMain object; let’s dig into it and see how the answers where calculated.
 

Part A

Here you can see you are using a Spark DataFrame to read the Parquet files. If you are new to DataFrames, think of them as an RDD with a couple of extra things:

  • A schema that helps you see what is inside the RDD.
  • The ability to execute different types of methods on your RDD—such as Spark SQL—or, like we will see later in this post, for applying additional statistical functions to columns within an RDD.

Part B:

Here you will call a method that will do all the work and return the needed stats. (A method is useful here because one can unit-test the code.)

The importance of unit tests should not be understated; they are what separates a good coding shop from a great coding shop, and with Spark there is no reason why you shouldn’t have unit tests that cover your ETL pipelines or streaming processes.

Part B.1:

This section contains WordCount-like operations but in this case a “word” is replaced with a tuple of column and value (columnIdx, CellValue)—thereby delivering counts of every value within every column. This operation requires a shuffle with a reduce, but hopefully a lot of the columns will have repeat values and thus those repeats will be compacted by the map side combiner. 

Also note that the code is using the default partitioner, which means different values from a single column may (and hopefully) end up in all the partitions (to reduce data skewing). 

To explain this a little more, let’s use an example of a table with the columns USER_ID and EYE_COLOR. The number of different eye colors are few: black, brown, green, and so on.  Therefore the EYE_COLOR values are going to be compacted on the mapper side combiner of the shuffle, which will hopefully result in only sending a small amount of data over the wire to the reducer. Now, the USER_ID in this case has all unique values so the map-side compactor will have no impact on reducing what will be sent over the wire through the shuffle. 

Had we partitioned on columns rather than on both column and value, the USER_ID partition would have many more records to process in its reducer partition as compared to the EYE_COLOR reduce-side partition—resulting in skew that will result in increased slowness. Instead, we use the default partitioner that partitions on the hash mod of the (Column,Value) tuple, an approach that also lets you make the number of reduce-side partitions totally independent of the number of columns in the dataset.

Part B.2:

This is where the magic happens. Here, the method does a single pass over reduceByKey results with a mapPartition function and then pushes all the data through the FirstPassStatsModel class (see below) that will compute all the answers.

When you finish looking at the FirstPassStatsModel, you will see that it is nothing more than a HashMap with a key for the column and a value for an object call ColumnStats. The ColumnStats object (most of which is shown below) tracks all the totals and top-N counts. 
 

Part B.1.1 – This section applies a new value for this given column along with its count. You will note that no matter how many values are added, the amount of memory needed to hold the answers is fixed.  This function is called for every value within a mapPartition after the reduceByKey.

Part B.1.2 – This method is used to consolidate the ColumnStats from the different partitions. It will be called in the reduce process that follows the mapPartition in the main program flow.

Part D:

This reduce function pulls back a firstPassStatModel from every partition to the driver JVM and then combines them to return one final result.

Part E:

Finally, the result is printed. The reduce has combined all the results into a single FirstPassStatsModel object that now lives on the driver JVM, which gives you the freedom to access it like any old Java or Scala object. (Note: the object is printed here for simplicity but you can do anything with this information because it is just a routine local object now.)

In the end, the Spark code’s DAG will look like the image below. Read from HDFS map, combine, shuffle, reduceByKey, map, and then finally reduce.

Spark 1.4 Data Quality Checker

DataFrame functionality is greatly increased in Spark 1.4 with respect to this use case with the introduction of the following functions:

  • describe(Cols) (1.3.1): Computes statistics for numeric columns, including count, mean, stddev, min, and max.
  • DataFrameStatFunctions (1.4.0)
    • cov: Calculate the sample covariance of two numerical columns of a DataFrame.
    • corr: Calculates the correlation of two columns of a DataFrame.
    • crosstab: Computes a pair-wise frequency table of the given columns.
    • freqItems: Finding frequent items for columns, possibly with false positives.

The added functionality doesn’t give us everything we were looking for, but it does provide a good deal. For instance, freqItems is not the same as top-N values and counts, and we don’t have our unique, null, and empty counts. But parity with the 1.3 example is not the point of calling out these features. Rather, the point is to call out that Spark DataFrames are becoming much more than just an RDD with a schema. While these new functions are not super helpful, in a future release perhaps everything in the above 1.3 example will be solvable via a single command on a DataFrames object.

Conclusion

Apache Spark is a great tool for data quality checks in 1.3 or in 1.4. There are great tools in today’s builds to get us to a very scalable complete solution without a huge amount of code or complexity. (Just imagine what the above code would had looked like in MapReduce or SQL!) Furthermore, in the future the amount of code required will further decrease as Spark adds more statistics functionality into DataFrames.

Also note that everything shown above could have had been done, and in near-real time, via Spark Streaming. With very few code changes, you could also build long-running counts or multiple rolling windows to populate dashboards and get a better understanding of data across time. But that’s another post, for another time.

Acknowledgements

Thanks to Imran Rashid, Michael Ridley, and Matthew R. Carnali for their technical review of this post.

Ted Malaska is a Solutions Architect at Cloudera, a contributor to Spark, Apache Flume, and Apache HBase, and a co-author of the O’Reilly book, Hadoop Applications Architecture.

facebooktwittergoogle_pluslinkedinmailfacebooktwittergoogle_pluslinkedinmail

8 responses on “How-to: Do Data Quality Checks using Apache Spark DataFrames

  1. Sergey

    Why you disable compression while shuffling and spilling?
    sparkConfig.set(“spark.broadcast.compress”, “false”)
    sparkConfig.set(“spark.shuffle.compress”, “false”)
    sparkConfig.set(“spark.shuffle.spill.compress”, “false”)

  2. Brent Dorsey

    Thanks for sharing, this performs significantly better than what I was using! While validating the getFirstPassStat statistics on our data I discovered a bug in Part B.1.1. Because the sumLong calculation is happening after the reduce the bug returns the sum of the unique values from the column instead of summing all the values in the column. The fix is simply multiplying the unique column value by the number of time the value shows up in the partition.

    Bug: sumLong += colLongValue
    Fix: sumLong += (colLongValue * colCount)

    The following else if adds support for Double:

    else if (colValue.isInstanceOf[Double]) {
    val colDoubleValue = colValue.asInstanceOf[Double]
    if (maxDouble colDoubleValue) minDouble = colDoubleValue
    sumDouble += (colDoubleValue * colCount)
    }

  3. AG

    Don’t think this works well since each ‘mapper’ keeping a TopNlist, and then merging all these lists in the reduce does not accurately count the top N.
    Simple example with N=1:
    one mapper might have a column with two values: ‘a’ and ‘b’, with the following counts: (a,100), (b,50) and another mapper will have (b,1000),(a,999)
    Keeping the top N=1 we will merge (a,100) and (b,1000) which result in (b,1000) and not the expected (a,1099).

  4. Avi Chalbani

    Thanks for sharing,

    importent for me was that the mapPartitions and reduceByKey methods. That help preventing the OOM in Spark.
    reduceByKey – that returns a distributed dataset.

    i just have noticed an additional bug in counting the null and empty values. Rather than adding just oue to the conter the amount of values should be added.
    bug: nulls += 1
    fix: nulls += colCount

Leave a Reply

Your email address will not be published. Required fields are marked *