Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

activejob support #4

Closed
juggy opened this issue Oct 8, 2014 · 23 comments
Closed

activejob support #4

juggy opened this issue Oct 8, 2014 · 23 comments

Comments

@juggy
Copy link

juggy commented Oct 8, 2014

Is that in the plans?

@phstc
Copy link
Collaborator

phstc commented Oct 8, 2014

Hey @juggy

hm nice idea, I will look into.

@elsurudo
Copy link
Contributor

elsurudo commented Nov 4, 2014

👍

Would definitely be useful once rails 4.2 rolls out.

@phstc
Copy link
Collaborator

phstc commented Nov 6, 2014

I believe the Shoryuken adapter would be something like that:

# https://github.com/rails/rails/blob/master/activejob/lib/active_job/queue_adapters/shoryuken.rb

require 'shoryuken'

module ActiveJob
  module QueueAdapters
    # == Shoryuken adapter for Active Job
    #
    # Shoryuken is a super efficient AWS SQS thread based message processor for Ruby.
    #
    # Read more about Shoryuken {here}[https://aws.amazon.com/sqs/].
    # Read more about Amazon SQS {here}[https://aws.amazon.com/sqs/].
    #
    # To use Shoryuken set the queue_adapter config to +:shoryuken+.
    #
    #   Rails.application.config.active_job.queue_adapter = :shoryuken
    class ShoryukenAdapter
      class << self
        def enqueue(job) #:nodoc:
          register_worker!(job)

          Shoryuken::Client.send_message(job.queue_name, job.serialize)
        end

        def enqueue_at(job, timestamp) #:nodoc:
          register_worker!(job)

          delay = timestamp - Time.current.to_f

          raise 'The maximum allowed delay is 15 minutes' if delay > 15.minutes

          Shoryuken::Client.send_message(job.queue_name, job.serialize, delay_seconds: delay)
        end

        private

        def register_worker!(job)
          Shoryuken.register_worker(job.queue_name, JobWrapper)
        end
      end

      class JobWrapper #:nodoc:
        include Shoryuken::Worker

        def perform(sqs_msg, body)
          Base.execute body

          sqs_msg.delete
        end
      end
    end
  end
end

But we need to change Shoryuken to allow starting it with queues without workers, as they will be declared dynamically.

So here: https://github.com/phstc/shoryuken/blob/master/lib/shoryuken/manager.rb#L169 it needs to skip queues without a worker associated with.

def next_queue
  return nil if @queues.empty?

  queue = @queues.shift
  @queues << queue

  # try again if no worker defined
  return next_queue unless Shoryuken.workers.include? queue

  queue
end

And we need also to replace this validation: https://github.com/phstc/shoryuken/blob/master/lib/shoryuken/cli.rb#L245-L247 with:

Shoryuken.queues.each do |queue| 
  logger.warn "No worker supplied for #{queue}" unless Shoryuken.workers.include? queue
end

Anyone interested on that? 🙏

@juggy
Copy link
Author

juggy commented Nov 6, 2014

Seems like a good compromise. Do you see any drawbacks with this code (more specially the last 2 points)?

@phstc
Copy link
Collaborator

phstc commented Nov 6, 2014

I don't see any drawbacks, actually just with return next_queue unless Shoryuken.workers.include? queue it might cause an endless recursion in case of all queues are without worker.

Maybe a better approach:

def next_queue
  return nil if @queues.empty?

  # get/remove the first queue in the list
  queue = @queues.shift


  unless Shoryuken.workers.include? queue
    # when no worker registered pause the queue to avoid endless recursion

    logger.info "Pausing '#{queue}' for #{Shoryuken.options[:delay_no_workers].to_f} seconds, because of no worker registered"

    after(Shoryuken.options[:delay_no_workers].to_f) { async.restart_queue!(queue) }

    return next_queue
  end

  # add queue back to the end of the list
  @queues << queue

  queue
end

delay_no_workers would be a new option in the shoryuken.yml - Default 60 seconds?

What do you think?

@elsurudo
Copy link
Contributor

elsurudo commented Nov 7, 2014

Seems like you might be able to make use of the JobWorker discussed in our other issue for this: #7 (comment). If we add that functionality first (but with ActiveJob support in mind), we should be able to use it here.

@phstc
Copy link
Collaborator

phstc commented Jan 4, 2015

Added ActiveJob Support 😄

@phstc phstc closed this as completed Jan 4, 2015
@renanmzmendes
Copy link

For the main ruby process (in a Rails app) to be able to save the job to the queue, Shoryuken must know about the main AWS configuration. It would be nice to have a generator that would create a loader of shoryuken.yml on the initializers folder.

Maybe refactor a Shoryuken::CLI method so it can also be used in the initializer?

Mine was something like this:

