From 5a6f6a4c5bada73fe4aab648b4cb819ffa3fb45a Mon Sep 17 00:00:00 2001 From: Brandon Max Date: Fri, 10 Jan 2020 21:23:50 -0500 Subject: [PATCH] Fix jdbc_fetch_size usage with postgresql --- lib/logstash/plugin_mixins/jdbc/jdbc.rb | 2 ++ .../plugin_mixins/jdbc/statement_handler.rb | 18 +++++++++++------- spec/inputs/jdbc_spec.rb | 3 +-- 3 files changed, 14 insertions(+), 9 deletions(-) diff --git a/lib/logstash/plugin_mixins/jdbc/jdbc.rb b/lib/logstash/plugin_mixins/jdbc/jdbc.rb index 341cd58..c508506 100644 --- a/lib/logstash/plugin_mixins/jdbc/jdbc.rb +++ b/lib/logstash/plugin_mixins/jdbc/jdbc.rb @@ -160,6 +160,7 @@ def open_jdbc_connection require "java" require "sequel" require "sequel/adapters/jdbc" + require "sequel/adapters/jdbc/transactions" Sequel.application_timezone = @plugin_timezone.to_sym if @drivers_loaded.false? @@ -183,6 +184,7 @@ def open_jdbc_connection end @database = jdbc_connect() @database.extension(:pagination) + @database.extend(Sequel::JDBC::Transactions) if @jdbc_default_timezone @database.extension(:named_timezones) @database.timezone = @jdbc_default_timezone diff --git a/lib/logstash/plugin_mixins/jdbc/statement_handler.rb b/lib/logstash/plugin_mixins/jdbc/statement_handler.rb index 7b146a6..65e5dcd 100644 --- a/lib/logstash/plugin_mixins/jdbc/statement_handler.rb +++ b/lib/logstash/plugin_mixins/jdbc/statement_handler.rb @@ -31,16 +31,20 @@ class NormalStatementHandler < StatementHandler # @yieldparam row [Hash{Symbol=>Object}] def perform_query(db, sql_last_value, jdbc_paging_enabled, jdbc_page_size) query = build_query(db, sql_last_value) - if jdbc_paging_enabled - query.each_page(jdbc_page_size) do |paged_dataset| - paged_dataset.each do |row| + # Execute query in transaction cause PG driver require autocommit off for set fetch count + # See: https://jdbc.postgresql.org/documentation/head/query.html + db.transaction(rollback: :always) do + if jdbc_paging_enabled + query.each_page(jdbc_page_size) do |paged_dataset| + paged_dataset.each do |row| + yield row + end + end + else + query.each do |row| yield row end end - else - query.each do |row| - yield row - end end end diff --git a/spec/inputs/jdbc_spec.rb b/spec/inputs/jdbc_spec.rb index 02597e0..ea7cc67 100755 --- a/spec/inputs/jdbc_spec.rb +++ b/spec/inputs/jdbc_spec.rb @@ -970,7 +970,6 @@ { "statement" => "SELECT * FROM test_table", "jdbc_pool_timeout" => 0, - "jdbc_connection_string" => 'mock://localhost:1527/db', "sequel_opts" => { "max_connections" => 1 } @@ -1026,7 +1025,7 @@ end it "should report the statements to logging" do - expect(plugin.logger).to receive(:debug).once + expect(plugin.logger).to receive(:debug).thrice plugin.run(queue) end end