Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
2aec76c
feat: add suspend and resume
zhansan114514 Oct 2, 2025
bc24c3c
Merge branch 'PKUHPC:master' into feat/suspend
zhansan114514 Oct 4, 2025
224d2c7
fix with ai comments
zhansan114514 Oct 5, 2025
a893d51
Merge branch 'PKUHPC:master' into feat/suspend
zhansan114514 Oct 9, 2025
f87a16b
fix format
zhansan114514 Oct 9, 2025
378dc1e
rename task-related messages and methods to use 'job' terminology
zhansan114514 Oct 12, 2025
fb9eb76
update task management to use job terminology and status handling
zhansan114514 Oct 12, 2025
6aea2cf
update error handling for task suspension and resumption
zhansan114514 Oct 13, 2025
37ed254
add config and container support
zhansan114514 Oct 14, 2025
f2bb40e
format
zhansan114514 Oct 14, 2025
5e7fd5c
handle error fixing
zhansan114514 Oct 15, 2025
9fac2d9
Merge branch 'PKUHPC:master' into feat/suspend
zhansan114514 Oct 16, 2025
fca04fd
format
zhansan114514 Oct 16, 2025
724924b
Merge branch 'PKUHPC:master' into feat/suspend
zhansan114514 Oct 16, 2025
3ba38e5
Merge branch 'PKUHPC:master' into feat/suspend
zhansan114514 Oct 21, 2025
04c7ed6
Refactor job management methods in Craned and Supervisor
zhansan114514 Oct 21, 2025
9ab002e
fix
zhansan114514 Oct 21, 2025
31b7f0f
fix
zhansan114514 Oct 21, 2025
260a480
Merge branch 'PKUHPC:master' into feat/suspend
zhansan114514 Oct 27, 2025
4ecc292
update error handling in Suspend and Resume tasks
zhansan114514 Oct 27, 2025
36f86d2
Merge feat/suspend into upstream/master
zhansan114514 Nov 4, 2025
76c9f1b
merge
zhansan114514 Nov 4, 2025
3a53c87
Merge branch 'master' of https://github.com/PKUHPC/CraneSched into fe…
zhansan114514 Nov 4, 2025
cb1f358
delete flag
zhansan114514 Nov 4, 2025
f0f44df
remove old way and try new api
zhansan114514 Nov 5, 2025
c9019ac
fix
zhansan114514 Nov 5, 2025
b9cd9de
fix
zhansan114514 Nov 5, 2025
fa2180c
find in map but not at
zhansan114514 Nov 5, 2025
26ea975
fix wrong error code
zhansan114514 Nov 5, 2025
0f3ca65
skip daemon step
zhansan114514 Nov 5, 2025
6ffd32e
Merge branch 'master' into feat/suspend
zhansan114514 Nov 15, 2025
29201ec
fix error
zhansan114514 Nov 15, 2025
5f32f58
format
zhansan114514 Nov 15, 2025
4cb0364
fix eror
zhansan114514 Nov 15, 2025
b52209d
fix
zhansan114514 Nov 15, 2025
ab7ab1a
Merge branch 'PKUHPC:master' into feat/suspend
zhansan114514 Nov 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions protos/Crane.proto
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,24 @@ message TerminateStepsReply {
string reason = 2;
}

message SuspendStepsRequest {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suspend应该只支持Job级别的

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

repeated uint32 task_id_list = 1;
}

message SuspendStepsReply {
bool ok = 1;
string reason = 2;
}

message ResumeStepsRequest {
repeated uint32 task_id_list = 1;
}

message ResumeStepsReply {
bool ok = 1;
string reason = 2;
}

message TerminateOrphanedStepRequest {
repeated uint32 task_id_list = 1;
}
Expand Down Expand Up @@ -225,6 +243,8 @@ message ModifyTaskRequest {
TimeLimit = 0;
Priority = 1;
Hold = 2;
Suspend = 3;
Resume = 4;
}

