Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pubsub.jetstream bulk subscribe feature doesn't seem to work #3656

Open
fulop-bede opened this issue Jan 24, 2025 · 2 comments
Open

Pubsub.jetstream bulk subscribe feature doesn't seem to work #3656

fulop-bede opened this issue Jan 24, 2025 · 2 comments
Labels
kind/bug Something isn't working

Comments

@fulop-bede
Copy link

Hi, I'm trying to use the bulk subscribe feature with the jetstream pubsub component, but I'm experiencing issues.

Expected Behavior

When using the bulk subscribe feature, I expect:

  1. Receiving multiple messages in one request in my app
  2. The ability to acknowledge or retry each message independently.

Actual Behavior

Right out of the box, always only one message is delivered in one request to the app endpoint, no matter how many are waiting to be delivered. I've looked at the component implementation and found this Concurrency config (this is not in the docs for some reason). When I set this config to parallel, the bulk subscription endpoint in my app starts receiving multiple messages in the same request, but unfortunately, using this setting seems to break the ACK mechanism, Dapr starts logging errors like:
level=error msg="Error while sending ACK for JetStream message transaction-events/{3 2549}: nats: message was already acknowledged"
I'm using the explicit ackPolicy. Without the parallel concurrency setting, the ack policy works as intended, but the bulk message delivery does not.

Steps to Reproduce the Problem

I'm using the dotnet SDK to subscribe to a topic:

app.MapPost("/transaction-events",
        (BulkSubscribeMessage<BulkMessageModel<Event>> bulkMessages) =>
        {
            var responseEntries = bulkMessages.Entries
                .Select(message => new BulkSubscribeAppResponseEntry(message.EntryId, BulkSubscribeAppResponseStatus.SUCCESS))
                .ToList();
            return new BulkSubscribeAppResponse(responseEntries);
        })
    .WithTopic(new()
    {
        PubsubName = config.Value.PubsubName,
        Name = config.Value.Topic
    })
    .WithBulkSubscribe(new()
    {
        TopicName = config.Value.Topic,
        MaxMessagesCount = 100,
        MaxAwaitDurationMs = 10
    });

my pubsub.nats component:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pubsub-nats
spec:
  type: pubsub.jetstream
  version: v1
  metadata:
  - name: natsURL
    value: "nats://localhost:4222"
  - name: streamName
    value: "transaction-events"
  - name: durableName
    value: "transaction-events-durable"
  - name: queueGroupName
    value: "transaction-events-processor"
  - name: concurrency
    value: "parallel"
  - name: concurrencyMode
    value: "parallel"
  - name: ackPolicy
    value: "explicit"
  - name: ackWait
    value: "2s"
  - name: maxDeliver
    value: "5"
  - name: backOff
    value: "50ms, 1s, 10s"
  - name: maxAckPending
    value: 5000

Observations and Questions

Is the parallel concurrency setting supposed to be used for bulk subscriptions, and if so, is the acknowledgment issue expected behavior?
Why is the concurrency configuration undocumented for this component?
What is the correct configuration to achieve bulk delivery with independent acknowledgment?

@fulop-bede fulop-bede added the kind/bug Something isn't working label Jan 24, 2025
@alicejgibbons
Copy link

Note that bulk functionality is only for a subset of components all the way through at this point: https://docs.dapr.io/developing-applications/building-blocks/pubsub/pubsub-bulk/#how-components-handle-publishing-and-subscribing-to-bulk-messages

For nats its only supported to/from App and Dapr. Not sure if this is why you're seeing the error.

@fulop-bede
Copy link
Author

thanks Alice, but if the bulk subscribe should work between the app and dapr, I should still be able to receive multiple messages at once in my app.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind/bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants