One of the most fundamental aspects a data model can convey is how something changes over time. This makes sense when considering that we build data models to capture what is happening in the real world, and the real world is constantly changing. The challenge is that it’s not just that new things are occurring, it’s that existing things are changing too, and if in our data models we overwrite the old state of an entity with the new state then we have lost information about the change. To counter that we can instead retain all states of an entity so that we can query for any point in time. A key question then is, which time do we mean? When the state changed in the real world, or when the data model ultimately captured the change? Both can be important, so to efficiently capture these two different times we can use bi-temporal data modeling. This article first describes bi-temporal models in more detail, and then how to implement them on a Cloudera enterprise data hub (EDH) using Envelope along with Apache Spark, Apache Kudu, and Apache Impala (incubating).
Bi-temporal data models, and why they are important
Let’s look at a concrete example. A bank generates a daily report of customer credit scores, and to do this it maintains a customer table with customer identifier, name, score, as-of date, and last updated date. The as-of date tracks when the credit score was true in the real world, which we will call “event time”. The last updated date tracks when the credit score was true in the data model, which we will call “system time”. We will soon see that this simplistic model is difficult and slow for users to query.
On October 30th the credit score is derived for the brand new customer Jane.
customer_id | name | score | as_of_date | last_updated |
C-1001 | Jane | 768 | 20161030 | 2016-10-30 04:16:09 |
On October 31st, Jane’s credit score is still 768, so no new record is added. While the 768 score could have been inserted again for October 31st, it would create a huge amount of redundancy across the tens of millions of customers, especially considering that most credit scores don’t change from one day to the next.
On November 1st, Jane’s credit score is determined to be 771. We need the data model to continue to track that Jane’s credit score was 768 as of October 30th, so we insert a new record instead of overwriting the existing one. We do the same on November 16th when Jane’s credit score goes up again to 774.
customer_id | name | score | as_of_date | last_updated |
C-1001 | Jane | 768 | 20161030 | 2016-10-30 04:16:09 |
C-1001 | Jane | 771 | 20161101 | 2016-11-01 03:58:11 |
C-1001 | Jane | 774 | 20161116 | 2016-11-16 07:14:27 |
We now have all the states, or versions, of the customer credit scores ready to query. While it was easy to maintain this table, it is not easy to query. If we want the latest credit scores, we have to find the maximum as-of date per customer, which leads to an aggregation that makes the query more complicated, and which can be slow to run across many customers. Even worse, if we want to find the credit scores for a point in the past, we have to find the maximum as-of date per customer that is less than or equal to the date we want the scores for. Ultimately this leads to users who are unhappy with the query complexity and the query performance.
We can solve this by trading additional ETL complexity and development effort for better user query simplicity and performance. This is done by changing the as-of date from one field to two fields so that we can specify the range of real world time during which the credit score was true. Then we can use a simple filter query to find the credit score of a customer for any point in time. This gets us part the way there, but we will see that even that is not enough for certain important scenarios.
If the same report was generated now with a range of as-of dates, then we would have the below representation. The as-of start date is the first date that the customer’s credit score was true, and the as-of end date is the last date that it was true. When the last date that a credit score will be true isn’t known, typically because it hasn’t happened yet, then we can use a date far into the future as a placeholder. This is better than using NULL because users won’t have to deal with NULL comparisons in their queries.
customer_id | name | score | as_of_start_date | as_of_end_date | last_updated |
C-1001 | Jane | 768 | 20161030 | 20161031 | 2016-10-30 04:16:09 |
C-1001 | Jane | 771 | 20161101 | 20161115 | 2016-11-01 03:58:11 |
C-1001 | Jane | 774 | 20161116 | 99991231 | 2016-11-16 07:14:27 |
To find customer credit scores for any point in the past we can now just run a simple filter query that performs much faster than the equivalent aggregation query for the previous data model.
SELECT customer_id, name, score FROM customers WHERE 20161109 BETWEEN as_of_start_date AND as_of_end_date;
customer_id | name | score |
C-1001 | Jane | 771 |
This will work fine if we only ever add new credit scores, but what happens when we realize there has been a mistake in the past? If on November 19th we are told that in fact Jane’s credit score really should have been 775 on November 16th, then we need to apply that correction. Since we now know it wasn’t actually 774 on November 16th, we may be tempted to overwrite that record.
customer_id | name | score | as_of_start_date | as_of_end_date | last_updated |
C-1001 | Jane | 768 | 20161030 | 20161031 | 2016-10-30 04:16:09 |
C-1001 | Jane | 771 | 20161101 | 20161115 | 2016-11-01 03:58:11 |
C-1001 | Jane | 775 | 20161116 | 99991231 | 2016-11-19 15:32:04 |
While this data model now captures the best known truth for Jane’s credit scores over time, we have lost the information about the correction. This might not be important, and in that case the data model could stop here. But it is easy to imagine a scenario where this is important:
- On November 16th Jane’s credit score is reported as 774.
- On November 18th a manager with access to the report declines a loan application from Jane because her credit score wasn’t 775 or above.
- On November 19th Jane’s credit score for November 16th onwards is corrected to 775.
- In December an auditor at a financial regulator sees that the manager denied Jane the loan despite meeting the criteria…
To track how the credit scores changed in the table, not just in the real world, we can also implement the same two field pattern to represent the range of system time that the record was considered true. This allows us to roll back the clock and see the full state of the table at any point in the past, and with only a straightforward filter query. This tracking of both the event and system time ranges is what is known as bi-temporal data modeling. In data warehousing terminology this is a Type 2 slowly changing dimension, where new records are added not just for changes in the real world, but also for changes to the table itself.
With bi-temporal data modeling our table would now look like the below representation. We can find the customer credit scores across event time and across system time. For the regulatory scenario the auditor would be able to filter the system start and end dates by the date of the loan denial to see the report as it would have looked to the manager when they made the decision, and so avoiding unnecessary consequences for the bank.
customer
_id |
name | score | as_of_
start_date |
as_of_
end_date |
system_
start_ts |
system_
end_ts |
C-1001 | Jane | 768 | 20161030 | 99991231 | 2016-10-30 04:16:09 | 2016-11-01 03:58:10 |
C-1001 | Jane | 768 | 20161030 | 20161031 | 2016-11-01 03:58:11 | 9999-12-31 23:59:59 |
C-1001 | Jane | 771 | 20161101 | 99991231 | 2016-11-01 03:58:11 | 2016-11-16 07:14:26 |
C-1001 | Jane | 771 | 20161101 | 20161115 | 2016-11-16 07:14:27 | 9999-12-31 23:59:59 |
C-1001 | Jane | 774 | 20161116 | 99991231 | 2016-11-16 07:14:27 | 2016-11-19 15:32:03 |
C-1001 | Jane | 775 | 20161116 | 99991231 | 2016-11-19 15:32:04 | 9999-12-31 23:59:59 |
For the auditor to find Jane’s credit score as it would have looked to the loan manager at the time of the application decision (say 2:44pm) then again it is just an inexpensive filter query:
SELECT customer_id, name, score FROM customers WHERE 20161118 BETWEEN as_of_start_date AND as_of_end_date AND '2016-11-18 14:44:00' BETWEEN system_start_ts and system_end_ts;
customer_id | name | score |
C-1001 | Jane | 774 |
Implementing bi-temporal data models on a Cloudera EDH
Now let’s see how we can make this happen. We can use three components to implement a bi-temporal data model on a Cloudera EDH:
- Envelope to automatically maintain the changes to the table as new data arrives. Envelope is a pre-developed Spark application that developers can use to create their own Spark pipelines primarily by configuration, and includes bi-temporality logic out of the box. This is a good choice because the developer is not required to implement the complex logic of bi-temporality maintenance themselves.
- Kudu to store the records in a table. This is a good choice because Kudu allows us to efficiently insert and update individual records, and allows end users to query the same table with very fast scan speeds.
- Impala for users to query the table with SQL, either directly or via BI tools. This is a good choice because Impala is very fast at interactive end user queries.
Impala can be used to create the table in Kudu, such as with the SQL below:
CREATE TABLE customers ( customer_id STRING , as_of_start_date BIGINT , system_start_ts BIGINT , name STRING , score INT , score_date BIGINT , as_of_end_date BIGINT , system_end_ts BIGINT , current_flag STRING , PRIMARY KEY(customer_id, as_of_start_date, system_start_ts)) PARTITION BY HASH(customer_id) PARTITIONS 16 STORED AS KUDU;
In this table the primary key is composed of the unique set of fields of a bi-temporal version – the natural key and the event time start date and the system time start date. An additional field that can be helpful for users is a “current flag” column that has the value “Y” when the record is the current version in both event and system time, and “N” otherwise.
Note: This partitioning scheme is only given as an example. Ensure that you carefully design the partitioning scheme for your Kudu tables according to best practices.
With the table created we need a way to load data that tracks bi-temporality, including handling scenarios such as data corrections and records arriving out of order. Envelope comes included with a bi-temporal history planner, which allows developers to implement data pipelines (batch or streaming) that correctly merges arriving data into bi-temporal tables without any code or SQL required.
Note: Envelope is hosted on Cloudera Labs, and so is not officially supported by Cloudera. For assistance with Envelope please visit the Cloudera Community forum.
For example, let’s say the source customer data is ingested into a Cloudera EDH as a CSV file with this representation:
C-1001,Jane,775,20161119 C-1002,John,688,20161119 ...
And a Hive table was pointed to that HDFS location:
CREATE TABLE customers_raw (customer_id STRING, name STRING, score INT, score_date BIGINT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '/your/path/here';
We could then configure an Envelope pipeline to apply bi-temporality to the destination Kudu table like below:
$ cat load_customers.conf application { name = Load customers } steps { load_customers { input { type = hive table = customers_raw } planner { type = bitemporal fields.key = [customer_id] fields.values = [score] field.timestamp = score_date field.event.time.effective.from = as_of_start_date field.event.time.effective.to = as_of_end_date field.system.time.effective.from = system_start_ts field.system.time.effective.to = system_end_ts field.current.flag = current_flag } output { type = kudu connection = "yourkudumaster1:7051,..." table.name = "impala::default.customers" } } }
The Envelope pipeline can then be run as:
$ spark-submit envelope-*.jar load_customers.conf
Spark will run the Envelope application, which will use the provided configuration file to read the Hive table, plan the required mutations for bi-temporality, and apply them to the Kudu table, which can then immediately be queried by Impala. That’s it!
See the Envelope documentation for more information on Envelope and using the bi-temporal history planner. For a look at a more detailed Envelope pipeline, see the provided FIX example.
Summary
Bi-temporality enables a data model to track the history of how an entity has changed in both the real world (event time) and in the table itself (system time). Implementing this has become much easier with the advent of Envelope and Kudu, while still harnessing the benefits of executing transformation pipelines in Spark, and providing fast SQL access through Impala. If you have these requirements, or expect them down the road, these components now make it much more practical for you to implement them at big data scales.
Jeremy Beard is a Principal Solutions Architect at Cloudera.