Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor publishing #59

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,48 @@
# Changelog
All notable changes to this project will be documented in this file.

## [6.0.0] - 2021-10-15
### Added

- A lot of specs for all the refactoring.
- Docs
- 100% coverage

### Changed
- Heavy refactoring of Publisher and BatchPublisher.
All code is separated in different modules and classes.

1. Job callables are now called:
- single_publishing_job_class_callable
- batch_publishing_job_class_callable

2. Now there are three main classes for messaging:
- TableSync::Publishing::Single - sends one row with initialization
- TableSync::Publishing::Batch - sends batch of rows with initialization
- TableSync::Publishing::Raw - sends raw data without checks

Separate classes for publishing, object data, Rabbit params, debounce, serialization.

3. Jobs are not constrained by being ActiveJob anymore. Just need to have #perform_at method

4. Changed some method names towards consistency:
- attrs_for_routing_key -> attributes_for_routing_key
- attrs_for_metadata -> attributes_for_headers

5. Moved TableSync setup into separate classes.

6. Changed ORMAdapters.

7. Destroyed objects are initialized.
Now custom attributes for destruction will be called on instances.
- Obj.table_sync_destroy_attributes() -> Obj#attributes_for_destroy

8. Event constants are now kept in one place.

### Removed

- Plugin Errors

## [5.1.0] - 2021-09-09

### Changed
Expand Down
6 changes: 3 additions & 3 deletions Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
table_sync (5.1.0)
table_sync (6.0)
memery
rabbit_messaging
rails
Expand Down Expand Up @@ -98,15 +98,15 @@ GEM
nokogiri (>= 1.5.9)
mail (2.7.1)
mini_mime (>= 0.1.1)
marcel (1.0.1)
marcel (1.0.2)
memery (1.4.1)
ruby2_keywords (~> 0.0.2)
method_source (1.0.0)
mini_mime (1.1.1)
mini_portile2 (2.6.1)
minitest (5.14.4)
nio4r (2.5.8)
nokogiri (1.12.4)
nokogiri (1.12.5)
mini_portile2 (~> 2.6.1)
racc (~> 1.4)
parallel (1.20.1)
Expand Down
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ require 'table_sync'

- [Message protocol](docs/message_protocol.md)
- [Publishing](docs/publishing.md)
- [Publishers](docs/publishing/publishers.md)
- [Configuration](docs/publishing/configuration.md)
- [Manual Sync (examples)](docs/publishing/manual.md)
- [Receiving](docs/receiving.md)
- [Notifications](docs/notifications.md)

Expand Down
140 changes: 35 additions & 105 deletions docs/publishing.md
Original file line number Diff line number Diff line change
@@ -1,133 +1,63 @@
# Publishing changes
# Publishing

Include `TableSync.sync(self)` into a Sequel or ActiveRecord model. `:if` and `:unless` are supported for Sequel and ActiveRecord
TableSync can be used to send data using RabbitMQ.

Functioning `Rails.cache` is required
You can do in two ways. Automatic and manual.
Each one has its own pros and cons.

Example:

```ruby
class SomeModel < Sequel::Model
TableSync.sync(self, { if: -> (*) { some_code } })
end
```

#### #attributes_for_sync

Models can implement `#attributes_for_sync` to override which attributes are published. If not present, all attributes are published
Automatic is used to publish changes in realtime, as soon as the tracked entity changes.
Usually syncs one entity at a time.

#### #attrs_for_routing_key
Manual allows to sync a lot of entities per message.
But demands greater amount of work and data preparation.

Models can implement `#attrs_for_routing_key` to override which attributes are given to `routing_key_callable`. If not present, published attributes are given
## Automatic

#### #attrs_for_metadata
Include `TableSync.sync(self)` into a Sequel or ActiveRecord model.

Models can implement `#attrs_for_metadata` to override which attributes are given to `metadata_callable`. If not present, published attributes are given
Options:

#### .table_sync_model_name
- `if:` and `unless:` - Runs given proc in the scope of an instance. Skips sync on `false` for `if:` and on `true` for `unless:`.
- `on:` - specify events (`create`, `update`, `destroy`) to trigger sync on. Triggered for all of them without this option.
- `debounce_time` - min time period allowed between synchronizations.

Models can implement `.table_sync_model_name` class method to override the model name used for publishing events. Default is model class name
Functioning `Rails.cache` is required.

#### .table_sync_destroy_attributes(original_attributes)
How it works:

Models can implement `.table_sync_destroy_attributes` class method to override the attributes used for publishing destroy events. Default is object's original attributes
- `TableSync.sync(self)` - registers new callbacks (for `create`, `update`, `destroy`) for ActiveRecord model, and defines `after_create`, `after_update` and `after_destroy` callback methods for Sequel model.

## Configuration

- `TableSync.publishing_job_class_callable` is a callable which should resolve to a ActiveJob subclass that calls TableSync back to actually publish changes (required)
- Callbacks call `TableSync::Publishing::Single#publish_later` with given options and object attributes. It enqueues a job which then publishes a message.

