From e6de88e25f825b9d4f063f185ddbd397b34e478a Mon Sep 17 00:00:00 2001 From: Ilya Zhigalko Date: Fri, 7 Apr 2017 11:47:31 +0300 Subject: [PATCH 1/4] Fix lost performance on debug enabled --- lib/logstash/plugin_mixins/jdbc.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/logstash/plugin_mixins/jdbc.rb b/lib/logstash/plugin_mixins/jdbc.rb index 671aeea..93fa5c8 100644 --- a/lib/logstash/plugin_mixins/jdbc.rb +++ b/lib/logstash/plugin_mixins/jdbc.rb @@ -205,7 +205,7 @@ def execute_statement(statement, parameters) query = @database[statement, parameters] sql_last_value = @use_column_value ? @sql_last_value : Time.now.utc @tracking_column_warning_sent = false - @logger.debug? and @logger.debug("Executing JDBC query", :statement => statement, :parameters => parameters, :count => query.count) + @logger.debug? and @logger.debug("Executing JDBC query", :statement => statement, :parameters => parameters) if @jdbc_paging_enabled query.each_page(@jdbc_page_size) do |paged_dataset| From f66b2f4b8fabf33b58a8e39f5955147dffffa20e Mon Sep 17 00:00:00 2001 From: Ilya Zhigalko Date: Fri, 7 Apr 2017 13:36:32 +0300 Subject: [PATCH 2/4] Wrap statement in transaction with rollback (#103) --- lib/logstash/plugin_mixins/jdbc.rb | 27 ++++++++++++++++----------- spec/inputs/jdbc_spec.rb | 2 +- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/lib/logstash/plugin_mixins/jdbc.rb b/lib/logstash/plugin_mixins/jdbc.rb index 93fa5c8..6bb88cc 100644 --- a/lib/logstash/plugin_mixins/jdbc.rb +++ b/lib/logstash/plugin_mixins/jdbc.rb @@ -143,6 +143,7 @@ def prepare_jdbc_connection require "java" require "sequel" require "sequel/adapters/jdbc" + require "sequel/adapters/jdbc/transactions" load_drivers(@jdbc_driver_library.split(",")) if @jdbc_driver_library begin @@ -157,6 +158,7 @@ def prepare_jdbc_connection raise LogStash::ConfigurationError, "#{e}. #{message}" end @database = jdbc_connect() + @database.extend(Sequel::JDBC::Transactions) @database.extension(:pagination) if @jdbc_default_timezone @database.extension(:named_timezones) @@ -207,9 +209,19 @@ def execute_statement(statement, parameters) @tracking_column_warning_sent = false @logger.debug? and @logger.debug("Executing JDBC query", :statement => statement, :parameters => parameters) - if @jdbc_paging_enabled - query.each_page(@jdbc_page_size) do |paged_dataset| - paged_dataset.each do |row| + @database.transaction do + if @jdbc_paging_enabled + query.each_page(@jdbc_page_size) do |paged_dataset| + paged_dataset.each do |row| + sql_last_value = get_column_value(row) if @use_column_value + if @tracking_column_type=="timestamp" and @use_column_value and sql_last_value.is_a?(DateTime) + sql_last_value=Time.parse(sql_last_value.to_s) # Coerce the timestamp to a `Time` + end + yield extract_values_from(row) + end + end + else + query.each do |row| sql_last_value = get_column_value(row) if @use_column_value if @tracking_column_type=="timestamp" and @use_column_value and sql_last_value.is_a?(DateTime) sql_last_value=Time.parse(sql_last_value.to_s) # Coerce the timestamp to a `Time` @@ -217,14 +229,7 @@ def execute_statement(statement, parameters) yield extract_values_from(row) end end - else - query.each do |row| - sql_last_value = get_column_value(row) if @use_column_value - if @tracking_column_type=="timestamp" and @use_column_value and sql_last_value.is_a?(DateTime) - sql_last_value=Time.parse(sql_last_value.to_s) # Coerce the timestamp to a `Time` - end - yield extract_values_from(row) - end + raise Sequel::Rollback end success = true rescue Sequel::DatabaseConnectionError, Sequel::DatabaseError => e diff --git a/spec/inputs/jdbc_spec.rb b/spec/inputs/jdbc_spec.rb index 63da34f..87881d1 100755 --- a/spec/inputs/jdbc_spec.rb +++ b/spec/inputs/jdbc_spec.rb @@ -880,7 +880,7 @@ end it "should report the statements to logging" do - expect(plugin.logger).to receive(:debug).once + expect(plugin.logger).to receive(:debug).thrice plugin.run(queue) end end From ef38c778e4e3442fae34693228c1069f3e7531bb Mon Sep 17 00:00:00 2001 From: Ilya Zhigalko Date: Thu, 4 May 2017 14:36:29 +0300 Subject: [PATCH 3/4] Add information about transaction usage (#103) --- lib/logstash/plugin_mixins/jdbc.rb | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/logstash/plugin_mixins/jdbc.rb b/lib/logstash/plugin_mixins/jdbc.rb index 6bb88cc..14974ee 100644 --- a/lib/logstash/plugin_mixins/jdbc.rb +++ b/lib/logstash/plugin_mixins/jdbc.rb @@ -209,6 +209,8 @@ def execute_statement(statement, parameters) @tracking_column_warning_sent = false @logger.debug? and @logger.debug("Executing JDBC query", :statement => statement, :parameters => parameters) + # Execute query in transaction cause PG driver require autocommit off for set fetch count + # See: https://jdbc.postgresql.org/documentation/head/query.html @database.transaction do if @jdbc_paging_enabled query.each_page(@jdbc_page_size) do |paged_dataset| From 30bed65ad027afce7cf242af3d5ab8a0df4e1401 Mon Sep 17 00:00:00 2001 From: Ilya Zhigalko Date: Fri, 12 May 2017 14:56:39 +0300 Subject: [PATCH 4/4] Fix tests --- spec/inputs/jdbc_spec.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/inputs/jdbc_spec.rb b/spec/inputs/jdbc_spec.rb index be88cbf..508275c 100755 --- a/spec/inputs/jdbc_spec.rb +++ b/spec/inputs/jdbc_spec.rb @@ -833,7 +833,7 @@ { "statement" => "SELECT * FROM test_table", "jdbc_pool_timeout" => 0, - "jdbc_connection_string" => 'mock://localhost:1527/db', + "jdbc_connection_string" => 'jdbc:derby:memory:testdb;create=true', "sequel_opts" => { "max_connections" => 1 }