Skip to content

Commit 1790653

Browse files
committed
feat(aws): add aws lambdas and infra
1 parent 88f3093 commit 1790653

File tree

17 files changed

+924
-50
lines changed

17 files changed

+924
-50
lines changed

cmd/lambda/getclaims/main.go

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"net/http"
6+
7+
"github.com/aws/aws-lambda-go/lambda"
8+
"github.com/awslabs/aws-lambda-go-api-proxy/httpadapter"
9+
"github.com/storacha/indexing-service/pkg/aws"
10+
"github.com/storacha/indexing-service/pkg/server"
11+
)
12+
13+
func main() {
14+
config := aws.FromEnv(context.Background())
15+
service, err := aws.Construct(config)
16+
if err != nil {
17+
panic(err)
18+
}
19+
handler := server.GetClaimsHandler(service)
20+
lambda.Start(httpadapter.NewV2(http.HandlerFunc(handler)).ProxyWithContext)
21+
}

cmd/lambda/getroot/main.go

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"net/http"
6+
7+
"github.com/aws/aws-lambda-go/lambda"
8+
"github.com/awslabs/aws-lambda-go-api-proxy/httpadapter"
9+
"github.com/storacha/indexing-service/pkg/aws"
10+
"github.com/storacha/indexing-service/pkg/server"
11+
)
12+
13+
func main() {
14+
config := aws.FromEnv(context.Background())
15+
handler := server.GetRootHandler(config.Signer)
16+
lambda.Start(httpadapter.NewV2(http.HandlerFunc(handler)).ProxyWithContext)
17+
}

cmd/lambda/notifier/main.go

+42
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"github.com/aws/aws-lambda-go/events"
8+
"github.com/aws/aws-lambda-go/lambda"
9+
logging "github.com/ipfs/go-log/v2"
10+
"github.com/storacha/indexing-service/pkg/aws"
11+
"github.com/storacha/indexing-service/pkg/service/providerindex/notifier"
12+
)
13+
14+
var log = logging.Logger("lambda/notifier")
15+
16+
func makeHandler(notifier *notifier.Notifier) func(ctx context.Context, event events.EventBridgeEvent) {
17+
return func(ctx context.Context, event events.EventBridgeEvent) {
18+
synced, ts, err := notifier.Update(ctx)
19+
if err != nil {
20+
log.Errorf("error during notifier sync head check: %s", err.Error())
21+
return
22+
}
23+
if !synced {
24+
log.Warnf("remote IPNI subscriber did not sync for %s", time.Since(ts))
25+
}
26+
}
27+
}
28+
29+
func main() {
30+
config := aws.FromEnv(context.Background())
31+
// setup IPNI
32+
// TODO: switch to double hashed client for reader privacy?
33+
headStore := aws.NewS3Store(config.Config, config.NotifierHeadBucket, "")
34+
notifier, err := notifier.NewNotifierWithStorage(config.IndexerURL, config.PrivateKey, headStore)
35+
if err != nil {
36+
panic(err)
37+
}
38+
sqsRemoteSyncNotifier := aws.NewSQSRemoteSyncNotifier(config.Config, config.NotifierTopicArn)
39+
notifier.Notify(sqsRemoteSyncNotifier.NotifyRemoteSync)
40+
41+
lambda.Start(makeHandler(notifier))
42+
}

cmd/lambda/postclaims/main.go

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"net/http"
6+
7+
"github.com/aws/aws-lambda-go/lambda"
8+
"github.com/awslabs/aws-lambda-go-api-proxy/httpadapter"
9+
"github.com/storacha/indexing-service/pkg/aws"
10+
"github.com/storacha/indexing-service/pkg/server"
11+
)
12+
13+
func main() {
14+
config := aws.FromEnv(context.Background())
15+
service, err := aws.Construct(config)
16+
if err != nil {
17+
panic(err)
18+
}
19+
handler := server.PostClaimsHandler(config.Signer, service)
20+
lambda.Start(httpadapter.NewV2(http.HandlerFunc(handler)).ProxyWithContext)
21+
}

cmd/lambda/providercache/main.go

+72
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"errors"
6+
"sync"
7+
8+
"github.com/aws/aws-lambda-go/events"
9+
"github.com/aws/aws-lambda-go/lambda"
10+
logging "github.com/ipfs/go-log/v2"
11+
goredis "github.com/redis/go-redis/v9"
12+
"github.com/storacha/indexing-service/pkg/aws"
13+
"github.com/storacha/indexing-service/pkg/redis"
14+
"github.com/storacha/indexing-service/pkg/service/providercacher"
15+
)
16+
17+
var log = logging.Logger("lambda/providercache")
18+
19+
func handleMessage(ctx context.Context, sqsCachingDecoder *aws.SQSCachingDecoder, providerCacher providercacher.ProviderCacher, msg events.SQSMessage) error {
20+
job, err := sqsCachingDecoder.DecodeMessage(ctx, msg.Body)
21+
if err != nil {
22+
return err
23+
}
24+
_, err = providerCacher.CacheProviderForIndexRecords(ctx, job.Provider, job.Index)
25+
if err != nil {
26+
return err
27+
}
28+
return nil
29+
}
30+
31+
func makeHandler(sqsCachingDecoder *aws.SQSCachingDecoder, providerCacher providercacher.ProviderCacher) func(ctx context.Context, sqsEvent events.SQSEvent) error {
32+
return func(ctx context.Context, sqsEvent events.SQSEvent) error {
33+
// process messages in parallel
34+
results := make(chan error, len(sqsEvent.Records))
35+
var wg sync.WaitGroup
36+
for _, msg := range sqsEvent.Records {
37+
wg.Add(1)
38+
go func(msg events.SQSMessage) {
39+
defer wg.Done()
40+
err := handleMessage(ctx, sqsCachingDecoder, providerCacher, msg)
41+
results <- err
42+
}(msg)
43+
}
44+
wg.Wait()
45+
// collect errors
46+
close(results)
47+
var err error
48+
for nextErr := range results {
49+
err = errors.Join(err, nextErr)
50+
}
51+
// return overall error
52+
if err != nil {
53+
return err
54+
}
55+
for _, msg := range sqsEvent.Records {
56+
err := sqsCachingDecoder.CleanupMessage(ctx, msg.Body)
57+
if err != nil {
58+
log.Warnf("unable to cleanup message fully: %s", err.Error())
59+
}
60+
}
61+
return nil
62+
}
63+
}
64+
65+
func main() {
66+
config := aws.FromEnv(context.Background())
67+
providerRedis := goredis.NewClient(&config.ProvidersRedis)
68+
providerStore := redis.NewProviderStore(providerRedis)
69+
providerCacher := providercacher.NewSimpleProviderCacher(providerStore)
70+
sqsCachingDecoder := aws.NewSQSCachingDecoder(config.Config, config.CachingBucket)
71+
lambda.Start(makeHandler(sqsCachingDecoder, providerCacher))
72+
}

cmd/lambda/remotesync/main.go

+69
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
7+
"github.com/aws/aws-lambda-go/events"
8+
"github.com/aws/aws-lambda-go/lambda"
9+
"github.com/ipfs/go-cid"
10+
logging "github.com/ipfs/go-log/v2"
11+
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
12+
goredis "github.com/redis/go-redis/v9"
13+
"github.com/storacha/indexing-service/pkg/aws"
14+
"github.com/storacha/indexing-service/pkg/redis"
15+
"github.com/storacha/indexing-service/pkg/service/providercacher"
16+
"github.com/storacha/indexing-service/pkg/service/providerindex"
17+
"github.com/storacha/indexing-service/pkg/service/providerindex/store"
18+
)
19+
20+
var log = logging.Logger("lambda/providercache")
21+
22+
func handleMessage(ctx context.Context, sqsCachingDecoder *aws.SQSCachingDecoder, providerCacher providercacher.ProviderCacher, msg events.SQSMessage) error {
23+
job, err := sqsCachingDecoder.DecodeMessage(ctx, msg.Body)
24+
if err != nil {
25+
return err
26+
}
27+
_, err = providerCacher.CacheProviderForIndexRecords(ctx, job.Provider, job.Index)
28+
if err != nil {
29+
return err
30+
}
31+
return nil
32+
}
33+
34+
func makeHandler(remoteSyncer *providerindex.RemoteSyncer) func(ctx context.Context, snsEvent events.SNSEvent) error {
35+
return func(ctx context.Context, snsEvent events.SNSEvent) error {
36+
for _, record := range snsEvent.Records {
37+
snsRecord := record.SNS
38+
var snsRemoteSyncMessage aws.SNSRemoteSyncMessage
39+
err := json.Unmarshal([]byte(snsRecord.Message), &snsRemoteSyncMessage)
40+
if err != nil {
41+
return err
42+
}
43+
headCid, err := cid.Parse(snsRemoteSyncMessage.Head)
44+
if err != nil {
45+
return err
46+
}
47+
head := cidlink.Link{Cid: headCid}
48+
prevCid, err := cid.Parse(snsRemoteSyncMessage.Prev)
49+
if err != nil {
50+
return err
51+
}
52+
prev := cidlink.Link{Cid: prevCid}
53+
remoteSyncer.HandleRemoteSync(ctx, head, prev)
54+
}
55+
return nil
56+
}
57+
}
58+
59+
func main() {
60+
cfg := aws.FromEnv(context.Background())
61+
providerRedis := goredis.NewClient(&cfg.ProvidersRedis)
62+
providerStore := redis.NewProviderStore(providerRedis)
63+
ipniStore := aws.NewS3Store(cfg.Config, cfg.IPNIStoreBucket, cfg.IPNIStorePrefix)
64+
chunkLinksTable := aws.NewDynamoProviderContextTable(cfg.Config, cfg.ChunkLinksTableName)
65+
metadataTable := aws.NewDynamoProviderContextTable(cfg.Config, cfg.MetadataTableName)
66+
publisherStore := store.NewPublisherStore(ipniStore, chunkLinksTable, metadataTable)
67+
remoteSyncer := providerindex.NewRemoteSyncer(providerStore, publisherStore)
68+
lambda.Start(makeHandler(remoteSyncer))
69+
}

