Skip to content

Commit

Permalink
bucket notification - ignore failure if notification was removed
Browse files Browse the repository at this point in the history
Signed-off-by: Amit Prinz Setter <[email protected]>
  • Loading branch information
alphaprinz committed Jan 23, 2025
1 parent c61072b commit 7167ef2
Showing 1 changed file with 49 additions and 29 deletions.
78 changes: 49 additions & 29 deletions src/util/notifications_util.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ class Notificator {
this.connect_files_dir = connect_files_dir ?? DEFAULT_CONNECT_FILES_DIR;
this.nc_config_fs = nc_config_fs;
this.batch_size = batch_size || config.NOTIFICATION_BATCH || 10;

//requiring system_store takes about 2 seconds for the first time
//this time is unnecesarily added to manage_nsfs runtime, which fails nsfs tests on timeout
//so I'm trying to reduce places were system store is required to minimum
this.system_store = require('../server/system_services/system_store').get_instance();
this.system_utils = require('../server/utils/system_utils');
}

async run_batch() {
Expand All @@ -58,18 +64,12 @@ class Notificator {
}

_can_run() {
//requiring system_store takes about 2 seconds for the first time
//this time is unnecesarily added to manage_nsfs runtime
//so I'm trying to reduce places were system store is required to minimum
const system_store = require('../server/system_services/system_store').get_instance();
const system_utils = require('../server/utils/system_utils');

if (!system_store.is_finished_initial_load) {
if (!this.system_store.is_finished_initial_load) {
dbg.log0('system_store did not finish initial load');
return false;
}
const system = system_store.data.systems[0];
if (!system || system_utils.system_in_maintenance(system._id)) return false;
const system = this.system_store.data.systems[0];
if (!system || this.system_utils.system_in_maintenance(system._id)) return false;

return true;
}
Expand Down Expand Up @@ -115,10 +115,12 @@ class Notificator {
async _notify(fs_context, log_file, failure_append) {
const file = new LogFile(fs_context, log_file);
let send_promises = [];
let notif;
await file.collect_and_process(async str => {
try {
const notif = JSON.parse(str);
dbg.log2("notifying with notification =", notif);
notif = null;
dbg.log2("notifying with notification =", str);
notif = JSON.parse(str);
let connect = this.notif_to_connect.get(notif.meta.name);
if (!connect) {
connect = await this.parse_connect_file(notif.meta.connect, true);
Expand All @@ -127,25 +129,21 @@ class Notificator {
let connection = this.connect_str_to_connection.get(notif.meta.name);
if (!connection) {
connection = get_connection(connect);
try {
await connection.connect();
} catch (err) {
//failed to connect
dbg.error("Connection failed for", connect);
await failure_append(str);
return;
}
await connection.connect();
this.connect_str_to_connection.set(notif.meta.name, connection);
}
const send_promise = connection.promise_notify(notif, failure_append);
const send_promise = connection.promise_notify(notif, this.handle_failed_notification.bind(this), failure_append);
if (send_promise) send_promises.push(send_promise);
if (send_promises.length > this.batch_size) {
await Promise.all(send_promises);
send_promises = [];
}
} catch (err) {
dbg.error("Failed to notify. err = ", err, ", str =", str);
await failure_append(str);
//re-write the failed notification if it's still configured on the bucket
if (notif) {
this.handle_failed_notification(notif, failure_append);
}
}
});
//note we can't reject promises here, since Promise.all() is rejected on
Expand All @@ -157,6 +155,27 @@ class Notificator {
return true;
}

async handle_failed_notification(notif, failure_append) {
let bucket;
//re-write the failed notification in the persitent log, unless
//it is no longer configured on the bucket
if (this.nc_config_fs) {
bucket = await this.nc_config_fs.get_bucket_by_name(notif.meta.bucket);
} else {
const system = this.system_store.data.systems[0];
bucket = system.buckets_by_name && system.buckets_by_name[notif.meta.bucket];
}
if (bucket.notifications) {
for (const notif_conf of bucket.notifications) {
if (notif_conf.id[0] === notif.meta.name) {
//notification is still configured, rewrite it
await failure_append(JSON.stringify(notif));
break;
}
}
}
}

async parse_connect_file(connect_filename, decrypt = false) {
let connect;
if (this.nc_config_fs) {
Expand Down Expand Up @@ -189,7 +208,7 @@ class HttpNotificator {
this.agent = new this.protocol.Agent(this.connect_obj.agent_request_object);
}

promise_notify(notif, promise_failure_cb) {
promise_notify(notif, promise_failure_cb, failure_ctxt) {
return new Promise(resolve => {
const req = this.protocol.request({
agent: this.agent,
Expand All @@ -205,12 +224,12 @@ class HttpNotificator {
return;
}
dbg.error("Notify err =", err);
promise_failure_cb(JSON.stringify(notif)).then(resolve);
promise_failure_cb(notif, failure_ctxt).then(resolve);
});
req.on('timeout', () => {
dbg.error("Notify timeout");
req.destroy();
promise_failure_cb(JSON.stringify(notif)).then(resolve);
promise_failure_cb(notif, failure_ctxt).then(resolve);
});
req.write(JSON.stringify(notif.notif));
req.end();
Expand Down Expand Up @@ -251,7 +270,7 @@ class KafkaNotificator {
this.connection.setPollInterval(100);
}

promise_notify(notif, promise_failure_cb) {
promise_notify(notif, promise_failure_cb, failure_ctxt) {
const connect_obj = this.connect_obj;
return new Promise(resolve => {
this.connection.produce(
Expand All @@ -262,7 +281,7 @@ class KafkaNotificator {
Date.now(),
(err, offset) => {
if (err) {
promise_failure_cb(JSON.stringify(notif)).then(resolve);
promise_failure_cb(notif, failure_ctxt).then(resolve);
} else {
resolve();
}
Expand Down Expand Up @@ -409,7 +428,7 @@ function compose_notification_req(req, res, bucket, notif_conf) {
notif.s3.object.sequencer = res.seq;
}

return compose_meta(notif, notif_conf);
return compose_meta(notif, notif_conf, bucket);
}

function compose_notification_lifecycle(deleted_obj, notif_conf, bucket) {
Expand All @@ -423,15 +442,16 @@ function compose_notification_lifecycle(deleted_obj, notif_conf, bucket) {
notif.s3.object.eTag = deleted_obj.etag;
notif.s3.object.versionId = deleted_obj.version_id;

return compose_meta(notif, notif_conf);
return compose_meta(notif, notif_conf, bucket);

}

function compose_meta(record, notif_conf) {
function compose_meta(record, notif_conf, bucket) {
return {
meta: {
connect: notif_conf.topic[0],
name: notif_conf.id[0],
bucket: bucket.name,
},
notif: {
Records: [record],
Expand Down

0 comments on commit 7167ef2

Please sign in to comment.