Skip to content

Commit

Permalink
[ENH] Make SysDb return elligible collections to GC.
Browse files Browse the repository at this point in the history
Limit the number of collections, and return
those that have at least 1 version that is
elligible for GC.
  • Loading branch information
rohitcpbot committed Mar 6, 2025
1 parent 14bb882 commit f2f4b6e
Show file tree
Hide file tree
Showing 9 changed files with 113 additions and 26 deletions.
4 changes: 2 additions & 2 deletions go/pkg/sysdb/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,8 @@ func (s *Coordinator) FlushCollectionCompaction(ctx context.Context, flushCollec
return s.catalog.FlushCollectionCompaction(ctx, flushCollectionCompaction)
}

func (s *Coordinator) ListCollectionsToGc(ctx context.Context) ([]*model.CollectionToGc, error) {
return s.catalog.ListCollectionsToGc(ctx)
func (s *Coordinator) ListCollectionsToGc(ctx context.Context, cutoffTimeSecs *uint64, limit *uint64) ([]*model.CollectionToGc, error) {
return s.catalog.ListCollectionsToGc(ctx, cutoffTimeSecs, limit)
}

func (s *Coordinator) ListCollectionVersions(ctx context.Context, collectionID types.UniqueID, tenantID string, maxCount *int64, versionsBefore *int64, versionsAtOrAfter *int64) ([]*coordinatorpb.CollectionVersionInfo, error) {
Expand Down
42 changes: 38 additions & 4 deletions go/pkg/sysdb/coordinator/table_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,14 +409,14 @@ func (tc *Catalog) GetCollectionSize(ctx context.Context, collectionID types.Uni
return total_records_post_compaction, nil
}

func (tc *Catalog) ListCollectionsToGc(ctx context.Context) ([]*model.CollectionToGc, error) {
func (tc *Catalog) ListCollectionsToGc(ctx context.Context, cutoffTimeSecs *uint64, limit *uint64) ([]*model.CollectionToGc, error) {
tracer := otel.Tracer
if tracer != nil {
_, span := tracer.Start(ctx, "Catalog.ListCollectionsToGc")
defer span.End()
}

collectionsToGc, err := tc.metaDomain.CollectionDb(ctx).ListCollectionsToGc()
collectionsToGc, err := tc.metaDomain.CollectionDb(ctx).ListCollectionsToGc(cutoffTimeSecs, limit)

if err != nil {
return nil, err
Expand Down Expand Up @@ -1403,7 +1403,7 @@ func (tc *Catalog) markVersionForDeletionInSingleCollection(

// Update the version file name in Postgres table as a CAS operation.
// TODO(rohit): Investigate if we really need a Tx here.
rowsAffected, err := tc.metaDomain.CollectionDb(ctx).UpdateVersionFileName(collectionID, existingVersionFileName, newVerFileFullPath)
rowsAffected, err := tc.metaDomain.CollectionDb(ctx).UpdateVersionRelatedFields(collectionID, existingVersionFileName, newVerFileFullPath, nil, nil)
if err != nil {
// Delete the newly created version file from S3 since it is not needed.
tc.s3Store.DeleteVersionFile(tenantID, collectionID, newVersionFileName)
Expand Down Expand Up @@ -1463,6 +1463,25 @@ func (tc *Catalog) updateProtoRemoveVersionEntries(versionFilePb *coordinatorpb.
return nil
}

func (tc *Catalog) getNumberOfActiveVersions(versionFilePb *coordinatorpb.CollectionVersionFile) int {
// Use a map to track unique active versions
activeVersions := make(map[int64]bool)
for _, version := range versionFilePb.GetVersionHistory().Versions {
activeVersions[version.Version] = true
}
return len(activeVersions)
}

func (tc *Catalog) getOldestVersionTs(versionFilePb *coordinatorpb.CollectionVersionFile) time.Time {
if versionFilePb.GetVersionHistory() == nil || len(versionFilePb.GetVersionHistory().Versions) <= 1 {
// Returning a zero timestamp that represents an unset value.
return time.Time{}
}
oldestVersionTs := versionFilePb.GetVersionHistory().Versions[1].CreatedAtSecs

return time.Unix(oldestVersionTs, 0)
}

func (tc *Catalog) DeleteVersionEntriesForCollection(ctx context.Context, tenantID string, collectionID string, versions []int64) error {
// Limit the loop to 5 attempts to avoid infinite loops
numAttempts := 0
Expand Down Expand Up @@ -1493,6 +1512,21 @@ func (tc *Catalog) DeleteVersionEntriesForCollection(ctx context.Context, tenant
return err
}

numActiveVersions := tc.getNumberOfActiveVersions(versionFilePb)
if numActiveVersions <= 1 {
// No remaining valid versions after GC.
return errors.New("no valid versions after gc")
}

// Get the creation time of the oldest version.
oldestVersionTs := tc.getOldestVersionTs(versionFilePb)
if oldestVersionTs.IsZero() {
// This should never happen.
log.Error("oldest version timestamp is zero after GC.", zap.String("collection_id", collectionID))
// No versions to delete.
return errors.New("oldest version timestamp is zero after GC")
}

// Write the new version file to S3
// Create the new version file name with the format: <version_number>_<uuid>_gc_delete
newVersionFileName := fmt.Sprintf(
Expand All @@ -1506,7 +1540,7 @@ func (tc *Catalog) DeleteVersionEntriesForCollection(ctx context.Context, tenant
}

// Update the version file name in Postgres table as a CAS operation
rowsAffected, err := tc.metaDomain.CollectionDb(ctx).UpdateVersionFileName(collectionID, existingVersionFileName, newVerFileFullPath)
rowsAffected, err := tc.metaDomain.CollectionDb(ctx).UpdateVersionRelatedFields(collectionID, existingVersionFileName, newVerFileFullPath, &oldestVersionTs, &numActiveVersions)
if err != nil {
// Delete the newly created version file from S3 since it is not needed
tc.s3Store.DeleteVersionFile(tenantID, collectionID, newVersionFileName)
Expand Down
2 changes: 1 addition & 1 deletion go/pkg/sysdb/grpc/collection_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ func (s *Server) FlushCollectionCompaction(ctx context.Context, req *coordinator

func (s *Server) ListCollectionsToGc(ctx context.Context, req *coordinatorpb.ListCollectionsToGcRequest) (*coordinatorpb.ListCollectionsToGcResponse, error) {
// Dumb implementation that just returns ALL the collections for now.
collectionsToGc, err := s.coordinator.ListCollectionsToGc(ctx)
collectionsToGc, err := s.coordinator.ListCollectionsToGc(ctx, req.CutoffTimeSecs, req.Limit)
if err != nil {
log.Error("ListCollectionsToGc failed", zap.Error(err))
return nil, grpcutils.BuildInternalGrpcError(err.Error())
Expand Down
43 changes: 37 additions & 6 deletions go/pkg/sysdb/metastore/db/dao/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,31 @@ func (s *collectionDb) GetCollections(id *string, name *string, tenantID string,
return s.getCollections(id, name, tenantID, databaseName, limit, offset, false)
}

func (s *collectionDb) ListCollectionsToGc() ([]*dbmodel.CollectionToGc, error) {
func (s *collectionDb) ListCollectionsToGc(cutoffTimeSecs *uint64, limit *uint64) ([]*dbmodel.CollectionToGc, error) {
var collections []*dbmodel.CollectionToGc
// Use the read replica for this so as to not overwhelm the writer.
// Skip collections that have not been compacted even once.
err := s.read_db.Table("collections").Select("id, name, version, version_file_name").Find(&collections).Where("version > 0").Error
query := s.read_db.Table("collections").
Select("id, name, version, version_file_name, oldest_version_ts, num_versions").
Where("version > 0")

// Apply cutoff time filter only if provided
if cutoffTimeSecs != nil {
cutoffTime := time.Unix(int64(*cutoffTimeSecs), 0)
query = query.Where("oldest_version_ts < ?", cutoffTime)
}

query = query.Order("num_versions DESC")

// Apply limit only if provided
if limit != nil {
query = query.Limit(int(*limit))
}

err := query.Find(&collections).Error
if err != nil {
return nil, err
}
log.Info("collections to gc", zap.Any("collections", collections))
log.Debug("collections to gc", zap.Any("collections", collections))
return collections, nil
}

Expand Down Expand Up @@ -400,8 +416,23 @@ func (s *collectionDb) UpdateLogPositionVersionAndTotalRecords(collectionID stri
return version, nil
}

func (s *collectionDb) UpdateVersionFileName(collectionID, existingVersionFileName, newVersionFileName string) (int64, error) {
result := s.db.Model(&dbmodel.Collection{}).Where("id = ? AND version_file_name = ?", collectionID, existingVersionFileName).Updates(map[string]interface{}{"version_file_name": newVersionFileName})
func (s *collectionDb) UpdateVersionRelatedFields(collectionID, existingVersionFileName, newVersionFileName string, oldestVersionTs *time.Time, numActiveVersions *int) (int64, error) {
// Create updates map with required version_file_name
updates := map[string]interface{}{
"version_file_name": newVersionFileName,
}

// Only add optional fields if they are not nil
if oldestVersionTs != nil {
updates["oldest_version_ts"] = oldestVersionTs
}
if numActiveVersions != nil {
updates["num_versions"] = numActiveVersions
}

result := s.db.Model(&dbmodel.Collection{}).
Where("id = ? AND version_file_name = ?", collectionID, existingVersionFileName).
Updates(updates)
if result.Error != nil {
return 0, result.Error
}
Expand Down
16 changes: 10 additions & 6 deletions go/pkg/sysdb/metastore/db/dbmodel/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,17 @@ type Collection struct {
TotalRecordsPostCompaction uint64 `gorm:"total_records_post_compaction;default:0"`
SizeBytesPostCompaction uint64 `gorm:"size_bytes_post_compaction;default:0"`
LastCompactionTimeSecs uint64 `gorm:"last_compaction_time_secs;default:0"`
NumVersions uint32 `gorm:"num_versions;default:0"`
OldestVersionTs time.Time `gorm:"oldest_version_ts;type:timestamp"`
}

type CollectionToGc struct {
ID string `gorm:"id;primaryKey"`
Name string `gorm:"name;not null;index:idx_name,unique;"`
Version int32 `gorm:"version;default:0"`
VersionFileName string `gorm:"version_file_name"`
ID string `gorm:"id;primaryKey"`
Name string `gorm:"name;not null;index:idx_name,unique;"`
Version int32 `gorm:"version;default:0"`
VersionFileName string `gorm:"version_file_name"`
OldestVersionTs time.Time `gorm:"oldest_version_ts;type:timestamp"`
NumVersions uint32 `gorm:"num_versions;default:0"`
}

func (v Collection) TableName() string {
Expand All @@ -55,6 +59,6 @@ type ICollectionDb interface {
UpdateLogPositionAndVersionInfo(collectionID string, logPosition int64, currentCollectionVersion int32, currentVersionFileName string, newCollectionVersion int32, newVersionFileName string) (int64, error)
GetCollectionEntry(collectionID *string, databaseName *string) (*Collection, error)
GetCollectionSize(collectionID string) (uint64, error)
ListCollectionsToGc() ([]*CollectionToGc, error)
UpdateVersionFileName(collectionID, existingVersionFileName, newVersionFileName string) (int64, error)
ListCollectionsToGc(cutoffTimeSecs *uint64, limit *uint64) ([]*CollectionToGc, error)
UpdateVersionRelatedFields(collectionID, existingVersionFileName, newVersionFileName string, oldestVersionTs *time.Time, numActiveVersions *int) (int64, error)
}
2 changes: 2 additions & 0 deletions go/pkg/sysdb/metastore/db/migrations/20250306164715.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- Modify "collections" table
ALTER TABLE "public"."collections" ADD COLUMN "num_versions" bigint NULL DEFAULT 0, ADD COLUMN "oldest_version_ts" timestamp NULL;
3 changes: 2 additions & 1 deletion go/pkg/sysdb/metastore/db/migrations/atlas.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
h1:5Ybj6XKzjbbfSXj2Pfqho8XPtgKfd/CX2Ddm0uiaYkQ=
h1:Iiz+3XcgK4x/Wl6/QA1uvWFBHgZuYdy3cRgBzrQYC48=
20240313233558.sql h1:Gv0TiSYsqGoOZ2T2IWvX4BOasauxool8PrBOIjmmIdg=
20240321194713.sql h1:kVkNpqSFhrXGVGFFvL7JdK3Bw31twFcEhI6A0oCFCkg=
20240327075032.sql h1:nlr2J74XRU8erzHnKJgMr/tKqJxw9+R6RiiEBuvuzgo=
Expand All @@ -11,3 +11,4 @@ h1:5Ybj6XKzjbbfSXj2Pfqho8XPtgKfd/CX2Ddm0uiaYkQ=
20250109224431.sql h1:RjJ2Q3jAWj48T2vmEo7X9rI9cKFC6zIcBUTq4RaE14A=
20250115204645.sql h1:Wc/JBG18UxkoFpxHWO6hADnOeggii95ysCqN1knHtiA=
20250303211712.sql h1:0InApNBRlxhGoA59Wd5v4loKD2ZJv+j1gA+koZaVz2w=
20250306164715.sql h1:25UkZ4y+urtbezu5QR/2MWuIrxSZZ586gmNBa05uTkk=
22 changes: 17 additions & 5 deletions idl/chromadb/proto/coordinator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -388,11 +388,23 @@ message GetCollectionSizeResponse {
}

message ListCollectionsToGcRequest {
// Only return collections with last GC time less than this value.
// This introduces a limit on the number of collections that can be returned.
// This allows for a cheap pagination, and also a way to deal with GC to
// garbage collect collections that have not been cleaned up for a long time.
// uint64 last_gc_time_lt = 1;
// Return collections that need to be GCed based on this cutoff time.
// Currently, sysdb will return all collections that have versions created
// before this cutoff time.
// SysDb can apply additional logic to return the collections that should
// be prioritized for GC.
optional uint64 cutoff_time_secs = 1;

// Limit the number of collections that can be returned.
// GC will get n number of collections to GC. After GC is done, it will
// update the collections such that these collections are not returned again.
// This is to ensure that we do not GC the same collections over and over again.
// This also allows for a cheap and stateless pagination without using offsets.
optional uint64 limit = 2;

// Design NOTE: When GC calls DeleteCollectionVersion, sysdb will update the
// time associated with the oldest version of the collection. This allows
// sysdb to return the collections that have not been GCed for a long time.
}

message CollectionToGcInfo {
Expand Down
5 changes: 4 additions & 1 deletion rust/sysdb/src/sysdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -835,7 +835,10 @@ impl GrpcSysDb {
) -> Result<Vec<CollectionToGcInfo>, GetCollectionsToGcError> {
let res = self
.client
.list_collections_to_gc(chroma_proto::ListCollectionsToGcRequest {})
.list_collections_to_gc(chroma_proto::ListCollectionsToGcRequest {
cutoff_time_secs: None,
limit: None,
})
.await;

match res {
Expand Down

0 comments on commit f2f4b6e

Please sign in to comment.