Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow timescale to be a non-primary rails db #58

2 changes: 1 addition & 1 deletion bin/console
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ Timescaledb::Rails::Hypertable.all.each do |hypertable|

Timescaledb::Rails.const_set(
class_name,
Class.new(ActiveRecord::Base) do
Class.new(Timescaledb::Rails::ApplicationRecord) do
include Timescaledb::Rails::Model

self.table_name = [hypertable.hypertable_schema, hypertable.hypertable_name].join('.')
Expand Down
71 changes: 21 additions & 50 deletions lib/timescaledb/rails/extensions/active_record/command_recorder.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,56 +5,27 @@ module Rails
module ActiveRecord
# :nodoc:
module CommandRecorder
def create_hypertable(*args, &block)
record(:create_hypertable, args, &block)
end

def enable_hypertable_compression(*args, &block)
record(:enable_hypertable_compression, args, &block)
end

def disable_hypertable_compression(*args, &block)
record(:disable_hypertable_compression, args, &block)
end

def add_hypertable_compression_policy(*args, &block)
record(:add_hypertable_compression_policy, args, &block)
end

def remove_hypertable_compression_policy(*args, &block)
record(:remove_hypertable_compression_policy, args, &block)
end

def add_hypertable_reorder_policy(*args, &block)
record(:add_hypertable_reorder_policy, args, &block)
end

def remove_hypertable_reorder_policy(*args, &block)
record(:remove_hypertable_reorder_policy, args, &block)
end

def add_hypertable_retention_policy(*args, &block)
record(:add_hypertable_retention_policy, args, &block)
end

def remove_hypertable_retention_policy(*args, &block)
record(:remove_hypertable_retention_policy, args, &block)
end

def create_continuous_aggregate(*args, &block)
record(:create_continuous_aggregate, args, &block)
end

def drop_continuous_aggregate(*args, &block)
record(:drop_continuous_aggregate, args, &block)
end

def add_continuous_aggregate_policy(*args, &block)
record(:add_continuous_aggregate_policy, args, &block)
end

def remove_continuous_aggregate_policy(*args, &block)
record(:remove_continuous_aggregate_policy, args, &block)
%w[
create_hypertable
enable_hypertable_compression
disable_hypertable_compression
add_hypertable_compression_policy
remove_hypertable_compression_policy
add_hypertable_reorder_policy
remove_hypertable_reorder_policy
add_hypertable_retention_policy
remove_hypertable_retention_policy
create_continuous_aggregate
drop_continuous_aggregate
add_continuous_aggregate_policy
remove_continuous_aggregate_policy
].each do |method|
module_eval <<-METHOD, __FILE__, __LINE__ + 1
def #{method}(*args, &block) # def create_table(*args, &block)
record(:"#{method}", args, &block) # record(:create_table, args, &block)
end # end
METHOD
ruby2_keywords(method) if respond_to?(:ruby2_keywords)
end

def invert_create_hypertable(args, &block)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def hypertables(filename)

def continuous_aggregates(filename)
File.open(filename, 'a') do |file|
Timescaledb::Rails::ContinuousAggregate.all.each do |continuous_aggregate|
Timescaledb::Rails::ContinuousAggregate.dependency_ordered.each do |continuous_aggregate|
create_continuous_aggregate_statement(continuous_aggregate, file)
add_continuous_aggregate_policy_statement(continuous_aggregate, file)
end
Expand Down Expand Up @@ -145,7 +145,7 @@ def timescale_structure_dump_default_flags

# @return [Boolean]
def timescale_enabled?
Timescaledb::Rails::Hypertable.table_exists?
ApplicationRecord.timescale_connection?(connection) && Hypertable.table_exists?
end
end
# rubocop:enable Layout/LineLength
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ def tables(stream)
def continuous_aggregates(stream)
return unless timescale_enabled?

Timescaledb::Rails::ContinuousAggregate.all.each do |continuous_aggregate|
continuous_aggregate(continuous_aggregate, stream)
continuous_aggregate_policy(continuous_aggregate, stream)
Timescaledb::Rails::ContinuousAggregate.dependency_ordered.each do |ca|
continuous_aggregate(ca, stream)
continuous_aggregate_policy(ca, stream)
end
end

Expand Down Expand Up @@ -149,7 +149,7 @@ def format_hypertable_option_value(value)
end

def timescale_enabled?
Timescaledb::Rails::Hypertable.table_exists?
ApplicationRecord.timescale_connection?(@connection) && Hypertable.table_exists?
end
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def enable_hypertable_compression(table_name, segment_by: nil, order_by: nil)
#
# disable_hypertable_compression('events')
#
def disable_hypertable_compression(table_name, _segment_by: nil, _order_by: nil)
def disable_hypertable_compression(table_name, segment_by: nil, order_by: nil) # rubocop:disable Lint/UnusedMethodArgument
execute "ALTER TABLE #{table_name} SET (timescaledb.compress = false);"
end

Expand Down Expand Up @@ -116,15 +116,22 @@ def remove_hypertable_reorder_policy(table_name, _index_name = nil)
# 'temperature_events', "SELECT * FROM events where event_type = 'temperature'"
# )
#
def create_continuous_aggregate(view_name, view_query)
execute "CREATE MATERIALIZED VIEW #{view_name} WITH (timescaledb.continuous) AS #{view_query};"
def create_continuous_aggregate(view_name, view_query, force: false)
if force
execute "DROP MATERIALIZED VIEW #{quote_table_name(view_name)} CASCADE;" if view_exists? view_name
santiagodoldan marked this conversation as resolved.
Show resolved Hide resolved
else
schema_cache.clear_data_source_cache!(view_name.to_s)
end

execute "CREATE MATERIALIZED VIEW #{quote_table_name(view_name)} " \
"WITH (timescaledb.continuous) AS #{view_query};"
end

# Drops a continuous aggregate
#
# drop_continuous_aggregate('temperature_events')
#
def drop_continuous_aggregate(view_name, _view_query = nil)
def drop_continuous_aggregate(view_name, _view_query = nil, force: false) # rubocop:disable Lint/UnusedMethodArgument
santiagodoldan marked this conversation as resolved.
Show resolved Hide resolved
execute "DROP MATERIALIZED VIEW #{view_name};"
end

Expand Down
16 changes: 11 additions & 5 deletions lib/timescaledb/rails/model/hyperfunctions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,18 @@ module Hyperfunctions
TIME_BUCKET_ALIAS = 'time_bucket'

# @return [ActiveRecord::Relation<ActiveRecord::Base>]
def time_bucket(interval, target_column = nil)
target_column ||= hypertable_time_column_name
def time_bucket(interval, target_column = nil, select_alias: TIME_BUCKET_ALIAS)
santiagodoldan marked this conversation as resolved.
Show resolved Hide resolved
target_column &&= Arel.sql(target_column.to_s)
target_column ||= arel_table[hypertable_time_column_name]

select("time_bucket('#{format_interval_value(interval)}', #{target_column}) as #{TIME_BUCKET_ALIAS}")
.group(TIME_BUCKET_ALIAS)
.order(TIME_BUCKET_ALIAS)
time_bucket = Arel::Nodes::NamedFunction.new(
'time_bucket',
[Arel::Nodes.build_quoted(format_interval_value(interval)), target_column]
)

select(time_bucket.dup.as(select_alias))
.group(time_bucket)
santiagodoldan marked this conversation as resolved.
Show resolved Hide resolved
.order(time_bucket)
.extending(AggregateFunctions)
end

Expand Down
2 changes: 2 additions & 0 deletions lib/timescaledb/rails/models.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# frozen_string_literal: true

require_relative './models/application_record'

require_relative './models/chunk'
require_relative './models/compression_setting'
require_relative './models/continuous_aggregate'
Expand Down
24 changes: 24 additions & 0 deletions lib/timescaledb/rails/models/application_record.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# frozen_string_literal: true

module Timescaledb
module Rails
# :nodoc:
class ApplicationRecord < ::ActiveRecord::Base
self.abstract_class = true

def self.timescale_connection?(connection)
pool_name = lambda do |pool|
if pool.respond_to?(:db_config)
pool.db_config.name
elsif pool.respond_to?(:spec)
pool.spec.name
else
raise "Don't know how to get pool name from #{pool.inspect}"
end
end

pool_name[connection.pool] == pool_name[self.connection.pool]
end
end
end
end
8 changes: 4 additions & 4 deletions lib/timescaledb/rails/models/chunk.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
module Timescaledb
module Rails
# :nodoc:
class Chunk < ::ActiveRecord::Base
class Chunk < ApplicationRecord
self.table_name = 'timescaledb_information.chunks'
self.primary_key = 'hypertable_name'

Expand All @@ -17,13 +17,13 @@ def chunk_full_name
end

def compress!
::ActiveRecord::Base.connection.execute(
self.class.connection.execute(
"SELECT compress_chunk('#{chunk_full_name}')"
)
end

def decompress!
::ActiveRecord::Base.connection.execute(
self.class.connection.execute(
"SELECT decompress_chunk('#{chunk_full_name}')"
)
end
Expand All @@ -40,7 +40,7 @@ def reorder!(index = nil)
options = ["'#{chunk_full_name}'"]
options << "'#{index}'" if index.present?

::ActiveRecord::Base.connection.execute(
self.class.connection.execute(
"SELECT reorder_chunk(#{options.join(', ')})"
)
end
Expand Down
2 changes: 1 addition & 1 deletion lib/timescaledb/rails/models/compression_setting.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
module Timescaledb
module Rails
# :nodoc:
class CompressionSetting < ::ActiveRecord::Base
class CompressionSetting < ApplicationRecord
self.table_name = 'timescaledb_information.compression_settings'
self.primary_key = 'hypertable_name'

Expand Down
2 changes: 2 additions & 0 deletions lib/timescaledb/rails/models/concerns/durationable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ module Durationable

# @return [String]
def parse_duration(duration)
return if duration.nil?

duration_in_seconds = duration_in_seconds(duration)

duration_to_interval(
Expand Down
13 changes: 11 additions & 2 deletions lib/timescaledb/rails/models/continuous_aggregate.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,30 @@
module Timescaledb
module Rails
# :nodoc:
class ContinuousAggregate < ::ActiveRecord::Base
class ContinuousAggregate < ApplicationRecord
include Timescaledb::Rails::Models::Durationable

self.table_name = 'timescaledb_information.continuous_aggregates'
self.primary_key = 'materialization_hypertable_name'

has_many :jobs, foreign_key: 'hypertable_name', class_name: 'Timescaledb::Rails::Job'

def self.dependency_ordered
deps = find_each.index_by(&:materialization_hypertable_name)

TSort.tsort_each(
->(&b) { deps.each_value.sort_by(&:hypertable_name).each(&b) },
->(n, &b) { Array.wrap(deps[n.hypertable_name]).each(&b) }
)
end

# Manually refresh a continuous aggregate.
#
# @param [DateTime] start_time
# @param [DateTime] end_time
#
def refresh!(start_time = 'NULL', end_time = 'NULL')
::ActiveRecord::Base.connection.execute(
self.class.connection.execute(
"CALL refresh_continuous_aggregate('#{view_name}', #{start_time}, #{end_time});"
)
end
Expand Down
2 changes: 1 addition & 1 deletion lib/timescaledb/rails/models/dimension.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
module Timescaledb
module Rails
# :nodoc:
class Dimension < ::ActiveRecord::Base
class Dimension < ApplicationRecord
TIME_TYPE = 'Time'

self.table_name = 'timescaledb_information.dimensions'
Expand Down
2 changes: 1 addition & 1 deletion lib/timescaledb/rails/models/hypertable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
module Timescaledb
module Rails
# :nodoc:
class Hypertable < ::ActiveRecord::Base
class Hypertable < ApplicationRecord
include Timescaledb::Rails::Models::Durationable

self.table_name = 'timescaledb_information.hypertables'
Expand Down
2 changes: 1 addition & 1 deletion lib/timescaledb/rails/models/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
module Timescaledb
module Rails
# :nodoc:
class Job < ::ActiveRecord::Base
class Job < ApplicationRecord
self.table_name = 'timescaledb_information.jobs'
self.primary_key = 'hypertable_name'

Expand Down
4 changes: 2 additions & 2 deletions spec/timescaledb/rails/model/hyperfunctions_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
it 'uses the default date column' do
result = Payload.time_bucket(interval)

expect(result.to_sql).to include("SELECT time_bucket('1 day', created_at) as time_bucket")
expect(result.to_sql).to include("SELECT time_bucket('1 day', \"payloads\".\"created_at\") AS time_bucket")
end

it 'returns an active record relation' do
Expand Down Expand Up @@ -78,7 +78,7 @@
it 'uses the specified date column' do
result = Payload.time_bucket(interval, date_column)

expect(result.to_sql).to include("SELECT time_bucket('1 day', #{date_column}) as time_bucket")
expect(result.to_sql).to include("SELECT time_bucket('1 day', #{date_column}) AS time_bucket")
end

context 'when the interval is a string' do
Expand Down