Skip to content

Commit

Permalink
Add number option to preserve in Iceberg expire_snapshots
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Mar 10, 2025
1 parent 58ecb8f commit bf45246
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.DeleteFiles;
import org.apache.iceberg.ExpireSnapshots;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileMetadata;
import org.apache.iceberg.FileScanTask;
Expand Down Expand Up @@ -1707,12 +1708,13 @@ private Optional<ConnectorTableExecuteHandle> getTableHandleForDropExtendedStats
private Optional<ConnectorTableExecuteHandle> getTableHandleForExpireSnapshots(ConnectorSession session, IcebergTableHandle tableHandle, Map<String, Object> executeProperties)
{
Duration retentionThreshold = (Duration) executeProperties.get(RETENTION_THRESHOLD);
Optional<Integer> retainLast = Optional.ofNullable((Integer) executeProperties.get("retain_last"));
Table icebergTable = catalog.loadTable(session, tableHandle.getSchemaTableName());

return Optional.of(new IcebergTableExecuteHandle(
tableHandle.getSchemaTableName(),
EXPIRE_SNAPSHOTS,
new IcebergExpireSnapshotsHandle(retentionThreshold),
new IcebergExpireSnapshotsHandle(retentionThreshold, retainLast),
icebergTable.location(),
icebergTable.io().properties()));
}
Expand Down Expand Up @@ -2174,11 +2176,12 @@ private void executeExpireSnapshots(ConnectorSession session, IcebergTableExecut
}
};

ExpireSnapshots expireSnapshots = table.expireSnapshots()
.expireOlderThan(expireTimestampMillis)
.deleteWith(deleteFunction);
expireSnapshotsHandle.retainLast().ifPresent(expireSnapshots::retainLast);
try {
table.expireSnapshots()
.expireOlderThan(expireTimestampMillis)
.deleteWith(deleteFunction)
.commit();
expireSnapshots.commit();

fileSystem.deleteFiles(pathsToDelete);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@
import com.google.common.collect.ImmutableList;
import com.google.inject.Provider;
import io.airlift.units.Duration;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.TableProcedureMetadata;

import static io.trino.plugin.base.session.PropertyMetadataUtil.durationProperty;
import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.EXPIRE_SNAPSHOTS;
import static io.trino.spi.StandardErrorCode.INVALID_PROCEDURE_ARGUMENT;
import static io.trino.spi.connector.TableProcedureExecutionMode.coordinatorOnly;
import static io.trino.spi.session.PropertyMetadata.integerProperty;

public class ExpireSnapshotsTableProcedure
implements Provider<TableProcedureMetadata>
Expand All @@ -36,6 +39,16 @@ public TableProcedureMetadata get()
"retention_threshold",
"Only snapshots older than threshold should be removed",
Duration.valueOf("7d"),
false),
integerProperty(
"retain_last",
"Number of ancestor snapshots to preserve regardless of retention_threshold (defaults to 1)",
null,
value -> {
if (value < 1) {
throw new TrinoException(INVALID_PROCEDURE_ARGUMENT, "retain_last must be at least 1, cannot be: %s".formatted(value));
}
},
false)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,29 @@
*/
package io.trino.plugin.iceberg.procedure;

import com.google.common.base.MoreObjects;
import io.airlift.units.Duration;

import java.util.Optional;

import static java.util.Objects.requireNonNull;

public record IcebergExpireSnapshotsHandle(Duration retentionThreshold)
public record IcebergExpireSnapshotsHandle(Duration retentionThreshold, Optional<Integer> retainLast)
implements IcebergProcedureHandle
{
public IcebergExpireSnapshotsHandle
{
requireNonNull(retentionThreshold, "retentionThreshold is null");
requireNonNull(retainLast, "retainLast is null");
}

@Override
public String toString()
{
return MoreObjects.toStringHelper(this)
.omitEmptyValues()
.add("retentionThreshold", retentionThreshold)
.add("retainLast", retainLast)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6473,6 +6473,32 @@ public void testExpireSnapshotsPartitionedTable()
assertThat(updatedSnapshots.size()).isLessThan(initialSnapshots.size());
}

@Test
public void testExpireSnapshotsRetainLast()
throws Exception
{
String tableName = "test_expiring_snapshots_" + randomNameSuffix();
Session sessionWithShortRetentionUnlocked = prepareCleanUpSession();
assertUpdate("CREATE TABLE " + tableName + " (key varchar, value integer)");
assertUpdate("INSERT INTO " + tableName + " VALUES ('one', 1)", 1);
assertUpdate("INSERT INTO " + tableName + " VALUES ('two', 2)", 1);
assertUpdate("INSERT INTO " + tableName + " VALUES ('three', 3)", 1);
assertThat(query("SELECT count(*) FROM " + tableName)).matches("VALUES BIGINT '3'");

List<Long> initialSnapshots = getSnapshotIds(tableName);
String tableLocation = getTableLocation(tableName);
List<String> initialFiles = getAllMetadataFilesFromTableDirectory(tableLocation);
assertQuerySucceeds(sessionWithShortRetentionUnlocked, "ALTER TABLE " + tableName + " EXECUTE expire_snapshots(retention_threshold => '0s', retain_last => 2)");

assertThat(query("SELECT count(*) FROM " + tableName)).matches("VALUES BIGINT '3'");
List<String> updatedFiles = getAllMetadataFilesFromTableDirectory(tableLocation);
List<Long> updatedSnapshots = getSnapshotIds(tableName);
assertThat(updatedFiles).hasSize(initialFiles.size() - 2);
assertThat(updatedSnapshots.size()).isLessThan(initialSnapshots.size());
assertThat(updatedSnapshots).hasSize(2);
assertThat(initialSnapshots).containsAll(updatedSnapshots);
}

@Test
public void testExpireSnapshotsOnSnapshot()
{
Expand Down Expand Up @@ -6525,6 +6551,9 @@ public void testExpireSnapshotsParameterValidation()
assertQueryFails(
"ALTER TABLE nation EXECUTE EXPIRE_SNAPSHOTS (retention_threshold => '33s')",
"\\QRetention specified (33.00s) is shorter than the minimum retention configured in the system (7.00d). Minimum retention can be changed with iceberg.expire-snapshots.min-retention configuration property or iceberg.expire_snapshots_min_retention session property");
assertQueryFails(
"ALTER TABLE nation EXECUTE EXPIRE_SNAPSHOTS (retention_threshold => '10d', retain_last => 0)",
".* retain_last must be at least 1, cannot be: 0");
}

@Test
Expand Down

0 comments on commit bf45246

Please sign in to comment.