Optimization Strategies for Iceberg Tables

Optimization Strategies for Iceberg Tables

Introduction

Apache Iceberg has recently grown in popularity because it adds data warehouse-like capabilities to your data lake making it easier to analyze all your datastructured and unstructured. It offers several benefits such as schema evolution, hidden partitioning,  time travel, and more that improve the productivity of data engineers and data analysts. However, you need to regularly maintain Iceberg tables to keep them in a healthy state so that read queries can perform  faster. This blog discusses a few problems that you might encounter with Iceberg tables and offers strategies on how to optimize them in each of those scenarios. You can take advantage of a combination of the strategies provided and adapt them to your particular use cases. 

Problem with too many snapshots

Everytime a write operation occurs on an Iceberg table, a new snapshot is created. Over a period of time this can cause the table’s metadata.json file to get bloated and the number of old and potentially unnecessary data/delete files present in the data store to grow, increasing storage costs. A bloated metadata.json file could increase both read/write times because a large metadata file needs to be read/written every time. Regularly expiring snapshots is recommended to delete data files that are no longer needed, and to keep the size of table metadata small. Expiring snapshots is a relatively cheap operation and uses metadata to determine newly unreachable files.

Solution: expire snapshots

We can expire old snapshots using expire_snapshots 

Problem with suboptimal manifests

Over time the snapshots might reference many manifest files. This could cause a slowdown in query planning and increase the runtime of metadata queries. Furthermore, when first created the manifests may not lend themselves well to partition pruning, which increases the overall runtime of the query. On the other hand, if the manifests are well organized into discrete bounds of partitions, then partition pruning can prune away entire subtrees of data files.

Solution: rewrite manifests

We can solve the too many manifest files problem with rewrite_manifests and potentially get a well-balanced hierarchical tree of data files. 

Problem with delete files

Background

merge-on-read vs copy-on-write

Since Iceberg V2, whenever existing data needs to be updated (via delete, update, or merge statements), there are two options available: copy-on-write and merge-on-read. With the copy-on-write option, the corresponding data files of a delete, update, or merge operation will be read and entirely new data files will be written with the necessary write modifications. Iceberg doesn’t delete the old data files. So if you want to query the table before the modifications were applied you can use the time travel feature of Iceberg. In a later blog, we will go into details about how to take advantage of the time travel feature. If you decided that the old data files are not needed any more then you can get rid of them by expiring the older snapshot as discussed above. 

With the merge-on-read option, instead of rewriting the entire data files during the write time, simply a delete file is written. This can be an equality delete file or a positional delete file. As of this writing, Spark doesn’t write equality deletes, but it is capable of reading them. The advantage of using this option is that your writes can be much quicker as you are not rewriting an entire data file. Suppose you want to delete a specific user’s data in a table because of GDPR requirements, Iceberg will simply write a delete file specifying the locations of the user data in the corresponding data files where the user’s data exist. So whenever you are reading the tables, Iceberg will dynamically apply those deletes and present a logical table where the user’s data is deleted even though the corresponding records are still present in the physical data files.

We enable the merge-on-read option for our customers by default. You can enable or disable them by setting the following properties based on your requirements. See Write properties.

Serializable vs snapshot isolation

The default isolation guarantee provided for the delete, update, and merge operations is serializable isolation. You could also change the isolation level to snapshot isolation. Both serializable and snapshot isolation guarantees provide a read-consistent view of your data.  Serializable Isolation is a stronger guarantee. For instance, you have an employee table that maintains employee salaries. Now, you want to delete all records corresponding to employees with salary greater than $100,000. Let’s say this salary table has five data files and three of those have records of employees with salary greater than $100,000. When you initiate the delete operation, the three files containing employee salaries greater than $100,000 are selected, then if your “delete_mode” is merge-on-read a delete file is written that points to the positions to delete in those three data files. If your  “delete_mode” is copy-on-write, then all three data files are simply rewritten. 

Irrespective of the delete_mode, while the delete operation is happening, assume a new data file is written by another user with a salary greater than $100,000. If the isolation guarantee you chose is snapshot, then the delete operation will succeed and only the salary records corresponding to the original three data files are removed from your table. The records in the newly written data file while your delete operation was in progress, will remain intact. On the other hand, if your isolation guarantee was serializable, then your delete operation will fail and you will have to retry the delete from scratch. Depending on your use case you might want to reduce your isolation level to “snapshot.”

The problem

The presence of too many delete files will eventually reduce the read performance, because in Iceberg V2 spec, everytime a data file is read, all the corresponding delete files also need to be read (the Iceberg community is currently considering introducing a concept called “delete vector” in the future and that might work differently from the current spec). This could be very costly. The position delete files might contain dangling deletes, as in it might have references to data that are no longer present in any of the current snapshots.

Solution: rewrite position deletes

For position delete files, compacting the position delete files mitigates the problem a little bit by reducing the number of delete files that need to be read and offering faster performance by better compressing the delete data. In addition the procedure also deletes the dangling deletes.

Rewrite position delete files

Iceberg provides a rewrite position delete files procedure in Spark SQL.

But the presence of delete files still pose a performance problem. Also, regulatory requirements might force you to eventually physically delete the data rather than do a logical deletion. This can be addressed by doing a major compaction and removing the delete files entirely, which is addressed later in the blog.

Problem with small files

We typically want to minimize the number of files we are touching during a read. Opening files is costly. File formats like Parquet work better if the underlying file size is large. Reading more of the same file is cheaper than opening a new file. In Parquet, typically you want your files to be around 512 MB and row-group sizes to be around 128 MB. During the write phase these are controlled by “write.target-file-size-bytes” and “write.parquet.row-group-size-bytes” respectively. You might want to leave the Iceberg defaults alone unless you know what you are doing.

