Skip to content

Commit

Permalink
Feature: Read replica support (#476)
Browse files Browse the repository at this point in the history
* added variables for tracking time since last write

* added variables to store reader and writer db connections

* switched from time.utc to time.monotonic for tracking time since last write

* Added methods to switch between connections.
Granite::Connections now accept connections with writer/reader. See spec/spec_helper spec/adapters_spec
Added env var to store url to sqlite replica db url

* added instance variant of switch_to_writer_adapter for use with callback. Simplified callbacks

* Fixed typo in def switch_to_writer_adapter
automatically switch to the writer adapter before saving to database.

* added logic to automatically switch to reader adapter

* functions in querying now dynamically change adapter based on need

* ensure all methods in query builder that need primary database switch connection

* added better error message for invalid connections

* groundwork for replica testing

* fixed error where mysql tests don't run

* Granite::Base.adapter class method now actually fetches current adapter if available instead of using first connection in Granite::Connections

* fixed typo. Invalid adapter_type for pg_with_replica

* fixed True to true. Added table name to ReplicatedChat.

* update specs

* spec updates

* Update .gitignore

* moved connection management logic to seperate module. Fixed bug where there was no switch to reader adapter. Added test for connection switching

* fixed error where reader connection switch ignored specified wait period. Fixed invalid test.

* moved default value  for connection switch wait period to granite::connections to allow changing it globally

* moved connection macro to connection management module

* cleaned up code for fetching first connection

* finalized syntax for adding new connections to granite::connections

* optimization: when reader & writer database are the same do not duplicate connection pool

---------

Co-authored-by: Holden Omans <[email protected]>
Co-authored-by: Seth T <[email protected]>
  • Loading branch information
3 people authored Jun 12, 2023
1 parent 5afa787 commit 8b9b55b
Show file tree
Hide file tree
Showing 15 changed files with 221 additions and 53 deletions.
3 changes: 3 additions & 0 deletions .env
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
PG_DATABASE_URL=postgres://granite:password@localhost:5432/granite_db
PG_REPLICA_URL=postgres://granite:password@localhost:5432/granite_db
MYSQL_DATABASE_URL=mysql://granite:password@localhost:3306/granite_db
MYSQL_REPLICA_URL=mysql://granite:password@localhost:3306/granite_db
SQLITE_DATABASE_URL=sqlite3:./granite.db
SQLITE_REPLICA_URL=sqlite3:./granite_replica.db
CURRENT_ADAPTER=pg
PG_VERSION=15.2
MYSQL_VERSION=5.7
Expand Down
14 changes: 6 additions & 8 deletions .github/workflows/spec.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,14 @@ jobs:
with:
crystal: ${{ matrix.crystal }}
- name: Install shards
run: shards update --ignore-crystal-version
run: shards update --ignore-crystal-version --skip-postinstall --skip-executables
- name: Run tests
timeout-minutes: 2
run: crystal spec
env:
CURRENT_ADAPTER: sqlite
SQLITE_DATABASE_URL: sqlite3:./granite.db
MYSQL_DATABASE_URL: mysql://granite:password@localhost:3306/granite_db
PG_DATABASE_URL: postgres://granite:password@localhost:5432/granite_db
SQLITE_REPLICA_URL: sqlite3:./granite_replica.db
mysql-spec:
runs-on: ubuntu-latest
strategy:
Expand Down Expand Up @@ -71,15 +70,15 @@ jobs:
with:
crystal: ${{ matrix.crystal }}
- name: Install shards
run: shards update --ignore-crystal-version
run: shards update --ignore-crystal-version --skip-postinstall --skip-executables
- name: Run tests
timeout-minutes: 2
run: crystal spec
env:
CURRENT_ADAPTER: mysql
SQLITE_DATABASE_URL: sqlite3:./granite.db
MYSQL_DATABASE_URL: mysql://granite:password@localhost:3306/granite_db
PG_DATABASE_URL: postgres://granite:password@localhost:5432/granite_db
MYSQL_REPLICA_URL: mysql://granite:password@localhost:3306/granite_db
psql-spec:
runs-on: ubuntu-latest
strategy:
Expand Down Expand Up @@ -110,12 +109,11 @@ jobs:
with:
crystal: ${{ matrix.crystal }}
- name: Install shards
run: shards update --ignore-crystal-version
run: shards update --ignore-crystal-version --skip-postinstall --skip-executables
- name: Run tests
timeout-minutes: 2
run: crystal spec
env:
CURRENT_ADAPTER: pg
SQLITE_DATABASE_URL: sqlite3:./granite.db
MYSQL_DATABASE_URL: mysql://granite:password@localhost:3306/granite_db
PG_DATABASE_URL: postgres://granite:password@localhost:5432/granite_db
PG_REPLICA_URL: postgres://granite:password@localhost:5432/granite_db
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ shard.lock

# Ignore bin because they will be build with shards install
bin
.env
11 changes: 11 additions & 0 deletions export.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
export PG_DATABASE_URL=postgres://granite:password@localhost:5432/granite_db
export PG_REPLICA_URL=postgres://granite:password@localhost:5432/granite__replica_db
export MYSQL_DATABASE_URL=mysql://granite:password@localhost:3306/granite_db
export MYSQL_REPLICA_URL=mysql://granite:password@localhost:3306/granite_replica_db
export SQLITE_DATABASE_URL=sqlite3:./granite.db
export SQLITE_REPLICA_URL=sqlite3:./granite_replica.db
export PG_VERSION=15.2
export MYSQL_VERSION=5.7
export SQLITE_VERSION=3110000
export SQLITE_VERSION_YEAR=2016
export CURRENT_ADAPTER=sqlite
40 changes: 20 additions & 20 deletions spec/adapter/adapters_spec.cr
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
require "../spec_helper"

class Foo < Granite::Base
connection sqlite

connection {{env("CURRENT_ADAPTER").id}}
column id : Int64, primary: true
end

Expand All @@ -13,41 +12,42 @@ end
describe Granite::Connections do
describe "registration" do
it "should allow connections to be be saved and looked up" do
Granite::Connections.registered_connections.size.should eq 3
Granite::Connections.registered_connections.size.should eq 2

if connection = Granite::Connections["mysql"]
connection.url.should eq ENV["MYSQL_DATABASE_URL"]
else
connection.should_not be_falsey
end
if connection = Granite::Connections["pg"]
connection.url.should eq ENV["PG_DATABASE_URL"]
if connection = Granite::Connections[CURRENT_ADAPTER]
connection[:writer].url.should eq ADAPTER_URL
else
connection.should_not be_falsey
end
if connection = Granite::Connections["sqlite"]
connection.url.should eq ENV["SQLITE_DATABASE_URL"]
else
connection.should_not be_falsey

case ENV["CURRENT_ADAPTER"]?
when "sqlite"
if connection = Granite::Connections["sqlite_with_replica"]
connection[:writer].url.should eq ENV["SQLITE_DATABASE_URL"]?
connection[:reader].url.should eq ADAPTER_REPLICA_URL
else
connection.should_not be_falsey
end
end
end

it "should disallow multiple connections with the same name" do
expect_raises(Exception, "Adapter with name 'mysql' has already been registered.") do
Granite::Connections << Granite::Adapter::Pg.new(name: "mysql", url: ENV["PG_DATABASE_URL"])
Granite::Connections << Granite::Adapter::Pg.new(name: "mysql2", url: "mysql://localhost:3306/test")
expect_raises(Exception, "Adapter with name 'mysql2' has already been registered.") do
Granite::Connections << Granite::Adapter::Pg.new(name: "mysql2", url: "mysql://localhost:3306/test")
end
end

it "should assign the correct connections to a model" do
adapter = Foo.adapter
adapter.name.should eq "sqlite"
adapter.url.should eq ENV["SQLITE_DATABASE_URL"]
adapter.name.should eq CURRENT_ADAPTER
adapter.url.should eq ADAPTER_URL
end

it "should use the first registered connection if none are specified" do
adapter = Bar.adapter
adapter.name.should eq "mysql"
adapter.url.should eq ENV["MYSQL_DATABASE_URL"]
adapter.name.should eq CURRENT_ADAPTER
adapter.url.should eq ADAPTER_URL
end
end
end
12 changes: 12 additions & 0 deletions spec/granite/connection_management_spec.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
require "spec"

describe "Granite::Base track time since last write" do
it "should switch to reader db connection after connection_switch_wait_period after write operation" do
ReplicatedChat.connection_switch_wait_period = 250
ReplicatedChat.new(content: "hello world!").save!
sleep 500.milliseconds
current_url = ReplicatedChat.adapter.url
reader_url = Granite::Connections[ENV["CURRENT_ADAPTER"] + "_with_replica"].not_nil![:reader].url
current_url.should eq reader_url
end
end
2 changes: 1 addition & 1 deletion spec/run_test_dbs.sh
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ docker run --name mysql -d \
-e MYSQL_USER=granite \
-e MYSQL_PASSWORD=password \
-p 3306:3306 \
mysql:%{MYSQL_VERSION}
mysql:${MYSQL_VERSION}

docker run --name psql -d \
-e POSTGRES_USER=granite \
Expand Down
24 changes: 20 additions & 4 deletions spec/spec_helper.cr
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,25 @@ require "mysql"
require "pg"
require "sqlite3"

Granite::Connections << Granite::Adapter::Mysql.new(name: "mysql", url: ENV["MYSQL_DATABASE_URL"])
Granite::Connections << Granite::Adapter::Pg.new(name: "pg", url: ENV["PG_DATABASE_URL"])
Granite::Connections << Granite::Adapter::Sqlite.new(name: "sqlite", url: ENV["SQLITE_DATABASE_URL"])
CURRENT_ADAPTER = ENV["CURRENT_ADAPTER"]
ADAPTER_URL = ENV["#{CURRENT_ADAPTER.upcase}_DATABASE_URL"]
ADAPTER_REPLICA_URL = ENV["#{CURRENT_ADAPTER.upcase}_REPLICA_URL"]? || ADAPTER_URL

case CURRENT_ADAPTER
when "pg"
Granite::Connections << Granite::Adapter::Pg.new(name: CURRENT_ADAPTER, url: ADAPTER_URL)
Granite::Connections << {name: "pg_with_replica", writer: ADAPTER_URL, reader: ADAPTER_REPLICA_URL, adapter_type: Granite::Adapter::Pg}
when "mysql"
Granite::Connections << Granite::Adapter::Mysql.new(name: CURRENT_ADAPTER, url: ADAPTER_URL)
Granite::Connections << {name: "mysql_with_replica", writer: ADAPTER_URL, reader: ADAPTER_REPLICA_URL, adapter_type: Granite::Adapter::Mysql}
when "sqlite"
Granite::Connections << Granite::Adapter::Sqlite.new(name: CURRENT_ADAPTER, url: ADAPTER_URL)
Granite::Connections << {name: "sqlite_with_replica", writer: ADAPTER_URL, reader: ADAPTER_REPLICA_URL, adapter_type: Granite::Adapter::Sqlite}
when Nil
raise "Please set CURRENT_ADAPTER"
else
raise "Unknown adapter #{CURRENT_ADAPTER}"
end

require "spec"
require "../src/granite"
Expand Down Expand Up @@ -32,6 +48,6 @@ end
{% if env("CURRENT_ADAPTER") == "mysql" && !flag?(:issue_473) %}
Spec.after_each do
# https://github.com/amberframework/granite/issues/473
Granite::Connections["mysql"].try &.database.pool.close
Granite::Connections["mysql"].not_nil![:writer].try &.database.pool.close
end
{% end %}
8 changes: 8 additions & 0 deletions spec/spec_models.cr
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,14 @@ end
{% begin %}
{% adapter_literal = env("CURRENT_ADAPTER").id %}

class ReplicatedChat < Granite::Base
connection {{ "#{adapter_literal}_with_replica" }}
table replicated_chats

column id : Int64, primary: true
column content : String
end

class Chat < Granite::Base
connection {{ adapter_literal }}
table chats
Expand Down
10 changes: 10 additions & 0 deletions src/granite/base.cr
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require "./connections"
require "./integrators"
require "./converters"
require "./type"
require "./connection_management"

# Granite::Base is the base class for your model objects.
abstract class Granite::Base
Expand All @@ -29,6 +30,8 @@ abstract class Granite::Base
include Migrator
include Select

include ConnectionManagement

extend Columns::ClassMethods
extend Tables::ClassMethods
extend Granite::Migrator::ClassMethods
Expand Down Expand Up @@ -70,5 +73,12 @@ abstract class Granite::Base

disable_granite_docs? def initialize
end

before_save :switch_to_writer_adapter
before_destroy :switch_to_writer_adapter
after_save :update_last_write_time
after_save :schedule_adapter_switch
after_destroy :update_last_write_time
after_destroy :schedule_adapter_switch
end
end
89 changes: 89 additions & 0 deletions src/granite/connection_management.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
module Granite::ConnectionManagement
macro included
# Default value for the time a model waits before using a reader
# database connection for read operations
# all models use this value. Change it
# to change it in all Granite::Base models.
class_property connection_switch_wait_period : Int64 = Granite::Connections.connection_switch_wait_period
@@last_write_time = Time.monotonic

class_property current_adapter : Granite::Adapter::Base?
class_property reader_adapter : Granite::Adapter::Base = Granite::Connections.first_reader
class_property writer_adapter : Granite::Adapter::Base = Granite::Connections.first_writer

def self.last_write_time
@@last_write_time
end

# This is done this way because callbacks don't work on class mthods
def self.update_last_write_time
@@last_write_time = Time.monotonic
end

def update_last_write_time
self.class.update_last_write_time
end

def self.time_since_last_write
Time.monotonic - @@last_write_time
end

def time_since_last_write
self.class.time_since_last_write
end

def self.switch_to_reader_adapter
if time_since_last_write > @@connection_switch_wait_period.milliseconds
@@current_adapter = @@reader_adapter
end
end

def switch_to_reader_adapter
self.class.switch_to_reader_adapter
end

def self.switch_to_writer_adapter
@@current_adapter = @@writer_adapter
end

def switch_to_writer_adapter
self.class.switch_to_writer_adapter
end

def self.schedule_adapter_switch
spawn do
sleep @@connection_switch_wait_period.milliseconds
switch_to_reader_adapter
end

Fiber.yield
end

def schedule_adapter_switch
self.class.schedule_adapter_switch
end

def self.adapter
begin
@@current_adapter.not_nil!
rescue NilAssertionError
Granite::Connections.registered_connections.first?.not_nil![:writer]
end
end
end

macro connection(name)
{% name = name.id.stringify %}

error_message = "Connection #{{{name}}} not found in Granite::Connections.
Available connections are:
#{Granite::Connections.registered_connections.map{ |conn| "#{conn[:writer].name}"}.join(", ")}"

raise error_message if Granite::Connections[{{name}}].nil?

self.writer_adapter = Granite::Connections[{{name}}].not_nil![:writer]
self.reader_adapter = Granite::Connections[{{name}}].not_nil![:reader]
self.current_adapter = @@writer_adapter
end
end
34 changes: 29 additions & 5 deletions src/granite/connections.cr
Original file line number Diff line number Diff line change
@@ -1,16 +1,40 @@
module Granite
class Connections
class_getter registered_connections = [] of Granite::Adapter::Base
class_property connection_switch_wait_period : Int64 = 2000
class_getter registered_connections = [] of {writer: Granite::Adapter::Base, reader: Granite::Adapter::Base}

# Registers the given *adapter*. Raises if an adapter with the same name has already been registered.
def self.<<(adapter : Granite::Adapter::Base) : Nil
raise "Adapter with name '#{adapter.name}' has already been registered." if @@registered_connections.any? { |conn| conn.name == adapter.name }
@@registered_connections << adapter
raise "Adapter with name '#{adapter.name}' has already been registered." if @@registered_connections.any? { |conn| conn[:writer].name == adapter.name }
@@registered_connections << {writer: adapter, reader: adapter}
end

def self.<<(data : NamedTuple(name: String, reader: String, writer: String, adapter_type: Granite::Adapter::Base.class)) : Nil
raise "Adapter with name '#{data[:name]}' has already been registered." if @@registered_connections.any? { |conn| conn[:writer].name == data[:name] }

writer_adapter = data[:adapter_type].new(name: data[:name], url: data[:writer])

# if reader/writer reference the same db. Make them point to the same granite adapter.
# This avoids connection pool duplications on the same database.
if (data[:reader] == data[:writer])
return @@registered_connections << {writer: writer_adapter, reader: writer_adapter}
end

reader_adapter = data[:adapter_type].new(name: data[:name], url: data[:reader])
@@registered_connections << {writer: writer_adapter, reader: reader_adapter}
end

# Returns a registered connection with the given *name*, otherwise `nil`.
def self.[](name : String) : Granite::Adapter::Base?
registered_connections.find { |conn| conn.name == name }
def self.[](name : String) : {writer: Granite::Adapter::Base, reader: Granite::Adapter::Base}?
registered_connections.find { |conn| conn[:writer].name == name }
end

def self.first_writer
@@registered_connections.first?.not_nil![:writer]
end

def self.first_reader
@@registered_connections.first?.not_nil![:reader]
end
end
end
1 change: 1 addition & 0 deletions src/granite/query/builder.cr
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ class Granite::Query::Builder(Model)
end

def delete
Model.switch_to_writer_adapter
assembler.delete
end

Expand Down
Loading

0 comments on commit 8b9b55b

Please sign in to comment.