Skip to content

Commit 92b576e

Browse files
Support updating ingestion error strategy
Signed-off-by: Varun Bharadwaj <[email protected]>
1 parent 172c0e7 commit 92b576e

File tree

9 files changed

+161
-32
lines changed

9 files changed

+161
-32
lines changed

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java

+3-5
Original file line numberDiff line numberDiff line change
@@ -8,30 +8,28 @@
88

99
package org.opensearch.plugin.kafka;
1010

11-
import org.junit.Assert;
1211
import org.opensearch.action.admin.cluster.node.info.NodeInfo;
1312
import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest;
1413
import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse;
1514
import org.opensearch.action.admin.cluster.node.info.PluginsAndModules;
1615
import org.opensearch.action.search.SearchResponse;
17-
import org.opensearch.client.Request;
1816
import org.opensearch.cluster.metadata.IndexMetadata;
1917
import org.opensearch.common.settings.Settings;
2018
import org.opensearch.index.query.RangeQueryBuilder;
2119
import org.opensearch.indices.pollingingest.PollingIngestStats;
2220
import org.opensearch.plugins.PluginInfo;
2321
import org.opensearch.test.OpenSearchIntegTestCase;
2422
import org.opensearch.transport.client.Requests;
23+
import org.junit.Assert;
2524

26-
import java.util.Arrays;
2725
import java.util.List;
2826
import java.util.concurrent.TimeUnit;
2927
import java.util.function.Function;
3028
import java.util.stream.Collectors;
3129
import java.util.stream.Stream;
3230

33-
import static org.awaitility.Awaitility.await;
3431
import static org.hamcrest.Matchers.is;
32+
import static org.awaitility.Awaitility.await;
3533

3634
/**
3735
* Integration test for Kafka ingestion
@@ -56,7 +54,7 @@ public void testPluginsAreInstalled() {
5654
);
5755
}
5856

59-
public void testKafkaIngestion() throws Exception {
57+
public void testKafkaIngestion() {
6058
produceData("1", "name1", "24");
6159
produceData("2", "name2", "20");
6260
createIndexWithDefaultSettings(1, 0);

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/KafkaIngestionBaseIT.java

+13
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.List;
2828
import java.util.Locale;
2929
import java.util.Properties;
30+
import java.util.concurrent.Callable;
3031
import java.util.concurrent.TimeUnit;
3132

3233
import org.testcontainers.containers.KafkaContainer;
@@ -111,6 +112,18 @@ protected void waitForSearchableDocs(long docCount, List<String> nodes) throws E
111112
}, 1, TimeUnit.MINUTES);
112113
}
113114

115+
protected void waitForState(Callable<Boolean> checkState) throws Exception {
116+
assertBusy(() -> {
117+
if (checkState.call() == false) {
118+
fail("Provided state requirements not met.");
119+
}
120+
}, 1, TimeUnit.MINUTES);
121+
}
122+
123+
protected String getSettings(String indexName, String setting) {
124+
return client().admin().indices().prepareGetSettings(indexName).get().getSetting(indexName, setting);
125+
}
126+
114127
protected void createIndexWithDefaultSettings(int numShards, int numReplicas) {
115128
createIndex(
116129
indexName,

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/RemoteStoreKafkaIT.java

+39-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import java.nio.file.Path;
2222
import java.util.Arrays;
23-
import java.util.concurrent.TimeUnit;
2423

2524
import static org.hamcrest.Matchers.is;
2625

@@ -119,6 +118,45 @@ public void testCloseIndex() throws Exception {
119118
client().admin().indices().close(Requests.closeIndexRequest(indexName)).get();
120119
}
121120

121+
public void testErrorStrategy() throws Exception {
122+
produceData("1", "name1", "25");
123+
// malformed message
124+
produceData("2", "", "");
125+
produceData("3", "name3", "25");
126+
127+
internalCluster().startClusterManagerOnlyNode();
128+
final String node = internalCluster().startDataOnlyNode();
129+
130+
createIndex(
131+
indexName,
132+
Settings.builder()
133+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
134+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
135+
.put("ingestion_source.type", "kafka")
136+
.put("ingestion_source.error_strategy", "block")
137+
.put("ingestion_source.pointer.init.reset", "earliest")
138+
.put("ingestion_source.param.topic", topicName)
139+
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
140+
.put("index.replication.type", "SEGMENT")
141+
.build(),
142+
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
143+
);
144+
145+
ensureGreen(indexName);
146+
waitForState(() -> "block".equalsIgnoreCase(getSettings(indexName, "index.ingestion_source.error_strategy")));
147+
waitForSearchableDocs(1, Arrays.asList(node));
148+
149+
client().admin()
150+
.indices()
151+
.prepareUpdateSettings(indexName)
152+
.setSettings(Settings.builder().put("ingestion_source.error_strategy", "drop"))
153+
.get();
154+
waitForState(() -> "drop".equalsIgnoreCase(getSettings(indexName, "index.ingestion_source.error_strategy")));
155+
156+
// TODO: resume and validate new document ingestion once pause/resume APIs are ready
157+
// waitForSearchableDocs(2, Arrays.asList(node));
158+
}
159+
122160
private void verifyRemoteStoreEnabled(String node) {
123161
GetSettingsResponse settingsResponse = client(node).admin().indices().prepareGetSettings(indexName).get();
124162
String remoteStoreEnabled = settingsResponse.getIndexToSettings().get(indexName).get("index.remote_store.enabled");

server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -771,13 +771,17 @@ public Iterator<Setting<?>> settings() {
771771
Property.Final
772772
);
773773

774+
/**
775+
* Defines the error strategy for pull-based ingestion.
776+
*/
774777
public static final String SETTING_INGESTION_SOURCE_ERROR_STRATEGY = "index.ingestion_source.error_strategy";
775778
public static final Setting<IngestionErrorStrategy.ErrorStrategy> INGESTION_SOURCE_ERROR_STRATEGY_SETTING = new Setting<>(
776779
SETTING_INGESTION_SOURCE_ERROR_STRATEGY,
777780
IngestionErrorStrategy.ErrorStrategy.DROP.name(),
778781
IngestionErrorStrategy.ErrorStrategy::parseFromString,
779782
(errorStrategy) -> {},
780-
Property.IndexScope
783+
Property.IndexScope,
784+
Property.Dynamic
781785
);
782786

