Skip to content

Commit 067773e

Browse files
7c00codope
authored andcommitted
Refactor HudiSplitSource (trinodb#16)
* Refactor HudiSplitSource * Address some code style issues * Remove metaClient and partitions from HudiTableHandle * Use intersect for HudiPredicates * Refactor HudiUtil.getSplits to HudiSplitFactory * Fix error-prone-checks failure * Extract HudiTableInfo
1 parent 3d8f512 commit 067773e

23 files changed

+514
-701
lines changed

plugin/trino-hudi/pom.xml

+8
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@
4949
<groupId>io.airlift</groupId>
5050
<artifactId>bootstrap</artifactId>
5151
</dependency>
52+
<dependency>
53+
<groupId>io.airlift</groupId>
54+
<artifactId>concurrent</artifactId>
55+
</dependency>
5256
<dependency>
5357
<groupId>io.airlift</groupId>
5458
<artifactId>configuration</artifactId>
@@ -82,6 +86,10 @@
8286
<groupId>com.google.inject</groupId>
8387
<artifactId>guice</artifactId>
8488
</dependency>
89+
<dependency>
90+
<groupId>javax.annotation</groupId>
91+
<artifactId>javax.annotation-api</artifactId>
92+
</dependency>
8593
<dependency>
8694
<groupId>javax.inject</groupId>
8795
<artifactId>javax.inject</artifactId>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package io.trino.plugin.hudi;
15+
16+
import javax.inject.Qualifier;
17+
18+
import java.lang.annotation.Retention;
19+
import java.lang.annotation.Target;
20+
21+
import static java.lang.annotation.ElementType.FIELD;
22+
import static java.lang.annotation.ElementType.METHOD;
23+
import static java.lang.annotation.ElementType.PARAMETER;
24+
import static java.lang.annotation.RetentionPolicy.RUNTIME;
25+
26+
@Retention(RUNTIME)
27+
@Target({FIELD, PARAMETER, METHOD})
28+
@Qualifier
29+
public @interface ForHudiSplitManager
30+
{
31+
}

plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConfig.java

+28
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ public class HudiConfig
3939
private boolean sizeBasedSplitWeightsEnabled = true;
4040
private DataSize standardSplitWeightSize = DataSize.of(128, MEGABYTE);
4141
private double minimumAssignedSplitWeight = 0.05;
42+
private int maxSplitsPerSecond = Integer.MAX_VALUE;
43+
private int maxOutstandingSplits = 1000;
4244

4345
@NotNull
4446
public String getBaseFileFormat()
@@ -194,4 +196,30 @@ public double getMinimumAssignedSplitWeight()
194196
{
195197
return minimumAssignedSplitWeight;
196198
}
199+
200+
@Min(1)
201+
public int getMaxSplitsPerSecond()
202+
{
203+
return maxSplitsPerSecond;
204+
}
205+
206+
@Config("hudi.max-splits-per-second")
207+
public HudiConfig setMaxSplitsPerSecond(int maxSplitsPerSecond)
208+
{
209+
this.maxSplitsPerSecond = maxSplitsPerSecond;
210+
return this;
211+
}
212+
213+
@Min(1)
214+
public int getMaxOutstandingSplits()
215+
{
216+
return maxOutstandingSplits;
217+
}
218+
219+
@Config("hudi.max-outstanding-splits")
220+
public HudiConfig setMaxOutstandingSplits(int maxOutstandingSplits)
221+
{
222+
this.maxOutstandingSplits = maxOutstandingSplits;
223+
return this;
224+
}
197225
}

plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConnector.java

-3
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ public class HudiConnector
5050

