Skip to content

Commit 5cbeff5

Browse files
lunnyAbdulrhmnGhanem
authored andcommitted
Fix data race in bleve indexer (go-gitea#16474)
* Fix data race in bleve indexer
1 parent 9a239cb commit 5cbeff5

File tree

3 files changed

+69
-6
lines changed

3 files changed

+69
-6
lines changed

modules/indexer/bleve/batch.go

+59
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
// Copyright 2021 The Gitea Authors. All rights reserved.
2+
// Use of this source code is governed by a MIT-style
3+
// license that can be found in the LICENSE file.
4+
5+
package bleve
6+
7+
import (
8+
"github.com/blevesearch/bleve/v2"
9+
)
10+
11+
// FlushingBatch is a batch of operations that automatically flushes to the
12+
// underlying index once it reaches a certain size.
13+
type FlushingBatch struct {
14+
maxBatchSize int
15+
batch *bleve.Batch
16+
index bleve.Index
17+
}
18+
19+
// NewFlushingBatch creates a new flushing batch for the specified index. Once
20+
// the number of operations in the batch reaches the specified limit, the batch
21+
// automatically flushes its operations to the index.
22+
func NewFlushingBatch(index bleve.Index, maxBatchSize int) *FlushingBatch {
23+
return &FlushingBatch{
24+
maxBatchSize: maxBatchSize,
25+
batch: index.NewBatch(),
26+
index: index,
27+
}
28+
}
29+
30+
// Index add a new index to batch
31+
func (b *FlushingBatch) Index(id string, data interface{}) error {
32+
if err := b.batch.Index(id, data); err != nil {
33+
return err
34+
}
35+
return b.flushIfFull()
36+
}
37+
38+
// Delete add a delete index to batch
39+
func (b *FlushingBatch) Delete(id string) error {
40+
b.batch.Delete(id)
41+
return b.flushIfFull()
42+
}
43+
44+
func (b *FlushingBatch) flushIfFull() error {
45+
if b.batch.Size() < b.maxBatchSize {
46+
return nil
47+
}
48+
return b.Flush()
49+
}
50+
51+
// Flush submit the batch and create a new one
52+
func (b *FlushingBatch) Flush() error {
53+
err := b.index.Batch(b.batch)
54+
if err != nil {
55+
return err
56+
}
57+
b.batch = b.index.NewBatch()
58+
return nil
59+
}

modules/indexer/code/bleve.go

+6-4
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"code.gitea.io/gitea/modules/analyze"
1919
"code.gitea.io/gitea/modules/charset"
2020
"code.gitea.io/gitea/modules/git"
21+
gitea_bleve "code.gitea.io/gitea/modules/indexer/bleve"
2122
"code.gitea.io/gitea/modules/log"
2223
"code.gitea.io/gitea/modules/setting"
2324
"code.gitea.io/gitea/modules/timeutil"
@@ -176,7 +177,8 @@ func NewBleveIndexer(indexDir string) (*BleveIndexer, bool, error) {
176177
return indexer, created, err
177178
}
178179

179-
func (b *BleveIndexer) addUpdate(batchWriter git.WriteCloserError, batchReader *bufio.Reader, commitSha string, update fileUpdate, repo *models.Repository, batch rupture.FlushingBatch) error {
180+
func (b *BleveIndexer) addUpdate(batchWriter git.WriteCloserError, batchReader *bufio.Reader, commitSha string,
181+
update fileUpdate, repo *models.Repository, batch *gitea_bleve.FlushingBatch) error {
180182
// Ignore vendored files in code search
181183
if setting.Indexer.ExcludeVendored && analyze.IsVendor(update.Filename) {
182184
return nil
@@ -229,7 +231,7 @@ func (b *BleveIndexer) addUpdate(batchWriter git.WriteCloserError, batchReader *
229231
})
230232
}
231233

232-
func (b *BleveIndexer) addDelete(filename string, repo *models.Repository, batch rupture.FlushingBatch) error {
234+
func (b *BleveIndexer) addDelete(filename string, repo *models.Repository, batch *gitea_bleve.FlushingBatch) error {
233235
id := filenameIndexerID(repo.ID, filename)
234236
return batch.Delete(id)
235237
}
@@ -267,7 +269,7 @@ func (b *BleveIndexer) Close() {
267269

268270
// Index indexes the data
269271
func (b *BleveIndexer) Index(repo *models.Repository, sha string, changes *repoChanges) error {
270-
batch := rupture.NewFlushingBatch(b.indexer, maxBatchSize)
272+
batch := gitea_bleve.NewFlushingBatch(b.indexer, maxBatchSize)
271273
if len(changes.Updates) > 0 {
272274

273275
batchWriter, batchReader, cancel := git.CatFileBatch(repo.RepoPath())
@@ -296,7 +298,7 @@ func (b *BleveIndexer) Delete(repoID int64) error {
296298
if err != nil {
297299
return err
298300
}
299-
batch := rupture.NewFlushingBatch(b.indexer, maxBatchSize)
301+
batch := gitea_bleve.NewFlushingBatch(b.indexer, maxBatchSize)
300302
for _, hit := range result.Hits {
301303
if err = batch.Delete(hit.ID); err != nil {
302304
return err

modules/indexer/issues/bleve.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,10 @@ import (
99
"os"
1010
"strconv"
1111

12+
gitea_bleve "code.gitea.io/gitea/modules/indexer/bleve"
1213
"code.gitea.io/gitea/modules/log"
1314
"code.gitea.io/gitea/modules/util"
15+
1416
"github.com/blevesearch/bleve/v2"
1517
"github.com/blevesearch/bleve/v2/analysis/analyzer/custom"
1618
"github.com/blevesearch/bleve/v2/analysis/token/lowercase"
@@ -197,7 +199,7 @@ func (b *BleveIndexer) Close() {
197199

198200
// Index will save the index data
199201
func (b *BleveIndexer) Index(issues []*IndexerData) error {
200-
batch := rupture.NewFlushingBatch(b.indexer, maxBatchSize)
202+
batch := gitea_bleve.NewFlushingBatch(b.indexer, maxBatchSize)
201203
for _, issue := range issues {
202204
if err := batch.Index(indexerID(issue.ID), struct {
203205
RepoID int64
@@ -218,7 +220,7 @@ func (b *BleveIndexer) Index(issues []*IndexerData) error {
218220

219221
// Delete deletes indexes by ids
220222
func (b *BleveIndexer) Delete(ids ...int64) error {
221-
batch := rupture.NewFlushingBatch(b.indexer, maxBatchSize)
223+
batch := gitea_bleve.NewFlushingBatch(b.indexer, maxBatchSize)
222224
for _, id := range ids {
223225
if err := batch.Delete(indexerID(id)); err != nil {
224226
return err

0 commit comments

Comments
 (0)