783787
public static final Setting.AffixSetting<Object> INGESTION_SOURCE_PARAMS_SETTING = Setting.prefixKeySetting(

server/src/main/java/org/opensearch/index/engine/IngestionEngine.java

+18-1
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public IngestionEngine(EngineConfig engineConfig, IngestionConsumerFactory inges
5757
super(engineConfig);
5858
this.ingestionConsumerFactory = Objects.requireNonNull(ingestionConsumerFactory);
5959
this.documentMapperForType = engineConfig.getDocumentMapperForTypeSupplier().get();
60-
60+
registerDynamicIndexSettingsHandlers();
6161
}
6262

6363
/**
@@ -304,4 +304,21 @@ protected Map<String, String> commitDataAsMap() {
304304
public PollingIngestStats pollingIngestStats() {
305305
return streamPoller.getStats();
306306
}
307+
308+
private void registerDynamicIndexSettingsHandlers() {
309+
engineConfig.getIndexSettings()
310+
.getScopedSettings()
311+
.addSettingsUpdateConsumer(IndexMetadata.INGESTION_SOURCE_ERROR_STRATEGY_SETTING, this::updateErrorHandlingStrategy);
312+
}
313+
314+
/**
315+
* Handler for updating ingestion error strategy in the stream poller on dynamic index settings update.
316+
*/
317+
private void updateErrorHandlingStrategy(IngestionErrorStrategy.ErrorStrategy errorStrategy) {
318+
IngestionErrorStrategy updatedIngestionErrorStrategy = IngestionErrorStrategy.create(
319+
errorStrategy,
320+
engineConfig.getIndexSettings().getIndexMetadata().getIngestionSource().getType()
321+
);
322+
streamPoller.updateErrorStrategy(updatedIngestionErrorStrategy);
323+
}
307324
}

