Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(deploy): add terraform configs #24

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@

/cmd/indexing-service
/indexing-service
*.car
*.car
/build
11 changes: 11 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
GOOS=linux
GOARCH=arm64
GOCC?=go
GOFLAGS=-tags=lambda.norpc
CGO_ENABLED=0
LAMBDAS=getclaims getroot notifier postclaims providercache remotesync

lambdas: $(LAMBDAS)

$(LAMBDAS): %:
GOOS=$(GOOS) GOARCH=$(GOARCH) CGO_ENABLED=$(CGO_ENABLED) $(GOCC) build $(GOFLAGS) -o build/$@/bootstrap cmd/lambda/$@/main.go
Binary file added bootstrap
Binary file not shown.
21 changes: 21 additions & 0 deletions cmd/lambda/getclaims/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package main

import (
"context"
"net/http"

"github.com/aws/aws-lambda-go/lambda"
"github.com/awslabs/aws-lambda-go-api-proxy/httpadapter"
"github.com/storacha/indexing-service/pkg/aws"
"github.com/storacha/indexing-service/pkg/server"
)

func main() {
config := aws.FromEnv(context.Background())
service, err := aws.Construct(config)
if err != nil {
panic(err)
}
handler := server.GetClaimsHandler(service)
lambda.Start(httpadapter.NewV2(http.HandlerFunc(handler)).ProxyWithContext)
}
17 changes: 17 additions & 0 deletions cmd/lambda/getroot/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package main

import (
"context"
"net/http"

"github.com/aws/aws-lambda-go/lambda"
"github.com/awslabs/aws-lambda-go-api-proxy/httpadapter"
"github.com/storacha/indexing-service/pkg/aws"
"github.com/storacha/indexing-service/pkg/server"
)

func main() {
config := aws.FromEnv(context.Background())
handler := server.GetRootHandler(config.Signer)
lambda.Start(httpadapter.NewV2(http.HandlerFunc(handler)).ProxyWithContext)
}
42 changes: 42 additions & 0 deletions cmd/lambda/notifier/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package main

import (
"context"
"time"

"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
logging "github.com/ipfs/go-log/v2"
"github.com/storacha/indexing-service/pkg/aws"
"github.com/storacha/indexing-service/pkg/service/providerindex/notifier"
)

var log = logging.Logger("lambda/notifier")

func makeHandler(notifier *notifier.Notifier) func(ctx context.Context, event events.EventBridgeEvent) {
return func(ctx context.Context, event events.EventBridgeEvent) {
synced, ts, err := notifier.Update(ctx)
if err != nil {
log.Errorf("error during notifier sync head check: %s", err.Error())
return
}
if !synced {
log.Warnf("remote IPNI subscriber did not sync for %s", time.Since(ts))
}
}
}

func main() {
config := aws.FromEnv(context.Background())
// setup IPNI
// TODO: switch to double hashed client for reader privacy?
headStore := aws.NewS3Store(config.Config, config.NotifierHeadBucket, "")
notifier, err := notifier.NewNotifierWithStorage(config.IndexerURL, config.PrivateKey, headStore)
if err != nil {
panic(err)
}
sqsRemoteSyncNotifier := aws.NewSQSRemoteSyncNotifier(config.Config, config.NotifierTopicArn)
notifier.Notify(sqsRemoteSyncNotifier.NotifyRemoteSync)

lambda.Start(makeHandler(notifier))
}
21 changes: 21 additions & 0 deletions cmd/lambda/postclaims/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package main

import (
"context"
"net/http"

"github.com/aws/aws-lambda-go/lambda"
"github.com/awslabs/aws-lambda-go-api-proxy/httpadapter"
"github.com/storacha/indexing-service/pkg/aws"
"github.com/storacha/indexing-service/pkg/server"
)

func main() {
config := aws.FromEnv(context.Background())
service, err := aws.Construct(config)
if err != nil {
panic(err)
}
handler := server.PostClaimsHandler(config.Signer, service)
lambda.Start(httpadapter.NewV2(http.HandlerFunc(handler)).ProxyWithContext)
}
72 changes: 72 additions & 0 deletions cmd/lambda/providercache/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package main

