From bf45246c1205cfc284503dad3647070dc1dd0e29 Mon Sep 17 00:00:00 2001 From: Yuya Ebihara Date: Mon, 10 Mar 2025 13:51:52 +0900 Subject: [PATCH] Add number option to preserve in Iceberg expire_snapshots --- .../trino/plugin/iceberg/IcebergMetadata.java | 13 +++++---- .../ExpireSnapshotsTableProcedure.java | 13 +++++++++ .../IcebergExpireSnapshotsHandle.java | 16 +++++++++- .../iceberg/BaseIcebergConnectorTest.java | 29 +++++++++++++++++++ 4 files changed, 65 insertions(+), 6 deletions(-) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 4376983317df..2bb369cdc088 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -142,6 +142,7 @@ import org.apache.iceberg.DataFiles; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.DeleteFiles; +import org.apache.iceberg.ExpireSnapshots; import org.apache.iceberg.FileFormat; import org.apache.iceberg.FileMetadata; import org.apache.iceberg.FileScanTask; @@ -1707,12 +1708,13 @@ private Optional getTableHandleForDropExtendedStats private Optional getTableHandleForExpireSnapshots(ConnectorSession session, IcebergTableHandle tableHandle, Map executeProperties) { Duration retentionThreshold = (Duration) executeProperties.get(RETENTION_THRESHOLD); + Optional retainLast = Optional.ofNullable((Integer) executeProperties.get("retain_last")); Table icebergTable = catalog.loadTable(session, tableHandle.getSchemaTableName()); return Optional.of(new IcebergTableExecuteHandle( tableHandle.getSchemaTableName(), EXPIRE_SNAPSHOTS, - new IcebergExpireSnapshotsHandle(retentionThreshold), + new IcebergExpireSnapshotsHandle(retentionThreshold, retainLast), icebergTable.location(), icebergTable.io().properties())); } @@ -2174,11 +2176,12 @@ private void executeExpireSnapshots(ConnectorSession session, IcebergTableExecut } }; + ExpireSnapshots expireSnapshots = table.expireSnapshots() + .expireOlderThan(expireTimestampMillis) + .deleteWith(deleteFunction); + expireSnapshotsHandle.retainLast().ifPresent(expireSnapshots::retainLast); try { - table.expireSnapshots() - .expireOlderThan(expireTimestampMillis) - .deleteWith(deleteFunction) - .commit(); + expireSnapshots.commit(); fileSystem.deleteFiles(pathsToDelete); } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/ExpireSnapshotsTableProcedure.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/ExpireSnapshotsTableProcedure.java index 1aa84ea92887..30c2f6ed8c2b 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/ExpireSnapshotsTableProcedure.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/ExpireSnapshotsTableProcedure.java @@ -16,11 +16,14 @@ import com.google.common.collect.ImmutableList; import com.google.inject.Provider; import io.airlift.units.Duration; +import io.trino.spi.TrinoException; import io.trino.spi.connector.TableProcedureMetadata; import static io.trino.plugin.base.session.PropertyMetadataUtil.durationProperty; import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.EXPIRE_SNAPSHOTS; +import static io.trino.spi.StandardErrorCode.INVALID_PROCEDURE_ARGUMENT; import static io.trino.spi.connector.TableProcedureExecutionMode.coordinatorOnly; +import static io.trino.spi.session.PropertyMetadata.integerProperty; public class ExpireSnapshotsTableProcedure implements Provider @@ -36,6 +39,16 @@ public TableProcedureMetadata get() "retention_threshold", "Only snapshots older than threshold should be removed", Duration.valueOf("7d"), + false), + integerProperty( + "retain_last", + "Number of ancestor snapshots to preserve regardless of retention_threshold (defaults to 1)", + null, + value -> { + if (value < 1) { + throw new TrinoException(INVALID_PROCEDURE_ARGUMENT, "retain_last must be at least 1, cannot be: %s".formatted(value)); + } + }, false))); } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergExpireSnapshotsHandle.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergExpireSnapshotsHandle.java index 0757a82d66e3..70d5a5d3ee89 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergExpireSnapshotsHandle.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergExpireSnapshotsHandle.java @@ -13,15 +13,29 @@ */ package io.trino.plugin.iceberg.procedure; +import com.google.common.base.MoreObjects; import io.airlift.units.Duration; +import java.util.Optional; + import static java.util.Objects.requireNonNull; -public record IcebergExpireSnapshotsHandle(Duration retentionThreshold) +public record IcebergExpireSnapshotsHandle(Duration retentionThreshold, Optional retainLast) implements IcebergProcedureHandle { public IcebergExpireSnapshotsHandle { requireNonNull(retentionThreshold, "retentionThreshold is null"); + requireNonNull(retainLast, "retainLast is null"); + } + + @Override + public String toString() + { + return MoreObjects.toStringHelper(this) + .omitEmptyValues() + .add("retentionThreshold", retentionThreshold) + .add("retainLast", retainLast) + .toString(); } } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index eb55ef9b64d6..d200dd58b9f9 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -6473,6 +6473,32 @@ public void testExpireSnapshotsPartitionedTable() assertThat(updatedSnapshots.size()).isLessThan(initialSnapshots.size()); } + @Test + public void testExpireSnapshotsRetainLast() + throws Exception + { + String tableName = "test_expiring_snapshots_" + randomNameSuffix(); + Session sessionWithShortRetentionUnlocked = prepareCleanUpSession(); + assertUpdate("CREATE TABLE " + tableName + " (key varchar, value integer)"); + assertUpdate("INSERT INTO " + tableName + " VALUES ('one', 1)", 1); + assertUpdate("INSERT INTO " + tableName + " VALUES ('two', 2)", 1); + assertUpdate("INSERT INTO " + tableName + " VALUES ('three', 3)", 1); + assertThat(query("SELECT count(*) FROM " + tableName)).matches("VALUES BIGINT '3'"); + + List initialSnapshots = getSnapshotIds(tableName); + String tableLocation = getTableLocation(tableName); + List initialFiles = getAllMetadataFilesFromTableDirectory(tableLocation); + assertQuerySucceeds(sessionWithShortRetentionUnlocked, "ALTER TABLE " + tableName + " EXECUTE expire_snapshots(retention_threshold => '0s', retain_last => 2)"); + + assertThat(query("SELECT count(*) FROM " + tableName)).matches("VALUES BIGINT '3'"); + List updatedFiles = getAllMetadataFilesFromTableDirectory(tableLocation); + List updatedSnapshots = getSnapshotIds(tableName); + assertThat(updatedFiles).hasSize(initialFiles.size() - 2); + assertThat(updatedSnapshots.size()).isLessThan(initialSnapshots.size()); + assertThat(updatedSnapshots).hasSize(2); + assertThat(initialSnapshots).containsAll(updatedSnapshots); + } + @Test public void testExpireSnapshotsOnSnapshot() { @@ -6525,6 +6551,9 @@ public void testExpireSnapshotsParameterValidation() assertQueryFails( "ALTER TABLE nation EXECUTE EXPIRE_SNAPSHOTS (retention_threshold => '33s')", "\\QRetention specified (33.00s) is shorter than the minimum retention configured in the system (7.00d). Minimum retention can be changed with iceberg.expire-snapshots.min-retention configuration property or iceberg.expire_snapshots_min_retention session property"); + assertQueryFails( + "ALTER TABLE nation EXECUTE EXPIRE_SNAPSHOTS (retention_threshold => '10d', retain_last => 0)", + ".* retain_last must be at least 1, cannot be: 0"); } @Test