go.mod

+29-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ go 1.23
55
toolchain go1.23.0
66

77
require (
8+
github.com/aws/aws-sdk-go-v2 v1.32.2
89
github.com/ipfs/go-cid v0.4.1
910
github.com/ipfs/go-ds-flatfs v0.5.1
1011
github.com/ipld/go-ipld-prime v0.21.1-0.20240917223228-6148356a4c2e
@@ -21,8 +22,26 @@ require (
2122

2223
require (
2324
github.com/alexbrainman/goissue34681 v0.0.0-20191006012335-3fc7a47baff5 // indirect
25+
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.6 // indirect
26+
github.com/aws/aws-sdk-go-v2/credentials v1.17.41 // indirect
27+
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.17 // indirect
28+
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.21 // indirect
29+
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.21 // indirect
30+
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 // indirect
31+
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.21 // indirect
32+
github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.24.2 // indirect
33+
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.0 // indirect
34+
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.4.2 // indirect
35+
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.10.2 // indirect
36+
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.2 // indirect
37+
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.2 // indirect
38+
github.com/aws/aws-sdk-go-v2/service/sso v1.24.2 // indirect
39+
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.2 // indirect
40+
github.com/aws/aws-sdk-go-v2/service/sts v1.32.2 // indirect
41+
github.com/aws/smithy-go v1.22.0 // indirect
2442
github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect
2543
github.com/ipfs/go-peertaskqueue v0.8.1 // indirect
44+
github.com/jmespath/go-jmespath v0.4.0 // indirect
2645
github.com/pion/ice/v2 v2.3.35 // indirect
2746
github.com/quic-go/qpack v0.5.1 // indirect
2847
github.com/russross/blackfriday/v2 v2.1.0 // indirect
@@ -32,6 +51,15 @@ require (
3251
)
3352

3453
require (
54+
github.com/aws/aws-lambda-go v1.47.0
55+
github.com/aws/aws-sdk-go-v2/config v1.27.43
56+
github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.15.12
57+
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.36.2
58+
github.com/aws/aws-sdk-go-v2/service/s3 v1.65.3
59+
github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.34.2
60+
github.com/aws/aws-sdk-go-v2/service/sns v1.33.2
61+
github.com/aws/aws-sdk-go-v2/service/sqs v1.36.2
62+
github.com/awslabs/aws-lambda-go-api-proxy v0.16.2
3563
github.com/cespare/xxhash/v2 v2.3.0 // indirect
3664
github.com/davecgh/go-spew v1.1.1 // indirect
3765
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 // indirect
@@ -40,7 +68,7 @@ require (
4068
github.com/go-logr/stdr v1.2.2 // indirect
4169
github.com/gobwas/glob v0.2.3 // indirect
4270
github.com/gogo/protobuf v1.3.2 // indirect
43-
github.com/google/uuid v1.6.0 // indirect
71+
github.com/google/uuid v1.6.0
4472
github.com/hashicorp/errwrap v1.1.0 // indirect
4573
github.com/hashicorp/go-multierror v1.1.1 // indirect
4674
github.com/hashicorp/golang-lru v1.0.2 // indirect

0 commit comments

Comments
 (0)