5151
private final LifeCycleManager lifeCycleManager;
5252
private final HudiTransactionManager transactionManager;
53-
private final HudiMetadataFactory metadataFactory;
5453
private final ConnectorSplitManager splitManager;
5554
private final ConnectorPageSourceProvider pageSourceProvider;
5655
private final ConnectorNodePartitioningProvider nodePartitioningProvider;
@@ -62,7 +61,6 @@ public class HudiConnector
6261
public HudiConnector(
6362
LifeCycleManager lifeCycleManager,
6463
HudiTransactionManager transactionManager,
65-
HudiMetadataFactory metadataFactory,
6664
ConnectorSplitManager splitManager,
6765
ConnectorPageSourceProvider pageSourceProvider,
6866
ConnectorNodePartitioningProvider nodePartitioningProvider,
@@ -73,7 +71,6 @@ public HudiConnector(
7371
{
7472
this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null");
7573
this.transactionManager = requireNonNull(transactionManager, "transactionManager is null");
76-
this.metadataFactory = requireNonNull(metadataFactory, "metadataFactory is null");
7774
this.splitManager = requireNonNull(splitManager, "splitManager is null");
7875
this.pageSourceProvider = requireNonNull(pageSourceProvider, "pageSourceProvider is null");
7976
this.nodePartitioningProvider = requireNonNull(nodePartitioningProvider, "nodePartitioningProvider is null");

plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiErrorCode.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public enum HudiErrorCode
3333

3434
HudiErrorCode(int code, ErrorType type)
3535
{
36-
errorCode = new ErrorCode(code + 0x0100_0000, name(), type);
36+
errorCode = new ErrorCode(code + 0x0507_0000, name(), type);
3737
}
3838

3939
@Override

plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiMetadata.java

+15-46
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import io.airlift.log.Logger;
2121
import io.trino.plugin.hive.HdfsEnvironment;
2222
import io.trino.plugin.hive.HiveColumnHandle;
23-
import io.trino.plugin.hive.HivePartition;
2423
import io.trino.plugin.hive.acid.AcidSchema;
2524
import io.trino.plugin.hive.metastore.Column;
2625
import io.trino.plugin.hive.metastore.HiveMetastore;
@@ -32,23 +31,23 @@
3231
import io.trino.spi.connector.ConnectorSession;
3332
import io.trino.spi.connector.ConnectorTableHandle;
3433
import io.trino.spi.connector.ConnectorTableMetadata;
35-
import io.trino.spi.connector.ConnectorTableProperties;
3634
import io.trino.spi.connector.Constraint;
3735
import io.trino.spi.connector.ConstraintApplicationResult;
3836
import io.trino.spi.connector.SchemaTableName;
3937
import io.trino.spi.connector.SchemaTablePrefix;
38+
import io.trino.spi.connector.TableColumnsMetadata;
4039
import io.trino.spi.connector.TableNotFoundException;
4140
import io.trino.spi.predicate.TupleDomain;
4241
import io.trino.spi.type.TypeManager;
4342
import org.apache.hadoop.conf.Configuration;
4443
import org.apache.hadoop.fs.Path;
4544
import org.apache.hudi.common.model.HoodieTableType;
46-
import org.apache.hudi.common.table.HoodieTableMetaClient;
4745

4846
import java.util.List;
4947
import java.util.Map;
5048
import java.util.Optional;
5149
import java.util.function.Function;
50+
import java.util.stream.Stream;
5251

5352
import static com.google.common.collect.ImmutableList.toImmutableList;
5453
import static com.google.common.collect.ImmutableMap.toImmutableMap;
@@ -67,7 +66,6 @@
6766
import static io.trino.plugin.hive.util.HiveUtil.hiveColumnHandles;
6867
import static io.trino.plugin.hive.util.HiveUtil.isHiveSystemSchema;
6968
import static io.trino.plugin.hudi.HudiErrorCode.HUDI_UNKNOWN_TABLE_TYPE;
70-
import static io.trino.plugin.hudi.HudiUtil.splitPredicate;
7169
import static io.trino.spi.connector.SchemaTableName.schemaTableName;
7270
import static java.lang.String.format;
7371
import static java.util.Collections.singletonList;
@@ -120,8 +118,7 @@ public HudiTableHandle getTableHandle(ConnectorSession session, SchemaTableName
120118
table.get().getStorage().getLocation(),
121119
HoodieTableType.COPY_ON_WRITE,
122120
TupleDomain.all(),
123-
TupleDomain.all(),
124-
Optional.of(getTableMetaClient(session, table.get())));
121+
TupleDomain.all());
125122
}
126123

127124
@Override
@@ -135,12 +132,14 @@ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, Connect
135132
public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(ConnectorSession session, ConnectorTableHandle tableHandle, Constraint constraint)
136133
{
137134
HudiTableHandle handle = (HudiTableHandle) tableHandle;
138-
HudiPredicates predicates = splitPredicate(constraint.getSummary());
139-
HudiTableHandle newHudiTableHandle = handle.withPredicates(predicates);
135+
HudiPredicates predicates = HudiPredicates.from(constraint.getSummary());
136+
HudiTableHandle newHudiTableHandle = handle.withPredicates(
137+
predicates.getPartitionColumnPredicates(),
138+
predicates.getRegularColumnPredicates());
140139

141140
if (handle.getPartitionPredicates().equals(newHudiTableHandle.getPartitionPredicates())
142141
&& handle.getRegularPredicates().equals(newHudiTableHandle.getRegularPredicates())) {
143-
log.info("No new predicates to apply");
142+
log.debug("No new predicates to apply");
144143
return Optional.empty();
145144
}
146145

@@ -150,12 +149,6 @@ public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(C
150149
false));
151150
}
152151

