diff --git a/bin/console b/bin/console index 745bd72..327d952 100755 --- a/bin/console +++ b/bin/console @@ -22,7 +22,7 @@ Timescaledb::Rails::Hypertable.all.each do |hypertable| Timescaledb::Rails.const_set( class_name, - Class.new(ActiveRecord::Base) do + Class.new(Timescaledb::Rails::ApplicationRecord) do include Timescaledb::Rails::Model self.table_name = [hypertable.hypertable_schema, hypertable.hypertable_name].join('.') diff --git a/lib/timescaledb/rails/extensions/active_record/command_recorder.rb b/lib/timescaledb/rails/extensions/active_record/command_recorder.rb index 207acc7..a77b93e 100644 --- a/lib/timescaledb/rails/extensions/active_record/command_recorder.rb +++ b/lib/timescaledb/rails/extensions/active_record/command_recorder.rb @@ -5,56 +5,27 @@ module Rails module ActiveRecord # :nodoc: module CommandRecorder - def create_hypertable(*args, &block) - record(:create_hypertable, args, &block) - end - - def enable_hypertable_compression(*args, &block) - record(:enable_hypertable_compression, args, &block) - end - - def disable_hypertable_compression(*args, &block) - record(:disable_hypertable_compression, args, &block) - end - - def add_hypertable_compression_policy(*args, &block) - record(:add_hypertable_compression_policy, args, &block) - end - - def remove_hypertable_compression_policy(*args, &block) - record(:remove_hypertable_compression_policy, args, &block) - end - - def add_hypertable_reorder_policy(*args, &block) - record(:add_hypertable_reorder_policy, args, &block) - end - - def remove_hypertable_reorder_policy(*args, &block) - record(:remove_hypertable_reorder_policy, args, &block) - end - - def add_hypertable_retention_policy(*args, &block) - record(:add_hypertable_retention_policy, args, &block) - end - - def remove_hypertable_retention_policy(*args, &block) - record(:remove_hypertable_retention_policy, args, &block) - end - - def create_continuous_aggregate(*args, &block) - record(:create_continuous_aggregate, args, &block) - end - - def drop_continuous_aggregate(*args, &block) - record(:drop_continuous_aggregate, args, &block) - end - - def add_continuous_aggregate_policy(*args, &block) - record(:add_continuous_aggregate_policy, args, &block) - end - - def remove_continuous_aggregate_policy(*args, &block) - record(:remove_continuous_aggregate_policy, args, &block) + %w[ + create_hypertable + enable_hypertable_compression + disable_hypertable_compression + add_hypertable_compression_policy + remove_hypertable_compression_policy + add_hypertable_reorder_policy + remove_hypertable_reorder_policy + add_hypertable_retention_policy + remove_hypertable_retention_policy + create_continuous_aggregate + drop_continuous_aggregate + add_continuous_aggregate_policy + remove_continuous_aggregate_policy + ].each do |method| + module_eval <<-METHOD, __FILE__, __LINE__ + 1 + def #{method}(*args, &block) # def create_table(*args, &block) + record(:"#{method}", args, &block) # record(:create_table, args, &block) + end # end + METHOD + ruby2_keywords(method) if respond_to?(:ruby2_keywords) end def invert_create_hypertable(args, &block) diff --git a/lib/timescaledb/rails/extensions/active_record/postgresql_database_tasks.rb b/lib/timescaledb/rails/extensions/active_record/postgresql_database_tasks.rb index 2107b84..6aa9ccb 100644 --- a/lib/timescaledb/rails/extensions/active_record/postgresql_database_tasks.rb +++ b/lib/timescaledb/rails/extensions/active_record/postgresql_database_tasks.rb @@ -37,7 +37,7 @@ def hypertables(filename) def continuous_aggregates(filename) File.open(filename, 'a') do |file| - Timescaledb::Rails::ContinuousAggregate.all.each do |continuous_aggregate| + Timescaledb::Rails::ContinuousAggregate.dependency_ordered.each do |continuous_aggregate| create_continuous_aggregate_statement(continuous_aggregate, file) add_continuous_aggregate_policy_statement(continuous_aggregate, file) end @@ -145,7 +145,7 @@ def timescale_structure_dump_default_flags # @return [Boolean] def timescale_enabled? - Timescaledb::Rails::Hypertable.table_exists? + ApplicationRecord.timescale_connection?(connection) && Hypertable.table_exists? end end # rubocop:enable Layout/LineLength diff --git a/lib/timescaledb/rails/extensions/active_record/schema_dumper.rb b/lib/timescaledb/rails/extensions/active_record/schema_dumper.rb index 4384af7..c942a6e 100644 --- a/lib/timescaledb/rails/extensions/active_record/schema_dumper.rb +++ b/lib/timescaledb/rails/extensions/active_record/schema_dumper.rb @@ -19,9 +19,9 @@ def tables(stream) def continuous_aggregates(stream) return unless timescale_enabled? - Timescaledb::Rails::ContinuousAggregate.all.each do |continuous_aggregate| - continuous_aggregate(continuous_aggregate, stream) - continuous_aggregate_policy(continuous_aggregate, stream) + Timescaledb::Rails::ContinuousAggregate.dependency_ordered.each do |ca| + continuous_aggregate(ca, stream) + continuous_aggregate_policy(ca, stream) end end @@ -149,7 +149,7 @@ def format_hypertable_option_value(value) end def timescale_enabled? - Timescaledb::Rails::Hypertable.table_exists? + ApplicationRecord.timescale_connection?(@connection) && Hypertable.table_exists? end end end diff --git a/lib/timescaledb/rails/extensions/active_record/schema_statements.rb b/lib/timescaledb/rails/extensions/active_record/schema_statements.rb index ff8c1ab..4b79296 100644 --- a/lib/timescaledb/rails/extensions/active_record/schema_statements.rb +++ b/lib/timescaledb/rails/extensions/active_record/schema_statements.rb @@ -56,7 +56,7 @@ def enable_hypertable_compression(table_name, segment_by: nil, order_by: nil) # # disable_hypertable_compression('events') # - def disable_hypertable_compression(table_name, _segment_by: nil, _order_by: nil) + def disable_hypertable_compression(table_name, segment_by: nil, order_by: nil) # rubocop:disable Lint/UnusedMethodArgument execute "ALTER TABLE #{table_name} SET (timescaledb.compress = false);" end @@ -116,15 +116,22 @@ def remove_hypertable_reorder_policy(table_name, _index_name = nil) # 'temperature_events', "SELECT * FROM events where event_type = 'temperature'" # ) # - def create_continuous_aggregate(view_name, view_query) - execute "CREATE MATERIALIZED VIEW #{view_name} WITH (timescaledb.continuous) AS #{view_query};" + def create_continuous_aggregate(view_name, view_query, force: false) + if force + execute "DROP MATERIALIZED VIEW #{quote_table_name(view_name)} CASCADE;" if view_exists? view_name + else + schema_cache.clear_data_source_cache!(view_name.to_s) + end + + execute "CREATE MATERIALIZED VIEW #{quote_table_name(view_name)} " \ + "WITH (timescaledb.continuous) AS #{view_query};" end # Drops a continuous aggregate # # drop_continuous_aggregate('temperature_events') # - def drop_continuous_aggregate(view_name, _view_query = nil) + def drop_continuous_aggregate(view_name, _view_query = nil, force: false) # rubocop:disable Lint/UnusedMethodArgument execute "DROP MATERIALIZED VIEW #{view_name};" end diff --git a/lib/timescaledb/rails/model/hyperfunctions.rb b/lib/timescaledb/rails/model/hyperfunctions.rb index 39c156a..cf8a0e2 100644 --- a/lib/timescaledb/rails/model/hyperfunctions.rb +++ b/lib/timescaledb/rails/model/hyperfunctions.rb @@ -10,12 +10,18 @@ module Hyperfunctions TIME_BUCKET_ALIAS = 'time_bucket' # @return [ActiveRecord::Relation] - def time_bucket(interval, target_column = nil) - target_column ||= hypertable_time_column_name + def time_bucket(interval, target_column = nil, select_alias: TIME_BUCKET_ALIAS) + target_column &&= Arel.sql(target_column.to_s) + target_column ||= arel_table[hypertable_time_column_name] - select("time_bucket('#{format_interval_value(interval)}', #{target_column}) as #{TIME_BUCKET_ALIAS}") - .group(TIME_BUCKET_ALIAS) - .order(TIME_BUCKET_ALIAS) + time_bucket = Arel::Nodes::NamedFunction.new( + 'time_bucket', + [Arel::Nodes.build_quoted(format_interval_value(interval)), target_column] + ) + + select(time_bucket.dup.as(select_alias)) + .group(time_bucket) + .order(time_bucket) .extending(AggregateFunctions) end diff --git a/lib/timescaledb/rails/models.rb b/lib/timescaledb/rails/models.rb index 6384a1c..18567c7 100644 --- a/lib/timescaledb/rails/models.rb +++ b/lib/timescaledb/rails/models.rb @@ -1,5 +1,7 @@ # frozen_string_literal: true +require_relative './models/application_record' + require_relative './models/chunk' require_relative './models/compression_setting' require_relative './models/continuous_aggregate' diff --git a/lib/timescaledb/rails/models/application_record.rb b/lib/timescaledb/rails/models/application_record.rb new file mode 100644 index 0000000..55e48c8 --- /dev/null +++ b/lib/timescaledb/rails/models/application_record.rb @@ -0,0 +1,24 @@ +# frozen_string_literal: true + +module Timescaledb + module Rails + # :nodoc: + class ApplicationRecord < ::ActiveRecord::Base + self.abstract_class = true + + def self.timescale_connection?(connection) + pool_name = lambda do |pool| + if pool.respond_to?(:db_config) + pool.db_config.name + elsif pool.respond_to?(:spec) + pool.spec.name + else + raise "Don't know how to get pool name from #{pool.inspect}" + end + end + + pool_name[connection.pool] == pool_name[self.connection.pool] + end + end + end +end diff --git a/lib/timescaledb/rails/models/chunk.rb b/lib/timescaledb/rails/models/chunk.rb index 7ba2cb4..56beb74 100644 --- a/lib/timescaledb/rails/models/chunk.rb +++ b/lib/timescaledb/rails/models/chunk.rb @@ -3,7 +3,7 @@ module Timescaledb module Rails # :nodoc: - class Chunk < ::ActiveRecord::Base + class Chunk < ApplicationRecord self.table_name = 'timescaledb_information.chunks' self.primary_key = 'hypertable_name' @@ -17,13 +17,13 @@ def chunk_full_name end def compress! - ::ActiveRecord::Base.connection.execute( + self.class.connection.execute( "SELECT compress_chunk('#{chunk_full_name}')" ) end def decompress! - ::ActiveRecord::Base.connection.execute( + self.class.connection.execute( "SELECT decompress_chunk('#{chunk_full_name}')" ) end @@ -40,7 +40,7 @@ def reorder!(index = nil) options = ["'#{chunk_full_name}'"] options << "'#{index}'" if index.present? - ::ActiveRecord::Base.connection.execute( + self.class.connection.execute( "SELECT reorder_chunk(#{options.join(', ')})" ) end diff --git a/lib/timescaledb/rails/models/compression_setting.rb b/lib/timescaledb/rails/models/compression_setting.rb index 83ef630..b5acae1 100644 --- a/lib/timescaledb/rails/models/compression_setting.rb +++ b/lib/timescaledb/rails/models/compression_setting.rb @@ -3,7 +3,7 @@ module Timescaledb module Rails # :nodoc: - class CompressionSetting < ::ActiveRecord::Base + class CompressionSetting < ApplicationRecord self.table_name = 'timescaledb_information.compression_settings' self.primary_key = 'hypertable_name' diff --git a/lib/timescaledb/rails/models/concerns/durationable.rb b/lib/timescaledb/rails/models/concerns/durationable.rb index 266fa81..735d208 100644 --- a/lib/timescaledb/rails/models/concerns/durationable.rb +++ b/lib/timescaledb/rails/models/concerns/durationable.rb @@ -11,6 +11,8 @@ module Durationable # @return [String] def parse_duration(duration) + return if duration.nil? + duration_in_seconds = duration_in_seconds(duration) duration_to_interval( diff --git a/lib/timescaledb/rails/models/continuous_aggregate.rb b/lib/timescaledb/rails/models/continuous_aggregate.rb index b4bd79a..c883ce5 100644 --- a/lib/timescaledb/rails/models/continuous_aggregate.rb +++ b/lib/timescaledb/rails/models/continuous_aggregate.rb @@ -5,7 +5,7 @@ module Timescaledb module Rails # :nodoc: - class ContinuousAggregate < ::ActiveRecord::Base + class ContinuousAggregate < ApplicationRecord include Timescaledb::Rails::Models::Durationable self.table_name = 'timescaledb_information.continuous_aggregates' @@ -13,13 +13,22 @@ class ContinuousAggregate < ::ActiveRecord::Base has_many :jobs, foreign_key: 'hypertable_name', class_name: 'Timescaledb::Rails::Job' + def self.dependency_ordered + deps = find_each.index_by(&:materialization_hypertable_name) + + TSort.tsort_each( + ->(&b) { deps.each_value.sort_by(&:hypertable_name).each(&b) }, + ->(n, &b) { Array.wrap(deps[n.hypertable_name]).each(&b) } + ) + end + # Manually refresh a continuous aggregate. # # @param [DateTime] start_time # @param [DateTime] end_time # def refresh!(start_time = 'NULL', end_time = 'NULL') - ::ActiveRecord::Base.connection.execute( + self.class.connection.execute( "CALL refresh_continuous_aggregate('#{view_name}', #{start_time}, #{end_time});" ) end diff --git a/lib/timescaledb/rails/models/dimension.rb b/lib/timescaledb/rails/models/dimension.rb index d186d01..a08dfc9 100644 --- a/lib/timescaledb/rails/models/dimension.rb +++ b/lib/timescaledb/rails/models/dimension.rb @@ -3,7 +3,7 @@ module Timescaledb module Rails # :nodoc: - class Dimension < ::ActiveRecord::Base + class Dimension < ApplicationRecord TIME_TYPE = 'Time' self.table_name = 'timescaledb_information.dimensions' diff --git a/lib/timescaledb/rails/models/hypertable.rb b/lib/timescaledb/rails/models/hypertable.rb index bcaec84..f72ce3f 100644 --- a/lib/timescaledb/rails/models/hypertable.rb +++ b/lib/timescaledb/rails/models/hypertable.rb @@ -5,7 +5,7 @@ module Timescaledb module Rails # :nodoc: - class Hypertable < ::ActiveRecord::Base + class Hypertable < ApplicationRecord include Timescaledb::Rails::Models::Durationable self.table_name = 'timescaledb_information.hypertables' diff --git a/lib/timescaledb/rails/models/job.rb b/lib/timescaledb/rails/models/job.rb index ff99302..2915cc7 100644 --- a/lib/timescaledb/rails/models/job.rb +++ b/lib/timescaledb/rails/models/job.rb @@ -3,7 +3,7 @@ module Timescaledb module Rails # :nodoc: - class Job < ::ActiveRecord::Base + class Job < ApplicationRecord self.table_name = 'timescaledb_information.jobs' self.primary_key = 'hypertable_name' diff --git a/spec/timescaledb/rails/model/hyperfunctions_spec.rb b/spec/timescaledb/rails/model/hyperfunctions_spec.rb index 917473d..c722a31 100644 --- a/spec/timescaledb/rails/model/hyperfunctions_spec.rb +++ b/spec/timescaledb/rails/model/hyperfunctions_spec.rb @@ -40,7 +40,7 @@ it 'uses the default date column' do result = Payload.time_bucket(interval) - expect(result.to_sql).to include("SELECT time_bucket('1 day', created_at) as time_bucket") + expect(result.to_sql).to include("SELECT time_bucket('1 day', \"payloads\".\"created_at\") AS time_bucket") end it 'returns an active record relation' do @@ -78,7 +78,7 @@ it 'uses the specified date column' do result = Payload.time_bucket(interval, date_column) - expect(result.to_sql).to include("SELECT time_bucket('1 day', #{date_column}) as time_bucket") + expect(result.to_sql).to include("SELECT time_bucket('1 day', #{date_column}) AS time_bucket") end context 'when the interval is a string' do