In Spark for example, the size of a Spark task in memory will need to be much higher to reach those defaults, because when data is written to disk, it will be compressed in Parquet/ORC. So getting your files to be of the desirable size is not easy unless your Spark task size is big enough.

Another problem arises with partitions. Unless aligned properly, a Spark task might touch multiple partitions. Let’s say you have 100 Spark tasks and each of them needs to write to 100 partitions, together they will write 10,000 small files. Let’s call this problem partition amplification.

Solution: use distribution-mode in write

The amplification problem could be addressed at write time by setting the appropriate write distribution mode in write properties. Insert distribution is controlled by  “write.distribution-mode”  and is defaulted to none by default. Delete distribution is controlled by “write.delete.distribution-mode” and is defaulted to hash, Update distribution is controlled by “write.update.distribution-mode” and is defaulted to hash and merge distribution is controlled by “write.merge.distribution-mode” and is defaulted to none.

The three write distribution modes that are available in Iceberg as of this writing are none, hash, and range. When your mode is none, no data shuffle occurs. You should use this mode only when you don’t care about the partition amplification problem or when you know that each task in your job only writes to a specific partition. 

When your mode is set to hash, your data is shuffled by using the partition key to generate the hashcode so that each resultant task will only write to a specific partition. When your distribution mode is range, your data is distributed such that your data is ordered by the partition key or sort key if the table has a SortOrder.

Using the hash or range can get tricky as you are now repartitioning the data based on the number of partitions your table might have. This can cause your Spark tasks after the shuffle to be either too small or too large. This problem can be mitigated by enabling adaptive query execution in spark by setting “spark.sql.adaptive.enabled=true” (this is enabled by default from Spark 3.2). Several configs are made available in Spark to adjust the behavior of adaptive query execution. Leaving the defaults as is unless you know exactly what you are doing is probably the best option. 

Even though the partition amplification problem could be mitigated by setting correct write distribution mode appropriate for your job, the resultant files could still be small just because the Spark tasks writing them could be small. Your job cannot write more data than it has.

Solution: rewrite data files

To address the small files problem and delete files problem, Iceberg provides a feature to rewrite data files. This feature is currently available only with Spark. The rest of the blog will go into this in more detail. This feature can be used to compact or even expand your data files, incorporate deletes from delete files corresponding to the data files that are being rewritten, provide better data ordering so that more data could be filtered directly at read time, and more. It is one of the most powerful tools in your toolbox that Iceberg provides. 

RewriteDataFiles

Iceberg provides a rewrite data files procedure in Spark SQL.

See RewriteDatafiles JavaDoc to see all the supported options. 

Now let’s discuss what the strategy option means because it is important to understand to get more out of the rewrite data files procedure. There are three strategy options available. They are Bin Pack, Sort, and Z Order. Note that when using the Spark procedure the Z Order strategy is invoked by simply setting the sort_order to “zorder(columns…).”

Strategy option

  • Bin Pack
    • It is the cheapest and fastest.
    • It combines files that are too small and combines them using the bin packing approach to reduce the number of output files.
    • No data ordering is changed.
    • No data is shuffled.
  • Sort
    • Much more expensive than Bin Pack.
    • Provides total hierarchical ordering.
    • Read queries only benefit if the columns used in the query are ordered. 
    • Requires data to be shuffled using range partitioning before writing.
  • Z Order
    • Most expensive of the three options.
    • The columns that are being used should have some kind of intrinsic clusterability and still need to have a sufficient amount of data in each partition because it only helps in eliminating files from a read scan, not from eliminating row groups. If they do, then queries can prune a lot of data during read time.
    • It only makes sense if more than one column is used in the Z order. If only one column is needed then regular sort is the better option. 
    • See https://blog.cloudera.com/speeding-up-queries-with-z-order/ to learn more about Z ordering. 

Commit conflicts

Iceberg uses optimistic concurrency control when committing new snapshots. So, when we use rewrite data files to update our data a new snapshot is created. But before that snapshot is committed, a check is done to see if there are any conflicts. If a conflict occurs all the work done could potentially be discarded. It is important to plan maintenance operations to minimize potential conflicts. Let us discuss some of the sources of conflicts.

  1. If only inserts occurred between the start of rewrite and the commit attempt, then there are no conflicts. This is because inserts result in new data files and the new data files can be added to the snapshot for the rewrite and the commit reattempted.
  2. Every delete file is associated with one or more data files. If a new delete file corresponding to a data file that is being rewritten is added in future snapshot (B), then a conflict occurs because the delete file is referencing a data file that is already being rewritten. 

Conflict mitigation

  1. If you can, try pausing jobs that can write to your tables during the maintenance operations. Or at least deletes should not be written to files that are being rewritten. 
  2. Partition your table in such a way that all new writes and deletes are written to a new partition. For instance, if your incoming data is partitioned by date, all your new data can go into a partition by date. You can run rewrite operations on partitions with older dates.
  3. Take advantage of the filter option in the rewrite data files spark action to best select the files to be rewritten based on your use case so that no delete conflicts occur.
  4. Enabling partial progress will help save your work by committing groups of files prior to the entire rewrite completing. Even if one of the file groups fails, other file groups could succeed.

Conclusion

Iceberg provides several features that a modern data lake needs. With a little care, planning and understanding a bit of Iceberg’s architecture one can take maximum advantage of all the awesome features it provides. 

To try some of these Iceberg features yourself you can sign up for one of our next live hands-on labs. 

You can also watch the webinar to learn more about Apache Iceberg and see the demo to learn the latest capabilities.

Srinivas Rishindra Pothireddi
Senior Software Engineer - CDE
More by this author

Leave a comment

Your email address will not be published. Links are not permitted in comments.