Example:

```ruby
class TableSync::Job < ActiveJob::Base
def perform(*args)
TableSync::Publishing::Publisher.new(*args).publish_now
end
class SomeModel < Sequel::Model
TableSync.sync(self, { if: -> (*) { some_code }, unless: -> (*) { some_code }, on: [:create, :update] })
end
```

- `TableSync.batch_publishing_job_class_callable` is a callable which should resolve to a ActiveJob subclass that calls TableSync batch publisher back to actually publish changes (required for batch publisher)

- `TableSync.routing_key_callable` is a callable which resolves which routing key to use when publishing changes. It receives object class and published attributes (required)

Example:

```ruby
TableSync.routing_key_callable = -> (klass, attributes) { klass.gsub('::', '_').tableize }
```

- `TableSync.routing_metadata_callable` is a callable that adds RabbitMQ headers which can be used in routing (optional). It receives object class and published attributes. One possible way of using it is defining a headers exchange and routing rules based on key-value pairs (which correspond to sent headers)

Example:

```ruby
TableSync.routing_metadata_callable = -> (klass, attributes) { attributes.slice("project_id") }
class SomeOtherModel < Sequel::Model
TableSync.sync(self)
end
```

- `TableSync.exchange_name` defines the exchange name used for publishing (optional, falls back to default Rabbit gem configuration).

- `TableSync.notifier` is a module that provides publish and recieve notifications.

# Manual publishing

`TableSync::Publishing::Publisher.new(object_class, original_attributes, confirm: true, state: :updated, debounce_time: 45)`
where state is one of `:created / :updated / :destroyed` and `confirm` is Rabbit's confirm delivery flag and optional param `debounce_time` determines debounce time in seconds, 1 minute by default.

# Manual publishing with batches

You can use `TableSync::Publishing::BatchPublisher` to publish changes in batches (array of hashes in `attributes`).

When using `TableSync::Publishing::BatchPublisher`,` TableSync.routing_key_callable` is called as follows: `TableSync.routing_key_callable.call(klass, {})`, i.e. empty hash is passed instead of attributes. And `TableSync.routing_metadata_callable` is not called at all: metadata is set to empty hash.
## Manual

`TableSync::Publishing::BatchPublisher.new(object_class, original_attributes_array, **options)`, where `original_attributes_array` is an array with hash of attributes of published objects and `options` is a hash of options.

`options` consists of:
- `confirm`, which is a flag for RabbitMQ, `true` by default
- `routing_key`, which is a custom key used (if given) to override one from `TableSync.routing_key_callable`, `nil` by default
- `push_original_attributes` (default value is `false`), if this option is set to `true`,
original_attributes_array will be pushed to Rabbit instead of fetching records from database and sending their mapped attributes.
- `headers`, which is an option for custom headers (can be used for headers exchanges routes), `nil` by default
- `event`, which is an option for event specification (`:destroy` or `:update`), `:update` by default
Directly call one of the publishers. It's the best if you need to sync a lot of data.
This way you don't even need for the changes to occur.

Example:

```ruby
TableSync::Publishing::BatchPublisher.new(
"SomeClass",
[{ id: 1 }, { id: 2 }],
confirm: false,
routing_key: "custom_routing_key",
push_original_attributes: true,
headers: { key: :value },
event: :destroy,
)
TableSync::Publishing::Batch.new(
object_class: "User",
original_attributes: [{ id: 1 }, { id: 2 }],
event: :update,
).publish_now
```

# Manual publishing with batches (Russian)

С помощью класса `TableSync::Publishing::BatchPublisher` вы можете опубликовать изменения батчами (массивом в `attributes`).

При использовании `TableSync::Publishing::BatchPublisher`, `TableSync.routing_key_callable` вызывается следующим образом: `TableSync.routing_key_callable.call(klass, {})`, то есть вместо аттрибутов передается пустой хэш. А `TableSync.routing_metadata_callable` не вызывается вовсе: в метадате устанавливается пустой хэш.

`TableSync::Publishing::BatchPublisher.new(object_class, original_attributes_array, **options)`, где `original_attributes_array` - массив с аттрибутами публикуемых объектов и `options`- это хэш с дополнительными опциями.

