diff --git a/lib/fluent/plugin/in_kubernetes_objects.rb b/lib/fluent/plugin/in_kubernetes_objects.rb index f57a2b0..6fa46fd 100644 --- a/lib/fluent/plugin/in_kubernetes_objects.rb +++ b/lib/fluent/plugin/in_kubernetes_objects.rb @@ -200,7 +200,12 @@ def create_pull_thread(conf) tag = generate_tag resource_name while thread_current_running? log.debug "Going to pull #{resource_name}" - response = @client.public_send "get_#{resource_name}", options + begin + response = @client.public_send "get_#{resource_name}", options + rescue Kubeclient::ResourceNotFoundError, NoMethodError + log.error "resource '#{resource_name}' not found. Stopped pulling it" + break + end now = Fluent::Engine.now es = Fluent::MultiEventStream.new @@ -243,22 +248,27 @@ def create_watcher_thread(conf) thread_create :"watch_#{resource_name}" do while thread_current_running? - @client.public_send("watch_#{resource_name}", options).tap do |watcher| - tag = generate_tag "#{resource_name}" - begin - watcher.each do |entity| - begin - entity = JSON.parse(entity) - router.emit tag, Fluent::Engine.now, entity - options[:resource_version] = entity['object']['metadata']['resourceVersion'] - @storage.put resource_name, entity['object']['metadata']['resourceVersion'] - rescue => e - log.info "Got exception #{e} parsing entity #{entity}. Resetting watcher." + begin + @client.public_send("watch_#{resource_name}", options).tap do |watcher| + tag = generate_tag "#{resource_name}" + begin + watcher.each do |entity| + begin + entity = JSON.parse(entity) + router.emit tag, Fluent::Engine.now, entity + options[:resource_version] = entity['object']['metadata']['resourceVersion'] + @storage.put resource_name, entity['object']['metadata']['resourceVersion'] + rescue => e + log.info "Got exception #{e} parsing entity #{entity}. Resetting watcher." + end end + rescue => e + log.info "Got exception #{e}. Resetting watcher." end - rescue => e - log.info "Got exception #{e}. Resetting watcher." end + rescue Kubeclient::ResourceNotFoundError, NoMethodError + log.error "resource '#{resource_name}' not found. Stopped watching it" + break end end end diff --git a/test/fluent/plugin/in_kubernetes_objects_test.rb b/test/fluent/plugin/in_kubernetes_objects_test.rb index 8063d50..687e3a5 100644 --- a/test/fluent/plugin/in_kubernetes_objects_test.rb +++ b/test/fluent/plugin/in_kubernetes_objects_test.rb @@ -144,5 +144,28 @@ f.unlink end end + it "checks for invalid pull request" do + d = create_input_driver(<<~CONF) + kubernetes_url #{k8s_url} + + resource_name fakeResource + + CONF + + d.run expect_emits: 0, timeout: 3 + + expect(d.logs.any? { |log| log.include? "resource 'fakeResource' not found." }).must_equal(true) + end + it "checks for invalid watch request" do + d = create_input_driver(<<~CONF) + kubernetes_url #{k8s_url} + + resource_name fakeResource + + CONF + + d.run expect_emits: 0, timeout: 3 + expect(d.logs.any? { |log| log.include? "resource 'fakeResource' not found." }).must_equal(true) + end end end