Skip to content

Commit d096dcc

Browse files
authored
Merge pull request #976 from lsst/tickets/DM-52880
Tickets/dm 52880
2 parents ec484f1 + 9c7f004 commit d096dcc

File tree

109 files changed

+1022
-855
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

109 files changed

+1022
-855
lines changed

src/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,3 +87,4 @@ add_subdirectory(wpublish)
8787
add_subdirectory(wsched)
8888
add_subdirectory(www)
8989

90+

src/cconfig/CzarConfig.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ http::AuthContext CzarConfig::httpAuthContext() const {
143143
_replicationAdminAuthKey->getVal());
144144
}
145145

146-
void CzarConfig::setId(qmeta::CzarId id) {
146+
void CzarConfig::setId(CzarId id) {
147147
_czarId = id;
148148
// Update the relevant section of the JSON-ified configuration.
149149
_jsonConfig["actual"]["identity"]["id"] = std::to_string(_czarId);

src/cconfig/CzarConfig.h

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@
3434
#include <nlohmann/json.hpp>
3535

3636
// Qserv headers
37+
#include "global/intTypes.h"
3738
#include "mysql/MySqlConfig.h"
38-
#include "qmeta/types.h"
3939
#include "util/ConfigStore.h"
4040
#include "util/ConfigValMap.h"
4141

@@ -177,6 +177,9 @@ class CzarConfig {
177177
/// the method then the monitoring will be disabled.
178178
unsigned int czarStatsUpdateIvalSec() const { return _czarStatsUpdateIvalSec->getVal(); }
179179

180+
/// Maximum number of attempts to run a given job before aborting the entire user query.
181+
unsigned int jobMaxAttempts() const { return _jobMaxAttempts->getVal(); }
182+
180183
/// A worker is considered fully ALIVE if the last update from the worker has been
181184
/// heard in less than _activeWorkerTimeoutAliveSecs seconds.
182185
int getActiveWorkerTimeoutAliveSecs() const { return _activeWorkerTimeoutAliveSecs->getVal(); }
@@ -230,14 +233,14 @@ class CzarConfig {
230233
std::string const& name() const { return _czarName; }
231234

232235
/// @return The unique identifier of Czar.
233-
qmeta::CzarId id() const { return _czarId; }
236+
CzarId id() const { return _czarId; }
234237

235238
/// Set a unique identifier of Czar.
236239
/// @note In the current implementation of Qserv a value of the identifier is not
237240
/// available at a time when the configuration is initialized. The identifier is generated
238241
/// when registering Czar by name in a special table of teh Qserv database.
239242
/// This logic should be fixed in some future version of Qserv.
240-
void setId(qmeta::CzarId id);
243+
void setId(CzarId id);
241244

242245
/// @return The interval in seconds for cleaning up the in-progress queries in QMeta.
243246
unsigned int getInProgressCleanupIvalSec() const { return _inProgressCleanupIvalSec->getVal(); }
@@ -266,8 +269,7 @@ class CzarConfig {
266269
/// The unique identifier of the Czar instance, the real vale cannot be
267270
/// acquired until later. Using a crazy initial value in hopes of highlighting
268271
/// issues.
269-
/// TODO: Is this really the right place for this? (previously undefined)
270-
qmeta::CzarId _czarId = std::numeric_limits<qmeta::CzarId>::max();
272+
CzarId _czarId = std::numeric_limits<CzarId>::max();
271273

272274
nlohmann::json _jsonConfig; ///< JSON-ified configuration
273275

@@ -370,7 +372,6 @@ class CzarConfig {
370372
// UberJobs
371373
CVTIntPtr _uberJobMaxChunks =
372374
util::ConfigValTInt::create(_configValMap, "uberjob", "maxChunks", notReq, 10000);
373-
374375
CVTIntPtr _qMetaSecsBetweenChunkCompletionUpdates = util::ConfigValTInt::create(
375376
_configValMap, "tuning", "qMetaSecsBetweenChunkCompletionUpdates", notReq, 60);
376377
CVTIntPtr _interactiveChunkLimit =
@@ -383,6 +384,8 @@ class CzarConfig {
383384
util::ConfigValTBool::create(_configValMap, "tuning", "notifyWorkersOnCzarRestart", notReq, 1);
384385
CVTIntPtr _czarStatsUpdateIvalSec =
385386
util::ConfigValTInt::create(_configValMap, "tuning", "czarStatsUpdateIvalSec", notReq, 1);
387+
CVTUIntPtr _jobMaxAttempts =
388+
util::ConfigValTUInt::create(_configValMap, "tuning", "jobMaxAttempts", notReq, 5);
386389

387390
// Replicator
388391
CVTStrPtr _replicationInstanceId =

src/ccontrol/MergingHandler.cc

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,17 @@ qdisp::MergeEndStatus MergingHandler::_mergeHttp(qdisp::UberJob::Ptr const& uber
234234
}
235235
};
236236
csvMemDisk->transferDataFromWorker(transferFunc);
237+
if (csvMemDisk->isCancelled()) {
238+
// Since csvMemDisk was cancelled, avoid merging to avoid risks of contamination.
239+
LOGS(_log, LOG_LVL_DEBUG, __func__ << " csvMemDisk cancelled");
240+
return qdisp::MergeEndStatus(false);
241+
}
242+
243+
bool mergeOk = _startMerge();
244+
if (!mergeOk) {
245+
LOGS(_log, LOG_LVL_DEBUG, __func__ << " merge cancelled");
246+
return qdisp::MergeEndStatus(false);
247+
}
237248

238249
// Attempt the actual merge.
239250
bool fileMergeSuccess = _infileMerger->mergeHttp(uberJob, fileSize, csvMemDisk);
@@ -264,11 +275,31 @@ qdisp::MergeEndStatus MergingHandler::_mergeHttp(qdisp::UberJob::Ptr const& uber
264275
return mergeEStatus;
265276
}
266277

267-
void MergingHandler::cancelFileMerge() {
268-
auto csvStrm = _csvMemDisk.lock();
269-
if (csvStrm != nullptr) {
270-
csvStrm->cancel();
278+
bool MergingHandler::cancelFileMerge() {
279+
lock_guard mergeStateLock(_mergeStateMtx);
280+
if (_mergeState == PREMERGE || _mergeState == CANCELLED) {
281+
_mergeState = CANCELLED;
282+
auto csvStrm = _csvMemDisk.lock();
283+
if (csvStrm != nullptr) {
284+
csvStrm->cancel();
285+
}
286+
// Merging to the result table hasn't been started, so
287+
// this can be cancelled.
288+
return true;
289+
}
290+
// Cancelling at this point would probably corrupt the result table.
291+
return false;
292+
}
293+
294+
bool MergingHandler::_startMerge() {
295+
lock_guard mergeStateLock(_mergeStateMtx);
296+
if (_mergeState == PREMERGE) {
297+
_mergeState = MERGING;
298+
// Merging hasn't been cancelled, so it's ok to start.
299+
return true;
271300
}
301+
// Merge was cancelled.
302+
return false;
272303
}
273304

274305
void MergingHandler::_setError(int code, std::string const& msg, int errorState) {

src/ccontrol/MergingHandler.h

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,9 @@ namespace lsst::qserv::ccontrol {
6060
class MergingHandler : public qdisp::ResponseHandler {
6161
public:
6262
typedef std::shared_ptr<MergingHandler> Ptr;
63+
64+
enum MergeState { PREMERGE, MERGING, CANCELLED };
65+
6366
virtual ~MergingHandler();
6467

6568
/// @param merger downstream merge acceptor
@@ -78,7 +81,8 @@ class MergingHandler : public qdisp::ResponseHandler {
7881
void errorFlush(std::string const& msg, int code) override;
7982

8083
/// Stop an ongoing file merge, if possible.
81-
void cancelFileMerge() override;
84+
/// @return true if the merge was cancelled.
85+
bool cancelFileMerge() override;
8286

8387
/// Print a string representation of the receiver to an ostream
8488
std::ostream& print(std::ostream& os) const override;
@@ -91,6 +95,11 @@ class MergingHandler : public qdisp::ResponseHandler {
9195
/// Set error code and string.
9296
void _setError(int code, std::string const& msg, int errorState);
9397

98+
/// Return true if merging should be started and set _mergeState to MERGING.
99+
/// This should only be called once after the file has been collected and
100+
/// before merging with the result table starts.
101+
bool _startMerge();
102+
94103
// All instances of the HTTP client class are members of the same pool. This allows
95104
// connection reuse and a significant reduction of the kernel memory pressure.
96105
// Note that the pool gets instantiated at the very first call to method _getHttpConnPool()
@@ -106,6 +115,11 @@ class MergingHandler : public qdisp::ResponseHandler {
106115

107116
std::weak_ptr<qdisp::Executive> _executive; ///< Weak pointer to the executive for errors.
108117
std::weak_ptr<mysql::CsvMemDisk> _csvMemDisk; ///< Weak pointer to cancel infile merge.
118+
119+
/// Indicates merge state of the result table relating to the UberJob associated with
120+
/// instance of MergingHandler.
121+
MergeState _mergeState = PREMERGE;
122+
std::mutex _mergeStateMtx; ///< Protectes _mergeState
109123
};
110124

111125
} // namespace lsst::qserv::ccontrol

src/ccontrol/UserQuery.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
// Qserv headers
4040
#include "ccontrol/QueryState.h"
4141
#include "global/intTypes.h"
42-
#include "qmeta/types.h"
4342

4443
// Forward decl
4544
namespace lsst::qserv::qmeta {

src/ccontrol/UserQueryAsyncResult.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ LOG_LOGGER _log = LOG_GET("lsst.qserv.ccontrol.UserQueryAsyncResult");
4747
namespace lsst::qserv::ccontrol {
4848

4949
// Constructors
50-
UserQueryAsyncResult::UserQueryAsyncResult(QueryId queryId, qmeta::CzarId czarId,
50+
UserQueryAsyncResult::UserQueryAsyncResult(QueryId queryId, CzarId czarId,
5151
std::shared_ptr<qmeta::QMeta> const& qMeta)
5252
: UserQuery(),
5353
_queryId(queryId),

src/ccontrol/UserQueryAsyncResult.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
// Qserv headers
3030
#include "ccontrol/UserQuery.h"
3131
#include "qmeta/QInfo.h"
32-
#include "qmeta/types.h"
3332

3433
namespace lsst::qserv::qmeta {
3534
class MessageStore;
@@ -58,7 +57,7 @@ class UserQueryAsyncResult : public UserQuery {
5857
* @param czarId ID for current czar
5958
* @param qMeta QMeta instance
6059
*/
61-
UserQueryAsyncResult(QueryId queryId, qmeta::CzarId czarId, std::shared_ptr<qmeta::QMeta> const& qMeta);
60+
UserQueryAsyncResult(QueryId queryId, CzarId czarId, std::shared_ptr<qmeta::QMeta> const& qMeta);
6261

6362
// Destructor
6463
~UserQueryAsyncResult();
@@ -103,7 +102,7 @@ class UserQueryAsyncResult : public UserQuery {
103102
protected:
104103
private:
105104
QueryId _queryId;
106-
qmeta::CzarId _czarId;
105+
CzarId _czarId;
107106
std::shared_ptr<qmeta::QMeta> _qMeta;
108107
qmeta::QInfo _qInfo;
109108
std::shared_ptr<qmeta::MessageStore> _messageStore;

src/ccontrol/UserQueryFactory.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ UserQueryFactory::UserQueryFactory(qproc::DatabaseModels::Ptr const& dbModels, s
234234
_userQuerySharedResources->queryMetadata->cleanupQueriesAtStart(_userQuerySharedResources->czarId);
235235

236236
// Add logging context with czar ID
237-
qmeta::CzarId czarId = _userQuerySharedResources->czarId;
237+
auto const czarId = _userQuerySharedResources->czarId;
238238
LOG_MDC_INIT([czarId]() { LOG_MDC("CZID", std::to_string(czarId)); });
239239

240240
// BOOST ASIO service is started to process asynchronous timer requests

src/ccontrol/UserQueryInvalid.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
// Qserv headers
3434
#include "ccontrol/UserQuery.h"
3535
#include "qmeta/MessageStore.h"
36-
#include "qmeta/types.h"
3736

3837
// Forward decl
3938

0 commit comments

Comments
 (0)