-
Notifications
You must be signed in to change notification settings - Fork 34
feat: add suspend and resume #632
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 7 commits
2aec76c
bc24c3c
224d2c7
a893d51
f87a16b
378dc1e
fb9eb76
6aea2cf
37ed254
f2bb40e
5e7fd5c
9fac2d9
fca04fd
724924b
3ba38e5
04c7ed6
9ab002e
31b7f0f
260a480
4ecc292
36f86d2
76c9f1b
3a53c87
cb1f358
f0f44df
c9019ac
b9cd9de
fa2180c
26ea975
0f3ca65
6ffd32e
29201ec
5f32f58
4cb0364
b52209d
ab7ab1a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -119,6 +119,20 @@ message TerminateTaskReply { | |
| string reason = 2; | ||
| } | ||
|
|
||
| message SuspendJobRequest {} | ||
|
|
||
| message SuspendJobReply { | ||
| bool ok = 1; | ||
| string reason = 2; | ||
| } | ||
|
|
||
| message ResumeTaskRequest {} | ||
|
||
|
|
||
| message ResumeTaskReply { | ||
| bool ok = 1; | ||
| string reason = 2; | ||
| } | ||
coderabbitai[bot] marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| message ShutdownSupervisorRequest {} | ||
|
|
||
| message ShutdownSupervisorReply {} | ||
|
|
@@ -129,5 +143,7 @@ service Supervisor { | |
| rpc CheckStatus(CheckStatusRequest) returns (CheckStatusReply); | ||
| rpc ChangeTaskTimeLimit(ChangeTaskTimeLimitRequest) returns (ChangeTaskTimeLimitReply); | ||
| rpc TerminateTask(TerminateTaskRequest) returns (TerminateTaskReply); | ||
| rpc SuspendJob(SuspendJobRequest) returns (SuspendJobReply); | ||
| rpc ResumeTask(ResumeTaskRequest) returns (ResumeTaskReply); | ||
| rpc ShutdownSupervisor(ShutdownSupervisorRequest) returns (ShutdownSupervisorReply); | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -86,6 +86,10 @@ class CranedStub { | |
| CraneErrCode ReleaseCgroupForJobs( | ||
| const std::vector<std::pair<task_id_t, uid_t>> &task_uid_pairs); | ||
|
|
||
| CraneErrCode SuspendSteps(const std::vector<job_id_t> &job_ids); | ||
|
|
||
| CraneErrCode ResumeSteps(const std::vector<job_id_t> &job_ids); | ||
|
||
|
|
||
| CraneErrCode TerminateSteps(const std::vector<task_id_t> &task_ids); | ||
|
|
||
| CraneErrCode TerminateOrphanedSteps(const std::vector<task_id_t> &task_ids); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -585,6 +585,52 @@ grpc::Status CraneCtldServiceImpl::ModifyTask( | |
| fmt::format("Failed to hold/release job: {}.", CraneErrStr(err))); | ||
| } | ||
| } | ||
| } else if (request->attribute() == ModifyTaskRequest::Suspend) { | ||
| std::vector<task_id_t> task_ids(request->task_ids().begin(), | ||
| request->task_ids().end()); | ||
| auto results = g_task_scheduler->SuspendRunningTasks(task_ids); | ||
| for (size_t i = 0; i < task_ids.size(); ++i) { | ||
| task_id_t task_id = task_ids[i]; | ||
| err = results[i]; | ||
| if (err == CraneErrCode::SUCCESS) { | ||
| response->add_modified_tasks(task_id); | ||
| } else { | ||
| response->add_not_modified_tasks(task_id); | ||
| if (err == CraneErrCode::ERR_NON_EXISTENT) { | ||
| response->add_not_modified_reasons( | ||
| fmt::format("Task #{} was not found in running queue.", task_id)); | ||
| } else if (err == CraneErrCode::ERR_INVALID_PARAM) { | ||
| response->add_not_modified_reasons( | ||
| fmt::format("Task #{} is not running.", task_id)); | ||
| } else { | ||
| response->add_not_modified_reasons( | ||
| fmt::format("Failed to suspend task: {}.", CraneErrStr(err))); | ||
| } | ||
| } | ||
| } | ||
| } else if (request->attribute() == ModifyTaskRequest::Resume) { | ||
| std::vector<task_id_t> task_ids(request->task_ids().begin(), | ||
| request->task_ids().end()); | ||
| auto results = g_task_scheduler->ResumeSuspendedTasks(task_ids); | ||
| for (size_t i = 0; i < task_ids.size(); ++i) { | ||
| task_id_t task_id = task_ids[i]; | ||
| err = results[i]; | ||
| if (err == CraneErrCode::SUCCESS) { | ||
| response->add_modified_tasks(task_id); | ||
| } else { | ||
| response->add_not_modified_tasks(task_id); | ||
| if (err == CraneErrCode::ERR_NON_EXISTENT) { | ||
| response->add_not_modified_reasons( | ||
| fmt::format("Task #{} was not found in running queue.", task_id)); | ||
| } else if (err == CraneErrCode::ERR_INVALID_PARAM) { | ||
| response->add_not_modified_reasons( | ||
| fmt::format("Task #{} is not suspended.", task_id)); | ||
| } else { | ||
| response->add_not_modified_reasons( | ||
| fmt::format("Failed to resume task: {}.", CraneErrStr(err))); | ||
| } | ||
| } | ||
| } | ||
|
Comment on lines
+640
to
+662
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ensure consistent Job-only validation for resume operations. Similar to the suspend operation, this resume implementation should validate that only Job objects can be resumed, not individual Steps, as indicated in the PR requirements.
Comment on lines
+617
to
+662
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion | 🟠 Major 🧩 Analysis chainVerify Job-only validation for suspend/resume operations. Per the PR requirements, suspend and resume operations should only support Job objects, not individual Steps. The current implementation delegates task_id validation to Consider adding explicit validation at this layer to return a clear error message if a Step ID (rather than a Job ID) is provided, or document that the TaskScheduler methods already enforce this restriction. 🏁 Script executed: #!/bin/bash
# Verify if TaskScheduler methods validate Job-only operations
# Check for step_id validation in the suspend/resume implementations
rg -n -A5 -B5 "SuspendRunningTasks|ResumeSuspendedTasks" --type=cpp -g "TaskScheduler.cpp"Length of output: 1489 Length of output: 5181 🏁 Script executed: #!/bin/bash
# Get the full implementation of ResumeSuspendedTasks
sed -n '1377,1450p' src/CraneCtld/TaskScheduler.cppLength of output: 2781 🏁 Script executed: #!/bin/bash
# Search for task_id type definition and any step_id checks
rg -n "task_id_t" src/CraneCtld/TaskScheduler.cpp | head -20Length of output: 1165 🏁 Script executed: #!/bin/bash
# Check if there's any validation in the implementations that distinguishes Job from Step
rg -n -A2 "GetTask\|GetJob\|step_id\|job_id" src/CraneCtld/TaskScheduler.cpp | grep -A2 -B2 "1230\|1377\|1240\|1380\|1390\|1400"Length of output: 0 Length of output: 172 🏁 Script executed: #!/bin/bash
# Search for TaskInCtld class definition to see if it has Job/Step distinction
rg -n "class TaskInCtld" -A 15 --type=hLength of output: 0 🏁 Script executed: #!/bin/bash
# Check if there's a Task class hierarchy with Job and Step types
rg -n "class Task|class Job|class Step" --type=h | head -30Length of output: 226 🏁 Script executed: #!/bin/bash
# Search for documentation or comments about Job-only operations
rg -n "Job.*only|Step.*not.*support|suspend.*job" -i --type=cpp src/CraneCtld/RpcService/CtldGrpcServer.cppLength of output: 0 🏁 Script executed: #!/bin/bash
# Check the RPC proto file to see if there's documentation about Job-only operations
fd -e proto -x grep -l "Suspend\|Resume" {} \;Length of output: 72 Length of output: 1257 🏁 Script executed: #!/bin/bash
# Search for where m_running_task_map_ is populated to see if Steps can be added
rg -n "m_running_task_map_\[|m_running_task_map_\.insert\|m_running_task_map_\.emplace" --type=cppLength of output: 0 🏁 Script executed: #!/bin/bash
# Check if there's any concept of Step IDs vs Job IDs in the RPC/public interfaces
rg -n "step_id|StepId" --type=protoLength of output: 29 🏁 Script executed: #!/bin/bash
# Look at the complete RPC handler method signature and context
sed -n '550,650p' src/CraneCtld/RpcService/CtldGrpcServer.cppLength of output: 4542 Length of output: 2269 🏁 Script executed: #!/bin/bash
# Check if Jobs and Steps have separate ID spaces or if both use the same numeric IDs
rg -n "class.*Job|struct.*Job" --type=h -A 5 | head -50Length of output: 1713 🏁 Script executed: #!/bin/bash
# Look for any comments or documentation in the RPC handler about Job-only restriction
sed -n '500,610p' src/CraneCtld/RpcService/CtldGrpcServer.cpp | head -80Length of output: 3389 🏁 Script executed: #!/bin/bash
# Check if there's any step_id validation elsewhere or if Steps bypass the RPC path
rg -n "TaskInCtld\|step.*suspend\|step.*resume" -i --type=cppLength of output: 0 Add explicit Job-only validation for suspend/resume operations. The proto documentation explicitly specifies "Suspend operations are applied on job granularity" and "Resume operations are applied on job granularity," but the RPC handler and TaskScheduler methods lack explicit validation to enforce this requirement. While the current architecture prevents Steps from being suspended (only jobs exist in Add a check at the RPC handler level (lines 599 and 622) or in the TaskScheduler methods to validate that the provided task_ids correspond to Jobs, and return a specific error if a Step ID is provided. This makes the Job-only requirement explicit in the code and protects against future architectural changes that might introduce Steps into the running tasks map. |
||
| } else { | ||
| for (auto task_id : request->task_ids()) { | ||
| response->add_not_modified_tasks(task_id); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suspend应该只支持Job级别的
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok