diff --git a/lib/fluent/plugin/in_kubernetes_objects.rb b/lib/fluent/plugin/in_kubernetes_objects.rb
index f57a2b0..6fa46fd 100644
--- a/lib/fluent/plugin/in_kubernetes_objects.rb
+++ b/lib/fluent/plugin/in_kubernetes_objects.rb
@@ -200,7 +200,12 @@ def create_pull_thread(conf)
tag = generate_tag resource_name
while thread_current_running?
log.debug "Going to pull #{resource_name}"
- response = @client.public_send "get_#{resource_name}", options
+ begin
+ response = @client.public_send "get_#{resource_name}", options
+ rescue Kubeclient::ResourceNotFoundError, NoMethodError
+ log.error "resource '#{resource_name}' not found. Stopped pulling it"
+ break
+ end
now = Fluent::Engine.now
es = Fluent::MultiEventStream.new
@@ -243,22 +248,27 @@ def create_watcher_thread(conf)
thread_create :"watch_#{resource_name}" do
while thread_current_running?
- @client.public_send("watch_#{resource_name}", options).tap do |watcher|
- tag = generate_tag "#{resource_name}"
- begin
- watcher.each do |entity|
- begin
- entity = JSON.parse(entity)
- router.emit tag, Fluent::Engine.now, entity
- options[:resource_version] = entity['object']['metadata']['resourceVersion']
- @storage.put resource_name, entity['object']['metadata']['resourceVersion']
- rescue => e
- log.info "Got exception #{e} parsing entity #{entity}. Resetting watcher."
+ begin
+ @client.public_send("watch_#{resource_name}", options).tap do |watcher|
+ tag = generate_tag "#{resource_name}"
+ begin
+ watcher.each do |entity|
+ begin
+ entity = JSON.parse(entity)
+ router.emit tag, Fluent::Engine.now, entity
+ options[:resource_version] = entity['object']['metadata']['resourceVersion']
+ @storage.put resource_name, entity['object']['metadata']['resourceVersion']
+ rescue => e
+ log.info "Got exception #{e} parsing entity #{entity}. Resetting watcher."
+ end
end
+ rescue => e
+ log.info "Got exception #{e}. Resetting watcher."
end
- rescue => e
- log.info "Got exception #{e}. Resetting watcher."
end
+ rescue Kubeclient::ResourceNotFoundError, NoMethodError
+ log.error "resource '#{resource_name}' not found. Stopped watching it"
+ break
end
end
end
diff --git a/test/fluent/plugin/in_kubernetes_objects_test.rb b/test/fluent/plugin/in_kubernetes_objects_test.rb
index 8063d50..687e3a5 100644
--- a/test/fluent/plugin/in_kubernetes_objects_test.rb
+++ b/test/fluent/plugin/in_kubernetes_objects_test.rb
@@ -144,5 +144,28 @@
f.unlink
end
end
+ it "checks for invalid pull request" do
+ d = create_input_driver(<<~CONF)
+ kubernetes_url #{k8s_url}
+
+ resource_name fakeResource
+
+ CONF
+
+ d.run expect_emits: 0, timeout: 3
+
+ expect(d.logs.any? { |log| log.include? "resource 'fakeResource' not found." }).must_equal(true)
+ end
+ it "checks for invalid watch request" do
+ d = create_input_driver(<<~CONF)
+ kubernetes_url #{k8s_url}
+
+ resource_name fakeResource
+
+ CONF
+
+ d.run expect_emits: 0, timeout: 3
+ expect(d.logs.any? { |log| log.include? "resource 'fakeResource' not found." }).must_equal(true)
+ end
end
end