server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java

+14-2
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ public class DefaultStreamPoller implements StreamPoller {
4242
private volatile boolean started;
4343
private volatile boolean closed;
4444
private volatile boolean paused;
45+
private volatile IngestionErrorStrategy errorStrategy;
4546

4647
private IngestionShardConsumer consumer;
4748

@@ -67,8 +68,6 @@ public class DefaultStreamPoller implements StreamPoller {
6768
@Nullable
6869
private IngestionShardPointer maxPersistedPointer;
6970

70-
private IngestionErrorStrategy errorStrategy;
71-
7271
public DefaultStreamPoller(
7372
IngestionShardPointer startPointer,
7473
Set<IngestionShardPointer> persistedPointers,
@@ -268,6 +267,7 @@ public void pause() {
268267
throw new RuntimeException("consumer is closed!");
269268
}
270269
paused = true;
270+
processorRunnable.pauseWriter();
271271
}
272272

273273
@Override
@@ -276,6 +276,7 @@ public void resume() {
276276
throw new RuntimeException("consumer is closed!");
277277
}
278278
paused = false;
279+
processorRunnable.resumeWriter();
279280
}
280281

281282
@Override
@@ -332,4 +333,15 @@ public PollingIngestStats getStats() {
332333
public State getState() {
333334
return state;
334335
}
336+
337+
@Override
338+
public IngestionErrorStrategy getErrorStrategy() {
339+
return this.errorStrategy;
340+
}
341+
342+
@Override
343+
public void updateErrorStrategy(IngestionErrorStrategy errorStrategy) {
344+
this.errorStrategy = errorStrategy;
345+
processorRunnable.setErrorStrategy(errorStrategy);
346+
}
335347
}

server/src/main/java/org/opensearch/indices/pollingingest/MessageProcessorRunnable.java

+54-21
Original file line numberDiff line numberDiff line change
@@ -46,15 +46,16 @@
4646
*/
4747
public class MessageProcessorRunnable implements Runnable {
4848
private static final Logger logger = LogManager.getLogger(MessageProcessorRunnable.class);
49+
private static final String ID = "_id";
50+
private static final String OP_TYPE = "_op_type";
51+
private static final String SOURCE = "_source";
52+
53+
private volatile IngestionErrorStrategy errorStrategy;
54+
private volatile boolean isWriterPaused;
4955

5056
private final BlockingQueue<IngestionShardConsumer.ReadResult<? extends IngestionShardPointer, ? extends Message>> blockingQueue;
5157
private final MessageProcessor messageProcessor;
5258
private final CounterMetric stats = new CounterMetric();
53-
private IngestionErrorStrategy errorStrategy;
54-
55-
private static final String ID = "_id";
56-
private static final String OP_TYPE = "_op_type";
57-
private static final String SOURCE = "_source";
5859

5960
/**
6061
* Constructor.
@@ -226,23 +227,39 @@ private static BytesReference convertToBytes(Object object) throws IOException {
226227
@Override
227228
public void run() {
228229
while (!(Thread.currentThread().isInterrupted())) {
229-
IngestionShardConsumer.ReadResult<? extends IngestionShardPointer, ? extends Message> result = null;
230-
try {
231-
result = blockingQueue.poll(1000, TimeUnit.MILLISECONDS);
232-
} catch (InterruptedException e) {
233-
// TODO: add metric
234-
logger.debug("MessageProcessorRunnable poll interruptedException", e);
235-
Thread.currentThread().interrupt(); // Restore interrupt status
236-
}
237-
if (result != null) {
230+
if (isWriterPaused) {
238231
try {
239-
stats.inc();
240-
messageProcessor.process(result.getMessage(), result.getPointer());
241-
} catch (Exception e) {
242-
errorStrategy.handleError(e, IngestionErrorStrategy.ErrorStage.PROCESSING);
243-
if (errorStrategy.shouldPauseIngestion(e, IngestionErrorStrategy.ErrorStage.PROCESSING)) {
244-
Thread.currentThread().interrupt();
245-
}
232+
// TODO: make sleep time configurable
233+
Thread.sleep(5000);
234+
} catch (InterruptedException e) {
235+
Thread.currentThread().interrupt(); // Restore interrupt status
236+
} catch (Throwable e) {
237+
logger.error("Error in pausing the ingestion writer thread", e);
238+
}
239+
continue;
240+
}
241+
242+
pollAndProcessMessage();
243+
}
244+
}
245+
246+
private void pollAndProcessMessage() {
247+
IngestionShardConsumer.ReadResult<? extends IngestionShardPointer, ? extends Message> result = null;
248+
try {
249+
result = blockingQueue.poll(1000, TimeUnit.MILLISECONDS);
250+
} catch (InterruptedException e) {
251+
// TODO: add metric
252+
logger.debug("MessageProcessorRunnable poll interruptedException", e);
253+
Thread.currentThread().interrupt(); // Restore interrupt status
254+
}
255+
if (result != null) {
256+
try {
257+
stats.inc();
258+
messageProcessor.process(result.getMessage(), result.getPointer());
259+
} catch (Exception e) {
260+
errorStrategy.handleError(e, IngestionErrorStrategy.ErrorStage.PROCESSING);
261+
if (errorStrategy.shouldPauseIngestion(e, IngestionErrorStrategy.ErrorStage.PROCESSING)) {
262+
isWriterPaused = true;
246263
}
247264
}
248265
}
@@ -251,4 +268,20 @@ public void run() {
251268
public CounterMetric getStats() {
252269
return stats;
253270
}
271+
272+
public IngestionErrorStrategy getErrorStrategy() {
273+
return this.errorStrategy;
274+
}
275+
276+
public void setErrorStrategy(IngestionErrorStrategy errorStrategy) {
277+
this.errorStrategy = errorStrategy;
278+
}
279+
280+
public void pauseWriter() {
281+
this.isWriterPaused = true;
282+
}
283+
284+
public void resumeWriter() {
285+
this.isWriterPaused = false;
286+
}
254287
}

server/src/main/java/org/opensearch/indices/pollingingest/StreamPoller.java

+7
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,13 @@ public interface StreamPoller extends Closeable {
5252

5353
PollingIngestStats getStats();
5454

55+
IngestionErrorStrategy getErrorStrategy();
56+
57+
/**
58+
* Update the error strategy for the poller.
59+
*/
60+
void updateErrorStrategy(IngestionErrorStrategy errorStrategy);
61+
5562
/**
5663
* a state to indicate the current state of the poller
5764
*/

server/src/test/java/org/opensearch/indices/pollingingest/DefaultStreamPollerTests.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@ public class DefaultStreamPollerTests extends OpenSearchTestCase {
5454
public void setUp() throws Exception {
5555
super.setUp();
5656
messages = new ArrayList<>();
57-
;
5857
messages.add("{\"_id\":\"1\",\"_source\":{\"name\":\"bob\", \"age\": 24}}".getBytes(StandardCharsets.UTF_8));
5958
messages.add("{\"_id\":\"2\",\"_source\":{\"name\":\"alice\", \"age\": 21}}".getBytes(StandardCharsets.UTF_8));
6059
fakeConsumer = new FakeIngestionSource.FakeIngestionConsumer(messages, 0);
@@ -346,4 +345,12 @@ public void testProcessingErrorWithBlockErrorIngestionStrategy() throws TimeoutE
346345
// the write to blockingQueue
347346
assertEquals(DefaultStreamPoller.State.POLLING, poller.getState());
348347
}
348+
349+
public void testUpdateErrorStrategy() {
350+
assertTrue(poller.getErrorStrategy() instanceof DropIngestionErrorStrategy);
351+
assertTrue(processorRunnable.getErrorStrategy() instanceof DropIngestionErrorStrategy);
352+
poller.updateErrorStrategy(new BlockIngestionErrorStrategy("ingestion_source"));
353+
assertTrue(poller.getErrorStrategy() instanceof BlockIngestionErrorStrategy);
354+
assertTrue(processorRunnable.getErrorStrategy() instanceof BlockIngestionErrorStrategy);
355+
}
349356
}

0 commit comments

Comments
 (0)