`options` состоит из:
- `confirm`, флаг для RabbitMQ, по умолчанию - `true`
- `routing_key`, ключ, который (если указан) замещает ключ, получаемый из `TableSync.routing_key_callable`, по умолчанию - `nil`
- `push_original_attributes` (значение по умолчанию `false`), если для этой опции задано значение true, в Rabbit будут отправлены original_attributes_array, вместо получения значений записей из базы непосредственно перед отправкой.
- `headers`, опция для задания headers (можно использовать для задания маршрутов в headers exchange'ах), `nil` по умолчанию
- `event`, опция для указания типа события (`:destroy` или `:update`), `:update` по умолчанию
## Read More

Example:

```ruby
TableSync::Publishing::BatchPublisher.new(
"SomeClass",
[{ id: 1 }, { id: 2 }],
confirm: false,
routing_key: "custom_routing_key",
push_original_attributes: true,
headers: { key: :value },
event: :destroy,
)
```
- [Publishers](publishing/publishers.md)
- [Configuration](publishing/configuration.md)
- [Manual Sync (examples)](publishing/manual.md)
143 changes: 143 additions & 0 deletions docs/publishing/configuration.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
# Configuration

Customization, configuration and other options.

## Model Customization

There are methods you can define on a synched model to customize published messages for it.

### `#attributes_for_sync`

Models can implement `#attributes_for_sync` to override which attributes are published for `update` and `create` events. If not present, all attributes are published.

### `#attributes_for_destroy`

Models can implement `#attributes_for_destroy` to override which attributes are published for `destroy` events. If not present, `needle` (primary key) is published.

### `#attributes_for_routing_key`

Models can implement `#attributes_for_routing_key` to override which attributes are given to the `routing_key_callable`. If not present, published attributes are given.

### `#attributes_for_headers`

Models can implement `#attributes_for_headers` to override which attributes are given to the `headers_callable`. If not present, published attributes are given.

### `.table_sync_model_name`

Models can implement `.table_sync_model_name` class method to override the model name used for publishing events. Default is a model class name.

## Callables

Callables are defined once. TableSync will use them to dynamically resolve things like jobs, routing_key and headers.

### Single publishing job (required for automatic and delayed publishing)

- `TableSync.single_publishing_job_class_callable` is a callable which should resolve to a class that calls TableSync back to actually publish changes.

It is expected to have `.perform_at(hash_with_options)` and it will be passed a hash with the following keys:

- `original_attributes` - serialized `original_attributes`
- `object_class` - model name
- `debounce_time` - pause between publishing messages
- `event` - type of event that happened to synched entity
- `perform_at` - time to perform the job at (depends on debounce)

Example:

```ruby
TableSync.single_publishing_job_class_callable = -> { TableSync::Job }

class TableSync::Job < ActiveJob::Base
def perform(jsoned_attributes)
TableSync::Publishing::Single.new(
JSON.parse(jsoned_attributes),
).publish_now
end

def self.perform_at(attributes)
set(wait_until: attributes.delete(:perform_at))
.perform_later(attributes.to_json)
end
end

# will enqueue the job described above

TableSync::Publishing::Single.new(
object_class: "User",
original_attributes: { id: 1, name: "Mark" }, # will be serialized!
debounce_time: 60,
event: :update,
).publish_later
```

### Batch publishing job (required only for `TableSync::Publishing::Batch#publish_later`)

- `TableSync.batch_publishing_job_class_callable` is a callable which should resolve to a class that calls TableSync back to actually publish changes.

It is expected to have `.perform_later(hash_with_options)` and it will be passed a hash with the following keys:

- `original_attributes` - array of serialized `original_attributes`
- `object_class` - model name
- `event` - type of event that happened to synched entity
- `routing_key` - custom routing_key (optional)
- `headers` - custom headers (optional)

More often than not this job is not very useful, since it makes more sense to use `#publish_now` from an already existing job that does a lot of things (not just publishing messages).

### Example

```ruby
TableSync.batch_publishing_job_class_callable = -> { TableSync::BatchJob }

class TableSync::BatchJob < ActiveJob::Base
def perform(jsoned_attributes)
TableSync::Publishing::Batch.new(
JSON.parse(jsoned_attributes),
).publish_now
end

def self.perform_later(attributes)
super(attributes.to_json)
end
end

TableSync::Publishing::Batch.new(
object_class: "User",
original_attributes: [{ id: 1, name: "Mark" }, { id: 2, name: "Bob" }], # will be serialized!
event: :create,
routing_key: :custom_key, # optional
headers: { type: "admin" }, # optional
).publish_later
```

### Routing key callable (required)

- `TableSync.routing_key_callable` is a callable that resolves which routing key to use when publishing changes. It receives object class and published attributes or `#attributes_for_routing_key` (if defined).

Example:

```ruby
TableSync.routing_key_callable = -> (klass, attributes) { klass.gsub('::', '_').tableize }
```

### Headers callable (required)

- `TableSync.headers_callable` is a callable that adds RabbitMQ headers which can be used in routing. It receives object class and published attributes or `#attributes_for_headers` (if defined).

One possible way of using it is defining a headers exchange and routing rules based on key-value pairs (which correspond to sent headers).

Example:

```ruby
TableSync.routing_metadata_callable = -> (klass, attributes) { attributes.slice("project_id") }
```

## Other

- `TableSync.exchange_name` defines the exchange name used for publishing (optional, falls back to default Rabbit gem configuration).

- `TableSync.notifier` is a module that provides publish and recieve notifications.

- `TableSync.raise_on_empty_message` - raises an error on empty message if set to true.

- `TableSync.orm` - set ORM (ActiveRecord or Sequel) used to process given entities. Required!
Loading