Skip to content

Commit e6e34ed

Browse files
Support updating ingestion error strategy
Signed-off-by: Varun Bharadwaj <[email protected]>
1 parent f9592dc commit e6e34ed

File tree

12 files changed

+151
-32
lines changed

12 files changed

+151
-32
lines changed

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java

+3-5
Original file line numberDiff line numberDiff line change
@@ -8,30 +8,28 @@
88

99
package org.opensearch.plugin.kafka;
1010

11-
import org.junit.Assert;
1211
import org.opensearch.action.admin.cluster.node.info.NodeInfo;
1312
import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest;
1413
import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse;
1514
import org.opensearch.action.admin.cluster.node.info.PluginsAndModules;
1615
import org.opensearch.action.search.SearchResponse;
17-
import org.opensearch.client.Request;
1816
import org.opensearch.cluster.metadata.IndexMetadata;
1917
import org.opensearch.common.settings.Settings;
2018
import org.opensearch.index.query.RangeQueryBuilder;
2119
import org.opensearch.indices.pollingingest.PollingIngestStats;
2220
import org.opensearch.plugins.PluginInfo;
2321
import org.opensearch.test.OpenSearchIntegTestCase;
2422
import org.opensearch.transport.client.Requests;
23+
import org.junit.Assert;
2524

26-
import java.util.Arrays;
2725
import java.util.List;
2826
import java.util.concurrent.TimeUnit;
2927
import java.util.function.Function;
3028
import java.util.stream.Collectors;
3129
import java.util.stream.Stream;
3230

33-
import static org.awaitility.Awaitility.await;
3431
import static org.hamcrest.Matchers.is;
32+
import static org.awaitility.Awaitility.await;
3533

3634
/**
3735
* Integration test for Kafka ingestion
@@ -56,7 +54,7 @@ public void testPluginsAreInstalled() {
5654
);
5755
}
5856

59-
public void testKafkaIngestion() throws Exception {
57+
public void testKafkaIngestion() {
6058
produceData("1", "name1", "24");
6159
produceData("2", "name2", "20");
6260
createIndexWithDefaultSettings(1, 0);

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/KafkaIngestionBaseIT.java

+13
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.List;
2828
import java.util.Locale;
2929
import java.util.Properties;
30+
import java.util.concurrent.Callable;
3031
import java.util.concurrent.TimeUnit;
3132

3233
import org.testcontainers.containers.KafkaContainer;
@@ -111,6 +112,18 @@ protected void waitForSearchableDocs(long docCount, List<String> nodes) throws E
111112
}, 1, TimeUnit.MINUTES);
112113
}
113114

115+
protected void waitForState(Callable<Boolean> checkState) throws Exception {
116+
assertBusy(() -> {
117+
if (checkState.call() == false) {
118+
fail("Provided state requirements not met.");
119+
}
120+
}, 1, TimeUnit.MINUTES);
121+
}
122+
123+
protected String getSettings(String indexName, String setting) {
124+
return client().admin().indices().prepareGetSettings(indexName).get().getSetting(indexName, setting);
125+
}
126+
114127
protected void createIndexWithDefaultSettings(int numShards, int numReplicas) {
115128
createIndex(
116129
indexName,

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/RemoteStoreKafkaIT.java

+37-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import java.nio.file.Path;
2222
import java.util.Arrays;
23-
import java.util.concurrent.TimeUnit;
2423

2524
import static org.hamcrest.Matchers.is;
2625

@@ -119,6 +118,43 @@ public void testCloseIndex() throws Exception {
119118
client().admin().indices().close(Requests.closeIndexRequest(indexName)).get();
120119
}
121120

121+
public void testErrorStrategy() throws Exception {
122+
produceData("1", "name1", "25");
123+
// malformed message
124+
produceData("2", "", "");
125+
produceData("3", "name3", "25");
126+
127+
internalCluster().startClusterManagerOnlyNode();
128+
final String node = internalCluster().startDataOnlyNode();
129+
130+
createIndex(
131+
indexName,
132+
Settings.builder()
133+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
134+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
135+
.put("ingestion_source.type", "kafka")
136+
.put("ingestion_source.error_strategy", "block")
137+
.put("ingestion_source.pointer.init.reset", "earliest")
138+
.put("ingestion_source.param.topic", topicName)
139+
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
140+
.put("index.replication.type", "SEGMENT")
141+
.build(),
142+
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
143+
);
144+
145+
ensureGreen(indexName);
146+
waitForState(() -> "block".equalsIgnoreCase(getSettings(indexName, "index.ingestion_source.error_strategy")));
147+
waitForSearchableDocs(1, Arrays.asList(node));
148+
149+
client().admin()
150+
.indices()
151+
.prepareUpdateSettings(indexName)
152+
.setSettings(Settings.builder().put("ingestion_source.error_strategy", "drop"))
153+
.get();
154+
waitForState(() -> "drop".equalsIgnoreCase(getSettings(indexName, "index.ingestion_source.error_strategy")));
155+
waitForSearchableDocs(2, Arrays.asList(node));
156+
}
157+
122158
private void verifyRemoteStoreEnabled(String node) {
123159
GetSettingsResponse settingsResponse = client(node).admin().indices().prepareGetSettings(indexName).get();
124160
String remoteStoreEnabled = settingsResponse.getIndexToSettings().get(indexName).get("index.remote_store.enabled");

server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -771,13 +771,17 @@ public Iterator<Setting<?>> settings() {
771771
Property.Final
772772
);
773773

774+
/**
775+
* Defines the error strategy for pull-based ingestion.
776+
*/
774777
public static final String SETTING_INGESTION_SOURCE_ERROR_STRATEGY = "index.ingestion_source.error_strategy";
775778
public static final Setting<IngestionErrorStrategy.ErrorStrategy> INGESTION_SOURCE_ERROR_STRATEGY_SETTING = new Setting<>(
776779
SETTING_INGESTION_SOURCE_ERROR_STRATEGY,
777780
IngestionErrorStrategy.ErrorStrategy.DROP.name(),
778781
IngestionErrorStrategy.ErrorStrategy::parseFromString,
779782
(errorStrategy) -> {},
780-
Property.IndexScope
783+
Property.IndexScope,
784+
Property.Dynamic
781785
);
782786

