Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Snapshot #4

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 13 additions & 36 deletions lib/sandthorn_driver_sequel_2/access/snapshot_access.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ module SandthornDriverSequel2
class SnapshotAccess < Access::Base

def find_by_aggregate_id(aggregate_id)
#aggregate = aggregates.find_by_aggregate_id(aggregate_id)
storage.snapshots.first(aggregate_id: aggregate_id)
end

Expand All @@ -11,42 +10,26 @@ def find(snapshot_id)
end

def record_snapshot(aggregate_id, snapshot_data)
#aggregate = aggregates.find_by_aggregate_id!(aggregate_id)
raise SandthornDriverSequel2::Errors::SnapshotDataError unless perform_snapshot?(snapshot_data)

previous_snapshot = find_by_aggregate_id(aggregate_id)
#if perform_snapshot?(aggregate, previous_snapshot)
perform_snapshot(aggregate_id, previous_snapshot, snapshot_data)
#end
end

def obsolete(aggregate_types: [], max_event_distance: 100)
aggregate_types.map!(&:to_s)
snapshot_version = Sequel.qualify(storage.snapshots_table_name, :aggregate_version)
aggregate_version = Sequel.qualify(storage.aggregates_table_name, :aggregate_version)
aggregate_id = Sequel.qualify(storage.aggregates_table_name, :aggregate_id)
query = storage.aggregates.left_outer_join(storage.snapshots, aggregate_id: :aggregate_id)
query = query.select { (aggregate_version - snapshot_version).as(distance) }
query = query.select_append(aggregate_id, :aggregate_type)
query = query.where { (aggregate_version - coalesce(snapshot_version, 0)) > max_event_distance }
if aggregate_types.any?
query = query.where(aggregate_type: aggregate_types)
if previous_snapshot
return if previous_snapshot[:aggregate_version] == snapshot_data[:aggregate_version]
end
query.all
perform_snapshot(aggregate_id, previous_snapshot, snapshot_data)
end

private

def aggregates
@aggregates ||= AggregateAccess.new(storage)
end

def perform_snapshot?(aggregate, snapshot)
return true if snapshot.nil?
snapshot = SnapshotWrapper.new(snapshot)
aggregate.aggregate_version > snapshot.aggregate_version
def perform_snapshot?(snapshot_data)
return false if snapshot_data.nil?
return false unless snapshot_data.class == Hash
return false if snapshot_data[:aggregate_version].nil?
return false if snapshot_data[:event_data].nil?
return true
end

def perform_snapshot(aggregate_id, previous_snapshot, snapshot_data)
#check_snapshot_version!(aggregate, snapshot_data)
if valid_snapshot?(previous_snapshot)
update_snapshot(previous_snapshot, snapshot_data)
else
Expand All @@ -65,6 +48,7 @@ def build_snapshot(snapshot_data)
{
snapshot_data: snapshot_data.data,
aggregate_version: snapshot_data.aggregate_version
# aggregate_type: snapshot_data.aggregate_version
}
end

Expand All @@ -74,14 +58,7 @@ def valid_snapshot?(snapshot)

def update_snapshot(snapshot, snapshot_data)
data = build_snapshot(snapshot_data)
storage.snapshots.where(aggregate_id: snapshot.id).update(data)
end

def check_snapshot_version!(aggregate, snapshot_data)
snapshot = SnapshotWrapper.new(snapshot_data)
if aggregate.aggregate_version < snapshot.aggregate_version
raise Errors::WrongSnapshotVersionError.new(aggregate, snapshot.aggregate_version)
end
storage.snapshots.where(id: snapshot.id).update(data)
end

end
Expand Down
3 changes: 2 additions & 1 deletion lib/sandthorn_driver_sequel_2/errors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ module SandthornDriverSequel2::Errors
InternalError = Class.new(Error)
NoAggregateError = Class.new(Error)
EventFormatError = Class.new(Error)

SnapshotDataError = Class.new(Error)

