Skip to content

Commit

Permalink
Fix jdbc_fetch_size usage with postgresql
Browse files Browse the repository at this point in the history
  • Loading branch information
bmax committed Feb 9, 2020
1 parent ae2523b commit 5a6f6a4
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 9 deletions.
2 changes: 2 additions & 0 deletions lib/logstash/plugin_mixins/jdbc/jdbc.rb
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ def open_jdbc_connection
require "java"
require "sequel"
require "sequel/adapters/jdbc"
require "sequel/adapters/jdbc/transactions"

Sequel.application_timezone = @plugin_timezone.to_sym
if @drivers_loaded.false?
Expand All @@ -183,6 +184,7 @@ def open_jdbc_connection
end
@database = jdbc_connect()
@database.extension(:pagination)
@database.extend(Sequel::JDBC::Transactions)
if @jdbc_default_timezone
@database.extension(:named_timezones)
@database.timezone = @jdbc_default_timezone
Expand Down
18 changes: 11 additions & 7 deletions lib/logstash/plugin_mixins/jdbc/statement_handler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,20 @@ class NormalStatementHandler < StatementHandler
# @yieldparam row [Hash{Symbol=>Object}]
def perform_query(db, sql_last_value, jdbc_paging_enabled, jdbc_page_size)
query = build_query(db, sql_last_value)
if jdbc_paging_enabled
query.each_page(jdbc_page_size) do |paged_dataset|
paged_dataset.each do |row|
# Execute query in transaction cause PG driver require autocommit off for set fetch count
# See: https://jdbc.postgresql.org/documentation/head/query.html
db.transaction(rollback: :always) do
if jdbc_paging_enabled
query.each_page(jdbc_page_size) do |paged_dataset|
paged_dataset.each do |row|
yield row
end
end
else
query.each do |row|
yield row
end
end
else
query.each do |row|
yield row
end
end
end

Expand Down
3 changes: 1 addition & 2 deletions spec/inputs/jdbc_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -970,7 +970,6 @@
{
"statement" => "SELECT * FROM test_table",
"jdbc_pool_timeout" => 0,
"jdbc_connection_string" => 'mock://localhost:1527/db',
"sequel_opts" => {
"max_connections" => 1
}
Expand Down Expand Up @@ -1026,7 +1025,7 @@
end

it "should report the statements to logging" do
expect(plugin.logger).to receive(:debug).once
expect(plugin.logger).to receive(:debug).thrice
plugin.run(queue)
end
end
Expand Down

0 comments on commit 5a6f6a4

Please sign in to comment.