Skip to content

Commit 72daed7

Browse files
jgates108fritzm
authored andcommitted
Merge pull request #978 from lsst/tickets/DM-53242
tickets/DM-53242
2 parents b3ce821 + 8497d45 commit 72daed7

35 files changed

+1037
-495
lines changed

src/ccontrol/CMakeLists.txt

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ target_link_libraries(ccontrol PUBLIC
2626
boost_regex
2727
cconfig
2828
css
29+
global
2930
log
3031
parser
3132
sphgeom
@@ -35,9 +36,6 @@ install(
3536
TARGETS ccontrol
3637
)
3738

38-
install(
39-
TARGETS ccontrol
40-
)
4139

4240
FUNCTION(ccontrol_tests)
4341
FOREACH(TEST IN ITEMS ${ARGV})
@@ -46,6 +44,7 @@ FUNCTION(ccontrol_tests)
4644
cconfig
4745
ccontrol
4846
czar
47+
global
4948
parser
5049
qana
5150
qdisp

src/czar/ActiveWorker.cc

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -281,14 +281,13 @@ ActiveWorkerMap::ActiveWorkerMap(std::shared_ptr<cconfig::CzarConfig> const& cza
281281

282282
void ActiveWorkerMap::updateMap(protojson::WorkerContactInfo::WCMap const& wcMap,
283283
protojson::CzarContactInfo::Ptr const& czInfo,
284-
std::string const& replicationInstanceId,
285-
std::string const& replicationAuthKey) {
284+
protojson::AuthContext const& authContext_) {
286285
// Go through wcMap, update existing entries in _awMap, create new entries for those that don't exist,
287286
lock_guard<mutex> awLg(_awMapMtx);
288287
for (auto const& [wcKey, wcVal] : wcMap) {
289288
auto iter = _awMap.find(wcKey);
290289
if (iter == _awMap.end()) {
291-
auto newAW = ActiveWorker::create(wcVal, czInfo, replicationInstanceId, replicationAuthKey);
290+
auto newAW = ActiveWorker::create(wcVal, czInfo, authContext_);
292291
LOGS(_log, LOG_LVL_INFO,
293292
cName(__func__) << " ActiveWorker created for " << wcKey << " " << newAW->dump());
294293
_awMap[wcKey] = newAW;

src/czar/ActiveWorker.h

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -93,9 +93,9 @@ class ActiveWorker : public std::enable_shared_from_this<ActiveWorker> {
9393
static std::string getStateStr(State st);
9494

9595
static Ptr create(protojson::WorkerContactInfo::Ptr const& wInfo,
96-
protojson::CzarContactInfo::Ptr const& czInfo, std::string const& replicationInstanceId,
97-
std::string const& replicationAuthKey) {
98-
return Ptr(new ActiveWorker(wInfo, czInfo, replicationInstanceId, replicationAuthKey));
96+
protojson::CzarContactInfo::Ptr const& czInfo,
97+
protojson::AuthContext const& authContext_) {
98+
return Ptr(new ActiveWorker(wInfo, czInfo, authContext_));
9999
}
100100

101101
/// This function should only be called before the _monitor thread is started
@@ -152,10 +152,8 @@ class ActiveWorker : public std::enable_shared_from_this<ActiveWorker> {
152152

153153
private:
154154
ActiveWorker(protojson::WorkerContactInfo::Ptr const& wInfo,
155-
protojson::CzarContactInfo::Ptr const& czInfo, std::string const& replicationInstanceId,
156-
std::string const& replicationAuthKey)
157-
: _wqsData(protojson::WorkerQueryStatusData::create(wInfo, czInfo, replicationInstanceId,
158-
replicationAuthKey)) {
155+
protojson::CzarContactInfo::Ptr const& czInfo, protojson::AuthContext const& authContext_)
156+
: _wqsData(protojson::WorkerQueryStatusData::create(wInfo, czInfo, authContext_)) {
159157
if (_wqsData == nullptr) {
160158
throw util::Bug(ERR_LOC, "ActiveWorker _wqsData null");
161159
}
@@ -206,8 +204,7 @@ class ActiveWorkerMap {
206204
/// Use information gathered from the registry to update the map. The registry
207205
/// contains last contact time (used for determining aliveness) and worker contact information.
208206
void updateMap(protojson::WorkerContactInfo::WCMap const& wcMap,
209-
protojson::CzarContactInfo::Ptr const& czInfo, std::string const& replicationInstanceId,
210-
std::string const& replicationAuthKey);
207+
protojson::CzarContactInfo::Ptr const& czInfo, protojson::AuthContext const& authContext_);
211208

212209
/// If this is to be called, it must be called before Czar::_monitor is started:
213210
/// It tells the workers all queries from `czId` with QueryIds less than `lastQId`

src/czar/Czar.cc

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@
5454
#include "http/MetaModule.h"
5555
#include "http/Method.h"
5656
#include "mysql/CsvMemDisk.h"
57+
#include "protojson/UberJobErrorMsg.h"
58+
#include "protojson/UberJobReadyMsg.h"
5759
#include "qdisp/CzarStats.h"
5860
#include "qdisp/Executive.h"
5961
#include "qproc/DatabaseModels.h"
@@ -697,4 +699,55 @@ void Czar::killIncompleteUbjerJobsOn(std::string const& restartedWorkerId) {
697699
}
698700
}
699701

702+
nlohmann::json Czar::handleUberJobReadyMsg(std::shared_ptr<protojson::UberJobReadyMsg> const& jrMsg,
703+
string const& note, bool const retry) {
704+
auto queryId = jrMsg->queryId;
705+
auto czarId = jrMsg->czarId;
706+
auto uberJobId = jrMsg->uberJobId;
707+
708+
qdisp::Executive::Ptr exec = czar::Czar::getCzar()->getExecutiveFromMap(queryId);
709+
if (exec == nullptr) {
710+
LOGS(_log, LOG_LVL_WARN,
711+
note << " null exec QID:" << queryId << " ujId=" << uberJobId << " cz=" << czarId);
712+
throw invalid_argument(string("HttpCzarWorkerModule::_handleJobReady No executive for qid=") +
713+
to_string(queryId) + " czar=" + to_string(czarId));
714+
}
715+
716+
qdisp::UberJob::Ptr uj = exec->findUberJob(uberJobId);
717+
if (uj == nullptr) {
718+
LOGS(_log, LOG_LVL_WARN,
719+
note << " null uj QID:" << queryId << " ujId=" << uberJobId << " cz=" << czarId);
720+
throw invalid_argument(string("HttpCzarWorkerModule::_handleJobReady No UberJob for qid=") +
721+
to_string(queryId) + " ujId=" + to_string(uberJobId) +
722+
" czar=" + to_string(czarId));
723+
}
724+
uj->setResultFileSize(jrMsg->fileUrlInfo.fileSize);
725+
exec->checkResultFileSize(jrMsg->fileUrlInfo.fileSize);
726+
727+
auto importRes = uj->importResultFile(jrMsg->fileUrlInfo, retry);
728+
return importRes;
729+
}
730+
731+
nlohmann::json Czar::handleUberJobErrorMsg(std::shared_ptr<protojson::UberJobErrorMsg> const& jrMsg,
732+
string const& note) {
733+
auto queryId = jrMsg->queryId;
734+
auto czarId = jrMsg->czarId;
735+
auto uberJobId = jrMsg->uberJobId;
736+
737+
// Find UberJob
738+
qdisp::Executive::Ptr exec = czar::Czar::getCzar()->getExecutiveFromMap(queryId);
739+
if (exec == nullptr) {
740+
throw invalid_argument(note + " No executive for qid=" + to_string(queryId) +
741+
" czar=" + to_string(czarId));
742+
}
743+
qdisp::UberJob::Ptr uj = exec->findUberJob(uberJobId);
744+
if (uj == nullptr) {
745+
throw invalid_argument(note + " No UberJob for qid=" + to_string(queryId) +
746+
" ujId=" + to_string(uberJobId) + " czar=" + to_string(czarId));
747+
}
748+
749+
auto importRes = uj->workerError(jrMsg->errorCode, jrMsg->errorMsg);
750+
return importRes;
751+
}
752+
700753
} // namespace lsst::qserv::czar

src/czar/Czar.h

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,12 @@ namespace lsst::qserv::http {
5959
class ClientConnPool;
6060
} // namespace lsst::qserv::http
6161

62+
namespace lsst::qserv::protojson {
63+
class UberJobErrorMsg;
64+
class UberJobReadyMsg;
65+
class UberJobStatusMsg;
66+
} // namespace lsst::qserv::protojson
67+
6268
namespace lsst::qserv::util {
6369
class FileMonitor;
6470
} // namespace lsst::qserv::util
@@ -162,6 +168,18 @@ class Czar {
162168

163169
std::string const& getFqdn() const { return _fqdn; }
164170

171+
/// Starts the process of collecting a result file from the worker.
172+
/// @throws std::invalid_argument
173+
/// @param retry - true indicates this is a retry of a failed communication and
174+
/// should not kill the associated UberJob due to an unexpected state.
175+
nlohmann::json handleUberJobReadyMsg(std::shared_ptr<protojson::UberJobReadyMsg> const& jrMsg,
176+
std::string const& note, bool const retry = false);
177+
178+
/// Handle an UberJob processing error from the worker.
179+
/// @throws std::invalid_argument
180+
nlohmann::json handleUberJobErrorMsg(std::shared_ptr<protojson::UberJobErrorMsg> const& jrMsg,
181+
std::string const& note);
182+
165183
/// Startup time of czar, sent to workers so they can detect that the czar was
166184
/// was restarted when this value changes.
167185
static uint64_t const czarStartupTime;

src/czar/CzarRegistry.cc

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -110,8 +110,8 @@ void CzarRegistry::_registryUpdateLoop() {
110110

111111
void CzarRegistry::_registryWorkerInfoLoop() {
112112
// Get worker information from the registry
113-
string const replicationInstanceId = _czarConfig->replicationInstanceId();
114-
string const replicationAuthKey = _czarConfig->replicationAuthKey();
113+
protojson::AuthContext const authContext(_czarConfig->replicationInstanceId(),
114+
_czarConfig->replicationAuthKey());
115115
uint64_t const czarStartTime = Czar::czarStartupTime;
116116
string const fqdn = util::get_current_host_fqdn();
117117

@@ -141,8 +141,7 @@ void CzarRegistry::_registryWorkerInfoLoop() {
141141
if (wMap != nullptr) {
142142
_contactMap = wMap;
143143
_latestMapUpdate = CLOCK::now();
144-
_activeWorkerMap->updateMap(*_contactMap, czInfo, replicationInstanceId,
145-
replicationAuthKey);
144+
_activeWorkerMap->updateMap(*_contactMap, czInfo, authContext);
146145
}
147146
}
148147
}

src/czar/HttpCzarWorkerModule.cc

Lines changed: 32 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -102,34 +102,15 @@ json HttpCzarWorkerModule::_workerCzarComIssue() {
102102
}
103103

104104
json HttpCzarWorkerModule::_handleJobError(string const& func) {
105-
LOGS(_log, LOG_LVL_DEBUG, "HttpCzarWorkerModule::_handleJobError start");
105+
string const fName("HttpCzarWorkerModule::_handleJobError");
106+
LOGS(_log, LOG_LVL_DEBUG, fName << " start");
106107
// Metadata-only responses for the file-based protocol should not have any data
107108

108109
// Parse and verify the json message and then kill the UberJob.
109110
try {
110-
string const& repliInstanceId = cconfig::CzarConfig::instance()->replicationInstanceId();
111-
string const& repliAuthKey = cconfig::CzarConfig::instance()->replicationAuthKey();
112111
auto const& jsReq = body().objJson;
113-
auto jrMsg = protojson::UberJobErrorMsg::createFromJson(jsReq, repliInstanceId, repliAuthKey);
114-
115-
auto const queryId = jrMsg->getQueryId();
116-
auto const czarId = jrMsg->getCzarId();
117-
auto const uberJobId = jrMsg->getUberJobId();
118-
119-
// Find UberJob
120-
qdisp::Executive::Ptr exec = czar::Czar::getCzar()->getExecutiveFromMap(queryId);
121-
if (exec == nullptr) {
122-
throw invalid_argument(string("HttpCzarWorkerModule::_handleJobError No executive for qid=") +
123-
to_string(queryId) + " czar=" + to_string(czarId));
124-
}
125-
qdisp::UberJob::Ptr uj = exec->findUberJob(uberJobId);
126-
if (uj == nullptr) {
127-
throw invalid_argument(string("HttpCzarWorkerModule::_handleJobError No UberJob for qid=") +
128-
to_string(queryId) + " ujId=" + to_string(uberJobId) +
129-
" czar=" + to_string(czarId));
130-
}
131-
132-
auto importRes = uj->workerError(jrMsg->getErrorCode(), jrMsg->getErrorMsg());
112+
auto jrMsg = protojson::UberJobErrorMsg::createFromJson(jsReq);
113+
auto importRes = czar::Czar::getCzar()->handleUberJobErrorMsg(jrMsg, fName);
133114
return importRes;
134115
} catch (std::invalid_argument const& iaEx) {
135116
LOGS(_log, LOG_LVL_ERROR,
@@ -148,33 +129,7 @@ json HttpCzarWorkerModule::_handleJobReady(string const& func) {
148129
try {
149130
auto const& jsReq = body().objJson;
150131
auto jrMsg = protojson::UberJobReadyMsg::createFromJson(jsReq);
151-
152-
// Find UberJob
153-
auto queryId = jrMsg->getQueryId();
154-
auto czarId = jrMsg->getCzarId();
155-
auto uberJobId = jrMsg->getUberJobId();
156-
qdisp::Executive::Ptr exec = czar::Czar::getCzar()->getExecutiveFromMap(queryId);
157-
if (exec == nullptr) {
158-
LOGS(_log, LOG_LVL_WARN,
159-
fName << " null exec QID:" << queryId << " ujId=" << uberJobId << " cz=" << czarId);
160-
throw invalid_argument(string("HttpCzarWorkerModule::_handleJobReady No executive for qid=") +
161-
to_string(queryId) + " czar=" + to_string(czarId));
162-
}
163-
164-
qdisp::UberJob::Ptr uj = exec->findUberJob(uberJobId);
165-
if (uj == nullptr) {
166-
LOGS(_log, LOG_LVL_WARN,
167-
fName << " null uj QID:" << queryId << " ujId=" << uberJobId << " cz=" << czarId);
168-
throw invalid_argument(string("HttpCzarWorkerModule::_handleJobReady No UberJob for qid=") +
169-
to_string(queryId) + " ujId=" + to_string(uberJobId) +
170-
" czar=" + to_string(czarId));
171-
}
172-
173-
uj->setResultFileSize(jrMsg->getFileSize());
174-
exec->checkResultFileSize(jrMsg->getFileSize());
175-
176-
auto importRes =
177-
uj->importResultFile(jrMsg->getFileUrl(), jrMsg->getRowCount(), jrMsg->getFileSize());
132+
auto importRes = czar::Czar::getCzar()->handleUberJobReadyMsg(jrMsg, fName);
178133
return importRes;
179134
} catch (std::invalid_argument const& iaEx) {
180135
LOGS(_log, LOG_LVL_ERROR,
@@ -185,14 +140,14 @@ json HttpCzarWorkerModule::_handleJobReady(string const& func) {
185140
}
186141

187142
json HttpCzarWorkerModule::_handleWorkerCzarComIssue(string const& func) {
188-
LOGS(_log, LOG_LVL_DEBUG, "HttpCzarWorkerModule::_handleWorkerCzarComIssue start");
143+
string const fName("HttpCzarWorkerModule::_handleWorkerCzarComIssue");
144+
LOGS(_log, LOG_LVL_DEBUG, fName << " start");
189145
// Parse and verify the json message and then deal with the problems.
190146
try {
191-
string const replicationInstanceId = cconfig::CzarConfig::instance()->replicationInstanceId();
192-
string const replicationAuthKey = cconfig::CzarConfig::instance()->replicationAuthKey();
147+
protojson::AuthContext const authC(cconfig::CzarConfig::instance()->replicationInstanceId(),
148+
cconfig::CzarConfig::instance()->replicationAuthKey());
193149
auto const& jsReq = body().objJson;
194-
auto wccIssue = protojson::WorkerCzarComIssue::createFromJson(jsReq, replicationInstanceId,
195-
replicationAuthKey);
150+
auto wccIssue = protojson::WorkerCzarComIssue::createFromJson(jsReq, authC);
196151

197152
auto wId = wccIssue->getWorkerInfo()->wId;
198153
if (wccIssue->getThoughtCzarWasDead()) {
@@ -209,7 +164,29 @@ json HttpCzarWorkerModule::_handleWorkerCzarComIssue(string const& func) {
209164
execPtr->killIncompleteUberJobsOnWorker(wId);
210165
}
211166
}
167+
// The response here includes the QueryId and UberJobId of all
168+
// uberjobs in the original message. If the czar cannot handle
169+
// one now, it won't be able to handle it later, so there's no
170+
// point in the worker sending it again.
171+
// Under normal circumstances, the czar should be able to
172+
// find and handle all failed transmits. Anything it can't find should
173+
// show up in completed query IDs, or failed uberJobs, and failing that
174+
// it should be garbage collected.
212175
auto jsRet = wccIssue->responseToJson();
176+
auto failedTransmits = wccIssue->takeFailedTransmitsMap();
177+
for (auto& [key, elem] : *failedTransmits) {
178+
protojson::UberJobStatusMsg::Ptr& statusMsg = elem;
179+
auto rdyMsg = dynamic_pointer_cast<protojson::UberJobReadyMsg>(statusMsg);
180+
if (rdyMsg != nullptr) {
181+
bool const retry = true;
182+
// Put the file on a queue to be collected later.
183+
czar::Czar::getCzar()->handleUberJobReadyMsg(rdyMsg, fName, retry);
184+
} else {
185+
auto errMsg = dynamic_pointer_cast<protojson::UberJobErrorMsg>(statusMsg);
186+
// Kill the UberJob or user query depending on the error.
187+
czar::Czar::getCzar()->handleUberJobErrorMsg(errMsg, fName);
188+
}
189+
}
213190
LOGS(_log, LOG_LVL_TRACE, "HttpCzarWorkerModule::_handleWorkerCzarComIssue jsRet=" << jsRet.dump());
214191
return jsRet;
215192
} catch (std::invalid_argument const& iaEx) {

src/global/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ target_sources(global PRIVATE
66
ResourceUnit.cc
77
sqltoken.cc
88
stringUtil.cc
9+
UberJobBase.cc
910
)
1011

1112
install(

src/global/UberJobBase.cc

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* LSST Data Management System
3+
*
4+
* This product includes software developed by the
5+
* LSST Project (http://www.lsst.org/).
6+
*
7+
* This program is free software: you can redistribute it and/or modify
8+
* it under the terms of the GNU General Public License as published by
9+
* the Free Software Foundation, either version 3 of the License, or
10+
* (at your option) any later version.
11+
*
12+
* This program is distributed in the hope that it will be useful,
13+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
* MERCHANTABILITY
15+
* or FITNESS FOR A PARTICULAR PURPOSE. See the
16+
* GNU General Public License for more details.
17+
*
18+
* You should have received a copy of the LSST License Statement and
19+
* the GNU General Public License along with this program. If not,
20+
* see <http://www.lsstcorp.org/LegalNotices/>.
21+
*/
22+
23+
// Class header
24+
#include "global/UberJobBase.h"
25+
26+
// System headers
27+
#include <sstream>
28+
29+
// Third-party headers
30+
31+
// Qserv headers
32+
33+
// LSST headers
34+
35+
using namespace std;
36+
37+
namespace lsst::qserv {
38+
39+
std::ostream& UberJobBase::dump(std::ostream& os) const {
40+
os << _idStr;
41+
return os;
42+
}
43+
44+
std::string UberJobBase::dump() const {
45+
std::ostringstream os;
46+
dump(os);
47+
return os.str();
48+
}
49+
50+
std::ostream& operator<<(std::ostream& os, UberJobBase const& uj) { return uj.dump(os); }
51+
52+
} // namespace lsst::qserv

0 commit comments

Comments
 (0)