783787
public static final Setting.AffixSetting<Object> INGESTION_SOURCE_PARAMS_SETTING = Setting.prefixKeySetting(

server/src/main/java/org/opensearch/index/engine/IngestionEngine.java

+18-1
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public IngestionEngine(EngineConfig engineConfig, IngestionConsumerFactory inges
5757
super(engineConfig);
5858
this.ingestionConsumerFactory = Objects.requireNonNull(ingestionConsumerFactory);
5959
this.documentMapperForType = engineConfig.getDocumentMapperForTypeSupplier().get();
60-
60+
registerDynamicIndexSettingsHandlers();
6161
}
6262

6363
/**
@@ -304,4 +304,21 @@ protected Map<String, String> commitDataAsMap() {
304304
public PollingIngestStats pollingIngestStats() {
305305
return streamPoller.getStats();
306306
}
307+
308+
private void registerDynamicIndexSettingsHandlers() {
309+
engineConfig.getIndexSettings()
310+
.getScopedSettings()
311+
.addSettingsUpdateConsumer(IndexMetadata.INGESTION_SOURCE_ERROR_STRATEGY_SETTING, this::updateErrorHandlingStrategy);
312+
}
313+
314+
/**
315+
* Handler for updating ingestion error strategy in the stream poller on dynamic index settings update.
316+
*/
317+
private void updateErrorHandlingStrategy(IngestionErrorStrategy.ErrorStrategy errorStrategy) {
318+
IngestionErrorStrategy updatedIngestionErrorStrategy = IngestionErrorStrategy.create(
319+
errorStrategy,
320+
engineConfig.getIndexSettings().getIndexMetadata().getIngestionSource().getType()
321+
);
322+
streamPoller.updateErrorStrategy(updatedIngestionErrorStrategy);
323+
}
307324
}

server/src/main/java/org/opensearch/indices/pollingingest/BlockIngestionErrorStrategy.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public void handleError(Throwable e, ErrorStage stage) {
3030
}
3131

3232
@Override
33-
public boolean shouldPauseIngestion(Throwable e, ErrorStage stage) {
34-
return true;
33+
public boolean shouldIgnoreError(Throwable e, ErrorStage stage) {
34+
return false;
3535
}
3636
}

server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java

