Extending Hive Replication: Transactional Tables, External Tables, and Statistics

With every release, Hive’s built-in replication is expanding its territory by improving support for different table types. In this blog post, we will discuss the recent additions i.e. replication of transactional tables (a.k.a ACID tables), external tables and statistics associated with all kinds of tables.

Transactional table replication

Transactional tables in Hive support ACID properties. Unlike non-transactional tables, data read from transactional tables is transactionally consistent, irrespective of the state of the database. Of course, this imposes specific demands on replication of such tables, hence why Hive replication was designed with the following assumptions:

  1. A replicated database may contain more than one transactional table with cross-table integrity constraints.
  2. A target may host multiple databases, some replicated and some native to the target. The databases replicated from the same source may have transactional tables with cross-database integrity constraints.
  3. A user should be able to run read-only workloads on the target and should be able to read transactionally consistent data.
  4. Since in Hive a read-only transaction requires a new transaction-id, the transaction-ids on the source and the target of replication may differ. Thus transaction-ids can not be used for reading transactionally consistent data across source and replicated target.

Following sections explain various design decisions.

Introducing Write Identifiers

Hive uses MVCC for transactional consistency and performance. Every transaction modifying the data retains the old version and creates a new version. Usually, these versions are tracked by an identifier (id in short) associated with the transaction. A reader, when it begins, takes a transaction snapshot to know the versions of data visible to it. This snapshot allows readers to get a transactionally consistent view of the data. In Hive, a read-only transaction also gets a transaction-id. Hence read-only transaction on the target cause the transaction-ids on target to go out of sync with those on the source. This means a reader on the target can not rely on the transaction-ids to get a consistent view of data. This problem is solved by introducing the concept of a write-id. Every transaction associates a write-id with the version of data that it creates. For a given table, given a transaction snapshot, the reader knows the write-ids that are visible to it and hence the associated visible data. Since we can not run any write transactions on the target, the target can not produce any write-ids and hence, unlike transaction-ids, write-ids on source and the target can not go out of sync. Thus we do not require to modify the data files to restamp them with new transaction-ids on the target. Instead we replicate the write-id information to the target and build association between the transaction-ids on the target and the write-ids obtained from the source.

Avoid replicating aborted data versions

Hive replication is event driven. Any change to the database is captured as an event and the event is replicated to replicate that change from source database to the target database. Data modification is also captured as an event with the list of files created or deleted as part of that data change. For non-transactional tables, a data change, once made, becomes visible immediately. Hence for non-transactional tables, we replicate the data along with the event. But for transactional tables, the data change becomes visible only when the transaction commits. This means that the data modifications by an aborted transaction are never read by any reader. This is also true for a reader on the target cluster. Hence there is no point wasting resources to replicate the versions of the transactional data created by an aborted transaction. A commit transaction event in Hive also lists the files (i.e. data versions) that the transaction created. When the commit event is replayed on the target, these files are copied to the target where corresponding versions of data become visible. An abort transaction event does not list any files and thus the versions of data created by an aborted transaction are never copied to the target, avoiding wastage of resources.

Bootstrap dump and concurrent transactions

Replication works in two phases, first the bootstrap when we dump and load the entire tables and second, the incremental, where we apply the events incrementally.  In both cases the REPL command outputs the last event that is replicated, so that the next incremental cycle knows which event to start the subsequent incremental cycle from. When a bootstrap dump is taken, only the data visible at that time is dumped. This means that the versions of data written by the concurrent transactions will not be dumped. Since the bootstrap dump does not capture events, open transaction and write-id allocation events are not captured for any concurrent transactions. If the bootstrap dump finishes before these transactions commit, the next incremental cycle would capture the corresponding commit/abort transaction events. But since the target doesn’t know about these transactions, replaying these events on the target is not possible. That, in turn, means that we will not be able to replicate the data versions created by the concurrent transactions that commit after the bootstrap dump finishes. To avoid this, before starting the bootstrap dump, Hive waits for a certain period for any ongoing transactions to finish. The waiting period is controlled by the value of “hive.repl.bootstrap.dump.open.txn.timeout”. Any ongoing transactions which do not finish within this period are forced to abort, after which the bootstrap dump is taken. Any transactions started after issuing bootstrap REPL command are not touched since corresponding open transaction events are captured and are replayed during the next incremental cycle.

Compaction