import (
"context"
"errors"
"sync"

"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
logging "github.com/ipfs/go-log/v2"
goredis "github.com/redis/go-redis/v9"
"github.com/storacha/indexing-service/pkg/aws"
"github.com/storacha/indexing-service/pkg/redis"
"github.com/storacha/indexing-service/pkg/service/providercacher"
)

var log = logging.Logger("lambda/providercache")

func handleMessage(ctx context.Context, sqsCachingDecoder *aws.SQSCachingDecoder, providerCacher providercacher.ProviderCacher, msg events.SQSMessage) error {
job, err := sqsCachingDecoder.DecodeMessage(ctx, msg.Body)
if err != nil {
return err
}
_, err = providerCacher.CacheProviderForIndexRecords(ctx, job.Provider, job.Index)
if err != nil {
return err
}
return nil
}

func makeHandler(sqsCachingDecoder *aws.SQSCachingDecoder, providerCacher providercacher.ProviderCacher) func(ctx context.Context, sqsEvent events.SQSEvent) error {
return func(ctx context.Context, sqsEvent events.SQSEvent) error {
// process messages in parallel
results := make(chan error, len(sqsEvent.Records))
var wg sync.WaitGroup
for _, msg := range sqsEvent.Records {
wg.Add(1)
go func(msg events.SQSMessage) {
defer wg.Done()
err := handleMessage(ctx, sqsCachingDecoder, providerCacher, msg)
results <- err
}(msg)
}
wg.Wait()
// collect errors
close(results)
var err error
for nextErr := range results {
err = errors.Join(err, nextErr)
}
// return overall error
if err != nil {
return err
}
for _, msg := range sqsEvent.Records {
err := sqsCachingDecoder.CleanupMessage(ctx, msg.Body)
if err != nil {
log.Warnf("unable to cleanup message fully: %s", err.Error())
}
}
return nil
}
}

func main() {
config := aws.FromEnv(context.Background())
providerRedis := goredis.NewClient(&config.ProvidersRedis)
providerStore := redis.NewProviderStore(providerRedis)
providerCacher := providercacher.NewSimpleProviderCacher(providerStore)
sqsCachingDecoder := aws.NewSQSCachingDecoder(config.Config, config.CachingBucket)
lambda.Start(makeHandler(sqsCachingDecoder, providerCacher))
}
53 changes: 53 additions & 0 deletions cmd/lambda/remotesync/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package main

import (
"context"
"encoding/json"

"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
"github.com/ipfs/go-cid"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
goredis "github.com/redis/go-redis/v9"
"github.com/storacha/indexing-service/pkg/aws"
"github.com/storacha/indexing-service/pkg/redis"
"github.com/storacha/indexing-service/pkg/service/providerindex"
"github.com/storacha/indexing-service/pkg/service/providerindex/store"
)

func makeHandler(remoteSyncer *providerindex.RemoteSyncer) func(ctx context.Context, snsEvent events.SNSEvent) error {
return func(ctx context.Context, snsEvent events.SNSEvent) error {
for _, record := range snsEvent.Records {
snsRecord := record.SNS
var snsRemoteSyncMessage aws.SNSRemoteSyncMessage
err := json.Unmarshal([]byte(snsRecord.Message), &snsRemoteSyncMessage)
if err != nil {
return err
}
headCid, err := cid.Parse(snsRemoteSyncMessage.Head)
if err != nil {
return err
}
head := cidlink.Link{Cid: headCid}
prevCid, err := cid.Parse(snsRemoteSyncMessage.Prev)
if err != nil {
return err
}
prev := cidlink.Link{Cid: prevCid}
remoteSyncer.HandleRemoteSync(ctx, head, prev)
}
return nil
}
}

func main() {
cfg := aws.FromEnv(context.Background())
providerRedis := goredis.NewClient(&cfg.ProvidersRedis)
providerStore := redis.NewProviderStore(providerRedis)
ipniStore := aws.NewS3Store(cfg.Config, cfg.IPNIStoreBucket, cfg.IPNIStorePrefix)
chunkLinksTable := aws.NewDynamoProviderContextTable(cfg.Config, cfg.ChunkLinksTableName)
metadataTable := aws.NewDynamoProviderContextTable(cfg.Config, cfg.MetadataTableName)
publisherStore := store.NewPublisherStore(ipniStore, chunkLinksTable, metadataTable)
remoteSyncer := providerindex.NewRemoteSyncer(providerStore, publisherStore)
lambda.Start(makeHandler(remoteSyncer))
}
43 changes: 43 additions & 0 deletions deploy/dynamodb.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
resource "aws_dynamodb_table" "metadata" {
name = "${terraform.workspace}-${var.app}-metadata"
billing_mode = "PAY_PER_REQUEST"

attribute {
name = "provider"
type = "S"
}

attribute {
name = "contextID"
type = "B"
}

hash_key = "provider"
range_key = "contextID"

tags = {
Name = "${terraform.workspace}-${var.app}-metadata"
}
}

resource "aws_dynamodb_table" "chunk_links" {
name = "${terraform.workspace}-${var.app}-chunk-links"
billing_mode = "PAY_PER_REQUEST"

attribute {
name = "provider"
type = "S"
}

attribute {
name = "contextID"
type = "B"
}

hash_key = "provider"
range_key = "contextID"

tags = {
Name = "${terraform.workspace}-${var.app}-chunk-links"
}
}
85 changes: 85 additions & 0 deletions deploy/elasticcache.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
locals {
caches = ["providers","indexes","claims"]
}
resource "aws_kms_key" "cache_key" {
description = "KMS CMK for ${terraform.workspace} ${var.app}"
enable_key_rotation = true
}

resource "aws_elasticache_serverless_cache" "cache" {
for_each = local.caches

engine = "REDIS"
name = "${terraform.workspace}-${var.app}-${each.key}-cache"
cache_usage_limits {
data_storage {
maximum = terraform.workspace == "prod" ? 10 : 1
unit = "GB"
}
ecpu_per_second {
maximum = terraform.workspace == "prod" ? 5000 : 500
}
}
daily_snapshot_time = "2:00"
description = "${terraform.workspace} ${var.app} ${each.key} serverless cluster"
kms_key_id = aws_kms_key.cache_key.arn
major_engine_version = "7"
security_group_ids = [aws_security_group.cache_security_group.id]

snapshot_retention_limit = 7
subnet_ids = aws_subnet.vpc_private_subnet[*].id

user_group_id = aws_elasticache_user_group.cache_user_group.user_group_id
}

resource "aws_elasticache_user_group" "cache_user_group" {
engine = "REDIS"
user_group_id = "${terraform.workspace}-${var.app}-redis"

user_ids = [
"${terraform.workspace}-${var.app}-default-disabled",
"${terraform.workspace}-${var.app}-iam-user"
]

lifecycle {
ignore_changes = [user_ids]
}
}

resource "aws_elasticache_user" "cache_default_user" {
user_id = "${terraform.workspace}-${var.app}-default-disabled"
user_name = "default"
access_string = "off +get ~keys*"
authentication_mode {
type = "no-password-required"
}
engine = "REDIS"
}

resource "aws_elasticache_user" "cache_iam_user" {
user_id = "${terraform.workspace}-${var.app}-iam-user"
user_name = "iam-user"
access_string = "on ~* +@all"
authentication_mode {
type = "iam"
}
engine = "REDIS"
}

resource "aws_security_group" "cache_security_group" {

name = "${terraform.workspace}-${var.app}-cache-security-group"
description = "Security group for VPC access to redis"
vpc_id = module.vpc.vpc_id
}

resource "aws_security_group_rule" "cache_security_group_rule" {
security_group_id = aws_security_group.cache_security_group.id
type = "ingress"

cidr_blocks = [aws_vpc.vpc.cidr_block]
description = "Redis"
from_port = 6379
to_port = 6379
protocol = "tcp"
}
Loading
Loading