From 77f6a9bf070af25bd9c491227707e9f0fa65044e Mon Sep 17 00:00:00 2001 From: Francois Ferrand Date: Fri, 13 Dec 2024 19:27:46 +0100 Subject: [PATCH 1/2] Ensure lifecycle tasks wait for messages to be pushed Issue: BB-641 --- extensions/lifecycle/tasks/LifecycleTask.js | 25 +++-- extensions/lifecycle/tasks/LifecycleTaskV2.js | 102 ++++++++++-------- 2 files changed, 74 insertions(+), 53 deletions(-) diff --git a/extensions/lifecycle/tasks/LifecycleTask.js b/extensions/lifecycle/tasks/LifecycleTask.js index 4b7e1af4cf..fd05b4ca56 100644 --- a/extensions/lifecycle/tasks/LifecycleTask.js +++ b/extensions/lifecycle/tasks/LifecycleTask.js @@ -400,6 +400,8 @@ class LifecycleTask extends BackbeatTask { }); } + const promises = []; + // sending bucket entry - only once - for checking next listing if (data.IsTruncated && allVersions.length > 0 && nbRetries === 0) { // Uses last version whether Version or DeleteMarker @@ -414,31 +416,30 @@ class LifecycleTask extends BackbeatTask { prevDate: last.LastModified, }, }); - this._sendBucketEntry(entry, err => { + promises.push(new Promise(resolve => this._sendBucketEntry(entry, err => { if (!err) { log.debug('sent kafka entry for bucket ' + 'consumption', { method: 'LifecycleTask._getObjectVersions', }); } - }); + resolve(); // safe to ignore the error, we will retry lifecycle eventually + }))); } - // if no versions to process, skip further processing for this - // batch + // if no versions to process, skip further processing for this batch if (allVersionsWithStaleDate.length === 0) { - return done(null); + return Promise.allSettled(promises).then(() => done(), done); } // for each version, get their relative rules, compare with // bucket rules, match with `staleDate` to // NoncurrentVersionExpiration Days and send expiration if // rules all apply - return this._compareRulesToList(bucketData, bucketLCRules, - allVersionsWithStaleDate, log, versioningStatus, - err => { + promises.push(new Promise((resolve, reject) => this._compareRulesToList(bucketData, + bucketLCRules, allVersionsWithStaleDate, log, versioningStatus, err => { if (err) { - return done(err); + return reject(err); } if (!data.IsTruncated) { @@ -453,8 +454,10 @@ class LifecycleTask extends BackbeatTask { ); } - return done(); - }); + return resolve(); + }))); + + return Promise.allSettled(promises).then(() => done(), done); }); } diff --git a/extensions/lifecycle/tasks/LifecycleTaskV2.js b/extensions/lifecycle/tasks/LifecycleTaskV2.js index 6cd2034318..83038fc9bb 100644 --- a/extensions/lifecycle/tasks/LifecycleTaskV2.js +++ b/extensions/lifecycle/tasks/LifecycleTaskV2.js @@ -1,6 +1,7 @@ 'use strict'; // eslint-disable-line const async = require('async'); +const util = require('util'); const { errors } = require('arsenal'); const LifecycleTask = require('./LifecycleTask'); @@ -39,33 +40,33 @@ class LifecycleTaskV2 extends LifecycleTask { * @param {array} remainings - array of { prefix, listType, beforeDate } * @param {object} bucketData - bucket data * @param {Logger.newRequestLogger} log - logger object + * @param {function} done - callback(error) * @return {undefined} */ - _handleRemainingListings(remainings, bucketData, log) { - if (remainings && remainings.length) { - remainings.forEach(l => { - const { - prefix, - listType, - beforeDate, - storageClass, - } = l; - - const entry = Object.assign({}, bucketData, { - contextInfo: { requestId: log.getSerializedUids() }, - details: { beforeDate, prefix, listType, storageClass }, - }); + _handleRemainingListings(remainings, bucketData, log, done) { + async.forEach(remainings || [], (l, cb) => { + const { + prefix, + listType, + beforeDate, + storageClass, + } = l; + + const entry = Object.assign({}, bucketData, { + contextInfo: { requestId: log.getSerializedUids() }, + details: { beforeDate, prefix, listType, storageClass }, + }); - this._sendBucketEntry(entry, err => { - if (!err) { - log.debug( - 'sent kafka entry for bucket consumption', { - method: 'LifecycleTaskV2._getVersionList', - }); - } - }); + this._sendBucketEntry(entry, err => { + if (!err) { + log.debug( + 'sent kafka entry for bucket consumption', { + method: 'LifecycleTaskV2._getVersionList', + }); + } + cb(); }); - } + }, done); } /** @@ -101,15 +102,19 @@ class LifecycleTaskV2 extends LifecycleTask { return process.nextTick(done); } + const promises = []; + // re-queue remaining listings only once if (nbRetries === 0) { - this._handleRemainingListings(remainings, bucketData, log); + promises.push(util.promisify(this._handleRemainingListings).bind(this)( + remainings, bucketData, log, + )); } return this.backbeatMetadataProxy.listLifecycle(listType, params, log, (err, contents, isTruncated, markerInfo) => { if (err) { - return done(err); + return Promise.allSettled(promises).then(() => done(err), () => done(err)); } // re-queue truncated listing only once. @@ -125,17 +130,22 @@ class LifecycleTaskV2 extends LifecycleTask { }, }); - this._sendBucketEntry(entry, err => { + promises.push(new Promise(resolve => this._sendBucketEntry(entry, err => { if (!err) { log.debug( 'sent kafka entry for bucket consumption', { - method: 'LifecycleTaskV2._getObjectList', - }); + method: 'LifecycleTaskV2._getObjectList', + }); } - }); + resolve(); // safe to ignore the error, we will retry lifecycle eventually + }))); } - return this._compareRulesToList(bucketData, bucketLCRules, - contents, log, done); + + promises.push(util.promisify(this._compareRulesToList).bind(this)( + bucketData, bucketLCRules, contents, log, + )); + + return Promise.allSettled(promises).then(() => done(), done); }); } @@ -173,15 +183,19 @@ class LifecycleTaskV2 extends LifecycleTask { return process.nextTick(done); } + const promises = []; + // re-queue remaining listings only once if (nbRetries === 0) { - this._handleRemainingListings(remainings, bucketData, log); + promises.push(util.promisify(this._handleRemainingListings).bind(this)( + remainings, bucketData, log, + )); } return this.backbeatMetadataProxy.listLifecycle(listType, params, log, (err, contents, isTruncated, markerInfo) => { if (err) { - return done(err); + return Promise.allSettled(promises).then(() => done(err), () => done(err)); } // create Set of unique keys not matching the next marker to @@ -209,19 +223,21 @@ class LifecycleTaskV2 extends LifecycleTask { }, }); - this._sendBucketEntry(entry, err => { + promises.push(new Promise(resolve => this._sendBucketEntry(entry, err => { if (!err) { log.debug( 'sent kafka entry for bucket consumption', { - method: 'LifecycleTaskV2._getObjectList', - }); + method: 'LifecycleTaskV2._getObjectVersions', + }); } - }); + resolve(); // safe to ignore the error, we will retry lifecycle eventually + }))); } - return this._compareRulesToList(bucketData, bucketLCRules, - contents, log, err => { + + promises.push(new Promise((resolve, reject) => this._compareRulesToList( + bucketData, bucketLCRules, contents, log, err => { if (err) { - return done(err); + return reject(err); } if (!isTruncated) { @@ -236,8 +252,10 @@ class LifecycleTaskV2 extends LifecycleTask { ); } - return done(); - }); + return resolve(); + }))); + + return Promise.allSettled(promises).then(() => done(), done); }); } From 0c257d77df94b2d436de171902d311598be6f2d5 Mon Sep 17 00:00:00 2001 From: Francois Ferrand Date: Fri, 27 Dec 2024 16:23:08 +0100 Subject: [PATCH 2/2] Make allSettled usage safer allSettled does not follow the usuage fullfil pattern: it will never reject, and always fullfil with an array of the results of each promises. This is not an issue in the case of lifecycle, where we actually ignore all errors; but it makes the code look inconsistent, as it suggests errors are possible but not handle them. To avoid future issues, add proper processing of the results of allSettled to build a single error when appropriate. Issue: BB-641 --- extensions/lifecycle/tasks/LifecycleTask.js | 6 ++++-- extensions/lifecycle/tasks/LifecycleTaskV2.js | 12 ++++++++---- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/extensions/lifecycle/tasks/LifecycleTask.js b/extensions/lifecycle/tasks/LifecycleTask.js index fd05b4ca56..eeb506c50e 100644 --- a/extensions/lifecycle/tasks/LifecycleTask.js +++ b/extensions/lifecycle/tasks/LifecycleTask.js @@ -429,7 +429,7 @@ class LifecycleTask extends BackbeatTask { // if no versions to process, skip further processing for this batch if (allVersionsWithStaleDate.length === 0) { - return Promise.allSettled(promises).then(() => done(), done); + return Promise.allSettled(promises).then(() => done()); } // for each version, get their relative rules, compare with @@ -457,7 +457,9 @@ class LifecycleTask extends BackbeatTask { return resolve(); }))); - return Promise.allSettled(promises).then(() => done(), done); + return Promise.allSettled(promises).then(results => done( + results.find(r => r.status === 'rejected')?.reason + )); }); } diff --git a/extensions/lifecycle/tasks/LifecycleTaskV2.js b/extensions/lifecycle/tasks/LifecycleTaskV2.js index 83038fc9bb..e41e6cb1b4 100644 --- a/extensions/lifecycle/tasks/LifecycleTaskV2.js +++ b/extensions/lifecycle/tasks/LifecycleTaskV2.js @@ -114,7 +114,7 @@ class LifecycleTaskV2 extends LifecycleTask { return this.backbeatMetadataProxy.listLifecycle(listType, params, log, (err, contents, isTruncated, markerInfo) => { if (err) { - return Promise.allSettled(promises).then(() => done(err), () => done(err)); + return Promise.allSettled(promises).then(() => done(err)); } // re-queue truncated listing only once. @@ -145,7 +145,9 @@ class LifecycleTaskV2 extends LifecycleTask { bucketData, bucketLCRules, contents, log, )); - return Promise.allSettled(promises).then(() => done(), done); + return Promise.allSettled(promises).then(results => done( + results.find(r => r.status === 'rejected')?.reason + )); }); } @@ -195,7 +197,7 @@ class LifecycleTaskV2 extends LifecycleTask { return this.backbeatMetadataProxy.listLifecycle(listType, params, log, (err, contents, isTruncated, markerInfo) => { if (err) { - return Promise.allSettled(promises).then(() => done(err), () => done(err)); + return Promise.allSettled(promises).then(() => done(err)); } // create Set of unique keys not matching the next marker to @@ -255,7 +257,9 @@ class LifecycleTaskV2 extends LifecycleTask { return resolve(); }))); - return Promise.allSettled(promises).then(() => done(), done); + return Promise.allSettled(promises).then(results => done( + results.find(r => r.status === 'rejected')?.reason + )); }); }