Skip to content

Commit 2edfe09

Browse files
jkyllingraunaqmorarka
authored andcommittedJun 24, 2024
Add Parquet Bloom filter write support to Iceberg connector
1 parent a48a0a6 commit 2edfe09

File tree

10 files changed

+394
-26
lines changed

10 files changed

+394
-26
lines changed
 

‎docs/src/main/sphinx/connector/iceberg.md

+4
Original file line numberDiff line numberDiff line change
@@ -714,6 +714,10 @@ connector using a {doc}`WITH </sql/create-table-as>` clause.
714714
* - `orc_bloom_filter_fpp`
715715
- The ORC bloom filters false positive probability. Requires ORC format.
716716
Defaults to `0.05`.
717+
* - `parquet_bloom_filter_columns`
718+
- Comma-separated list of columns to use for Parquet bloom filter. It improves
719+
the performance of queries using Equality and IN predicates when reading
720+
Parquet files. Requires Parquet format. Defaults to `[]`.
717721
:::
718722

719723
The table definition below specifies to use Parquet files, partitioning by columns

‎plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriterFactory.java

+6-3
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373
import static io.trino.plugin.iceberg.IcebergTableProperties.ORC_BLOOM_FILTER_FPP_PROPERTY;
7474
import static io.trino.plugin.iceberg.IcebergUtil.getOrcBloomFilterColumns;
7575
import static io.trino.plugin.iceberg.IcebergUtil.getOrcBloomFilterFpp;
76+
import static io.trino.plugin.iceberg.IcebergUtil.getParquetBloomFilterColumns;
7677
import static io.trino.plugin.iceberg.TypeConverter.toTrinoType;
7778
import static io.trino.plugin.iceberg.util.OrcTypeConverter.toOrcType;
7879
import static io.trino.plugin.iceberg.util.PrimitiveTypeMapBuilder.makeTypeMap;
@@ -126,7 +127,7 @@ public IcebergFileWriter createDataFileWriter(
126127
{
127128
return switch (fileFormat) {
128129
// TODO use metricsConfig https://github.com/trinodb/trino/issues/9791
129-
case PARQUET -> createParquetWriter(MetricsConfig.getDefault(), fileSystem, outputPath, icebergSchema, session);
130+
case PARQUET -> createParquetWriter(MetricsConfig.getDefault(), fileSystem, outputPath, icebergSchema, session, storageProperties);
130131
case ORC -> createOrcWriter(metricsConfig, fileSystem, outputPath, icebergSchema, session, storageProperties, getOrcStringStatisticsLimit(session));
131132
case AVRO -> createAvroWriter(fileSystem, outputPath, icebergSchema, session);
132133
};
@@ -140,7 +141,7 @@ public IcebergFileWriter createPositionDeleteWriter(
140141
Map<String, String> storageProperties)
141142
{
142143
return switch (fileFormat) {
143-
case PARQUET -> createParquetWriter(FULL_METRICS_CONFIG, fileSystem, outputPath, POSITION_DELETE_SCHEMA, session);
144+
case PARQUET -> createParquetWriter(FULL_METRICS_CONFIG, fileSystem, outputPath, POSITION_DELETE_SCHEMA, session, storageProperties);
144145
case ORC -> createOrcWriter(FULL_METRICS_CONFIG, fileSystem, outputPath, POSITION_DELETE_SCHEMA, session, storageProperties, DataSize.ofBytes(Integer.MAX_VALUE));
145146
case AVRO -> createAvroWriter(fileSystem, outputPath, POSITION_DELETE_SCHEMA, session);
146147
};
@@ -151,7 +152,8 @@ private IcebergFileWriter createParquetWriter(
151152
TrinoFileSystem fileSystem,
152153
Location outputPath,
153154
Schema icebergSchema,
154-
ConnectorSession session)
155+
ConnectorSession session,
156+
Map<String, String> storageProperties)
155157
{
156158
List<String> fileColumnNames = icebergSchema.columns().stream()
157159
.map(Types.NestedField::name)
@@ -170,6 +172,7 @@ private IcebergFileWriter createParquetWriter(
170172
.setMaxPageValueCount(getParquetWriterPageValueCount(session))
171173
.setMaxBlockSize(getParquetWriterBlockSize(session))
172174
.setBatchSize(getParquetWriterBatchSize(session))
175+
.setBloomFilterColumns(getParquetBloomFilterColumns(storageProperties))
173176
.build();
174177

175178
HiveCompressionCodec hiveCompressionCodec = getCompressionCodec(session);

‎plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableProperties.java

+19
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ public class IcebergTableProperties
4545
public static final String FORMAT_VERSION_PROPERTY = "format_version";
4646
public static final String ORC_BLOOM_FILTER_COLUMNS_PROPERTY = "orc_bloom_filter_columns";
4747
public static final String ORC_BLOOM_FILTER_FPP_PROPERTY = "orc_bloom_filter_fpp";
48+
public static final String PARQUET_BLOOM_FILTER_COLUMNS_PROPERTY = "parquet_bloom_filter_columns";
4849

4950
private final List<PropertyMetadata<?>> tableProperties;
5051

@@ -107,6 +108,18 @@ public IcebergTableProperties(
107108
orcWriterConfig.getDefaultBloomFilterFpp(),
108109
IcebergTableProperties::validateOrcBloomFilterFpp,
109110
false))
111+
.add(new PropertyMetadata<>(
112+
PARQUET_BLOOM_FILTER_COLUMNS_PROPERTY,
113+
"Parquet Bloom filter index columns",
114+
new ArrayType(VARCHAR),
115+
List.class,
116+
ImmutableList.of(),
117+
false,
118+
value -> ((List<?>) value).stream()
119+
.map(String.class::cast)
120+
.map(name -> name.toLowerCase(ENGLISH))
121+
.collect(toImmutableList()),
122+
value -> value))
110123
.build();
111124
}
112125

@@ -169,4 +182,10 @@ private static void validateOrcBloomFilterFpp(double fpp)
169182
throw new TrinoException(INVALID_TABLE_PROPERTY, "Bloom filter fpp value must be between 0.0 and 1.0");
170183
}
171184
}
185+
186+
public static List<String> getParquetBloomFilterColumns(Map<String, Object> tableProperties)
187+
{
188+
List<String> parquetBloomFilterColumns = (List<String>) tableProperties.get(PARQUET_BLOOM_FILTER_COLUMNS_PROPERTY);
189+
return parquetBloomFilterColumns == null ? ImmutableList.of() : ImmutableList.copyOf(parquetBloomFilterColumns);
190+
}
172191
}

