Inside Santander’s Near Real-Time Data Ingest Architecture (Part 2)

Inside Santander’s Near Real-Time Data Ingest Architecture (Part 2)

Thanks to Pedro Boado and Abel Fernandez Alfonso from Santander’s engineering team for their collaboration on this post about how Santander UK is using Apache HBase as a near real-time serving engine to power its innovative Spendlytics app.

The Spendlytics iOS app is designed to help Santander’s personal debit and credit-card customers keep on top of their spending, including payments made via Apple Pay. It uses real-time transaction data to enable customers to analyze their card spend across time periods (weekly, monthly, yearly), by category (travel, supermarkets, cash, etc), and by retailer.

In our previous post, we described how Apache Flume and Apache Kafka are used to transform, enrich, and stream transactions into Apache HBase. This post continues by describing how transactions are arranged in Apache HBase to optimize performance, and how we make use of coprocessors to provide per-customer aggregations of purchasing trends. Santander and Cloudera went on (and are still on) a HBase journey with Spendlytics, one that has seen many iterations and optimizations of schema design and coprocessor implementations. We hope that these lessons learned are the key takeaway points from this post.

Schema 1.0

Good HBase schema design is about understanding the intended access patterns. Get it right and HBase will fly; get it wrong and you could end up with suboptimal performance due to design tradeoffs like region hotspots or having to perform large scans across multiple regions. (A hotspot in an HBase table is where an uneven rowkey distribution can cause the majority of requests being routed to a single region, overwhelming the RegionServer and resulting in slow response times.)