The versions created by various transactions need to be cleaned up regularly to improve reader performance, to reclaim the storage occupied by multiple versions of the data and reduce the number of files stored in HDFS. This is achieved using compaction. Compaction rewrites the data file directories to reduce the number of files and to reduce the amount of space taken by a table contained within them. Since we are potentially replicating the data files during commit event replication, we require to compact the data directories on the target as well. There are two ways this can be achieved:

  1. Log compaction as an event and replicate the result of compaction to the target.
  2. Run compaction on the target separately and independent of the source cluster.

Compaction, though it reduces the space occupied by a table, may create files which are larger than any of the files it compacted. Thus in order to replicate the result of compaction, we need to replicate this larger set of files, which would waste the bandwidth without providing any value. Instead of wasting the bandwidth, Hive uses the second option. Compactions on the target are frequently run and transactional consistency is provided by annotating the compacted files with the transaction-id on the target, thus allowing the reader to choose their base directory based on the transaction snapshot. The visibility of the data rows is decided still by the write-ids associated with those rows.

External tables

Data replication and performance

Hive tracks the changes to the metadata of an external table e.g. location, schema etc. But the data in an external table is modified by actors external to Hive. Hence Hive can not track the changes to the data in an external table. As a result, point-in-time replication is not supported for external tables. This means that we have to copy the data for all external tables in its entirety during every incremental cycle. Thankfully this replication is optimized using distcp which can copy only differential data since the last cycle. When a bootstrap or an incremental replication cycle is performed, the external table metadata is replicated similar to managed tables, but the external table data is always distcp’ed from source to target.

A bootstrap as well as an incremental dump creates a file _external_tables_info under the dump root directory. This file contains one entry for every external table directory (or partition; explained further below)  to be copied to the target. Each entry contains the name of table, optionally name of the partition, followed by a base_64 encoded, fully qualified HDFS path. During load, the system then creates as many distcp jobs as there are entries. All of the partitions of a partitioned external table can have their root directories under the same table directory or some of the partitions may have their directories outside the table’s directory. For every partition which has its directory outside the table directory, we add one further entry for that partition. However an entry with just the table name covers all the partitions of a partitioned table that have their data within the table directory. This reduces the number of entries and thereby the number of distcp jobs that needs to be created. Based on the available number of parallel threads (hive.exec.parallel.thread.number) and maximum number of tasks that can be created (hive.repl.approx.max.load.tasks), the system will try to run as many parallel distcp jobs as possible, thus improving the overall throughput.

Base directory on target for external table

Since the location for external tables can be anywhere in HDFS, for use cases where multiple source clusters replicate data to same target cluster, there is a high possibility of overwriting data from different sources. To prevent this we mandate the use of a base directory configuration (hive.repl.replica.external.table.base.dir) to be provided in the WITH clause of REPL LOAD. If an external table ext_tab1 is located at /ext_loc/ext_tab1/ on the source HDFS and base directory is configured to be /ext_base1 on the target, the location for ext_tab1 on target will be /ext_base1/ext_loc/ext_tab1. A dump from another source when loaded on the same target should use a different base directory, say ext_base2. That way an external table on that source located at /ext_loc/ext_tab1 will be loaded at location /ext_base2/ext_loc/ext_tab1 on the target, thus avoiding collision.

Statistics

As mentioned above, a target cluster can be used as a backup and can also be used to run read-only workloads. Statistics are vital for query planning and optimization. Query planner uses statistics to choose the fastest possible execution plan for a given query. Certain types of queries, like ‘count(*)’, can be completely answered using statistics without scanning the data. Statistics are relatively small in size compared to the data; replicating statistics is more efficient than re-calculating on the target by scanning all data. Since statistical data does not change when replicated to the target, replicating statistics directly is more cost-effective as it avoids using additional compute resources on the target to derive the statistics. Statistics are maintained as part of the table/partition metadata. An update to data statistics is captured as an event on the source cluster. During the bootstrap phase, we dump and load the statistics along with the other metadata. In the subsequent incremental phase, statistics update events are dumped and loaded, thereby replicating the statistics.

Summary

Hive’s replication story has come a long way. With the recent new features and improvements in this area, it has closed the gap between the kind of data that it manages and the kind of data that it can replicate. With this level of maturity, it delivers enterprise strength and maturity for backup, disaster recovery, load balancing and many other purposes. The new capabilities discussed in this blog are available as part of HDP’s Data Lifecycle Manager (DLM) 1.5 and will eventually form part of the Cloudera Data Platform.

Ashutosh Bapat
Ashutosh Bapat

Leave a comment

Your email address will not be published. Links are not permitted in comments.