+16-6
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ public class DefaultStreamPoller implements StreamPoller {
4242
private volatile boolean started;
4343
private volatile boolean closed;
4444
private volatile boolean paused;
45+
private volatile IngestionErrorStrategy errorStrategy;
4546

4647
private IngestionShardConsumer consumer;
4748

@@ -67,8 +68,6 @@ public class DefaultStreamPoller implements StreamPoller {
6768
@Nullable
6869
private IngestionShardPointer maxPersistedPointer;
6970

70-
private IngestionErrorStrategy errorStrategy;
71-
7271
public DefaultStreamPoller(
7372
IngestionShardPointer startPointer,
7473
Set<IngestionShardPointer> persistedPointers,
@@ -231,14 +230,14 @@ protected void startPoll() {
231230
logger.error("Error in polling the shard {}: {}", consumer.getShardId(), e);
232231
errorStrategy.handleError(e, IngestionErrorStrategy.ErrorStage.POLLING);
233232

234-
if (errorStrategy.shouldPauseIngestion(e, IngestionErrorStrategy.ErrorStage.POLLING)) {
235-
// Blocking error encountered. Pause poller to stop processing remaining updates.
236-
pause();
237-
} else {
233+
if (errorStrategy.shouldIgnoreError(e, IngestionErrorStrategy.ErrorStage.POLLING)) {
238234
// Advance the batch start pointer to ignore the error and continue from next record
239235
batchStartPointer = lastSuccessfulPointer == null
240236
? consumer.nextPointer(batchStartPointer)
241237
: consumer.nextPointer(lastSuccessfulPointer);
238+
} else {
239+
// Blocking error encountered. Pause poller to stop processing remaining updates.
240+
pause();
242241
}
243242
}
244243
}
@@ -332,4 +331,15 @@ public PollingIngestStats getStats() {
332331
public State getState() {
333332
return state;
334333
}
334+
335+
@Override
336+
public IngestionErrorStrategy getErrorStrategy() {
337+
return this.errorStrategy;
338+
}
339+
340+
@Override
341+
public void updateErrorStrategy(IngestionErrorStrategy errorStrategy) {
342+
this.errorStrategy = errorStrategy;
343+
processorRunnable.setErrorStrategy(errorStrategy);
344+
}
335345
}

server/src/main/java/org/opensearch/indices/pollingingest/DropIngestionErrorStrategy.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ public void handleError(Throwable e, ErrorStage stage) {
3030
}
3131

3232
@Override
33-
public boolean shouldPauseIngestion(Throwable e, ErrorStage stage) {
34-
return false;
33+
public boolean shouldIgnoreError(Throwable e, ErrorStage stage) {
34+
return true;
3535
}
3636

3737
}

server/src/main/java/org/opensearch/indices/pollingingest/IngestionErrorStrategy.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@ public interface IngestionErrorStrategy {
2525
void handleError(Throwable e, ErrorStage stage);
2626

2727
/**
28-
* Indicates if ingestion must be paused, blocking further writes.
28+
* Indicates if the error should be ignored.
2929
*/
30-
boolean shouldPauseIngestion(Throwable e, ErrorStage stage);
30+
boolean shouldIgnoreError(Throwable e, ErrorStage stage);
3131

3232
static IngestionErrorStrategy create(ErrorStrategy errorStrategy, String ingestionSource) {
3333
switch (errorStrategy) {

server/src/main/java/org/opensearch/indices/pollingingest/MessageProcessorRunnable.java

+38-11
Original file line numberDiff line numberDiff line change
@@ -46,15 +46,15 @@
4646
*/
4747
public class MessageProcessorRunnable implements Runnable {
4848
private static final Logger logger = LogManager.getLogger(MessageProcessorRunnable.class);
49+
private static final String ID = "_id";
50+
private static final String OP_TYPE = "_op_type";
51+
private static final String SOURCE = "_source";
52+
private static final int WAIT_BEFORE_RETRY_DURATION_MS = 5000;
4953

54+
private volatile IngestionErrorStrategy errorStrategy;
5055
private final BlockingQueue<IngestionShardConsumer.ReadResult<? extends IngestionShardPointer, ? extends Message>> blockingQueue;
5156
private final MessageProcessor messageProcessor;
5257
private final CounterMetric stats = new CounterMetric();
53-
private IngestionErrorStrategy errorStrategy;
54-
55-
private static final String ID = "_id";
56-
private static final String OP_TYPE = "_op_type";
57-
private static final String SOURCE = "_source";
5858

5959
/**
6060
* Constructor.
@@ -223,32 +223,59 @@ private static BytesReference convertToBytes(Object object) throws IOException {
223223
return blockingQueue;
224224
}
225225

226+
/**
227+
* Polls messages from the blocking queue and processes messages. If message processing fails, the failed message
228+
* is retried indefinitely after a retry wait time, unless a DROP error policy is used to skip the failed message.
229+
*/
226230
@Override
227231
public void run() {
232+
IngestionShardConsumer.ReadResult<? extends IngestionShardPointer, ? extends Message> readResult = null;
233+
228234
while (!(Thread.currentThread().isInterrupted())) {
229-
IngestionShardConsumer.ReadResult<? extends IngestionShardPointer, ? extends Message> result = null;
230235
try {
231-
result = blockingQueue.poll(1000, TimeUnit.MILLISECONDS);
236+
if (readResult == null) {
237+
readResult = blockingQueue.poll(1000, TimeUnit.MILLISECONDS);
238+
}
232239
} catch (InterruptedException e) {
233240
// TODO: add metric
234241
logger.debug("MessageProcessorRunnable poll interruptedException", e);
235242
Thread.currentThread().interrupt(); // Restore interrupt status
236243
}
237-
if (result != null) {
244+
if (readResult != null) {
238245
try {
239246
stats.inc();
240-
messageProcessor.process(result.getMessage(), result.getPointer());
247+
messageProcessor.process(readResult.getMessage(), readResult.getPointer());
248+
readResult = null;
241249
} catch (Exception e) {
242250
errorStrategy.handleError(e, IngestionErrorStrategy.ErrorStage.PROCESSING);
243-
if (errorStrategy.shouldPauseIngestion(e, IngestionErrorStrategy.ErrorStage.PROCESSING)) {
244-
Thread.currentThread().interrupt();
251+
if (errorStrategy.shouldIgnoreError(e, IngestionErrorStrategy.ErrorStage.PROCESSING)) {
252+
readResult = null;
253+
} else {
254+
waitBeforeRetry();
245255
}
246256
}
247257
}
248258
}
249259
}
250260

261+
private void waitBeforeRetry() {
262+
try {
263+
Thread.sleep(WAIT_BEFORE_RETRY_DURATION_MS);
264+
} catch (InterruptedException e) {
265+
logger.debug("MessageProcessor thread interrupted while waiting for retry", e);
266+
Thread.currentThread().interrupt(); // Restore interrupt status
267+
}
268+
}
269+
251270
public CounterMetric getStats() {
252271
return stats;
253272
}
273+
274+
public IngestionErrorStrategy getErrorStrategy() {
275+
return this.errorStrategy;
276+
}
277+
278+
public void setErrorStrategy(IngestionErrorStrategy errorStrategy) {
279+
this.errorStrategy = errorStrategy;
280+
}
254281
}

server/src/main/java/org/opensearch/indices/pollingingest/StreamPoller.java

+7
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,13 @@ public interface StreamPoller extends Closeable {
5252

5353
PollingIngestStats getStats();
5454

55+
IngestionErrorStrategy getErrorStrategy();
56+
57+
/**
58+
* Update the error strategy for the poller.
59+
*/
60+
void updateErrorStrategy(IngestionErrorStrategy errorStrategy);
61+
5562
/**
5663
* a state to indicate the current state of the poller
5764
*/

server/src/test/java/org/opensearch/indices/pollingingest/DefaultStreamPollerTests.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@ public class DefaultStreamPollerTests extends OpenSearchTestCase {
5454
public void setUp() throws Exception {
5555
super.setUp();
5656
messages = new ArrayList<>();
57-
;
5857
messages.add("{\"_id\":\"1\",\"_source\":{\"name\":\"bob\", \"age\": 24}}".getBytes(StandardCharsets.UTF_8));
5958
messages.add("{\"_id\":\"2\",\"_source\":{\"name\":\"alice\", \"age\": 21}}".getBytes(StandardCharsets.UTF_8));
6059
fakeConsumer = new FakeIngestionSource.FakeIngestionConsumer(messages, 0);
@@ -346,4 +345,12 @@ public void testProcessingErrorWithBlockErrorIngestionStrategy() throws TimeoutE
346345
// the write to blockingQueue
347346
assertEquals(DefaultStreamPoller.State.POLLING, poller.getState());
348347
}
348+
349+
public void testUpdateErrorStrategy() {
350+
assertTrue(poller.getErrorStrategy() instanceof DropIngestionErrorStrategy);
351+
assertTrue(processorRunnable.getErrorStrategy() instanceof DropIngestionErrorStrategy);
352+
poller.updateErrorStrategy(new BlockIngestionErrorStrategy("ingestion_source"));
353+
assertTrue(poller.getErrorStrategy() instanceof BlockIngestionErrorStrategy);
354+
assertTrue(processorRunnable.getErrorStrategy() instanceof BlockIngestionErrorStrategy);
355+
}
349356
}

0 commit comments

Comments
 (0)