diff --git a/lib/logstash/plugin_mixins/jdbc/jdbc.rb b/lib/logstash/plugin_mixins/jdbc/jdbc.rb index 341cd58..c508506 100644 --- a/lib/logstash/plugin_mixins/jdbc/jdbc.rb +++ b/lib/logstash/plugin_mixins/jdbc/jdbc.rb @@ -160,6 +160,7 @@ def open_jdbc_connection require "java" require "sequel" require "sequel/adapters/jdbc" + require "sequel/adapters/jdbc/transactions" Sequel.application_timezone = @plugin_timezone.to_sym if @drivers_loaded.false? @@ -183,6 +184,7 @@ def open_jdbc_connection end @database = jdbc_connect() @database.extension(:pagination) + @database.extend(Sequel::JDBC::Transactions) if @jdbc_default_timezone @database.extension(:named_timezones) @database.timezone = @jdbc_default_timezone diff --git a/lib/logstash/plugin_mixins/jdbc/statement_handler.rb b/lib/logstash/plugin_mixins/jdbc/statement_handler.rb index 7b146a6..65e5dcd 100644 --- a/lib/logstash/plugin_mixins/jdbc/statement_handler.rb +++ b/lib/logstash/plugin_mixins/jdbc/statement_handler.rb @@ -31,16 +31,20 @@ class NormalStatementHandler < StatementHandler # @yieldparam row [Hash{Symbol=>Object}] def perform_query(db, sql_last_value, jdbc_paging_enabled, jdbc_page_size) query = build_query(db, sql_last_value) - if jdbc_paging_enabled - query.each_page(jdbc_page_size) do |paged_dataset| - paged_dataset.each do |row| + # Execute query in transaction cause PG driver require autocommit off for set fetch count + # See: https://jdbc.postgresql.org/documentation/head/query.html + db.transaction(rollback: :always) do + if jdbc_paging_enabled + query.each_page(jdbc_page_size) do |paged_dataset| + paged_dataset.each do |row| + yield row + end + end + else + query.each do |row| yield row end end - else - query.each do |row| - yield row - end end end