class ConcurrencyError < Error
attr_reader :event, :aggregate
def initialize(event, aggregate)
Expand Down
11 changes: 2 additions & 9 deletions lib/sandthorn_driver_sequel_2/event_store.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ def save_snapshot aggregate_snapshot, aggregate_id
# TODO: needs a better name
def get_aggregate_events_from_snapshot(aggregate_id)
driver.execute do |db|
#snapshots = get_snapshot_access(db)
snapshots = get_snapshot_access(db)
event_access = get_event_access(db)
snapshot = false#snapshots.find_by_aggregate_id(aggregate_id)
snapshot = snapshots.find_by_aggregate_id(aggregate_id)
if snapshot
events = event_access.after_snapshot(snapshot)
snapshot_event = build_snapshot_event(snapshot)
Expand Down Expand Up @@ -105,13 +105,6 @@ def get_new_events_after_event_id_matching_classname event_id, class_name, take:
get_events(after_sequence_number: event_id, aggregate_types: Utilities.array_wrap(class_name), take: take)
end

def obsolete_snapshots(*args)
driver.execute do |db|
snapshots = get_snapshot_access(db)
snapshots.obsolete(*args)
end
end

private

def transform_snapshot(snapshot)
Expand Down
1 change: 1 addition & 0 deletions lib/sandthorn_driver_sequel_2/wrappers/snapshot_wrapper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ def aggregate_version
def data
self[:event_data]
end

end
end
64 changes: 0 additions & 64 deletions spec/asking_for_aggregates_to_snapshot_spec.rb

This file was deleted.

13 changes: 0 additions & 13 deletions spec/event_access_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,7 @@ module SandthornDriverSequel2
let(:context) { :test }
let(:db) { Sequel.connect(event_store_url)}
let(:aggregate_id) { SecureRandom.uuid }
let(:aggregate) do
aggregate_access.register_aggregate(aggregate_id, "foo")
end
let(:storage) { Storage.new(db, :test, {}) }
let(:aggregate_access) { AggregateAccess.new(storage) }
let(:snapshot_access) { SnapshotAccess.new(storage)}
let(:access) { EventAccess.new(storage) }

Expand Down Expand Up @@ -51,15 +47,6 @@ module SandthornDriverSequel2
expect(events.map(&:timestamp).all?).to be_truthy
end

# it "updates the aggregate version" do
# access.store_events(events)
# events = access.find_events_by_aggregate_id(aggregate_id)
# version = events.map(&:aggregate_version).max

# reloaded_aggregate = aggregate_access.find(aggregate.id)
# expect(reloaded_aggregate.aggregate_version).to eq(version)
# end

context "when the aggregate version of an event is incorrect" do
it "throws an error" do
event = { aggregate_version: 100, aggregate_id: aggregate_id, aggregate_type: "Foo", event_name: "new", event_data: "noop" }
Expand Down
137 changes: 63 additions & 74 deletions spec/saving_snapshot_spec.rb
Original file line number Diff line number Diff line change
@@ -1,77 +1,66 @@
# require 'spec_helper'
# require 'yaml'
require 'spec_helper'
require 'yaml'

# module SandthornDriverSequel2
# describe EventStore do
# before(:each) { prepare_for_test }
# let(:aggregate_id) { @id ||= UUIDTools::UUID.random_create.to_s }
# let(:test_events) { [{aggregate_version: 1, event_data: nil, event_name: "new"},{aggregate_version: 2, event_data: nil, event_name: "foo"}] }
# let(:additional_events) { [{aggregate_version: 3, event_data: nil, event_name: "klopp"},{aggregate_version: 4, event_data: nil, event_name: "flipp"}] }
# let(:snapshot_data) { { event_data: YAML.dump(Object.new), aggregate_version: 2 } }
# let(:save_snapshot) { event_store.save_snapshot snapshot_data, aggregate_id }
# let(:save_events) { event_store.save_events test_events, aggregate_id, SandthornDriverSequel2::EventStore }
# let(:save_additional_events) { event_store.save_events additional_events, aggregate_id, SandthornDriverSequel2::EventStore }
# context "when loading an aggregate using get_aggregate" do
# context "and it has a snapshot" do
# before(:each) do
# save_events
# save_snapshot
# save_additional_events
# end
# let(:events) { event_store.get_aggregate aggregate_id, SandthornDriverSequel2::EventStore }
# it "should have the first event as :aggregate_set_from_snapshot" do
# expect(events.first[:event_name]).to eql "aggregate_set_from_snapshot"
# end
# it "should have additional events after first snapshot-event" do
# expect(events.length).to eql 1+additional_events.length
# expect(events[1][:aggregate_version]).to eql additional_events[0][:aggregate_version]
# expect(events.last[:aggregate_version]).to eql additional_events.last[:aggregate_version]
# end
# end
module SandthornDriverSequel2
describe EventStore do
before(:each) { prepare_for_test }
let(:aggregate_id) { @id ||= UUIDTools::UUID.random_create.to_s }
let(:test_events) { [{aggregate_version: 1, event_data: nil, event_name: "new"},{aggregate_version: 2, event_data: nil, event_name: "foo"}] }
let(:additional_events) { [{aggregate_version: 3, event_data: nil, event_name: "klopp"},{aggregate_version: 4, event_data: nil, event_name: "flipp"}] }
let(:snapshot_data) { { event_data: YAML.dump(Object.new), aggregate_version: 2 } }
let(:save_snapshot) { event_store.save_snapshot snapshot_data, aggregate_id }
let(:save_events) { event_store.save_events test_events, aggregate_id, SandthornDriverSequel2::EventStore }
let(:save_additional_events) { event_store.save_events additional_events, aggregate_id, SandthornDriverSequel2::EventStore }
context "when loading an aggregate using get_aggregate" do
context "and it has a snapshot" do
before(:each) do
save_events
save_snapshot
save_additional_events
end
let(:events) { event_store.get_aggregate aggregate_id, SandthornDriverSequel2::EventStore }
it "should have the first event as :aggregate_set_from_snapshot" do
expect(events.first[:event_name]).to eql "aggregate_set_from_snapshot"
end
it "should have additional events after first snapshot-event" do
expect(events.length).to eql 1+additional_events.length
expect(events[1][:aggregate_version]).to eql additional_events[0][:aggregate_version]
expect(events.last[:aggregate_version]).to eql additional_events.last[:aggregate_version]
end
end

# end
# context "when saving a snapshot" do
end
context "when saving a snapshot" do

# context "and events are saved beforehand" do
# before(:each) { save_events }
# it "should be able to save snapshot" do
# expect { save_snapshot }.to_not raise_error
# end
# it "should be able to save and get snapshot" do
# save_snapshot
# snap = event_store.get_snapshot(aggregate_id)
# expect(snap).to eql snapshot_data
# end
# end
# context "when trying to save a snapshot on a non-existing aggregate" do
# it "should raise a NonAggregateError" do
# expect { save_snapshot }.to raise_error SandthornDriverSequel2::Errors::NoAggregateError
# end
# end
# context "when trying to save a snapshot with a non-existing aggregate_version" do
# before(:each) { save_events }
# it "should raise a WrongAggregateVersion error" do
# data = snapshot_data
# data[:aggregate_version] = 100
# expect { event_store.save_snapshot data, aggregate_id }.to raise_error SandthornDriverSequel2::Errors::WrongSnapshotVersionError
# end
# end
# context "when saving a snapshot twice" do
# before(:each) { save_events; save_snapshot }
# it "should not raise error" do
# expect { save_snapshot }.to_not raise_error
# end
# end
# context "when saving a snapshot on a version less than current version" do
# before(:each) { save_events; }
# it "should save without protesting" do
# data = snapshot_data
# data[:aggregate_version] = 1
# event_store.save_snapshot(data, aggregate_id)
# snap = event_store.get_snapshot(aggregate_id)
# expect(snap).to eql data
# end
# end
# end
# end
# end
context "and events are saved beforehand" do
before(:each) { save_events }
it "should be able to save snapshot" do
expect { save_snapshot }.to_not raise_error
end
it "should be able to save and get snapshot" do
save_snapshot
snap = event_store.get_snapshot(aggregate_id)
expect(snap).to eql snapshot_data
end
end


context "when saving a snapshot twice" do
before(:each) { save_events; save_snapshot }
it "should not raise error" do
expect { save_snapshot }.to_not raise_error
end
end
context "when saving a snapshot on a version less than current version" do
before(:each) { save_events; }
it "should save without protesting" do
data = snapshot_data
data[:aggregate_version] = 1
event_store.save_snapshot(data, aggregate_id)
snap = event_store.get_snapshot(aggregate_id)
expect(snap).to eql data
end
end
end
end
end
Loading