Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Campaign data sync #54

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions app/models/identity_tijuana/campaign.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
module IdentityTijuana
class Campaign < ReadWrite
self.table_name = 'campaigns'

scope :deleted_campaigns, -> (last_updated_at, exclude_from) {
where('deleted_at is not null and deleted_at >= ? and deleted_at < ?', last_updated_at, exclude_from)
.order('deleted_at, id')
}

scope :updated_campaigns, -> (last_updated_at, last_id) {
updated_campaigns_all(last_updated_at, last_id)
.order('updated_at, id')
.limit(Settings.tijuana.pull_batch_amount)
}

scope :updated_campaigns_all, -> (last_updated_at, last_id) {
where('updated_at > ? or (updated_at = ? and id > ?)', last_updated_at, last_updated_at, last_id)
}

def import(sync_id)
begin
# The campaigns table in TJ maps onto the issues table in ID.
issue = ::Issue.find_or_create_by(external_id: self.id.to_s, external_source: 'tijuana')
issue.name = self.name
issue.save!
# The campaigns.accounts_key column in TJ maps onto the issue_categories table in ID.
issue.issue_categories.clear
accounts_key = self.accounts_key
if accounts_key.present?
issue_category = ::IssueCategory.find_or_create_by(name: accounts_key)
issue.issue_categories << issue_category
end
rescue Exception => e
Rails.logger.error "Tijuana campaigns sync id:#{self.id}, error: #{e.message}"
raise
end
end

def erase(sync_id)
begin
issue = ::Issue.find_by(external_id: self.id.to_s, external_source: 'tijuana')
if issue.present?
issue.campaigns.destroy_all # TODO: Will need to cascade to other tables.
issue.issue_categories.clear
issue.destroy
end
rescue Exception => e
Rails.logger.error "Tijuana campaigns delete id:#{self.id}, error: #{e.message}"
raise
end
end
end
end
159 changes: 44 additions & 115 deletions lib/identity_tijuana.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@ module IdentityTijuana
SYSTEM_NAME = 'tijuana'
SYNCING = 'tag'
CONTACT_TYPE = 'email'
PULL_JOBS = [[:fetch_user_updates, 10.minutes]]
PULL_JOBS = [[:fetch_campaign_updates, 10.minutes], [:fetch_user_updates, 10.minutes]]
MEMBER_RECORD_DATA_TYPE='object'
MUTEX_EXPIRY_DURATION = 10.minutes

include RedisHelper
include MutexHelper
include CampaignHelper

def self.push(sync_id, member_ids, external_system_params)
begin
members = Member.where(id: member_ids).with_email.order(:id)
Expand Down Expand Up @@ -56,33 +60,20 @@ def self.get_push_jobs
def self.pull(sync_id, external_system_params)
begin
pull_job = JSON.parse(external_system_params)['pull_job'].to_s
self.send(pull_job, sync_id) do |records_for_import_count, records_for_import, records_for_import_scope, pull_deferred|
yield records_for_import_count, records_for_import, records_for_import_scope, pull_deferred
end
rescue => e
raise e
end
end

def self.fetch_user_updates(sync_id)
begin
mutex_acquired = acquire_mutex_lock(__method__.to_s, sync_id)
mutex_acquired = acquire_mutex_lock(pull_job, sync_id)
unless mutex_acquired
yield 0, {}, {}, true
return
end
need_another_batch = fetch_user_updates_impl(sync_id) do |records_for_import_count, records_for_import, records_for_import_scope, pull_deferred|
self.send(pull_job, sync_id) do |records_for_import_count, records_for_import, records_for_import_scope, pull_deferred|
yield records_for_import_count, records_for_import, records_for_import_scope, pull_deferred
end
ensure
release_mutex_lock(__method__.to_s) if mutex_acquired
release_mutex_lock(pull_job) if mutex_acquired # Check to make sure that mutex lock is always released.
end
schedule_pull_batch(:fetch_user_updates) if need_another_batch
schedule_pull_batch(:fetch_tagging_updates)
schedule_pull_batch(:fetch_donation_updates)
end

def self.fetch_user_updates_impl(sync_id)
def self.fetch_user_updates(sync_id)
started_at = DateTime.now
last_updated_at = get_redis_date('tijuana:users:last_updated_at')
last_id = (Sidekiq.redis { |r| r.get 'tijuana:users:last_id' } || 0).to_i
Expand All @@ -100,29 +91,30 @@ def self.fetch_user_updates_impl(sync_id)
updated_member_ids = Member.connection.execute(<<~SQL
SELECT id as member_id
FROM members
WHERE updated_at > '#{last_updated_at}'
AND updated_at <= '#{users_dependent_data_cutoff}'
UNION
SELECT DISTINCT member_id
FROM addresses
WHERE updated_at > '#{last_updated_at}'
AND updated_at <= '#{users_dependent_data_cutoff}'
UNION
SELECT distinct member_id
FROM custom_fields
WHERE updated_at > '#{last_updated_at}'
AND updated_at <= '#{users_dependent_data_cutoff}'
UNION
SELECT DISTINCT member_id
FROM member_subscriptions
WHERE updated_at > '#{last_updated_at}'
AND updated_at <= '#{users_dependent_data_cutoff}'
UNION
SELECT DISTINCT member_id
FROM phone_numbers
WHERE updated_at > '#{last_updated_at}'
AND updated_at <= '#{users_dependent_data_cutoff}'
ORDER BY member_id;
WHERE (updated_at > '#{last_updated_at}'
AND updated_at <= '#{users_dependent_data_cutoff}')
OR id IN (
SELECT DISTINCT member_id
FROM addresses
WHERE updated_at > '#{last_updated_at}'
AND updated_at <= '#{users_dependent_data_cutoff}'
UNION
SELECT distinct member_id
FROM custom_fields
WHERE updated_at > '#{last_updated_at}'
AND updated_at <= '#{users_dependent_data_cutoff}'
UNION
SELECT DISTINCT member_id
FROM member_subscriptions
WHERE updated_at > '#{last_updated_at}'
AND updated_at <= '#{users_dependent_data_cutoff}'
UNION
SELECT DISTINCT member_id
FROM phone_numbers
WHERE updated_at > '#{last_updated_at}'
AND updated_at <= '#{users_dependent_data_cutoff}'
ORDER BY member_id
);
SQL
).map {|member_id_row| member_id_row['member_id']}

Expand Down Expand Up @@ -154,7 +146,11 @@ def self.fetch_user_updates_impl(sync_id)
false
)

updated_users.count < updated_users_all.count
release_mutex_lock(:fetch_user_updates)
need_another_batch = updated_users.count < updated_users_all.count
schedule_pull_batch(:fetch_user_updates) if need_another_batch
schedule_pull_batch(:fetch_tagging_updates)
schedule_pull_batch(:fetch_donation_updates)
end

def self.fetch_users_for_dedupe
Expand All @@ -173,22 +169,6 @@ def self.fetch_users_for_dedupe
end

def self.fetch_donation_updates(sync_id)
begin
mutex_acquired = acquire_mutex_lock(__method__.to_s, sync_id)
unless mutex_acquired
yield 0, {}, {}, true
return
end
need_another_batch = fetch_donation_updates_impl(sync_id) do |records_for_import_count, records_for_import, records_for_import_scope, pull_deferred|
yield records_for_import_count, records_for_import, records_for_import_scope, pull_deferred
end
ensure
release_mutex_lock(__method__.to_s) if mutex_acquired
end
schedule_pull_batch(:fetch_donation_updates) if need_another_batch
end

def self.fetch_donation_updates_impl(sync_id)
started_at = DateTime.now
last_updated_at = get_redis_date('tijuana:donations:last_updated_at')
last_id = (Sidekiq.redis { |r| r.get 'tijuana:donations:last_id' } || 0).to_i
Expand Down Expand Up @@ -221,26 +201,12 @@ def self.fetch_donation_updates_impl(sync_id)
false
)

updated_donations.count < updated_donations_all.count
release_mutex_lock(:fetch_donation_updates)
need_another_batch = updated_donations.count < updated_donations_all.count
schedule_pull_batch(:fetch_donation_updates) if need_another_batch
end

def self.fetch_tagging_updates(sync_id)
begin
mutex_acquired = acquire_mutex_lock(__method__.to_s, sync_id)
unless mutex_acquired
yield 0, {}, {}, true
return
end
need_another_batch = fetch_tagging_updates_impl(sync_id) do |records_for_import_count, records_for_import, records_for_import_scope, pull_deferred|
yield records_for_import_count, records_for_import, records_for_import_scope, pull_deferred
end
ensure
release_mutex_lock(__method__.to_s) if mutex_acquired
end
schedule_pull_batch(:fetch_tagging_updates) if need_another_batch
end

def self.fetch_tagging_updates_impl(sync_id)
latest_tagging_scope_limit = 50000
started_at = DateTime.now
last_id = (Sidekiq.redis { |r| r.get 'tijuana:taggings:last_id' } || 0).to_i
Expand Down Expand Up @@ -369,50 +335,13 @@ def self.fetch_tagging_updates_impl(sync_id)
false
)

results.count < tags_remaining_count
release_mutex_lock(:fetch_tagging_updates)
need_another_batch = results.count < tags_remaining_count
schedule_pull_batch(:fetch_tagging_updates) if need_another_batch
end

private

def self.acquire_mutex_lock(method_name, sync_id)
mutex_name = "#{SYSTEM_NAME}:mutex:#{method_name}"
new_mutex_expiry = DateTime.now + MUTEX_EXPIRY_DURATION
mutex_acquired = set_redis_date(mutex_name, new_mutex_expiry, true)
unless mutex_acquired
mutex_expiry = get_redis_date(mutex_name)
if mutex_expiry.past?
unless worker_currently_running?(method_name, sync_id)
delete_redis_date(mutex_name)
mutex_acquired = set_redis_date(mutex_name, new_mutex_expiry, true)
end
end
end
mutex_acquired
end

def self.release_mutex_lock(method_name)
mutex_name = "#{SYSTEM_NAME}:mutex:#{method_name}"
delete_redis_date(mutex_name)
end

def self.get_redis_date(redis_identifier, default_value=Time.at(0))
date_str = Sidekiq.redis { |r| r.get redis_identifier }
date_str ? Time.parse(date_str) : default_value
end

def self.set_redis_date(redis_identifier, date_time_value, as_mutex=false)
date_str = date_time_value.utc.to_fs(:inspect) # Ensures fractional seconds are retained
if as_mutex
Sidekiq.redis { |r| r.setnx redis_identifier, date_str }
else
Sidekiq.redis { |r| r.set redis_identifier, date_str }
end
end

def self.delete_redis_date(redis_identifier)
Sidekiq.redis { |r| r.del redis_identifier }
end

def self.schedule_pull_batch(pull_job)
sync = Sync.create!(
external_system: SYSTEM_NAME,
Expand Down
66 changes: 66 additions & 0 deletions lib/identity_tijuana/campaign_helper.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
module IdentityTijuana
module CampaignHelper
module ClassMethods
def fetch_campaign_updates(sync_id)
started_at = DateTime.now
last_updated_at = get_redis_date('tijuana:campaigns:last_updated_at')
last_id = (Sidekiq.redis { |r| r.get 'tijuana:campaigns:last_id' } || 0).to_i
campaigns_dependent_data_cutoff = DateTime.now
updated_campaigns = IdentityTijuana::Campaign.updated_campaigns(last_updated_at, last_id)
updated_campaigns_all = IdentityTijuana::Campaign.updated_campaigns_all(last_updated_at, last_id)

updated_campaigns.each do |campaign|
campaign.import(sync_id)
end

unless updated_campaigns.empty?
campaigns_dependent_data_cutoff = updated_campaigns.last.updated_at if updated_campaigns.count < updated_campaigns_all.count
end

# Erase any logically deleted campaigns from ID.
deleted_campaigns = IdentityTijuana::Campaign.deleted_campaigns(last_updated_at, campaigns_dependent_data_cutoff)
deleted_campaigns.each do |campaign|
campaign.erase(sync_id)
end

unless updated_campaigns.empty?
set_redis_date('tijuana:campaigns:last_updated_at', updated_campaigns.last.updated_at)
Sidekiq.redis { |r| r.set 'tijuana:campaigns:last_id', updated_campaigns.last.id }
end

set_redis_date('tijuana:campaigns:dependent_data_cutoff', campaigns_dependent_data_cutoff)

execution_time_seconds = ((DateTime.now - started_at) * 24 * 60 * 60).to_i
yield(
updated_campaigns.size,
updated_campaigns.pluck(:id),
{
scope: 'tijuana:campaigns:last_updated_at',
scope_limit: Settings.tijuana.pull_batch_amount,
from: last_updated_at,
to: updated_campaigns.empty? ? nil : updated_campaigns.last.updated_at,
started_at: started_at,
completed_at: DateTime.now,
execution_time_seconds: execution_time_seconds,
remaining_behind: updated_campaigns_all.count
},
false
)

release_mutex_lock(:fetch_campaign_updates)
need_another_batch = updated_campaigns.count < updated_campaigns_all.count
if need_another_batch
schedule_pull_batch(:fetch_campaign_updates)
else
schedule_pull_batch(:fetch_page_sequence_updates)
schedule_pull_batch(:fetch_push_updates)
end
end
end

extend ClassMethods
def self.included(other)
other.extend(ClassMethods)
end
end
end
33 changes: 33 additions & 0 deletions lib/identity_tijuana/mutex_helper.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
module IdentityTijuana
module MutexHelper
include RedisHelper

module ClassMethods
def self.acquire_mutex_lock(method_name, sync_id)
mutex_name = "#{SYSTEM_NAME}:mutex:#{method_name}"
new_mutex_expiry = DateTime.now + MUTEX_EXPIRY_DURATION
mutex_acquired = set_redis_date(mutex_name, new_mutex_expiry, true)
unless mutex_acquired
mutex_expiry = get_redis_date(mutex_name)
if mutex_expiry.past?
unless worker_currently_running?(method_name, sync_id)
delete_redis_date(mutex_name)
mutex_acquired = set_redis_date(mutex_name, new_mutex_expiry, true)
end
end
end
mutex_acquired
end

def self.release_mutex_lock(method_name)
mutex_name = "#{SYSTEM_NAME}:mutex:#{method_name}"
delete_redis_date(mutex_name)
end
end

extend ClassMethods
def self.included(other)
other.extend(ClassMethods)
end
end
end
Loading