‎plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java

+46-4
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@
102102
import static com.google.common.collect.ImmutableSet.toImmutableSet;
103103
import static com.google.common.collect.Iterables.getOnlyElement;
104104
import static io.airlift.slice.Slices.utf8Slice;
105+
import static io.trino.parquet.writer.ParquetWriter.SUPPORTED_BLOOM_FILTER_TYPES;
105106
import static io.trino.plugin.base.io.ByteBuffers.getWrappedBytes;
106107
import static io.trino.plugin.hive.HiveMetadata.TABLE_COMMENT;
107108
import static io.trino.plugin.iceberg.ColumnIdentity.createColumnIdentity;
@@ -118,6 +119,7 @@
118119
import static io.trino.plugin.iceberg.IcebergTableProperties.LOCATION_PROPERTY;
119120
import static io.trino.plugin.iceberg.IcebergTableProperties.ORC_BLOOM_FILTER_COLUMNS_PROPERTY;
120121
import static io.trino.plugin.iceberg.IcebergTableProperties.ORC_BLOOM_FILTER_FPP_PROPERTY;
122+
import static io.trino.plugin.iceberg.IcebergTableProperties.PARQUET_BLOOM_FILTER_COLUMNS_PROPERTY;
121123
import static io.trino.plugin.iceberg.IcebergTableProperties.PARTITIONING_PROPERTY;
122124
import static io.trino.plugin.iceberg.IcebergTableProperties.SORTED_BY_PROPERTY;
123125
import static io.trino.plugin.iceberg.IcebergTableProperties.getPartitioning;
@@ -167,6 +169,7 @@
167169
import static org.apache.iceberg.TableProperties.OBJECT_STORE_PATH;
168170
import static org.apache.iceberg.TableProperties.ORC_BLOOM_FILTER_COLUMNS;
169171
import static org.apache.iceberg.TableProperties.ORC_BLOOM_FILTER_FPP;
172+
import static org.apache.iceberg.TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX;
170173
import static org.apache.iceberg.TableProperties.WRITE_DATA_LOCATION;
171174
import static org.apache.iceberg.TableProperties.WRITE_LOCATION_PROVIDER_IMPL;
172175
import static org.apache.iceberg.TableProperties.WRITE_METADATA_LOCATION;
@@ -303,6 +306,12 @@ public static Map<String, Object> getIcebergTableProperties(Table icebergTable)
303306
properties.put(ORC_BLOOM_FILTER_FPP_PROPERTY, Double.parseDouble(orcBloomFilterFpp.get()));
304307
}
305308

