Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
2aec76c
feat: add suspend and resume
zhansan114514 Oct 2, 2025
bc24c3c
Merge branch 'PKUHPC:master' into feat/suspend
zhansan114514 Oct 4, 2025
224d2c7
fix with ai comments
zhansan114514 Oct 5, 2025
a893d51
Merge branch 'PKUHPC:master' into feat/suspend
zhansan114514 Oct 9, 2025
f87a16b
fix format
zhansan114514 Oct 9, 2025
378dc1e
rename task-related messages and methods to use 'job' terminology
zhansan114514 Oct 12, 2025
fb9eb76
update task management to use job terminology and status handling
zhansan114514 Oct 12, 2025
6aea2cf
update error handling for task suspension and resumption
zhansan114514 Oct 13, 2025
37ed254
add config and container support
zhansan114514 Oct 14, 2025
f2bb40e
format
zhansan114514 Oct 14, 2025
5e7fd5c
handle error fixing
zhansan114514 Oct 15, 2025
9fac2d9
Merge branch 'PKUHPC:master' into feat/suspend
zhansan114514 Oct 16, 2025
fca04fd
format
zhansan114514 Oct 16, 2025
724924b
Merge branch 'PKUHPC:master' into feat/suspend
zhansan114514 Oct 16, 2025
3ba38e5
Merge branch 'PKUHPC:master' into feat/suspend
zhansan114514 Oct 21, 2025
04c7ed6
Refactor job management methods in Craned and Supervisor
zhansan114514 Oct 21, 2025
9ab002e
fix
zhansan114514 Oct 21, 2025
31b7f0f
fix
zhansan114514 Oct 21, 2025
260a480
Merge branch 'PKUHPC:master' into feat/suspend
zhansan114514 Oct 27, 2025
4ecc292
update error handling in Suspend and Resume tasks
zhansan114514 Oct 27, 2025
36f86d2
Merge feat/suspend into upstream/master
zhansan114514 Nov 4, 2025
76c9f1b
merge
zhansan114514 Nov 4, 2025
3a53c87
Merge branch 'master' of https://github.com/PKUHPC/CraneSched into fe…
zhansan114514 Nov 4, 2025
cb1f358
delete flag
zhansan114514 Nov 4, 2025
f0f44df
remove old way and try new api
zhansan114514 Nov 5, 2025
c9019ac
fix
zhansan114514 Nov 5, 2025
b9cd9de
fix
zhansan114514 Nov 5, 2025
fa2180c
find in map but not at
zhansan114514 Nov 5, 2025
26ea975
fix wrong error code
zhansan114514 Nov 5, 2025
0f3ca65
skip daemon step
zhansan114514 Nov 5, 2025
6ffd32e
Merge branch 'master' into feat/suspend
zhansan114514 Nov 15, 2025
29201ec
fix error
zhansan114514 Nov 15, 2025
5f32f58
format
zhansan114514 Nov 15, 2025
4cb0364
fix eror
zhansan114514 Nov 15, 2025
b52209d
fix
zhansan114514 Nov 15, 2025
ab7ab1a
Merge branch 'PKUHPC:master' into feat/suspend
zhansan114514 Nov 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions protos/Crane.proto
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,24 @@ message TerminateStepsReply {
string reason = 2;
}

message SuspendStepsRequest {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suspend应该只支持Job级别的

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

repeated uint32 task_id_list = 1;
}

message SuspendStepsReply {
bool ok = 1;
string reason = 2;
}

message ResumeStepsRequest {
repeated uint32 task_id_list = 1;
}

message ResumeStepsReply {
bool ok = 1;
string reason = 2;
}

message TerminateOrphanedStepRequest {
repeated uint32 task_id_list = 1;
}
Expand Down Expand Up @@ -225,6 +243,8 @@ message ModifyTaskRequest {
TimeLimit = 0;
Priority = 1;
Hold = 2;
Suspend = 3;
Resume = 4;
}

uint32 uid = 1;
Expand Down Expand Up @@ -1007,6 +1027,8 @@ service Craned {
If the task is a batch task, just kill it.
*/
rpc TerminateSteps(TerminateStepsRequest) returns (TerminateStepsReply);
rpc SuspendSteps(SuspendStepsRequest) returns (SuspendStepsReply);
rpc ResumeSteps(ResumeStepsRequest) returns (ResumeStepsReply);
rpc TerminateOrphanedStep(TerminateOrphanedStepRequest) returns (TerminateOrphanedStepReply);
rpc ChangeJobTimeLimit(ChangeJobTimeLimitRequest) returns (ChangeJobTimeLimitReply);

Expand Down
1 change: 1 addition & 0 deletions protos/PublicDefs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ enum TaskStatus {
ExceedTimeLimit = 4;
Cancelled = 5;
OutOfMemory = 6;
Suspended = 7;

Invalid = 15;
}
Expand Down
16 changes: 16 additions & 0 deletions protos/Supervisor.proto
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,20 @@ message TerminateTaskReply {
string reason = 2;
}

message SuspendTaskRequest {}
Copy link
Collaborator

@L-Xiafeng L-Xiafeng Oct 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Task->Job


message SuspendTaskReply {
bool ok = 1;
string reason = 2;
}

message ResumeTaskRequest {}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ResumeJob


message ResumeTaskReply {
bool ok = 1;
string reason = 2;
}

message ShutdownSupervisorRequest {}

message ShutdownSupervisorReply {}
Expand All @@ -129,5 +143,7 @@ service Supervisor {
rpc CheckStatus(CheckStatusRequest) returns (CheckStatusReply);
rpc ChangeTaskTimeLimit(ChangeTaskTimeLimitRequest) returns (ChangeTaskTimeLimitReply);
rpc TerminateTask(TerminateTaskRequest) returns (TerminateTaskReply);
rpc SuspendTask(SuspendTaskRequest) returns (SuspendTaskReply);
rpc ResumeTask(ResumeTaskRequest) returns (ResumeTaskReply);
rpc ShutdownSupervisor(ShutdownSupervisorRequest) returns (ShutdownSupervisorReply);
}
58 changes: 58 additions & 0 deletions src/CraneCtld/RpcService/CranedKeeper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,64 @@ CraneErrCode CranedStub::ReleaseCgroupForJobs(
return CraneErrCode::SUCCESS;
}

CraneErrCode CranedStub::SuspendSteps(const std::vector<task_id_t> &task_ids) {
Copy link
Collaborator

@L-Xiafeng L-Xiafeng Oct 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

vector<job_id_t>

using crane::grpc::SuspendStepsReply;
using crane::grpc::SuspendStepsRequest;

ClientContext context;
Status status;
SuspendStepsRequest request;
SuspendStepsReply reply;
context.set_deadline(std::chrono::system_clock::now() +
std::chrono::seconds(kCtldRpcTimeoutSeconds));

request.mutable_task_id_list()->Assign(task_ids.begin(), task_ids.end());

status = m_stub_->SuspendSteps(&context, request, &reply);
if (!status.ok()) {
CRANE_DEBUG("SuspendSteps RPC for Node {} failed: {}", m_craned_id_,
status.error_message());
HandleGrpcErrorCode_(status.error_code());
return CraneErrCode::ERR_RPC_FAILURE;
}
UpdateLastActiveTime();

if (reply.ok()) return CraneErrCode::SUCCESS;

CRANE_WARN("SuspendSteps RPC for Node {} declined: {}", m_craned_id_,
reply.reason());
return CraneErrCode::ERR_GENERIC_FAILURE;
}

CraneErrCode CranedStub::ResumeSteps(const std::vector<task_id_t> &task_ids) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

job_id_t

using crane::grpc::ResumeStepsReply;
using crane::grpc::ResumeStepsRequest;

ClientContext context;
Status status;
ResumeStepsRequest request;
ResumeStepsReply reply;
context.set_deadline(std::chrono::system_clock::now() +
std::chrono::seconds(kCtldRpcTimeoutSeconds));

request.mutable_task_id_list()->Assign(task_ids.begin(), task_ids.end());

status = m_stub_->ResumeSteps(&context, request, &reply);
if (!status.ok()) {
CRANE_DEBUG("ResumeSteps RPC for Node {} failed: {}", m_craned_id_,
status.error_message());
HandleGrpcErrorCode_(status.error_code());
return CraneErrCode::ERR_RPC_FAILURE;
}
UpdateLastActiveTime();

if (reply.ok()) return CraneErrCode::SUCCESS;

CRANE_WARN("ResumeSteps RPC for Node {} declined: {}", m_craned_id_,
reply.reason());
return CraneErrCode::ERR_GENERIC_FAILURE;
}

CraneErrCode CranedStub::ChangeJobTimeLimit(uint32_t task_id,
uint64_t seconds) {
using crane::grpc::ChangeJobTimeLimitReply;
Expand Down
4 changes: 4 additions & 0 deletions src/CraneCtld/RpcService/CranedKeeper.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ class CranedStub {
CraneErrCode ReleaseCgroupForJobs(
const std::vector<std::pair<task_id_t, uid_t>> &task_uid_pairs);

CraneErrCode SuspendSteps(const std::vector<task_id_t> &task_ids);

CraneErrCode ResumeSteps(const std::vector<task_id_t> &task_ids);

CraneErrCode TerminateSteps(const std::vector<task_id_t> &task_ids);

CraneErrCode TerminateOrphanedSteps(const std::vector<task_id_t> &task_ids);
Expand Down
38 changes: 38 additions & 0 deletions src/CraneCtld/RpcService/CtldGrpcServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,44 @@ grpc::Status CraneCtldServiceImpl::ModifyTask(
fmt::format("Failed to hold/release job: {}.", CraneErrStr(err)));
}
}
} else if (request->attribute() == ModifyTaskRequest::Suspend) {
for (auto task_id : request->task_ids()) {
err = g_task_scheduler->SuspendRunningTask(task_id);
if (err == CraneErrCode::SUCCESS) {
response->add_modified_tasks(task_id);
} else {
response->add_not_modified_tasks(task_id);
if (err == CraneErrCode::ERR_NON_EXISTENT) {
response->add_not_modified_reasons(
fmt::format("Task #{} was not found in running queue.", task_id));
} else if (err == CraneErrCode::ERR_INVALID_PARAM) {
response->add_not_modified_reasons(
fmt::format("Task #{} is not running.", task_id));
} else {
response->add_not_modified_reasons(
fmt::format("Failed to suspend task: {}.", CraneErrStr(err)));
}
}
}
} else if (request->attribute() == ModifyTaskRequest::Resume) {
for (auto task_id : request->task_ids()) {
err = g_task_scheduler->ResumeSuspendedTask(task_id);
if (err == CraneErrCode::SUCCESS) {
response->add_modified_tasks(task_id);
} else {
response->add_not_modified_tasks(task_id);
if (err == CraneErrCode::ERR_NON_EXISTENT) {
response->add_not_modified_reasons(
fmt::format("Task #{} was not found in running queue.", task_id));
} else if (err == CraneErrCode::ERR_INVALID_PARAM) {
response->add_not_modified_reasons(
fmt::format("Task #{} is not suspended.", task_id));
} else {
response->add_not_modified_reasons(
fmt::format("Failed to resume task: {}.", CraneErrStr(err)));
}
}
}
Comment on lines +640 to +662
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Ensure consistent Job-only validation for resume operations.

Similar to the suspend operation, this resume implementation should validate that only Job objects can be resumed, not individual Steps, as indicated in the PR requirements.

Comment on lines +617 to +662
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

🧩 Analysis chain

Verify Job-only validation for suspend/resume operations.

Per the PR requirements, suspend and resume operations should only support Job objects, not individual Steps. The current implementation delegates task_id validation to g_task_scheduler->SuspendRunningTasks() and ResumeSuspendedTasks(), but it's unclear from the provided code whether these methods enforce the Job-only restriction.

Consider adding explicit validation at this layer to return a clear error message if a Step ID (rather than a Job ID) is provided, or document that the TaskScheduler methods already enforce this restriction.


🏁 Script executed:

#!/bin/bash
# Verify if TaskScheduler methods validate Job-only operations
# Check for step_id validation in the suspend/resume implementations

rg -n -A5 -B5 "SuspendRunningTasks|ResumeSuspendedTasks" --type=cpp -g "TaskScheduler.cpp"

Length of output: 1489



---

🏁 Script executed:

```shell
#!/bin/bash
# Get the full implementation of SuspendRunningTasks
sed -n '1230,1376p' src/CraneCtld/TaskScheduler.cpp

Length of output: 5181


🏁 Script executed:

#!/bin/bash
# Get the full implementation of ResumeSuspendedTasks
sed -n '1377,1450p' src/CraneCtld/TaskScheduler.cpp

Length of output: 2781


🏁 Script executed:

#!/bin/bash
# Search for task_id type definition and any step_id checks
rg -n "task_id_t" src/CraneCtld/TaskScheduler.cpp | head -20

Length of output: 1165


🏁 Script executed:

#!/bin/bash
# Check if there's any validation in the implementations that distinguishes Job from Step
rg -n -A2 "GetTask\|GetJob\|step_id\|job_id" src/CraneCtld/TaskScheduler.cpp | grep -A2 -B2 "1230\|1377\|1240\|1380\|1390\|1400"

Length of output: 0



---

🏁 Script executed:

```shell
#!/bin/bash
# Search for task_id_t and job_id_t type definitions
rg -n "using task_id_t|using job_id_t|typedef.*task_id_t|typedef.*job_id_t" --type=cpp --type=h

Length of output: 172


🏁 Script executed:

#!/bin/bash
# Search for TaskInCtld class definition to see if it has Job/Step distinction
rg -n "class TaskInCtld" -A 15 --type=h

Length of output: 0


🏁 Script executed:

#!/bin/bash
# Check if there's a Task class hierarchy with Job and Step types
rg -n "class Task|class Job|class Step" --type=h | head -30

Length of output: 226


🏁 Script executed:

#!/bin/bash
# Search for documentation or comments about Job-only operations
rg -n "Job.*only|Step.*not.*support|suspend.*job" -i --type=cpp src/CraneCtld/RpcService/CtldGrpcServer.cpp

Length of output: 0


🏁 Script executed:

#!/bin/bash
# Check the RPC proto file to see if there's documentation about Job-only operations
fd -e proto -x grep -l "Suspend\|Resume" {} \;

Length of output: 72



---

🏁 Script executed:

```shell
#!/bin/bash
# Check the proto files for Suspend/Resume RPC method documentation
cat protos/Crane.proto | grep -A 10 "ModifyTaskRequest\|Suspend\|Resume" | head -50

Length of output: 1257


🏁 Script executed:

#!/bin/bash
# Search for where m_running_task_map_ is populated to see if Steps can be added
rg -n "m_running_task_map_\[|m_running_task_map_\.insert\|m_running_task_map_\.emplace" --type=cpp

Length of output: 0


🏁 Script executed:

#!/bin/bash
# Check if there's any concept of Step IDs vs Job IDs in the RPC/public interfaces
rg -n "step_id|StepId" --type=proto

Length of output: 29


🏁 Script executed:

#!/bin/bash
# Look at the complete RPC handler method signature and context
sed -n '550,650p' src/CraneCtld/RpcService/CtldGrpcServer.cpp

Length of output: 4542



---

🏁 Script executed:

```shell
#!/bin/bash
# Search for where tasks are added to m_running_task_map_ to see if Steps can be there
rg -n "m_running_task_map_" --type=cpp src/CraneCtld/TaskScheduler.cpp | head -40

Length of output: 2269


🏁 Script executed:

#!/bin/bash
# Check if Jobs and Steps have separate ID spaces or if both use the same numeric IDs
rg -n "class.*Job|struct.*Job" --type=h -A 5 | head -50

Length of output: 1713


🏁 Script executed:

#!/bin/bash
# Look for any comments or documentation in the RPC handler about Job-only restriction
sed -n '500,610p' src/CraneCtld/RpcService/CtldGrpcServer.cpp | head -80

Length of output: 3389


🏁 Script executed:

#!/bin/bash
# Check if there's any step_id validation elsewhere or if Steps bypass the RPC path
rg -n "TaskInCtld\|step.*suspend\|step.*resume" -i --type=cpp

Length of output: 0


Add explicit Job-only validation for suspend/resume operations.

The proto documentation explicitly specifies "Suspend operations are applied on job granularity" and "Resume operations are applied on job granularity," but the RPC handler and TaskScheduler methods lack explicit validation to enforce this requirement. While the current architecture prevents Steps from being suspended (only jobs exist in m_running_task_map_), this constraint is implicit rather than explicit code validation.

Add a check at the RPC handler level (lines 599 and 622) or in the TaskScheduler methods to validate that the provided task_ids correspond to Jobs, and return a specific error if a Step ID is provided. This makes the Job-only requirement explicit in the code and protects against future architectural changes that might introduce Steps into the running tasks map.

} else {
for (auto task_id : request->task_ids()) {
response->add_not_modified_tasks(task_id);
Expand Down
187 changes: 187 additions & 0 deletions src/CraneCtld/TaskScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -996,6 +996,193 @@ std::future<CraneErrCode> TaskScheduler::HoldReleaseTaskAsync(task_id_t task_id,
return std::move(future);
}

CraneErrCode TaskScheduler::SuspendRunningTask(task_id_t task_id) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

不支持批量吗?

std::vector<CranedId> executing_nodes;

{
LockGuard running_guard(&m_running_task_map_mtx_);
auto iter = m_running_task_map_.find(task_id);
if (iter == m_running_task_map_.end()) {
CRANE_TRACE("Task #{} not in Rn queue for suspend", task_id);
return CraneErrCode::ERR_NON_EXISTENT;
}

TaskInCtld* task = iter->second.get();
if (task->Status() == crane::grpc::Suspended) {
CRANE_TRACE("Task #{} already suspended", task_id);
return CraneErrCode::ERR_INVALID_PARAM;
}
if (task->Status() != crane::grpc::Running) {
CRANE_TRACE("Task #{} is not running (status {}) for suspend", task_id,
static_cast<int>(task->Status()));
return CraneErrCode::ERR_INVALID_PARAM;
}

executing_nodes = task->executing_craned_ids;
}

if (executing_nodes.empty()) {
CRANE_WARN("Task #{} has no executing craned when suspending", task_id);
return CraneErrCode::ERR_INVALID_PARAM;
}

std::vector<task_id_t> job_ids{task_id};
std::vector<CranedId> suspended_nodes;
CraneErrCode failure_code = CraneErrCode::SUCCESS;
bool has_failure = false;
for (const auto& craned_id : executing_nodes) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

先查下任务所有节点是不是Up

auto stub = g_craned_keeper->GetCranedStub(craned_id);
if (!stub || stub->Invalid()) {
CRANE_WARN("SuspendSteps stub for {} unavailable", craned_id);
if (failure_code == CraneErrCode::SUCCESS)
failure_code = CraneErrCode::ERR_RPC_FAILURE;
has_failure = true;
continue;
}

CraneErrCode err = stub->SuspendSteps(job_ids);
if (err != CraneErrCode::SUCCESS) {
CRANE_ERROR("Failed to suspend task #{} on craned {}: {}", task_id,
craned_id, CraneErrStr(err));
if (failure_code == CraneErrCode::SUCCESS) failure_code = err;
has_failure = true;
continue;
}

suspended_nodes.push_back(craned_id);
}

if (has_failure) {
for (const auto& craned_id : suspended_nodes) {
auto stub = g_craned_keeper->GetCranedStub(craned_id);
if (!stub || stub->Invalid()) {
CRANE_ERROR(
"Failed to rollback suspended state on craned {}: stub unavailable",
craned_id);
continue;
}

CraneErrCode err = stub->ResumeSteps(job_ids);
if (err != CraneErrCode::SUCCESS) {
CRANE_ERROR("Rollback ResumeSteps for task #{} on craned {} failed: {}",
task_id, craned_id, CraneErrStr(err));
}
}

return failure_code;
}

{
LockGuard running_guard(&m_running_task_map_mtx_);
auto iter = m_running_task_map_.find(task_id);
if (iter == m_running_task_map_.end())
return CraneErrCode::ERR_NON_EXISTENT;

TaskInCtld* task = iter->second.get();
if (task->Status() != crane::grpc::Suspended) {
task->SetStatus(crane::grpc::Suspended);
if (!g_embedded_db_client->UpdateRuntimeAttrOfTaskIfExists(
0, task->TaskDbId(), task->RuntimeAttr())) {
CRANE_ERROR("Failed to persist suspended status for task #{}", task_id);
}
}
}

return CraneErrCode::SUCCESS;
}

