Skip to content

Commit

Permalink
feat: allow timescale to be a non-primary rails db
Browse files Browse the repository at this point in the history
* Add application record as a common superclass
* Schema dumper works on rails 6 too
* Ensure methods with kwargs are invertible
* Remove unused force param
  • Loading branch information
segiddins authored Apr 19, 2023
1 parent 48e0c4c commit 54775e7
Show file tree
Hide file tree
Showing 16 changed files with 99 additions and 78 deletions.
2 changes: 1 addition & 1 deletion bin/console
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ Timescaledb::Rails::Hypertable.all.each do |hypertable|

Timescaledb::Rails.const_set(
class_name,
Class.new(ActiveRecord::Base) do
Class.new(Timescaledb::Rails::ApplicationRecord) do
include Timescaledb::Rails::Model

self.table_name = [hypertable.hypertable_schema, hypertable.hypertable_name].join('.')
Expand Down
71 changes: 21 additions & 50 deletions lib/timescaledb/rails/extensions/active_record/command_recorder.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,56 +5,27 @@ module Rails
module ActiveRecord
# :nodoc:
module CommandRecorder
def create_hypertable(*args, &block)
record(:create_hypertable, args, &block)
end

def enable_hypertable_compression(*args, &block)
record(:enable_hypertable_compression, args, &block)
end

def disable_hypertable_compression(*args, &block)
record(:disable_hypertable_compression, args, &block)
end

def add_hypertable_compression_policy(*args, &block)
record(:add_hypertable_compression_policy, args, &block)
end

def remove_hypertable_compression_policy(*args, &block)
record(:remove_hypertable_compression_policy, args, &block)
end

def add_hypertable_reorder_policy(*args, &block)
record(:add_hypertable_reorder_policy, args, &block)
end

def remove_hypertable_reorder_policy(*args, &block)
record(:remove_hypertable_reorder_policy, args, &block)
end

def add_hypertable_retention_policy(*args, &block)
record(:add_hypertable_retention_policy, args, &block)
end

def remove_hypertable_retention_policy(*args, &block)
record(:remove_hypertable_retention_policy, args, &block)
end

def create_continuous_aggregate(*args, &block)
record(:create_continuous_aggregate, args, &block)
end

def drop_continuous_aggregate(*args, &block)
record(:drop_continuous_aggregate, args, &block)
end

def add_continuous_aggregate_policy(*args, &block)
record(:add_continuous_aggregate_policy, args, &block)
end

def remove_continuous_aggregate_policy(*args, &block)
record(:remove_continuous_aggregate_policy, args, &block)
%w[
create_hypertable
enable_hypertable_compression
disable_hypertable_compression
add_hypertable_compression_policy
remove_hypertable_compression_policy
add_hypertable_reorder_policy
remove_hypertable_reorder_policy
add_hypertable_retention_policy
remove_hypertable_retention_policy
create_continuous_aggregate
drop_continuous_aggregate
add_continuous_aggregate_policy
remove_continuous_aggregate_policy
].each do |method|
module_eval <<-METHOD, __FILE__, __LINE__ + 1
def #{method}(*args, &block) # def create_table(*args, &block)
record(:"#{method}", args, &block) # record(:create_table, args, &block)
end # end
METHOD
ruby2_keywords(method) if respond_to?(:ruby2_keywords)
end

def invert_create_hypertable(args, &block)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def hypertables(filename)

def continuous_aggregates(filename)
File.open(filename, 'a') do |file|
Timescaledb::Rails::ContinuousAggregate.all.each do |continuous_aggregate|
Timescaledb::Rails::ContinuousAggregate.dependency_ordered.each do |continuous_aggregate|
create_continuous_aggregate_statement(continuous_aggregate, file)
add_continuous_aggregate_policy_statement(continuous_aggregate, file)
end
Expand Down Expand Up @@ -145,7 +145,7 @@ def timescale_structure_dump_default_flags

# @return [Boolean]
def timescale_enabled?
Timescaledb::Rails::Hypertable.table_exists?
ApplicationRecord.timescale_connection?(connection) && Hypertable.table_exists?
end
end
# rubocop:enable Layout/LineLength
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ def tables(stream)
def continuous_aggregates(stream)
return unless timescale_enabled?

Timescaledb::Rails::ContinuousAggregate.all.each do |continuous_aggregate|
continuous_aggregate(continuous_aggregate, stream)
continuous_aggregate_policy(continuous_aggregate, stream)
Timescaledb::Rails::ContinuousAggregate.dependency_ordered.each do |ca|
continuous_aggregate(ca, stream)
continuous_aggregate_policy(ca, stream)
end
end

Expand Down Expand Up @@ -149,7 +149,7 @@ def format_hypertable_option_value(value)
end

def timescale_enabled?
Timescaledb::Rails::Hypertable.table_exists?
ApplicationRecord.timescale_connection?(@connection) && Hypertable.table_exists?
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def enable_hypertable_compression(table_name, segment_by: nil, order_by: nil)
#
# disable_hypertable_compression('events')
#
def disable_hypertable_compression(table_name, _segment_by: nil, _order_by: nil)
def disable_hypertable_compression(table_name, segment_by: nil, order_by: nil) # rubocop:disable Lint/UnusedMethodArgument
execute "ALTER TABLE #{table_name} SET (timescaledb.compress = false);"
end

Expand Down Expand Up @@ -116,15 +116,22 @@ def remove_hypertable_reorder_policy(table_name, _index_name = nil)
# 'temperature_events', "SELECT * FROM events where event_type = 'temperature'"
# )
#
def create_continuous_aggregate(view_name, view_query)
execute "CREATE MATERIALIZED VIEW #{view_name} WITH (timescaledb.continuous) AS #{view_query};"
def create_continuous_aggregate(view_name, view_query, force: false)
if force
execute "DROP MATERIALIZED VIEW #{quote_table_name(view_name)} CASCADE;" if view_exists? view_name
else
schema_cache.clear_data_source_cache!(view_name.to_s)
end

execute "CREATE MATERIALIZED VIEW #{quote_table_name(view_name)} " \
"WITH (timescaledb.continuous) AS #{view_query};"
end

# Drops a continuous aggregate
#
# drop_continuous_aggregate('temperature_events')
#
def drop_continuous_aggregate(view_name, _view_query = nil)
def drop_continuous_aggregate(view_name, _view_query = nil, force: false) # rubocop:disable Lint/UnusedMethodArgument
execute "DROP MATERIALIZED VIEW #{view_name};"
end

Expand Down
16 changes: 11 additions & 5 deletions lib/timescaledb/rails/model/hyperfunctions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,18 @@ module Hyperfunctions
TIME_BUCKET_ALIAS = 'time_bucket'

# @return [ActiveRecord::Relation<ActiveRecord::Base>]
def time_bucket(interval, target_column = nil)
target_column ||= hypertable_time_column_name
def time_bucket(interval, target_column = nil, select_alias: TIME_BUCKET_ALIAS)
target_column &&= Arel.sql(target_column.to_s)
target_column ||= arel_table[hypertable_time_column_name]

select("time_bucket('#{format_interval_value(interval)}', #{target_column}) as #{TIME_BUCKET_ALIAS}")
.group(TIME_BUCKET_ALIAS)
.order(TIME_BUCKET_ALIAS)
time_bucket = Arel::Nodes::NamedFunction.new(
'time_bucket',
[Arel::Nodes.build_quoted(format_interval_value(interval)), target_column]
)

select(time_bucket.dup.as(select_alias))
.group(time_bucket)
.order(time_bucket)
.extending(AggregateFunctions)
end

Expand Down
2 changes: 2 additions & 0 deletions lib/timescaledb/rails/models.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# frozen_string_literal: true

require_relative './models/application_record'

require_relative './models/chunk'
require_relative './models/compression_setting'
require_relative './models/continuous_aggregate'
Expand Down
24 changes: 24 additions & 0 deletions lib/timescaledb/rails/models/application_record.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# frozen_string_literal: true

module Timescaledb
module Rails
# :nodoc:
class ApplicationRecord < ::ActiveRecord::Base
self.abstract_class = true

def self.timescale_connection?(connection)
pool_name = lambda do |pool|
if pool.respond_to?(:db_config)
pool.db_config.name
elsif pool.respond_to?(:spec)
pool.spec.name
else
raise "Don't know how to get pool name from #{pool.inspect}"
end
end

pool_name[connection.pool] == pool_name[self.connection.pool]
end
end
end
end
8 changes: 4 additions & 4 deletions lib/timescaledb/rails/models/chunk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
module Timescaledb
module Rails
# :nodoc:
class Chunk < ::ActiveRecord::Base
class Chunk < ApplicationRecord
self.table_name = 'timescaledb_information.chunks'
self.primary_key = 'hypertable_name'

Expand All @@ -17,13 +17,13 @@ def chunk_full_name
end

def compress!
::ActiveRecord::Base.connection.execute(
self.class.connection.execute(
"SELECT compress_chunk('#{chunk_full_name}')"
)
end

def decompress!
::ActiveRecord::Base.connection.execute(
self.class.connection.execute(
"SELECT decompress_chunk('#{chunk_full_name}')"
)
end
Expand All @@ -40,7 +40,7 @@ def reorder!(index = nil)
options = ["'#{chunk_full_name}'"]
options << "'#{index}'" if index.present?

::ActiveRecord::Base.connection.execute(
self.class.connection.execute(
"SELECT reorder_chunk(#{options.join(', ')})"
)
end
Expand Down
2 changes: 1 addition & 1 deletion lib/timescaledb/rails/models/compression_setting.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
module Timescaledb
module Rails
# :nodoc:
class CompressionSetting < ::ActiveRecord::Base
class CompressionSetting < ApplicationRecord
self.table_name = 'timescaledb_information.compression_settings'
self.primary_key = 'hypertable_name'

Expand Down
2 changes: 2 additions & 0 deletions lib/timescaledb/rails/models/concerns/durationable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ module Durationable

# @return [String]
def parse_duration(duration)
return if duration.nil?

duration_in_seconds = duration_in_seconds(duration)

duration_to_interval(
Expand Down
13 changes: 11 additions & 2 deletions lib/timescaledb/rails/models/continuous_aggregate.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,30 @@
module Timescaledb
module Rails
# :nodoc:
class ContinuousAggregate < ::ActiveRecord::Base
class ContinuousAggregate < ApplicationRecord
include Timescaledb::Rails::Models::Durationable

self.table_name = 'timescaledb_information.continuous_aggregates'
self.primary_key = 'materialization_hypertable_name'

has_many :jobs, foreign_key: 'hypertable_name', class_name: 'Timescaledb::Rails::Job'

def self.dependency_ordered
deps = find_each.index_by(&:materialization_hypertable_name)

TSort.tsort_each(
->(&b) { deps.each_value.sort_by(&:hypertable_name).each(&b) },
->(n, &b) { Array.wrap(deps[n.hypertable_name]).each(&b) }
)
end

# Manually refresh a continuous aggregate.
#
# @param [DateTime] start_time
# @param [DateTime] end_time
#
def refresh!(start_time = 'NULL', end_time = 'NULL')
::ActiveRecord::Base.connection.execute(
self.class.connection.execute(
"CALL refresh_continuous_aggregate('#{view_name}', #{start_time}, #{end_time});"
)
end
Expand Down
2 changes: 1 addition & 1 deletion lib/timescaledb/rails/models/dimension.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
module Timescaledb
module Rails
# :nodoc:
class Dimension < ::ActiveRecord::Base
class Dimension < ApplicationRecord
TIME_TYPE = 'Time'

self.table_name = 'timescaledb_information.dimensions'
Expand Down
2 changes: 1 addition & 1 deletion lib/timescaledb/rails/models/hypertable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
module Timescaledb
module Rails
# :nodoc:
class Hypertable < ::ActiveRecord::Base
class Hypertable < ApplicationRecord
include Timescaledb::Rails::Models::Durationable

self.table_name = 'timescaledb_information.hypertables'
Expand Down
2 changes: 1 addition & 1 deletion lib/timescaledb/rails/models/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
module Timescaledb
module Rails
# :nodoc:
class Job < ::ActiveRecord::Base
class Job < ApplicationRecord
self.table_name = 'timescaledb_information.jobs'
self.primary_key = 'hypertable_name'

Expand Down
4 changes: 2 additions & 2 deletions spec/timescaledb/rails/model/hyperfunctions_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
it 'uses the default date column' do
result = Payload.time_bucket(interval)

expect(result.to_sql).to include("SELECT time_bucket('1 day', created_at) as time_bucket")
expect(result.to_sql).to include("SELECT time_bucket('1 day', \"payloads\".\"created_at\") AS time_bucket")
end

it 'returns an active record relation' do
Expand Down Expand Up @@ -78,7 +78,7 @@
it 'uses the specified date column' do
result = Payload.time_bucket(interval, date_column)

expect(result.to_sql).to include("SELECT time_bucket('1 day', #{date_column}) as time_bucket")
expect(result.to_sql).to include("SELECT time_bucket('1 day', #{date_column}) AS time_bucket")
end

context 'when the interval is a string' do
Expand Down

0 comments on commit 54775e7

Please sign in to comment.