309+
// iceberg Parquet format bloom filter properties
310+
Set<String> parquetBloomFilterColumns = getParquetBloomFilterColumns(icebergTable.properties());
311+
if (!parquetBloomFilterColumns.isEmpty()) {
312+
properties.put(PARQUET_BLOOM_FILTER_COLUMNS_PROPERTY, ImmutableList.copyOf(parquetBloomFilterColumns));
313+
}
314+
306315
return properties.buildOrThrow();
307316
}
308317

@@ -319,6 +328,14 @@ public static Optional<String> getOrcBloomFilterColumns(Map<String, String> prop
319328
return orcBloomFilterColumns;
320329
}
321330

331+
public static Set<String> getParquetBloomFilterColumns(Map<String, String> properties)
332+
{
333+
return properties.entrySet().stream()
334+
.filter(entry -> entry.getKey().startsWith(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX) && "true".equals(entry.getValue()))
335+
.map(entry -> entry.getKey().substring(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX.length()))
336+
.collect(toImmutableSet());
337+
}
338+
322339
public static Optional<String> getOrcBloomFilterFpp(Map<String, String> properties)
323340
{
324341
return Stream.of(
@@ -776,14 +793,24 @@ public static Map<String, String> createTableProperties(ConnectorTableMetadata t
776793
propertiesBuilder.put(FORMAT_VERSION, Integer.toString(IcebergTableProperties.getFormatVersion(tableMetadata.getProperties())));
777794

778795
// iceberg ORC format bloom filter properties used by create table
779-
List<String> columns = IcebergTableProperties.getOrcBloomFilterColumns(tableMetadata.getProperties());
780-
if (!columns.isEmpty()) {
796+
List<String> orcBloomFilterColumns = IcebergTableProperties.getOrcBloomFilterColumns(tableMetadata.getProperties());
797+
if (!orcBloomFilterColumns.isEmpty()) {
781798
checkFormatForProperty(fileFormat.toIceberg(), FileFormat.ORC, ORC_BLOOM_FILTER_COLUMNS_PROPERTY);
782-
validateOrcBloomFilterColumns(tableMetadata, columns);
783-
propertiesBuilder.put(ORC_BLOOM_FILTER_COLUMNS, Joiner.on(",").join(columns));
799+
validateOrcBloomFilterColumns(tableMetadata, orcBloomFilterColumns);
800+
propertiesBuilder.put(ORC_BLOOM_FILTER_COLUMNS, Joiner.on(",").join(orcBloomFilterColumns));
784801
propertiesBuilder.put(ORC_BLOOM_FILTER_FPP, String.valueOf(IcebergTableProperties.getOrcBloomFilterFpp(tableMetadata.getProperties())));
785802
}
786803

804+
// iceberg Parquet format bloom filter properties used by create table
805+
List<String> parquetBloomFilterColumns = IcebergTableProperties.getParquetBloomFilterColumns(tableMetadata.getProperties());
806+
if (!parquetBloomFilterColumns.isEmpty()) {
807+
checkFormatForProperty(fileFormat.toIceberg(), FileFormat.PARQUET, PARQUET_BLOOM_FILTER_COLUMNS_PROPERTY);
808+
validateParquetBloomFilterColumns(tableMetadata, parquetBloomFilterColumns);
809+
for (String column : parquetBloomFilterColumns) {
810+
propertiesBuilder.put(PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX + column, "true");
811+
}
812+
}
813+
787814
if (tableMetadata.getComment().isPresent()) {
788815
propertiesBuilder.put(TABLE_COMMENT, tableMetadata.getComment().get());
789816
}
@@ -884,6 +911,21 @@ private static void validateOrcBloomFilterColumns(ConnectorTableMetadata tableMe
884911
}
885912
}
886913

914+
private static void validateParquetBloomFilterColumns(ConnectorTableMetadata tableMetadata, List<String> parquetBloomFilterColumns)
915+
{
916+
Map<String, Type> columnTypes = tableMetadata.getColumns().stream()
917+
.collect(toImmutableMap(ColumnMetadata::getName, ColumnMetadata::getType));
918+
for (String column : parquetBloomFilterColumns) {
919+
Type type = columnTypes.get(column);
920+
if (type == null) {
921+
throw new TrinoException(INVALID_TABLE_PROPERTY, format("Parquet Bloom filter column %s not present in schema", column));
922+
}
923+
if (!SUPPORTED_BLOOM_FILTER_TYPES.contains(type)) {
924+
throw new TrinoException(INVALID_TABLE_PROPERTY, format("Parquet Bloom filter column %s has unsupported type %s", column, type.getDisplayName()));
925+
}
926+
}
927+
}
928+
887929
public static int parseVersion(String metadataFileName)
888930
throws TrinoException
889931
{

‎plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergParquetWithBloomFilters.java

+28-19
Original file line numberDiff line numberDiff line change
@@ -14,18 +14,20 @@
1414
package io.trino.plugin.iceberg;
1515

1616
import com.google.common.base.Joiner;
17-
import com.google.common.collect.ImmutableMap;
18-
import io.trino.plugin.hive.TestingHivePlugin;
1917
import io.trino.spi.connector.CatalogSchemaTableName;
2018
import io.trino.spi.connector.SchemaTableName;
2119
import io.trino.testing.BaseTestParquetWithBloomFilters;
20+
import io.trino.testing.MaterializedResult;
2221
import io.trino.testing.QueryRunner;
22+
import org.junit.jupiter.api.Test;
2323

24-
import java.nio.file.Path;
2524
import java.util.List;
2625

26+
import static io.trino.testing.MaterializedResult.resultBuilder;
27+
import static io.trino.testing.QueryAssertions.assertContains;
2728
import static io.trino.testing.TestingNames.randomNameSuffix;
2829
import static java.lang.String.format;
30+
import static org.assertj.core.api.Assertions.assertThat;
2931

3032
public class TestIcebergParquetWithBloomFilters
3133
extends BaseTestParquetWithBloomFilters
@@ -34,30 +36,37 @@ public class TestIcebergParquetWithBloomFilters
3436
protected QueryRunner createQueryRunner()
3537
throws Exception
3638
{
37-
QueryRunner queryRunner = IcebergQueryRunner.builder().build();
38-
Path dataDirectory = queryRunner.getCoordinator().getBaseDataDir().resolve("iceberg_data");
39-
40-
// create hive catalog
41-
queryRunner.installPlugin(new TestingHivePlugin(dataDirectory));
42-
queryRunner.createCatalog("hive", "hive", ImmutableMap.<String, String>builder()
43-
.put("hive.security", "allow-all")
44-
.buildOrThrow());
45-
46-
return queryRunner;
39+
return IcebergQueryRunner.builder().build();
4740
}
4841

4942
@Override
5043
protected CatalogSchemaTableName createParquetTableWithBloomFilter(String columnName, List<Integer> testValues)
5144
{
5245
// create the managed table
5346
String tableName = "parquet_with_bloom_filters_" + randomNameSuffix();
54-
CatalogSchemaTableName hiveCatalogSchemaTableName = new CatalogSchemaTableName("hive", new SchemaTableName("tpch", tableName));
55-
CatalogSchemaTableName icebergCatalogSchemaTableName = new CatalogSchemaTableName("iceberg", new SchemaTableName("tpch", tableName));
56-
assertUpdate(format("CREATE TABLE %s WITH (format = 'PARQUET', parquet_bloom_filter_columns = ARRAY['%s']) AS SELECT * FROM (VALUES %s) t(%s)", hiveCatalogSchemaTableName, columnName, Joiner.on(", ").join(testValues), columnName), testValues.size());
47+
CatalogSchemaTableName catalogSchemaTableName = new CatalogSchemaTableName("iceberg", new SchemaTableName("tpch", tableName));
48+
assertUpdate(format("CREATE TABLE %s WITH (format = 'PARQUET', parquet_bloom_filter_columns = ARRAY['%s']) AS SELECT * FROM (VALUES %s) t(%s)", catalogSchemaTableName, columnName, Joiner.on(", ").join(testValues), columnName), testValues.size());
49+
50+
return catalogSchemaTableName;
51+
}
52+
53+
@Test
54+
public void testBloomFilterPropertiesArePersistedDuringCreate()
55+
{
56+
String tableName = "test_metadata_write_properties_" + randomNameSuffix();
57+
assertQuerySucceeds("CREATE TABLE " + tableName + " (A bigint, b bigint, c bigint) WITH (" +
58+
"format = 'parquet'," +
59+
"parquet_bloom_filter_columns = array['a','B'])");
5760

58-
// migrate the hive table to the iceberg table
59-
assertUpdate("CALL iceberg.system.migrate('tpch', '" + tableName + "', 'false')");
61+
MaterializedResult actualProperties = computeActual("SELECT * FROM \"" + tableName + "$properties\"");
62+
assertThat(actualProperties).isNotNull();
63+
MaterializedResult expectedProperties = resultBuilder(getSession())
64+
.row("write.parquet.bloom-filter-enabled.column.a", "true")
65+
.row("write.parquet.bloom-filter-enabled.column.b", "true")
66+
.build();
67+
assertContains(actualProperties, expectedProperties);
6068

61-
return icebergCatalogSchemaTableName;
69+
assertThat((String) computeScalar("SHOW CREATE TABLE " + tableName))
70+
.contains("parquet_bloom_filter_columns");
6271
}
6372
}

0 commit comments

Comments
 (0)
Please sign in to comment.