CraneErrCode TaskScheduler::ResumeSuspendedTask(task_id_t task_id) {
std::vector<CranedId> executing_nodes;

{
LockGuard running_guard(&m_running_task_map_mtx_);
auto iter = m_running_task_map_.find(task_id);
if (iter == m_running_task_map_.end()) {
CRANE_TRACE("Task #{} not in Rn queue for resume", task_id);
return CraneErrCode::ERR_NON_EXISTENT;
}

TaskInCtld* task = iter->second.get();
if (task->Status() != crane::grpc::Suspended) {
CRANE_TRACE("Task #{} is not suspended (status {}) for resume", task_id,
static_cast<int>(task->Status()));
return CraneErrCode::ERR_INVALID_PARAM;
}

executing_nodes = task->executing_craned_ids;
}

if (executing_nodes.empty()) {
CRANE_WARN("Task #{} has no executing craned when resuming", task_id);
return CraneErrCode::ERR_INVALID_PARAM;
}

std::vector<task_id_t> job_ids{task_id};
std::vector<CranedId> resumed_nodes;
CraneErrCode failure_code = CraneErrCode::SUCCESS;
bool has_failure = false;
for (const auto& craned_id : executing_nodes) {
auto stub = g_craned_keeper->GetCranedStub(craned_id);
if (!stub || stub->Invalid()) {
CRANE_WARN("ResumeSteps stub for {} unavailable", craned_id);
if (failure_code == CraneErrCode::SUCCESS)
failure_code = CraneErrCode::ERR_RPC_FAILURE;
has_failure = true;
continue;
}

CraneErrCode err = stub->ResumeSteps(job_ids);
if (err != CraneErrCode::SUCCESS) {
CRANE_ERROR("Failed to resume task #{} on craned {}: {}", task_id,
craned_id, CraneErrStr(err));
if (failure_code == CraneErrCode::SUCCESS) failure_code = err;
has_failure = true;
continue;
}

resumed_nodes.push_back(craned_id);
}

if (has_failure) {
for (const auto& craned_id : resumed_nodes) {
auto stub = g_craned_keeper->GetCranedStub(craned_id);
if (!stub || stub->Invalid()) {
CRANE_ERROR(
"Failed to rollback resumed state on craned {}: stub unavailable",
craned_id);
continue;
}

CraneErrCode err = stub->SuspendSteps(job_ids);
if (err != CraneErrCode::SUCCESS) {
CRANE_ERROR(
"Rollback SuspendSteps for task #{} on craned {} failed: {}",
task_id, craned_id, CraneErrStr(err));
}
}

return failure_code;
}

{
LockGuard running_guard(&m_running_task_map_mtx_);
auto iter = m_running_task_map_.find(task_id);
if (iter == m_running_task_map_.end())
return CraneErrCode::ERR_NON_EXISTENT;

TaskInCtld* task = iter->second.get();
if (task->Status() != crane::grpc::Running) {
task->SetStatus(crane::grpc::Running);
if (!g_embedded_db_client->UpdateRuntimeAttrOfTaskIfExists(
0, task->TaskDbId(), task->RuntimeAttr())) {
CRANE_ERROR("Failed to persist resumed status for task #{}", task_id);
}
}
}

return CraneErrCode::SUCCESS;
}

CraneErrCode TaskScheduler::ChangeTaskTimeLimit(task_id_t task_id,
int64_t secs) {
if (!CheckIfTimeLimitSecIsValid(secs)) return CraneErrCode::ERR_INVALID_PARAM;
Expand Down
Loading
Loading