153-
@Override
154-
public ConnectorTableProperties getTableProperties(ConnectorSession session, ConnectorTableHandle tableHandle)
155-
{
156-
return new ConnectorTableProperties();
157-
}
158-
159152
@Override
160153
public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle)
161154
{
@@ -175,11 +168,8 @@ public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTable
175168
@Override
176169
public Optional<Object> getInfo(ConnectorTableHandle table)
177170
{
178-
return ((HudiTableHandle) table).getPartitions()
179-
.map(partitions -> new HudiInputInfo(
180-
partitions.stream()
181-
.map(HivePartition::getPartitionId)
182-
.collect(toImmutableList())));
171+
HudiTableHandle hudiTable = (HudiTableHandle) table;
172+
return Optional.of(HudiTableInfo.from(hudiTable));
183173
}
184174

185175
@Override
@@ -191,40 +181,26 @@ public List<SchemaTableName> listTables(ConnectorSession session, Optional<Strin
191181
tableNames.add(schemaTableName(schemaName, tableName));
192182
}
193183
}
194-
195-
tableNames.addAll(listMaterializedViews(session, optionalSchemaName));
196184
return tableNames.build();
197185
}
198186

199187
@Override
200-
public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSession session, SchemaTablePrefix prefix)
188+
public Stream<TableColumnsMetadata> streamTableColumns(ConnectorSession session, SchemaTablePrefix prefix)
201189
{
202190
List<SchemaTableName> tables = prefix.getTable()
203191
.map(ignored -> singletonList(prefix.toSchemaTableName()))
204192
.orElseGet(() -> listTables(session, prefix.getSchema()));
205-
206-
ImmutableMap.Builder<SchemaTableName, List<ColumnMetadata>> columns = ImmutableMap.builder();
207-
for (SchemaTableName table : tables) {
208-
try {
209-
columns.put(table, getTableMetadata(table).getColumns());
210-
}
211-
catch (TableNotFoundException e) {
212-
// table disappeared during listing operation
213-
}
214-
}
215-
return columns.buildOrThrow();
193+
return tables.stream().map(table -> {
194+
List<ColumnMetadata> columns = getTableMetadata(table).getColumns();
195+
return TableColumnsMetadata.forTable(table, columns);
196+
});
216197
}
217198

218199
HiveMetastore getMetastore()
219200
{
220201
return metastore;
221202
}
222203