def parse_config(config_file)
  if File.exist?(config_file)
    YAML.load(ERB.new(IO.read(config_file)).result)
  else
    raise ArgumentError, "Config file #{config_file} does not exist"
  end
end

config = parse_config([Dir.pwd, 'config/shoryuken.yml'].join('/')).deep_symbolize_keys
Shoryuken.options.merge!(config)

AWS.config Shoryuken.options[:aws]

Cheers

@phstc
Copy link
Collaborator

phstc commented Mar 6, 2015

would you like to extract https://github.com/phstc/shoryuken/blob/master/lib/shoryuken/environment_loader.rb#L46 and call it a in rails initializer?

@renanmzmendes
Copy link

Actually, one would actually need to load shoryuken.yml so that Shoryuken.options[:aws] wasn't empty when initialize_aws was called, do you agree? Maybe a larger refactoring should be done in order to accomplish this...

P.S.: my previous post was based on the 0.0.5 gem version

@phstc
Copy link
Collaborator

phstc commented Mar 7, 2015

I agree!

Can you update to the master version? If so, maybe something like that would work:

options = Shoryuken.load_config_file 'shoryuken.yml'
Shoryuken.initialize_aws(options)

The we would need to update here and here to use these new methods.

wdyt?

@claudia-beas
Copy link

Hi,
so how exactly would I use Shoryuken with ActiveJob? Because I need to set queuing backend, but I don't know what I'm supposed to write for Shoryuken.
Or is there a possibility to use something like perform_later with Shoryuken without an extra Active Job?

@phstc
Copy link
Collaborator

phstc commented Mar 19, 2015

hi @claudia-beas

You can use Shoryuken without Rails (or ActiveJob) as a standalone process.

Please have a look at the documentation and related wiki pages:

@claudia-beas
Copy link

hi @phstc , thanks for your quick answer. I've been using Shoryuken workers so far, but without delay. I know there's a delay option I can set inside the worker, but I'm not sure how. As I said, the ideal result would be something like perform_later but with the limit not set to fifteen minutes, but also several days possible.

@phstc
Copy link
Collaborator

phstc commented Mar 19, 2015

The fifteen minutes delay is a restriction in SQS 😞 So the options in Shoryuken https://github.com/phstc/shoryuken/wiki/Sending-a-message#delaying-a-message are limited to that.

Have a look at this post, it might help you: http://www.pablocantero.com/blog/2014/11/29/sqs-to-the-rescue/#delaying-a-message-perform_in

@claudia-beas
Copy link

Thank you (for the help and for answering my questions super-awesomely fast^^)! I'll look into it!

@joekhoobyar
Copy link
Contributor

@claudia-beas - If you need to delay a message longer than 15 minutes, you could also try this: https://github.com/joekhoobyar/shoryuken-later

@claudia-beas
Copy link

Cool, thanks.

@shqear93
Copy link

@phstc how to change message group id or duplication id using shoryuken with activejob ??

@phstc
Copy link
Collaborator

phstc commented Dec 12, 2019

@shqear93 Unfortunately, the Active Job spec does not support custom parameters.

I would love if we could set adapter-specific parameters using set, for example MyJob.set(message_group_id: some_id).perform_later(something) but custom options don't seem to be supported.

Unless I'm missing something with the adapter spec, we can't use extra params.

So for using FIFO with a custom message group id and duplication id (Shoryuken has default values for those) you would need to use standard workers.
`

@phstc
Copy link
Collaborator

phstc commented Dec 12, 2019

Active Job does not pass the options to the adapters https://github.com/rails/rails/blob/efbab359290d73fc7270599f4e05a59862f8080a/activejob/lib/active_job/enqueuing.rb#L55-L57 😢

@shqear93
Copy link

shqear93 commented Dec 12, 2019

@phstc You right! I had to patch this method as well to make it works :

config/initializers/override_activejob.rb

require "active_job/arguments"

module ActiveJob
  # Provides behavior for enqueuing jobs.
  module Enqueuing
    extend ActiveSupport::Concern

    def enqueue(options = {})
      self.scheduled_at = options.delete(:wait).seconds.from_now.to_f if options[:wait]
      self.scheduled_at = options.delete(:wait_until).to_f if options[:wait_until]
      self.queue_name   = self.class.queue_name_from_part(options.delete(:queue)) if options[:queue]
      self.priority     = options.delete(:priority).to_i if options[:priority]
      run_callbacks :enqueue do
        if scheduled_at
          self.class.queue_adapter.enqueue_at self, scheduled_at
        else
          self.class.queue_adapter.enqueue self, options
        end
      end
      self
    end
  end
end

@DharmaPriya-V
Copy link

can we have shorukyen's auto_visbility_timeout in active job as well.. As i know we can't have as we don't know the receipt handle
Currently my application is fully based on active jobs...Can I use active job module like logging and exceptions in worker?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

8 participants