Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bucket notification - ignore failure if notification was removed #8709

Merged
merged 1 commit into from
Jan 23, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 49 additions & 29 deletions src/util/notifications_util.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ class Notificator {
this.connect_files_dir = connect_files_dir ?? DEFAULT_CONNECT_FILES_DIR;
this.nc_config_fs = nc_config_fs;
this.batch_size = batch_size || config.NOTIFICATION_BATCH || 10;

//requiring system_store takes about 2 seconds for the first time
//this time is unnecesarily added to manage_nsfs runtime, which fails nsfs tests on timeout
//so I'm trying to reduce places were system store is required to minimum
this.system_store = require('../server/system_services/system_store').get_instance();
this.system_utils = require('../server/utils/system_utils');
}

async run_batch() {
Expand All @@ -58,18 +64,12 @@ class Notificator {
}

_can_run() {
//requiring system_store takes about 2 seconds for the first time
//this time is unnecesarily added to manage_nsfs runtime
//so I'm trying to reduce places were system store is required to minimum
const system_store = require('../server/system_services/system_store').get_instance();
const system_utils = require('../server/utils/system_utils');

if (!system_store.is_finished_initial_load) {
if (!this.system_store.is_finished_initial_load) {
dbg.log0('system_store did not finish initial load');
return false;
}
const system = system_store.data.systems[0];
if (!system || system_utils.system_in_maintenance(system._id)) return false;
const system = this.system_store.data.systems[0];
if (!system || this.system_utils.system_in_maintenance(system._id)) return false;

return true;
}
Expand Down Expand Up @@ -115,10 +115,12 @@ class Notificator {
async _notify(fs_context, log_file, failure_append) {
const file = new LogFile(fs_context, log_file);
let send_promises = [];
let notif;
await file.collect_and_process(async str => {
try {
const notif = JSON.parse(str);
dbg.log2("notifying with notification =", notif);
notif = null;
dbg.log2("notifying with notification =", str);
notif = JSON.parse(str);
let connect = this.notif_to_connect.get(notif.meta.name);
if (!connect) {
connect = await this.parse_connect_file(notif.meta.connect, true);
Expand All @@ -127,25 +129,21 @@ class Notificator {
let connection = this.connect_str_to_connection.get(notif.meta.name);
if (!connection) {
connection = get_connection(connect);
try {
await connection.connect();
} catch (err) {
//failed to connect
dbg.error("Connection failed for", connect);
await failure_append(str);
return;
}
await connection.connect();
this.connect_str_to_connection.set(notif.meta.name, connection);
}
const send_promise = connection.promise_notify(notif, failure_append);
const send_promise = connection.promise_notify(notif, this.handle_failed_notification.bind(this), failure_append);
if (send_promise) send_promises.push(send_promise);
if (send_promises.length > this.batch_size) {
await Promise.all(send_promises);
send_promises = [];
}
} catch (err) {
dbg.error("Failed to notify. err = ", err, ", str =", str);
await failure_append(str);
//re-write the failed notification if it's still configured on the bucket
if (notif) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need of this check.
"notif" will not be null at this point. If it could be, then we should have checked it before line no 124.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

notif can be null in case json parsing fails (currently line 123).
If json parsing is ok, notif will not be null on 124 and forward.

this.handle_failed_notification(notif, failure_append);
}
}
});
//note we can't reject promises here, since Promise.all() is rejected on
Expand All @@ -157,6 +155,27 @@ class Notificator {
return true;
}

async handle_failed_notification(notif, failure_append) {
let bucket;
//re-write the failed notification in the persitent log, unless
//it is no longer configured on the bucket
if (this.nc_config_fs) {
bucket = await this.nc_config_fs.get_bucket_by_name(notif.meta.bucket);
} else {
const system = this.system_store.data.systems[0];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line number 71 and 72 suggest that system could be null. So better check it here also?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If system is null, then _can_run() will return false and bg worker will not run (and we will not get to handle_failed_notification() where I don't check is system is not null).

bucket = system.buckets_by_name && system.buckets_by_name[notif.meta.bucket];
}
if (bucket.notifications) {
for (const notif_conf of bucket.notifications) {
if (notif_conf.id[0] === notif.meta.name) {
//notification is still configured, rewrite it
await failure_append(JSON.stringify(notif));
break;
}
}
}
}

async parse_connect_file(connect_filename, decrypt = false) {
let connect;
if (this.nc_config_fs) {
Expand Down Expand Up @@ -189,7 +208,7 @@ class HttpNotificator {
this.agent = new this.protocol.Agent(this.connect_obj.agent_request_object);
}

promise_notify(notif, promise_failure_cb) {
promise_notify(notif, promise_failure_cb, failure_ctxt) {
return new Promise(resolve => {
const req = this.protocol.request({
agent: this.agent,
Expand All @@ -205,12 +224,12 @@ class HttpNotificator {
return;
}
dbg.error("Notify err =", err);
promise_failure_cb(JSON.stringify(notif)).then(resolve);
promise_failure_cb(notif, failure_ctxt).then(resolve);
});
req.on('timeout', () => {
dbg.error("Notify timeout");
req.destroy();
promise_failure_cb(JSON.stringify(notif)).then(resolve);
promise_failure_cb(notif, failure_ctxt).then(resolve);
});
req.write(JSON.stringify(notif.notif));
req.end();
Expand Down Expand Up @@ -251,7 +270,7 @@ class KafkaNotificator {
this.connection.setPollInterval(100);
}

promise_notify(notif, promise_failure_cb) {
promise_notify(notif, promise_failure_cb, failure_ctxt) {
const connect_obj = this.connect_obj;
return new Promise(resolve => {
this.connection.produce(
Expand All @@ -262,7 +281,7 @@ class KafkaNotificator {
Date.now(),
(err, offset) => {
if (err) {
promise_failure_cb(JSON.stringify(notif)).then(resolve);
promise_failure_cb(notif, failure_ctxt).then(resolve);
} else {
resolve();
}
Expand Down Expand Up @@ -409,7 +428,7 @@ function compose_notification_req(req, res, bucket, notif_conf) {
notif.s3.object.sequencer = res.seq;
}

return compose_meta(notif, notif_conf);
return compose_meta(notif, notif_conf, bucket);
}

function compose_notification_lifecycle(deleted_obj, notif_conf, bucket) {
Expand All @@ -423,15 +442,16 @@ function compose_notification_lifecycle(deleted_obj, notif_conf, bucket) {
notif.s3.object.eTag = deleted_obj.etag;
notif.s3.object.versionId = deleted_obj.version_id;

return compose_meta(notif, notif_conf);
return compose_meta(notif, notif_conf, bucket);

}

function compose_meta(record, notif_conf) {
function compose_meta(record, notif_conf, bucket) {
return {
meta: {
connect: notif_conf.topic[0],
name: notif_conf.id[0],
bucket: bucket.name,
},
notif: {
Records: [record],
Expand Down
Loading