223-
void rollback()
224-
{
225-
// TODO: cleanup open transaction when write will be supported
226-
}
227-
228204
private static Function<HiveColumnHandle, ColumnMetadata> columnMetadataGetter(Table table)
229205
{
230206
ImmutableList.Builder<String> columnNames = ImmutableList.builder();
@@ -288,13 +264,6 @@ private boolean isHudiTable(ConnectorSession session, Table table)
288264
return true;
289265
}
290266

291-
private HoodieTableMetaClient getTableMetaClient(ConnectorSession session, Table table)
292-
{
293-
String basePath = table.getStorage().getLocation();
294-
Configuration conf = hdfsEnvironment.getConfiguration(new HdfsEnvironment.HdfsContext(session), new Path(basePath));
295-
return HoodieTableMetaClient.builder().setConf(conf).setBasePath(basePath).build();
296-
}
297-
298267
private ConnectorTableMetadata getTableMetadata(SchemaTableName tableName)
299268
{
300269
Table table = metastore.getTable(tableName.getSchemaName(), tableName.getTableName())

plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiModule.java

+15
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
import com.google.inject.Binder;
1818
import com.google.inject.Module;
19+
import com.google.inject.Provides;
1920
import com.google.inject.Scopes;
2021
import io.trino.plugin.base.security.AllowAllAccessControl;
2122
import io.trino.plugin.base.session.SessionPropertiesProvider;
@@ -32,9 +33,15 @@
3233
import io.trino.spi.connector.ConnectorPageSourceProvider;
3334
import io.trino.spi.connector.ConnectorSplitManager;
3435

36+
import javax.inject.Singleton;
37+
38+
import java.util.concurrent.ExecutorService;
39+
3540
import static com.google.inject.multibindings.Multibinder.newSetBinder;
3641
import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder;
42+
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
3743
import static io.airlift.configuration.ConfigBinder.configBinder;
44+
import static java.util.concurrent.Executors.newCachedThreadPool;
3845
import static org.weakref.jmx.guice.ExportBinder.newExporter;
3946

4047
public class HudiModule
@@ -67,4 +74,12 @@ public void configure(Binder binder)
6774

6875
newOptionalBinder(binder, ConnectorAccessControl.class).setDefault().to(AllowAllAccessControl.class).in(Scopes.SINGLETON);
6976
}
77+
78+
@ForHudiSplitManager
79+
@Singleton
80+
@Provides
81+
public ExecutorService createExecutorService()
82+
{
83+
return newCachedThreadPool(daemonThreadsNamed("hudi-split-manager-%d"));
84+
}
7085
}

plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiPageSource.java

+3-16
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.List;
2626
import java.util.Map;
2727

28+
import static io.trino.plugin.base.util.Closables.closeAllSuppress;
2829
import static io.trino.plugin.hudi.HudiErrorCode.HUDI_BAD_DATA;
2930
import static java.util.Objects.requireNonNull;
3031

@@ -89,11 +90,11 @@ public Page getNextPage()
8990
return new Page(positionCount, blocksWithPartitionColumns);
9091
}
9192
catch (TrinoException e) {
92-
closeWithSuppression(e);
93+
closeAllSuppress(e, this);
9394
throw e;
9495
}
9596
catch (RuntimeException e) {
96-
closeWithSuppression(e);
97+
closeAllSuppress(e, this);
9798
throw new TrinoException(HUDI_BAD_DATA, e);
9899
}
99100
}
@@ -110,18 +111,4 @@ public void close()
110111
{
111112
dataPageSource.close();
112113
}
113-
114-
private void closeWithSuppression(Throwable throwable)
115-
{
116-
requireNonNull(throwable, "throwable is null");
117-
try {
118-
close();
119-
}
120-
catch (Exception e) {
121-
// Self-suppression not permitted
122-
if (e != throwable) {
123-
throwable.addSuppressed(e);
124-
}
125-
}
126-
}
127114
}

0 commit comments

Comments
 (0)