What we knew about Spendlytics intended access patterns and how it influenced the initial schema design:

  • Customers analyze only transactions on their own accounts:
    • For fast linear scan performance, all customer transactions should be stored sequentially.
  • Customer IDs are monotonically increasing:
    • Sequential customer IDs increase the probability that newer customers will be co-located within the same region, potentially creating a region hot spot. To avoid this issue, customer IDs should be salted (prefixed) or reversed to even distribution across regions when used at the beginning of the rowkey.
  • Customers have multiple cards
    • To optimize scans, a customer’s transactions should be further grouped and sorted by card contract, i.e. the contract ID should form part of the rowkey.
  • Transactions will be accessed in their entirety, i.e. attributes such as retailer, merchant, location, currency, and amount do not need to be read separately
    • Storing transaction attributes in separate cells would result in a wider, sparse table, which will increase seek times. As the attributes will be accessed together it made sense to serialize them together in an Apache Avro record. Avro is compact and provides us with an efficient representation with schema evolve-ability.
  • Transactions are accessed individually, in batches (by time, category, and retailer), and by aggregate (by time, category, and retailer).
    • Adding a unique transaction ID as a column qualifier will allow retrieval of individual transactions without adding more complexity to the rowkey.
    • To enable fast scanning of transactions over variable time periods, the transaction timestamp should form part of the rowkey.
    • Adding category and retailer to the rowkey could be too granular and would result in a very tall and narrow table with a complex row key. Tall and narrow is OK given that atomicity is not an issue, but having them as column qualifiers would widen the table while still supporting secondary aggregations.
  • Trend data should be precomputed as much as possible to optimize read performance.
    • More on this later, but for now know that we added a second column family to store the trends.

    Based on the above, the initial schema design is illustrated as follows:

    Computing Trends

    The aspect of the initial design we learned the most from was computing trends. The requirement was to enable customers to analyze their spend by category and retailer down to the hour. Data points included the smallest and largest transaction values, total transaction value, and number of transactions. Response times had to be 200ms or less.

    Precomputing trends would give us the fastest response times so this was our first approach. Trends could not lag the transactions so they had to be computed on the write path. This would be great for read performance, but presented us with a couple of challenges: how best to organize trends in HBase, and how to compute them quickly and reliably without severely impacting write performance.

    We experimented with different schema designs and tried to leverage some well known designs where possible (such as OpenTSDB’s schema). After several iterations we settled on the schema design illustrated above. Stored in the transactions table, in a separate column family, trend values are organized together in a single row, with one trend row per customer. By giving the rowkey the same prefix as a customer’s transactions (for example, <reverse_customer_id>::<contract_id>) it ensured that the trend row will be sorted alongside the corresponding customer’s transaction records. With defined region boundaries and a custom region split policy in place, we can also guarantee that the trend row will always be collocated with a customer’s transaction records, enabling trend aggregation to stay entirely server-side in the coprocessor.

    To precompute trends, we implemented a custom observer coprocessor to hook into the write path. (Observer coprocessors are similar to triggers in a RDBMS in that they execute user code before or after a specific event occurs. For example, pre or post Put or Get.)

    On postPut the coprocessor performs the following actions:

    1. Checks the Put for a trend attribute (flag). The attribute is set on new transaction records only to avoid recursive calls when updating the trend record. It also allows for the coprocessor to be skipped for Puts that don’t require trends to be updated (e.g. settlements).
    2. Get trend record for customer. A customer’s trend record is colocated with their transactions (based on rowkey prefix) so the coprocessor can retrieve it directly from the current region. The trend row has to be locked to prevent multiple RegionServer handler threads trying to update the trends in parallel.
    3. Update data points:
    4. Update and unlock trend row.

    The solution proved to be accurate during testing, and as expected read performance exceeded requirements. However, there were some concerns with this approach. The first was how to handle failure: trends are stored in a separate row so atomicity cannot be guaranteed. The second was how to validate the accuracy of trends over time; that is, we would need to implement a mechanism to identify and remediate any trend inaccuracies. When we also considered the HA requirements and the fact that we would need to run two, active-active instances of HBase in different data centers, this could be a bigger problem. Not only could trend accuracy decrease over time, but the two clusters could also drift and have to be reconciled depending on the method we used to synchronize them. Finally, fixing bugs or adding new data points would be difficult because we would possibly have to back-track and recompute all trends.

    Then there was write performance. For every new transaction the observer had to fetch a trend record, update 32 data points, and put the trend record back. Despite all this happening within the bounds of a single region, we found that throughput was reduced from over 20,000 writes per second to 1,000 writes per second (per RegionServer). This performance was acceptable in the short term, but would not scale to support the predicted long-term load.

    We knew that write performance was a risk so we had a backup plan, and that was an endpoint coprocessor. Endpoint coprocessors are similar to stored procedures in a RDBMS in that they allow you to perform server-side computation—at the RegionServer where the data is located, rather than at the client. Endpoints effectively extend the HBase API.

    Instead of precomputing trends, the endpoint computes them on the fly, server-side. As a result we could drop the trends column family from the schema and the risk of inaccuracies and divergence went with it. Moving away from the observer resulted in good write performance, but would reads be fast enough? In short, yes. With a customer’s transactions confined to a single region and sorted by card and timestamp, the endpoint can scan and aggregate quickly, well within Spendlytics’ 200ms objective. This also means that a client request (from the Spendlytics API in this case) is only ever routed to a single Endpoint instance (single RegionServer) and the client will get a single response back with a complete result—that is, no client-side processing is required to aggregate partial results from multiple endpoints, which would be the case if a customer’s transactions spanned multiple regions.

    Lessons Learned

    Spendlytics has been live since July 2015. Since then we have monitored access patterns closely and looked at ways to optimize performance. We want to continually improve the user experience and provide customers with more and more insight into their card spending. The remainder of this post describes the lessons we have learned from running Spendlytics in production and some of the optimizations that have been put in place.

    After the initial release we identified a number of pain points that we wanted to focus on improving. The first was how to filter results by transaction attribute. As mentioned previously, transaction attributes are encoded in Avro records, but we found that an increasing number of access patterns wanted to filter by attribute and users were forced into doing this client-side. The initial solution was to implement a custom HBase ValueFilter that accepted our own complex filter expressions, for example:

    category='SUPERMARKETS' AND amount > 100 AND 
    (brand LIKE 'foo%' OR brand = 'bar')

    The expression is evaluated for each Avro record, allowing us to filter the results server-side and reduce the amount of data that is returned to the client (saving network bandwidth and client-side processing). The filter does affect scan performance, but response times remained well within the 200ms objective.

    This ended up being a temporary solution because of further changes that were required to optimize writes. Due to the way the credit-card settlement process works, we first receive an authorized transaction from the time of sale (in near real-time) and then some time later a settled transaction from the credit-card network (in batch). These transactions need to be reconciled, essentially by merging the settled transactions with the authorized transactions already in HBase, joining on transaction ID. As part of this process transaction attributes can change and new attributes can be added. This proved to be painful due to the overhead of having to rewrite entire Avro records—even when updating single attributes. So to make the attributes more accessible for updates we organized them into columns, replacing the Avro serialization.

    We also only care about transaction-level atomicity, so bucketing the transactions by hour didn’t give us any advantage. Moreover, the settled transactions that now arrive in batch have only day-level granularity, which made it difficult (costly) to reconcile them with existing authorized transactions stored by hour. To solve this problem, we moved the transaction ID into the rowkey and reduced the timestamp grain to days, rather than hours. The reconciliation process is now much easier because we can simply bulk load the changes into HBase and let the settlement values take precedence.

    In summary:

    • Observer coprocessors can be a valuable tool, but use them wisely.
    • For some use cases, extending the HBase API using endpoints is a good alternative.
    • Use custom filters to improve performance by trimming results server-side.
    • Serialized values make sense for the right use case, but play to HBase’s strengths by favoring native support for fields and columns.
    • Managing precomputed results is difficult; the additional latency from computing on-the-fly can be worthwhile.
    • Access patterns will change, so be agile and open to making changes to HBase schema to adapt and stay ahead of the game.

    Roadmap

    An optimization that we are currently evaluating is hybrid coprocessors. What we mean by this is the combination of both observer and endpoint coprocessors to precompute trends. However, unlike before, we would not do this on the write path but in the background by hooking into HBase’s flush and compaction operations. An observer will compute trends during flush and compaction events based on the settled transactions available at that point in time. We would then use an endpoint to combine the precomputed trends with on-the-fly aggregations of the delta of transactions. By precomputing trends in this way we hope to give reads a boost in performance, without affecting write performance.

    Another approach we’re evaluating for trend aggregation, and for HBase access in general, is Apache Phoenix. Phoenix is a SQL skin for HBase that enables access using standard JDBC APIs. We hope that by using SQL and JDBC it will simplify HBase access and reduce the amount of code we have to write. We can also leverage Phoenix’s intelligent execution patterns and built in coprocessors and filters for fast aggregations. Phoenix was considered to be too immature for production use at Spendlytics’ inception, but with similar use cases being reported by the likes of eBay and Salesforce, now is the time to re-evaluate. (A Phoenix package for CDH is available for installation and evaluation, but with no support, via Cloudera Labs.)

    Santander recently announced that it is the first bank to launch voice banking technology that enables customers to talk to its SmartBank app and ask about their card spending. The platform behind this technology is Cloudera, and the architecture for Spendlytics—as described in this set of posts—served as the blueprint design.

    James Kinley is a Principal Solutions Architect at Cloudera.

    Ian Buss is a Senior Solutions Architect at Cloudera.

    Pedro Boado is a Hadoop engineer at Santander (Isban) UK.

    Abel Fernandez Alfonso is a Hadoop engineer at Santander (Isban) UK.

 

 

James Kinley
Principal Data Scientist
More by this author
Ian Buss
Principal Solutions Architect
More by this author

Leave a comment

Your email address will not be published. Links are not permitted in comments.