uint32 uid = 1;
Expand Down Expand Up @@ -1007,6 +1027,8 @@ service Craned {
If the task is a batch task, just kill it.
*/
rpc TerminateSteps(TerminateStepsRequest) returns (TerminateStepsReply);
rpc SuspendSteps(SuspendStepsRequest) returns (SuspendStepsReply);
rpc ResumeSteps(ResumeStepsRequest) returns (ResumeStepsReply);
rpc TerminateOrphanedStep(TerminateOrphanedStepRequest) returns (TerminateOrphanedStepReply);
rpc ChangeJobTimeLimit(ChangeJobTimeLimitRequest) returns (ChangeJobTimeLimitReply);

Expand Down
1 change: 1 addition & 0 deletions protos/PublicDefs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ enum TaskStatus {
ExceedTimeLimit = 4;
Cancelled = 5;
OutOfMemory = 6;
Suspended = 7;

Invalid = 15;
}
Expand Down
16 changes: 16 additions & 0 deletions protos/Supervisor.proto
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,20 @@ message TerminateTaskReply {
string reason = 2;
}

message SuspendJobRequest {}

message SuspendJobReply {
bool ok = 1;
string reason = 2;
}

message ResumeTaskRequest {}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ResumeJob


message ResumeTaskReply {
bool ok = 1;
string reason = 2;
}

message ShutdownSupervisorRequest {}

message ShutdownSupervisorReply {}
Expand All @@ -129,5 +143,7 @@ service Supervisor {
rpc CheckStatus(CheckStatusRequest) returns (CheckStatusReply);
rpc ChangeTaskTimeLimit(ChangeTaskTimeLimitRequest) returns (ChangeTaskTimeLimitReply);
rpc TerminateTask(TerminateTaskRequest) returns (TerminateTaskReply);
rpc SuspendJob(SuspendJobRequest) returns (SuspendJobReply);
rpc ResumeTask(ResumeTaskRequest) returns (ResumeTaskReply);
rpc ShutdownSupervisor(ShutdownSupervisorRequest) returns (ShutdownSupervisorReply);
}
58 changes: 58 additions & 0 deletions src/CraneCtld/RpcService/CranedKeeper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,64 @@ CraneErrCode CranedStub::ReleaseCgroupForJobs(
return CraneErrCode::SUCCESS;
}

CraneErrCode CranedStub::SuspendSteps(const std::vector<job_id_t> &job_ids) {
using crane::grpc::SuspendStepsReply;
using crane::grpc::SuspendStepsRequest;

ClientContext context;
Status status;
SuspendStepsRequest request;
SuspendStepsReply reply;
context.set_deadline(std::chrono::system_clock::now() +
std::chrono::seconds(kCtldRpcTimeoutSeconds));

request.mutable_task_id_list()->Assign(job_ids.begin(), job_ids.end());

status = m_stub_->SuspendSteps(&context, request, &reply);
if (!status.ok()) {
CRANE_DEBUG("SuspendSteps RPC for Node {} failed: {}", m_craned_id_,
status.error_message());
HandleGrpcErrorCode_(status.error_code());
return CraneErrCode::ERR_RPC_FAILURE;
}
UpdateLastActiveTime();

if (reply.ok()) return CraneErrCode::SUCCESS;

CRANE_WARN("SuspendSteps RPC for Node {} declined: {}", m_craned_id_,
reply.reason());
return CraneErrCode::ERR_GENERIC_FAILURE;
}

CraneErrCode CranedStub::ResumeSteps(const std::vector<job_id_t> &job_ids) {
using crane::grpc::ResumeStepsReply;
using crane::grpc::ResumeStepsRequest;

ClientContext context;
Status status;
ResumeStepsRequest request;
ResumeStepsReply reply;
context.set_deadline(std::chrono::system_clock::now() +
std::chrono::seconds(kCtldRpcTimeoutSeconds));

request.mutable_task_id_list()->Assign(job_ids.begin(), job_ids.end());

status = m_stub_->ResumeSteps(&context, request, &reply);
if (!status.ok()) {
CRANE_DEBUG("ResumeSteps RPC for Node {} failed: {}", m_craned_id_,
status.error_message());
HandleGrpcErrorCode_(status.error_code());
return CraneErrCode::ERR_RPC_FAILURE;
}
UpdateLastActiveTime();

if (reply.ok()) return CraneErrCode::SUCCESS;

CRANE_WARN("ResumeSteps RPC for Node {} declined: {}", m_craned_id_,
reply.reason());
return CraneErrCode::ERR_GENERIC_FAILURE;
}

CraneErrCode CranedStub::ChangeJobTimeLimit(uint32_t task_id,
uint64_t seconds) {
using crane::grpc::ChangeJobTimeLimitReply;
Expand Down
4 changes: 4 additions & 0 deletions src/CraneCtld/RpcService/CranedKeeper.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ class CranedStub {
CraneErrCode ReleaseCgroupForJobs(
const std::vector<std::pair<task_id_t, uid_t>> &task_uid_pairs);

CraneErrCode SuspendSteps(const std::vector<job_id_t> &job_ids);

CraneErrCode ResumeSteps(const std::vector<job_id_t> &job_ids);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

🧩 Analysis chain

Clarify naming: method names suggest "Steps" but parameters suggest "Jobs".

The method names SuspendSteps and ResumeSteps suggest they operate on steps, but the parameter type std::vector<job_id_t> suggests they operate on jobs. Based on the PR comment stating "The two operations (suspend and resume) should only support operations on Job objects," consider renaming these methods to SuspendJobs and ResumeJobs for consistency.

Additionally, note the inconsistency with TerminateSteps at line 93 which uses std::vector<task_id_t>. If job_id_t and task_id_t are distinct types, clarify the intended semantics.


🏁 Script executed:

#!/bin/bash
# Check the type definitions and usage patterns for job_id_t, task_id_t, and step_id_t
rg -n "using (job_id_t|task_id_t|step_id_t)" --type=cpp --type=h

Length of output: 259


🏁 Script executed:

#!/bin/bash
rg -nP '(SuspendSteps|ResumeSteps|TerminateSteps)' -C3 src/CraneCtld/RpcService/CranedKeeper.h

Length of output: 428


Rename methods to match parameter types

  • SuspendSteps/ResumeSteps take vector<job_id_t> → rename to SuspendJobs/ResumeJobs.
  • TerminateSteps takes vector<task_id_t> → rename to TerminateTasks (and likewise TerminateOrphanedStepsTerminateOrphanedTasks).


CraneErrCode TerminateSteps(const std::vector<task_id_t> &task_ids);

CraneErrCode TerminateOrphanedSteps(const std::vector<task_id_t> &task_ids);
Expand Down
46 changes: 46 additions & 0 deletions src/CraneCtld/RpcService/CtldGrpcServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,52 @@ grpc::Status CraneCtldServiceImpl::ModifyTask(
fmt::format("Failed to hold/release job: {}.", CraneErrStr(err)));
}
}
} else if (request->attribute() == ModifyTaskRequest::Suspend) {
std::vector<task_id_t> task_ids(request->task_ids().begin(),
request->task_ids().end());
auto results = g_task_scheduler->SuspendRunningTasks(task_ids);
for (size_t i = 0; i < task_ids.size(); ++i) {
task_id_t task_id = task_ids[i];
err = results[i];
if (err == CraneErrCode::SUCCESS) {
response->add_modified_tasks(task_id);
} else {
response->add_not_modified_tasks(task_id);
if (err == CraneErrCode::ERR_NON_EXISTENT) {
response->add_not_modified_reasons(
fmt::format("Task #{} was not found in running queue.", task_id));
} else if (err == CraneErrCode::ERR_INVALID_PARAM) {
response->add_not_modified_reasons(
fmt::format("Task #{} is not running.", task_id));
} else {
response->add_not_modified_reasons(
fmt::format("Failed to suspend task: {}.", CraneErrStr(err)));
}
}
}
} else if (request->attribute() == ModifyTaskRequest::Resume) {
std::vector<task_id_t> task_ids(request->task_ids().begin(),
request->task_ids().end());
auto results = g_task_scheduler->ResumeSuspendedTasks(task_ids);
for (size_t i = 0; i < task_ids.size(); ++i) {
task_id_t task_id = task_ids[i];
err = results[i];
if (err == CraneErrCode::SUCCESS) {
response->add_modified_tasks(task_id);
} else {
response->add_not_modified_tasks(task_id);
if (err == CraneErrCode::ERR_NON_EXISTENT) {
response->add_not_modified_reasons(
fmt::format("Task #{} was not found in running queue.", task_id));
} else if (err == CraneErrCode::ERR_INVALID_PARAM) {
response->add_not_modified_reasons(
fmt::format("Task #{} is not suspended.", task_id));
} else {
response->add_not_modified_reasons(
fmt::format("Failed to resume task: {}.", CraneErrStr(err)));
}
}
}
Comment on lines +640 to +662
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Ensure consistent Job-only validation for resume operations.

Similar to the suspend operation, this resume implementation should validate that only Job objects can be resumed, not individual Steps, as indicated in the PR requirements.

Comment on lines +617 to +662
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

🧩 Analysis chain

Verify Job-only validation for suspend/resume operations.

Per the PR requirements, suspend and resume operations should only support Job objects, not individual Steps. The current implementation delegates task_id validation to g_task_scheduler->SuspendRunningTasks() and ResumeSuspendedTasks(), but it's unclear from the provided code whether these methods enforce the Job-only restriction.

Consider adding explicit validation at this layer to return a clear error message if a Step ID (rather than a Job ID) is provided, or document that the TaskScheduler methods already enforce this restriction.


🏁 Script executed:

#!/bin/bash
# Verify if TaskScheduler methods validate Job-only operations
# Check for step_id validation in the suspend/resume implementations

rg -n -A5 -B5 "SuspendRunningTasks|ResumeSuspendedTasks" --type=cpp -g "TaskScheduler.cpp"

Length of output: 1489



---

🏁 Script executed:

```shell
#!/bin/bash
# Get the full implementation of SuspendRunningTasks
sed -n '1230,1376p' src/CraneCtld/TaskScheduler.cpp

Length of output: 5181


🏁 Script executed:

#!/bin/bash
# Get the full implementation of ResumeSuspendedTasks
sed -n '1377,1450p' src/CraneCtld/TaskScheduler.cpp

Length of output: 2781


🏁 Script executed:

#!/bin/bash
# Search for task_id type definition and any step_id checks
rg -n "task_id_t" src/CraneCtld/TaskScheduler.cpp | head -20

Length of output: 1165


🏁 Script executed:

#!/bin/bash
# Check if there's any validation in the implementations that distinguishes Job from Step
rg -n -A2 "GetTask\|GetJob\|step_id\|job_id" src/CraneCtld/TaskScheduler.cpp | grep -A2 -B2 "1230\|1377\|1240\|1380\|1390\|1400"

Length of output: 0



---

🏁 Script executed:

```shell
#!/bin/bash
# Search for task_id_t and job_id_t type definitions
rg -n "using task_id_t|using job_id_t|typedef.*task_id_t|typedef.*job_id_t" --type=cpp --type=h

Length of output: 172


🏁 Script executed:

#!/bin/bash
# Search for TaskInCtld class definition to see if it has Job/Step distinction
rg -n "class TaskInCtld" -A 15 --type=h

Length of output: 0


🏁 Script executed:

#!/bin/bash
# Check if there's a Task class hierarchy with Job and Step types
rg -n "class Task|class Job|class Step" --type=h | head -30

Length of output: 226


🏁 Script executed:

#!/bin/bash
# Search for documentation or comments about Job-only operations
rg -n "Job.*only|Step.*not.*support|suspend.*job" -i --type=cpp src/CraneCtld/RpcService/CtldGrpcServer.cpp

Length of output: 0


🏁 Script executed:

#!/bin/bash
# Check the RPC proto file to see if there's documentation about Job-only operations
fd -e proto -x grep -l "Suspend\|Resume" {} \;

Length of output: 72



---

🏁 Script executed:

```shell
#!/bin/bash
# Check the proto files for Suspend/Resume RPC method documentation
cat protos/Crane.proto | grep -A 10 "ModifyTaskRequest\|Suspend\|Resume" | head -50

Length of output: 1257


🏁 Script executed:

#!/bin/bash
# Search for where m_running_task_map_ is populated to see if Steps can be added
rg -n "m_running_task_map_\[|m_running_task_map_\.insert\|m_running_task_map_\.emplace" --type=cpp

Length of output: 0


🏁 Script executed:

#!/bin/bash
# Check if there's any concept of Step IDs vs Job IDs in the RPC/public interfaces
rg -n "step_id|StepId" --type=proto

Length of output: 29


🏁 Script executed:

#!/bin/bash
# Look at the complete RPC handler method signature and context
sed -n '550,650p' src/CraneCtld/RpcService/CtldGrpcServer.cpp

Length of output: 4542



---

🏁 Script executed:

```shell
#!/bin/bash
# Search for where tasks are added to m_running_task_map_ to see if Steps can be there
rg -n "m_running_task_map_" --type=cpp src/CraneCtld/TaskScheduler.cpp | head -40

Length of output: 2269


🏁 Script executed:

#!/bin/bash
# Check if Jobs and Steps have separate ID spaces or if both use the same numeric IDs
rg -n "class.*Job|struct.*Job" --type=h -A 5 | head -50

Length of output: 1713


🏁 Script executed:

#!/bin/bash
# Look for any comments or documentation in the RPC handler about Job-only restriction
sed -n '500,610p' src/CraneCtld/RpcService/CtldGrpcServer.cpp | head -80

Length of output: 3389


🏁 Script executed:

#!/bin/bash
# Check if there's any step_id validation elsewhere or if Steps bypass the RPC path
rg -n "TaskInCtld\|step.*suspend\|step.*resume" -i --type=cpp

Length of output: 0


Add explicit Job-only validation for suspend/resume operations.

The proto documentation explicitly specifies "Suspend operations are applied on job granularity" and "Resume operations are applied on job granularity," but the RPC handler and TaskScheduler methods lack explicit validation to enforce this requirement. While the current architecture prevents Steps from being suspended (only jobs exist in m_running_task_map_), this constraint is implicit rather than explicit code validation.

Add a check at the RPC handler level (lines 599 and 622) or in the TaskScheduler methods to validate that the provided task_ids correspond to Jobs, and return a specific error if a Step ID is provided. This makes the Job-only requirement explicit in the code and protects against future architectural changes that might introduce Steps into the running tasks map.

} else {
for (auto task_id : request->task_ids()) {
response->add_not_modified_tasks(task_id);
Expand Down
Loading
Loading