Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ActiveJob, FIFO queues & message_group_id #645

Closed
davidrichey opened this issue Jan 22, 2021 · 4 comments
Closed

ActiveJob, FIFO queues & message_group_id #645

davidrichey opened this issue Jan 22, 2021 · 4 comments

Comments

@davidrichey
Copy link
Contributor

I have a question around FIFO queues & the message_group_id with the implementation of ActiveJob. Is there an easy way to set the message_group_id? I've gone over the documentation and can set it using:

queue_name = 'my-queue'
job_args = {
  "job_class": "MyJob",
  "job_id": SecureRandom.uuid,
  "provider_job_id": nil,
  "queue_name": queue_name,
  "priority": nil,
  "arguments": ['args'],
  "executions": 0,
  "locale": "en"
}

message = {
  message_body: job_args,
  message_group_id: "GroupOne",
  message_attributes: {
    shoryuken_class: {
      string_value: 'ActiveJob::QueueAdapters::ShoryukenAdapter::JobWrapper',
      data_type: "String"
    }
  }
}

Shoryuken::Client.queues(queue_name).send_message(message)

but haven't found a short hand way to use this. I've tried the ones mentioned here but no luck with using ActiveJob.

I've been playing around with the ShoryukenAdapter to see if there is an easy implementation with ActiveJob. Rails implements the set method on the ActiveJob class but only these options: :wait, :wait_until, :queue, :priority

I don't see priority being used in the ActiveJob adapter, could we use that?

MyJob.set(priority: 1).perform_later('args')

Then implement something like 👇 in the [Active Job adapter]?(https://github.com/phstc/shoryuken/blob/master/lib/shoryuken/extensions/active_job_adapter.rb#L63)

unless options[:priority].nil?
  opts[:message_group_id] = "#{Shoryuken::Queue::MESSAGE_GROUP_ID}-#{options[:priority]}
end
@davidrichey
Copy link
Contributor Author

davidrichey commented Jan 22, 2021

Referencing #4 , starting here

@cjlarose
Copy link
Contributor

cjlarose commented Jan 23, 2021

I think #635 attempts to solves a similar, but distinct, problem with specifying message_attributes when using the ActiveJob interface.

I think these proposals are interesting. If I understand it correctly, you're defining your workers using the ActiveJob::Base-based interface as illustrated in the wiki. This gives you the flexibility to use other ActiveJob queue adapters since you wouldn't have to change out the worker code.

When enqueuing a message with ActiveJobs interface (perform_later), it is possible to enqueue jobs to Shoryuken using that queue-processor-agnostic interface. However, it is not currently possible to specify SQS-specific options like the message_group_id, message_deduplication_id, or additional message_attributes. There doesn't appear to be much that ActiveJob offers in terms of being able to specify queue-adapter-specific options, but that kinda makes sense: it's meant to provide the common-denominator interface for all queue adapters.

Your code snippet offers a workaround by using the Shoryuken::Queue-based interface. ActiveJob::QueueAdapaters::ShoryukenAdapter actually provides a nicer way to accomplish what you want already:

job = MyJob.new 'args'
MyJob.queue_adapter.enqueue(job, message_group_id: 'GroupOne') # edit: problems with this code described below

It's not well-documented in the wiki that this is possible, and it's not the nicest interface, but I think it'll work for your use case. Let me know if this does what you want.

@davidrichey
Copy link
Contributor Author

Yep! That works and exactly what I was looking for. This would be nice to add in the code snippet under the How does that work? section in the FIFO Queues wiki page.

@cjlarose
Copy link
Contributor

Actually, one problem with that snippet is that it work won't if you swap out the ActiveJob queue adapter (for example in your testing environment). I opened #646 which I think addresses the bigger issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants