Skip to content

Commit 172c0e7

Browse files
Fix global checkpoint for p2p segrep in ingestion mode
Signed-off-by: Varun Bharadwaj <[email protected]>
1 parent 3966ed9 commit 172c0e7

File tree

4 files changed

+65
-35
lines changed

4 files changed

+65
-35
lines changed

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java

+16-20
Original file line numberDiff line numberDiff line change
@@ -8,27 +8,30 @@
88

99
package org.opensearch.plugin.kafka;
1010

11+
import org.junit.Assert;
1112
import org.opensearch.action.admin.cluster.node.info.NodeInfo;
1213
import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest;
1314
import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse;
1415
import org.opensearch.action.admin.cluster.node.info.PluginsAndModules;
1516
import org.opensearch.action.search.SearchResponse;
17+
import org.opensearch.client.Request;
1618
import org.opensearch.cluster.metadata.IndexMetadata;
1719
import org.opensearch.common.settings.Settings;
1820
import org.opensearch.index.query.RangeQueryBuilder;
1921
import org.opensearch.indices.pollingingest.PollingIngestStats;
2022
import org.opensearch.plugins.PluginInfo;
2123
import org.opensearch.test.OpenSearchIntegTestCase;
22-
import org.junit.Assert;
24+
import org.opensearch.transport.client.Requests;
2325

26+
import java.util.Arrays;
2427
import java.util.List;
2528
import java.util.concurrent.TimeUnit;
2629
import java.util.function.Function;
2730
import java.util.stream.Collectors;
2831
import java.util.stream.Stream;
2932

30-
import static org.hamcrest.Matchers.is;
3133
import static org.awaitility.Awaitility.await;
34+
import static org.hamcrest.Matchers.is;
3235

3336
/**
3437
* Integration test for Kafka ingestion
@@ -53,30 +56,17 @@ public void testPluginsAreInstalled() {
5356
);
5457
}
5558

56-
public void testKafkaIngestion() {
59+
public void testKafkaIngestion() throws Exception {
5760
produceData("1", "name1", "24");
5861
produceData("2", "name2", "20");
59-
60-
createIndex(
61-
"test",
62-
Settings.builder()
63-
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
64-
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
65-
.put("ingestion_source.type", "kafka")
66-
.put("ingestion_source.pointer.init.reset", "earliest")
67-
.put("ingestion_source.param.topic", "test")
68-
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
69-
.put("index.replication.type", "SEGMENT")
70-
.build(),
71-
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
72-
);
62+
createIndexWithDefaultSettings(1, 0);
7363

7464
RangeQueryBuilder query = new RangeQueryBuilder("age").gte(21);
7565
await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
76-
refresh("test");
77-
SearchResponse response = client().prepareSearch("test").setQuery(query).get();
66+
refresh(indexName);
67+
SearchResponse response = client().prepareSearch(indexName).setQuery(query).get();
7868
assertThat(response.getHits().getTotalHits().value(), is(1L));
79-
PollingIngestStats stats = client().admin().indices().prepareStats("test").get().getIndex("test").getShards()[0]
69+
PollingIngestStats stats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
8070
.getPollingIngestStats();
8171
assertNotNull(stats);
8272
assertThat(stats.getMessageProcessorStats().getTotalProcessedCount(), is(2L));
@@ -141,4 +131,10 @@ public void testKafkaIngestion_RewindByOffset() {
141131
assertThat(response.getHits().getTotalHits().value(), is(1L));
142132
});
143133
}
134+
135+
public void testCloseIndex() throws Exception {
136+
createIndexWithDefaultSettings(1, 0);
137+
ensureGreen(indexName);
138+
client().admin().indices().close(Requests.closeIndexRequest(indexName)).get();
139+
}
144140
}

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/KafkaIngestionBaseIT.java

+18
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
import org.apache.kafka.clients.producer.ProducerRecord;
1616
import org.apache.kafka.common.serialization.StringSerializer;
1717
import org.opensearch.action.search.SearchResponse;
18+
import org.opensearch.cluster.metadata.IndexMetadata;
19+
import org.opensearch.common.settings.Settings;
1820
import org.opensearch.plugins.Plugin;
1921
import org.opensearch.test.OpenSearchIntegTestCase;
2022
import org.junit.After;
@@ -108,4 +110,20 @@ protected void waitForSearchableDocs(long docCount, List<String> nodes) throws E
108110
}
109111
}, 1, TimeUnit.MINUTES);
110112
}
113+
114+
protected void createIndexWithDefaultSettings(int numShards, int numReplicas) {
115+
createIndex(
116+
indexName,
117+
Settings.builder()
118+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numShards)
119+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, numReplicas)
120+
.put("ingestion_source.type", "kafka")
121+
.put("ingestion_source.pointer.init.reset", "earliest")
122+
.put("ingestion_source.param.topic", topicName)
123+
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
124+
.put("index.replication.type", "SEGMENT")
125+
.build(),
126+
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
127+
);
128+
}
111129
}

plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/RemoteStoreKafkaIT.java

+16-14
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,11 @@
1616
import org.opensearch.index.query.RangeQueryBuilder;
1717
import org.opensearch.test.InternalTestCluster;
1818
import org.opensearch.test.OpenSearchIntegTestCase;
19+
import org.opensearch.transport.client.Requests;
1920

2021
import java.nio.file.Path;
2122
import java.util.Arrays;
23+
import java.util.concurrent.TimeUnit;
2224

2325
import static org.hamcrest.Matchers.is;
2426

@@ -46,20 +48,7 @@ public void testSegmentReplicationWithRemoteStore() throws Exception {
4648

4749
internalCluster().startClusterManagerOnlyNode();
4850
final String nodeA = internalCluster().startDataOnlyNode();
49-
50-
createIndex(
51-
indexName,
52-
Settings.builder()
53-
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
54-
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
55-
.put("ingestion_source.type", "kafka")
56-
.put("ingestion_source.pointer.init.reset", "earliest")
57-
.put("ingestion_source.param.topic", topicName)
58-
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
59-
.put("index.replication.type", "SEGMENT")
60-
.build(),
61-
mapping
62-
);
51+
createIndexWithDefaultSettings(1, 1);
6352

6453
ensureYellowAndNoInitializingShards(indexName);
6554
final String nodeB = internalCluster().startDataOnlyNode();
@@ -117,6 +106,19 @@ public void testSegmentReplicationWithRemoteStore() throws Exception {
117106
waitForSearchableDocs(6, Arrays.asList(nodeB, nodeC));
118107
}
119108

109+
public void testCloseIndex() throws Exception {
110+
produceData("1", "name1", "24");
111+
produceData("2", "name2", "20");
112+
internalCluster().startClusterManagerOnlyNode();
113+
final String nodeA = internalCluster().startDataOnlyNode();
114+
final String nodeB = internalCluster().startDataOnlyNode();
115+
116+
createIndexWithDefaultSettings(1, 1);
117+
ensureGreen(indexName);
118+
waitForSearchableDocs(2, Arrays.asList(nodeA, nodeB));
119+
client().admin().indices().close(Requests.closeIndexRequest(indexName)).get();
120+
}
121+
120122
private void verifyRemoteStoreEnabled(String node) {
121123
GetSettingsResponse settingsResponse = client(node).admin().indices().prepareGetSettings(indexName).get();
122124
String remoteStoreEnabled = settingsResponse.getIndexToSettings().get(indexName).get("index.remote_store.enabled");

server/src/main/java/org/opensearch/index/shard/IndexShard.java

+15-1
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,7 @@
238238
import static org.opensearch.index.seqno.RetentionLeaseActions.RETAIN_ALL;
239239
import static org.opensearch.index.seqno.SequenceNumbers.LOCAL_CHECKPOINT_KEY;
240240
import static org.opensearch.index.seqno.SequenceNumbers.MAX_SEQ_NO;
241+
import static org.opensearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
241242
import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
242243
import static org.opensearch.index.shard.IndexShard.ShardMigrationState.REMOTE_MIGRATING_SEEDED;
243244
import static org.opensearch.index.shard.IndexShard.ShardMigrationState.REMOTE_MIGRATING_UNSEEDED;
@@ -451,7 +452,7 @@ public IndexShard(
451452
aId,
452453
indexSettings,
453454
primaryTerm,
454-
UNASSIGNED_SEQ_NO,
455+
getInitialGlobalCheckpointForShard(indexSettings),
455456
globalCheckpointListeners::globalCheckpointUpdated,
456457
threadPool::absoluteTimeInMillis,
457458
(retentionLeases, listener) -> retentionLeaseSyncer.sync(shardId, aId, getPendingPrimaryTerm(), retentionLeases, listener),
@@ -499,6 +500,19 @@ public boolean shouldCache(Query query) {
499500
this.segmentReplicationStatsProvider = segmentReplicationStatsProvider;
500501
}
501502

503+
/**
504+
* By default, UNASSIGNED_SEQ_NO is used as the initial global checkpoint for new shard initialization. Ingestion
505+
* source does not track sequence numbers explicitly and hence defaults to NO_OPS_PERFORMED for compatibility.
506+
*
507+
*/
508+
private long getInitialGlobalCheckpointForShard(IndexSettings indexSettings) {
509+
if (indexSettings.getIndexMetadata().useIngestionSource()) {
510+
return NO_OPS_PERFORMED;
511+
}
512+
513+
return UNASSIGNED_SEQ_NO;
514+
}
515+
502516
public ThreadPool getThreadPool() {
503517
return this.threadPool;
504518
}

0 commit comments

Comments
 (0)