diff --git a/lib/logstash/plugin_mixins/jdbc.rb b/lib/logstash/plugin_mixins/jdbc.rb index 6ef403f..5497684 100644 --- a/lib/logstash/plugin_mixins/jdbc.rb +++ b/lib/logstash/plugin_mixins/jdbc.rb @@ -143,6 +143,7 @@ def open_jdbc_connection require "java" require "sequel" require "sequel/adapters/jdbc" + require "sequel/adapters/jdbc/transactions" load_drivers(@jdbc_driver_library.split(",")) if @jdbc_driver_library begin @@ -157,6 +158,7 @@ def open_jdbc_connection raise LogStash::ConfigurationError, "#{e}. #{message}" end @database = jdbc_connect() + @database.extend(Sequel::JDBC::Transactions) @database.extension(:pagination) if @jdbc_default_timezone @database.extension(:named_timezones) @@ -218,11 +220,23 @@ def execute_statement(statement, parameters) query = @database[statement, parameters] sql_last_value = @use_column_value ? @sql_last_value : Time.now.utc @tracking_column_warning_sent = false - @logger.debug? and @logger.debug("Executing JDBC query", :statement => statement, :parameters => parameters, :count => query.count) - - if @jdbc_paging_enabled - query.each_page(@jdbc_page_size) do |paged_dataset| - paged_dataset.each do |row| + @logger.debug? and @logger.debug("Executing JDBC query", :statement => statement, :parameters => parameters) + + # Execute query in transaction cause PG driver require autocommit off for set fetch count + # See: https://jdbc.postgresql.org/documentation/head/query.html + @database.transaction do + if @jdbc_paging_enabled + query.each_page(@jdbc_page_size) do |paged_dataset| + paged_dataset.each do |row| + sql_last_value = get_column_value(row) if @use_column_value + if @tracking_column_type=="timestamp" and @use_column_value and sql_last_value.is_a?(DateTime) + sql_last_value=Time.parse(sql_last_value.to_s) # Coerce the timestamp to a `Time` + end + yield extract_values_from(row) + end + end + else + query.each do |row| sql_last_value = get_column_value(row) if @use_column_value if @tracking_column_type=="timestamp" and @use_column_value and sql_last_value.is_a?(DateTime) sql_last_value=Time.parse(sql_last_value.to_s) # Coerce the timestamp to a `Time` @@ -230,14 +244,7 @@ def execute_statement(statement, parameters) yield extract_values_from(row) end end - else - query.each do |row| - sql_last_value = get_column_value(row) if @use_column_value - if @tracking_column_type=="timestamp" and @use_column_value and sql_last_value.is_a?(DateTime) - sql_last_value=Time.parse(sql_last_value.to_s) # Coerce the timestamp to a `Time` - end - yield extract_values_from(row) - end + raise Sequel::Rollback end success = true rescue Sequel::DatabaseConnectionError, Sequel::DatabaseError => e diff --git a/spec/inputs/jdbc_spec.rb b/spec/inputs/jdbc_spec.rb index c7784ee..508275c 100755 --- a/spec/inputs/jdbc_spec.rb +++ b/spec/inputs/jdbc_spec.rb @@ -833,7 +833,7 @@ { "statement" => "SELECT * FROM test_table", "jdbc_pool_timeout" => 0, - "jdbc_connection_string" => 'mock://localhost:1527/db', + "jdbc_connection_string" => 'jdbc:derby:memory:testdb;create=true', "sequel_opts" => { "max_connections" => 1 } @@ -889,7 +889,7 @@ end it "should report the statements to logging" do - expect(plugin.logger).to receive(:debug).once + expect(plugin.logger).to receive(:debug).thrice plugin.run(queue) end end