Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow dynamic adding queues #322

Merged
merged 11 commits into from
Mar 6, 2017
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ rvm:
- 2.0.0
- 2.1.0
- 2.2.0
- 2.3.3

notifications:
email:
Expand Down
20 changes: 10 additions & 10 deletions lib/shoryuken.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,23 @@ module Shoryuken
polling_strategy: Polling::WeightedRoundRobin
}.freeze

@@queues = []
@@worker_registry = DefaultWorkerRegistry.new
@@queues = []
@@worker_registry = DefaultWorkerRegistry.new
@@active_job_queue_name_prefixing = false
@@sqs_client = nil
@@sqs_client = nil
@@sqs_client_receive_message_opts = {}
@@start_callback = nil
@@stop_callback = nil
@@start_callback = nil
@@stop_callback = nil

class << self
def queues
@@queues
end

def add_queue(queue, priority = 1)
priority.times { queues << queue }
end

def worker_registry
@@worker_registry
end
Expand Down Expand Up @@ -87,7 +91,7 @@ def active_job_queue_name_prefixing=(active_job_queue_name_prefixing)
end

def sqs_client
@@sqs_client
@@sqs_client ||= Aws::SQS::Client.new
end

def sqs_client=(sqs_client)
Expand Down Expand Up @@ -157,10 +161,6 @@ def on_stop(&block)
@@stop_callback = block
end

def sqs_client
@@sqs_client ||= Aws::SQS::Client.new
end

# Register a block to run at a point in the Shoryuken lifecycle.
# :startup, :quiet or :shutdown are valid events.
#
Expand Down
12 changes: 6 additions & 6 deletions lib/shoryuken/environment_loader.rb
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def prefix_active_job_queue_names
end

def parse_queue(queue, weight = nil)
[weight.to_i, 1].max.times { Shoryuken.queues << queue }
Shoryuken.add_queue(queue, [weight.to_i, 1])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You have not take the max of [weight.to_i, 1] but is it OK?
I think that add_queue takes weight which is defined numeric.

Copy link
Collaborator Author

@phstc phstc Mar 3, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@h3poteto nice! thanks for reviewing it. I meant [weight.to_i, 1].max.

This code needs a test rspec. Not sure how tests are passing, even the integration one.

end

def parse_queues
Expand Down Expand Up @@ -145,7 +145,7 @@ def validate_queues

Shoryuken.queues.uniq.each do |queue|
begin
Shoryuken::Client.queues queue
Shoryuken::Client.queues(queue)
rescue Aws::SQS::Errors::NonExistentQueue
non_existent_queues << queue
end
Expand All @@ -155,13 +155,13 @@ def validate_queues
end

def validate_workers
return if defined?(::ActiveJob)

all_queues = Shoryuken.queues
queues_with_workers = Shoryuken.worker_registry.queues

unless defined?(::ActiveJob)
(all_queues - queues_with_workers).each do |queue|
Shoryuken.logger.warn { "No worker supplied for '#{queue}'" }
end
(all_queues - queues_with_workers).each do |queue|
Shoryuken.logger.warn { "No worker supplied for '#{queue}'" }
end
end
end
Expand Down
4 changes: 1 addition & 3 deletions lib/shoryuken/queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,7 @@ def send_messages(options)
end

def receive_messages(options)
client.receive_message(options.merge(queue_url: url)).
messages.
map { |m| Message.new(client, self, m) }
client.receive_message(options.merge(queue_url: url)).messages.map { |m| Message.new(client, self, m) }
end

def fifo?
Expand Down
5 changes: 4 additions & 1 deletion spec/integration/launcher_spec.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
require 'spec_helper'
require 'shoryuken/manager'
require 'shoryuken/launcher'
require 'securerandom'

RSpec.describe Shoryuken::Launcher do
describe 'Consuming messages', slow: :true do
Expand All @@ -19,7 +20,9 @@
end

after do
queue_url = Shoryuken::Client.sqs.get_queue_url(queue_name: StandardWorker.get_shoryuken_options['queue']).queue_url
queue_url = Shoryuken::Client.sqs.get_queue_url(
queue_name: StandardWorker.get_shoryuken_options['queue']
).queue_url

Shoryuken::Client.sqs.delete_queue queue_url: queue_url
end
Expand Down
14 changes: 13 additions & 1 deletion spec/shoryuken_spec.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,18 @@
require 'spec_helper'

describe Shoryuken do
RSpec.describe Shoryuken do
describe '.add_queue' do
after { Shoryuken.queues.clear }

it 'adds queues' do
described_class.add_queue('default')
expect(described_class.queues).to eq(['default'])

described_class.add_queue('high', 2)
expect(described_class.queues).to eq(%w(default high high))
end
end

describe '.register_worker' do
it 'registers a worker' do
described_class.worker_registry.clear
Expand Down