-
Notifications
You must be signed in to change notification settings - Fork 34
feat: add suspend and resume #632
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 5 commits
2aec76c
bc24c3c
224d2c7
a893d51
f87a16b
378dc1e
fb9eb76
6aea2cf
37ed254
f2bb40e
5e7fd5c
9fac2d9
fca04fd
724924b
3ba38e5
04c7ed6
9ab002e
31b7f0f
260a480
4ecc292
36f86d2
76c9f1b
3a53c87
cb1f358
f0f44df
c9019ac
b9cd9de
fa2180c
26ea975
0f3ca65
6ffd32e
29201ec
5f32f58
4cb0364
b52209d
ab7ab1a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -119,6 +119,20 @@ message TerminateTaskReply { | |
| string reason = 2; | ||
| } | ||
|
|
||
| message SuspendTaskRequest {} | ||
|
||
|
|
||
| message SuspendTaskReply { | ||
| bool ok = 1; | ||
| string reason = 2; | ||
| } | ||
|
|
||
| message ResumeTaskRequest {} | ||
|
||
|
|
||
| message ResumeTaskReply { | ||
| bool ok = 1; | ||
| string reason = 2; | ||
| } | ||
coderabbitai[bot] marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| message ShutdownSupervisorRequest {} | ||
|
|
||
| message ShutdownSupervisorReply {} | ||
|
|
@@ -129,5 +143,7 @@ service Supervisor { | |
| rpc CheckStatus(CheckStatusRequest) returns (CheckStatusReply); | ||
| rpc ChangeTaskTimeLimit(ChangeTaskTimeLimitRequest) returns (ChangeTaskTimeLimitReply); | ||
| rpc TerminateTask(TerminateTaskRequest) returns (TerminateTaskReply); | ||
| rpc SuspendTask(SuspendTaskRequest) returns (SuspendTaskReply); | ||
| rpc ResumeTask(ResumeTaskRequest) returns (ResumeTaskReply); | ||
| rpc ShutdownSupervisor(ShutdownSupervisorRequest) returns (ShutdownSupervisorReply); | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -238,6 +238,64 @@ CraneErrCode CranedStub::ReleaseCgroupForJobs( | |
| return CraneErrCode::SUCCESS; | ||
| } | ||
|
|
||
| CraneErrCode CranedStub::SuspendSteps(const std::vector<task_id_t> &task_ids) { | ||
|
||
| using crane::grpc::SuspendStepsReply; | ||
| using crane::grpc::SuspendStepsRequest; | ||
|
|
||
| ClientContext context; | ||
| Status status; | ||
| SuspendStepsRequest request; | ||
| SuspendStepsReply reply; | ||
| context.set_deadline(std::chrono::system_clock::now() + | ||
| std::chrono::seconds(kCtldRpcTimeoutSeconds)); | ||
|
|
||
| request.mutable_task_id_list()->Assign(task_ids.begin(), task_ids.end()); | ||
|
|
||
| status = m_stub_->SuspendSteps(&context, request, &reply); | ||
| if (!status.ok()) { | ||
| CRANE_DEBUG("SuspendSteps RPC for Node {} failed: {}", m_craned_id_, | ||
| status.error_message()); | ||
| HandleGrpcErrorCode_(status.error_code()); | ||
| return CraneErrCode::ERR_RPC_FAILURE; | ||
| } | ||
| UpdateLastActiveTime(); | ||
|
|
||
| if (reply.ok()) return CraneErrCode::SUCCESS; | ||
|
|
||
| CRANE_WARN("SuspendSteps RPC for Node {} declined: {}", m_craned_id_, | ||
| reply.reason()); | ||
| return CraneErrCode::ERR_GENERIC_FAILURE; | ||
| } | ||
|
|
||
| CraneErrCode CranedStub::ResumeSteps(const std::vector<task_id_t> &task_ids) { | ||
|
||
| using crane::grpc::ResumeStepsReply; | ||
| using crane::grpc::ResumeStepsRequest; | ||
|
|
||
| ClientContext context; | ||
| Status status; | ||
| ResumeStepsRequest request; | ||
| ResumeStepsReply reply; | ||
| context.set_deadline(std::chrono::system_clock::now() + | ||
| std::chrono::seconds(kCtldRpcTimeoutSeconds)); | ||
|
|
||
| request.mutable_task_id_list()->Assign(task_ids.begin(), task_ids.end()); | ||
|
|
||
| status = m_stub_->ResumeSteps(&context, request, &reply); | ||
| if (!status.ok()) { | ||
| CRANE_DEBUG("ResumeSteps RPC for Node {} failed: {}", m_craned_id_, | ||
| status.error_message()); | ||
| HandleGrpcErrorCode_(status.error_code()); | ||
| return CraneErrCode::ERR_RPC_FAILURE; | ||
| } | ||
| UpdateLastActiveTime(); | ||
|
|
||
| if (reply.ok()) return CraneErrCode::SUCCESS; | ||
|
|
||
| CRANE_WARN("ResumeSteps RPC for Node {} declined: {}", m_craned_id_, | ||
| reply.reason()); | ||
| return CraneErrCode::ERR_GENERIC_FAILURE; | ||
| } | ||
|
|
||
| CraneErrCode CranedStub::ChangeJobTimeLimit(uint32_t task_id, | ||
| uint64_t seconds) { | ||
| using crane::grpc::ChangeJobTimeLimitReply; | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -585,6 +585,44 @@ grpc::Status CraneCtldServiceImpl::ModifyTask( | |
| fmt::format("Failed to hold/release job: {}.", CraneErrStr(err))); | ||
| } | ||
| } | ||
| } else if (request->attribute() == ModifyTaskRequest::Suspend) { | ||
| for (auto task_id : request->task_ids()) { | ||
| err = g_task_scheduler->SuspendRunningTask(task_id); | ||
| if (err == CraneErrCode::SUCCESS) { | ||
| response->add_modified_tasks(task_id); | ||
| } else { | ||
| response->add_not_modified_tasks(task_id); | ||
| if (err == CraneErrCode::ERR_NON_EXISTENT) { | ||
| response->add_not_modified_reasons( | ||
| fmt::format("Task #{} was not found in running queue.", task_id)); | ||
| } else if (err == CraneErrCode::ERR_INVALID_PARAM) { | ||
| response->add_not_modified_reasons( | ||
| fmt::format("Task #{} is not running.", task_id)); | ||
| } else { | ||
| response->add_not_modified_reasons( | ||
| fmt::format("Failed to suspend task: {}.", CraneErrStr(err))); | ||
| } | ||
| } | ||
| } | ||
| } else if (request->attribute() == ModifyTaskRequest::Resume) { | ||
| for (auto task_id : request->task_ids()) { | ||
| err = g_task_scheduler->ResumeSuspendedTask(task_id); | ||
| if (err == CraneErrCode::SUCCESS) { | ||
| response->add_modified_tasks(task_id); | ||
| } else { | ||
| response->add_not_modified_tasks(task_id); | ||
| if (err == CraneErrCode::ERR_NON_EXISTENT) { | ||
| response->add_not_modified_reasons( | ||
| fmt::format("Task #{} was not found in running queue.", task_id)); | ||
| } else if (err == CraneErrCode::ERR_INVALID_PARAM) { | ||
| response->add_not_modified_reasons( | ||
| fmt::format("Task #{} is not suspended.", task_id)); | ||
| } else { | ||
| response->add_not_modified_reasons( | ||
| fmt::format("Failed to resume task: {}.", CraneErrStr(err))); | ||
| } | ||
| } | ||
| } | ||
|
Comment on lines
+640
to
+662
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ensure consistent Job-only validation for resume operations. Similar to the suspend operation, this resume implementation should validate that only Job objects can be resumed, not individual Steps, as indicated in the PR requirements.
Comment on lines
+617
to
+662
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion | 🟠 Major 🧩 Analysis chainVerify Job-only validation for suspend/resume operations. Per the PR requirements, suspend and resume operations should only support Job objects, not individual Steps. The current implementation delegates task_id validation to Consider adding explicit validation at this layer to return a clear error message if a Step ID (rather than a Job ID) is provided, or document that the TaskScheduler methods already enforce this restriction. 🏁 Script executed: #!/bin/bash
# Verify if TaskScheduler methods validate Job-only operations
# Check for step_id validation in the suspend/resume implementations
rg -n -A5 -B5 "SuspendRunningTasks|ResumeSuspendedTasks" --type=cpp -g "TaskScheduler.cpp"Length of output: 1489 Length of output: 5181 🏁 Script executed: #!/bin/bash
# Get the full implementation of ResumeSuspendedTasks
sed -n '1377,1450p' src/CraneCtld/TaskScheduler.cppLength of output: 2781 🏁 Script executed: #!/bin/bash
# Search for task_id type definition and any step_id checks
rg -n "task_id_t" src/CraneCtld/TaskScheduler.cpp | head -20Length of output: 1165 🏁 Script executed: #!/bin/bash
# Check if there's any validation in the implementations that distinguishes Job from Step
rg -n -A2 "GetTask\|GetJob\|step_id\|job_id" src/CraneCtld/TaskScheduler.cpp | grep -A2 -B2 "1230\|1377\|1240\|1380\|1390\|1400"Length of output: 0 Length of output: 172 🏁 Script executed: #!/bin/bash
# Search for TaskInCtld class definition to see if it has Job/Step distinction
rg -n "class TaskInCtld" -A 15 --type=hLength of output: 0 🏁 Script executed: #!/bin/bash
# Check if there's a Task class hierarchy with Job and Step types
rg -n "class Task|class Job|class Step" --type=h | head -30Length of output: 226 🏁 Script executed: #!/bin/bash
# Search for documentation or comments about Job-only operations
rg -n "Job.*only|Step.*not.*support|suspend.*job" -i --type=cpp src/CraneCtld/RpcService/CtldGrpcServer.cppLength of output: 0 🏁 Script executed: #!/bin/bash
# Check the RPC proto file to see if there's documentation about Job-only operations
fd -e proto -x grep -l "Suspend\|Resume" {} \;Length of output: 72 Length of output: 1257 🏁 Script executed: #!/bin/bash
# Search for where m_running_task_map_ is populated to see if Steps can be added
rg -n "m_running_task_map_\[|m_running_task_map_\.insert\|m_running_task_map_\.emplace" --type=cppLength of output: 0 🏁 Script executed: #!/bin/bash
# Check if there's any concept of Step IDs vs Job IDs in the RPC/public interfaces
rg -n "step_id|StepId" --type=protoLength of output: 29 🏁 Script executed: #!/bin/bash
# Look at the complete RPC handler method signature and context
sed -n '550,650p' src/CraneCtld/RpcService/CtldGrpcServer.cppLength of output: 4542 Length of output: 2269 🏁 Script executed: #!/bin/bash
# Check if Jobs and Steps have separate ID spaces or if both use the same numeric IDs
rg -n "class.*Job|struct.*Job" --type=h -A 5 | head -50Length of output: 1713 🏁 Script executed: #!/bin/bash
# Look for any comments or documentation in the RPC handler about Job-only restriction
sed -n '500,610p' src/CraneCtld/RpcService/CtldGrpcServer.cpp | head -80Length of output: 3389 🏁 Script executed: #!/bin/bash
# Check if there's any step_id validation elsewhere or if Steps bypass the RPC path
rg -n "TaskInCtld\|step.*suspend\|step.*resume" -i --type=cppLength of output: 0 Add explicit Job-only validation for suspend/resume operations. The proto documentation explicitly specifies "Suspend operations are applied on job granularity" and "Resume operations are applied on job granularity," but the RPC handler and TaskScheduler methods lack explicit validation to enforce this requirement. While the current architecture prevents Steps from being suspended (only jobs exist in Add a check at the RPC handler level (lines 599 and 622) or in the TaskScheduler methods to validate that the provided task_ids correspond to Jobs, and return a specific error if a Step ID is provided. This makes the Job-only requirement explicit in the code and protects against future architectural changes that might introduce Steps into the running tasks map. |
||
| } else { | ||
| for (auto task_id : request->task_ids()) { | ||
| response->add_not_modified_tasks(task_id); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -996,6 +996,193 @@ std::future<CraneErrCode> TaskScheduler::HoldReleaseTaskAsync(task_id_t task_id, | |
| return std::move(future); | ||
| } | ||
|
|
||
| CraneErrCode TaskScheduler::SuspendRunningTask(task_id_t task_id) { | ||
|
||
| std::vector<CranedId> executing_nodes; | ||
|
|
||
| { | ||
| LockGuard running_guard(&m_running_task_map_mtx_); | ||
| auto iter = m_running_task_map_.find(task_id); | ||
| if (iter == m_running_task_map_.end()) { | ||
| CRANE_TRACE("Task #{} not in Rn queue for suspend", task_id); | ||
| return CraneErrCode::ERR_NON_EXISTENT; | ||
| } | ||
|
|
||
| TaskInCtld* task = iter->second.get(); | ||
| if (task->Status() == crane::grpc::Suspended) { | ||
| CRANE_TRACE("Task #{} already suspended", task_id); | ||
| return CraneErrCode::ERR_INVALID_PARAM; | ||
| } | ||
coderabbitai[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if (task->Status() != crane::grpc::Running) { | ||
| CRANE_TRACE("Task #{} is not running (status {}) for suspend", task_id, | ||
| static_cast<int>(task->Status())); | ||
| return CraneErrCode::ERR_INVALID_PARAM; | ||
| } | ||
|
|
||
| executing_nodes = task->executing_craned_ids; | ||
| } | ||
|
|
||
| if (executing_nodes.empty()) { | ||
| CRANE_WARN("Task #{} has no executing craned when suspending", task_id); | ||
| return CraneErrCode::ERR_INVALID_PARAM; | ||
| } | ||
|
|
||
| std::vector<task_id_t> job_ids{task_id}; | ||
| std::vector<CranedId> suspended_nodes; | ||
| CraneErrCode failure_code = CraneErrCode::SUCCESS; | ||
| bool has_failure = false; | ||
| for (const auto& craned_id : executing_nodes) { | ||
|
||
| auto stub = g_craned_keeper->GetCranedStub(craned_id); | ||
| if (!stub || stub->Invalid()) { | ||
| CRANE_WARN("SuspendSteps stub for {} unavailable", craned_id); | ||
| if (failure_code == CraneErrCode::SUCCESS) | ||
| failure_code = CraneErrCode::ERR_RPC_FAILURE; | ||
| has_failure = true; | ||
| continue; | ||
| } | ||
|
|
||
| CraneErrCode err = stub->SuspendSteps(job_ids); | ||
| if (err != CraneErrCode::SUCCESS) { | ||
| CRANE_ERROR("Failed to suspend task #{} on craned {}: {}", task_id, | ||
| craned_id, CraneErrStr(err)); | ||
| if (failure_code == CraneErrCode::SUCCESS) failure_code = err; | ||
| has_failure = true; | ||
| continue; | ||
| } | ||
L-Xiafeng marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| suspended_nodes.push_back(craned_id); | ||
| } | ||
|
|
||
| if (has_failure) { | ||
| for (const auto& craned_id : suspended_nodes) { | ||
| auto stub = g_craned_keeper->GetCranedStub(craned_id); | ||
| if (!stub || stub->Invalid()) { | ||
| CRANE_ERROR( | ||
| "Failed to rollback suspended state on craned {}: stub unavailable", | ||
| craned_id); | ||
| continue; | ||
| } | ||
|
|
||
| CraneErrCode err = stub->ResumeSteps(job_ids); | ||
| if (err != CraneErrCode::SUCCESS) { | ||
| CRANE_ERROR("Rollback ResumeSteps for task #{} on craned {} failed: {}", | ||
| task_id, craned_id, CraneErrStr(err)); | ||
| } | ||
coderabbitai[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| return failure_code; | ||
| } | ||
|
|
||
| { | ||
| LockGuard running_guard(&m_running_task_map_mtx_); | ||
| auto iter = m_running_task_map_.find(task_id); | ||
| if (iter == m_running_task_map_.end()) | ||
| return CraneErrCode::ERR_NON_EXISTENT; | ||
|
|
||
| TaskInCtld* task = iter->second.get(); | ||
| if (task->Status() != crane::grpc::Suspended) { | ||
| task->SetStatus(crane::grpc::Suspended); | ||
| if (!g_embedded_db_client->UpdateRuntimeAttrOfTaskIfExists( | ||
| 0, task->TaskDbId(), task->RuntimeAttr())) { | ||
| CRANE_ERROR("Failed to persist suspended status for task #{}", task_id); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| return CraneErrCode::SUCCESS; | ||
| } | ||
|
|
||
| CraneErrCode TaskScheduler::ResumeSuspendedTask(task_id_t task_id) { | ||
| std::vector<CranedId> executing_nodes; | ||
|
|
||
| { | ||
| LockGuard running_guard(&m_running_task_map_mtx_); | ||
| auto iter = m_running_task_map_.find(task_id); | ||
| if (iter == m_running_task_map_.end()) { | ||
| CRANE_TRACE("Task #{} not in Rn queue for resume", task_id); | ||
| return CraneErrCode::ERR_NON_EXISTENT; | ||
| } | ||
|
|
||
| TaskInCtld* task = iter->second.get(); | ||
| if (task->Status() != crane::grpc::Suspended) { | ||
| CRANE_TRACE("Task #{} is not suspended (status {}) for resume", task_id, | ||
| static_cast<int>(task->Status())); | ||
| return CraneErrCode::ERR_INVALID_PARAM; | ||
| } | ||
|
|
||
| executing_nodes = task->executing_craned_ids; | ||
| } | ||
|
|
||
| if (executing_nodes.empty()) { | ||
| CRANE_WARN("Task #{} has no executing craned when resuming", task_id); | ||
| return CraneErrCode::ERR_INVALID_PARAM; | ||
| } | ||
|
|
||
| std::vector<task_id_t> job_ids{task_id}; | ||
| std::vector<CranedId> resumed_nodes; | ||
| CraneErrCode failure_code = CraneErrCode::SUCCESS; | ||
| bool has_failure = false; | ||
| for (const auto& craned_id : executing_nodes) { | ||
| auto stub = g_craned_keeper->GetCranedStub(craned_id); | ||
| if (!stub || stub->Invalid()) { | ||
| CRANE_WARN("ResumeSteps stub for {} unavailable", craned_id); | ||
| if (failure_code == CraneErrCode::SUCCESS) | ||
| failure_code = CraneErrCode::ERR_RPC_FAILURE; | ||
| has_failure = true; | ||
| continue; | ||
| } | ||
|
|
||
| CraneErrCode err = stub->ResumeSteps(job_ids); | ||
| if (err != CraneErrCode::SUCCESS) { | ||
| CRANE_ERROR("Failed to resume task #{} on craned {}: {}", task_id, | ||
| craned_id, CraneErrStr(err)); | ||
| if (failure_code == CraneErrCode::SUCCESS) failure_code = err; | ||
| has_failure = true; | ||
| continue; | ||
| } | ||
|
|
||
| resumed_nodes.push_back(craned_id); | ||
| } | ||
|
|
||
| if (has_failure) { | ||
| for (const auto& craned_id : resumed_nodes) { | ||
| auto stub = g_craned_keeper->GetCranedStub(craned_id); | ||
| if (!stub || stub->Invalid()) { | ||
| CRANE_ERROR( | ||
| "Failed to rollback resumed state on craned {}: stub unavailable", | ||
| craned_id); | ||
| continue; | ||
| } | ||
|
|
||
| CraneErrCode err = stub->SuspendSteps(job_ids); | ||
| if (err != CraneErrCode::SUCCESS) { | ||
| CRANE_ERROR( | ||
| "Rollback SuspendSteps for task #{} on craned {} failed: {}", | ||
| task_id, craned_id, CraneErrStr(err)); | ||
| } | ||
| } | ||
|
|
||
| return failure_code; | ||
coderabbitai[bot] marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| { | ||
| LockGuard running_guard(&m_running_task_map_mtx_); | ||
| auto iter = m_running_task_map_.find(task_id); | ||
| if (iter == m_running_task_map_.end()) | ||
| return CraneErrCode::ERR_NON_EXISTENT; | ||
|
|
||
| TaskInCtld* task = iter->second.get(); | ||
| if (task->Status() != crane::grpc::Running) { | ||
| task->SetStatus(crane::grpc::Running); | ||
| if (!g_embedded_db_client->UpdateRuntimeAttrOfTaskIfExists( | ||
| 0, task->TaskDbId(), task->RuntimeAttr())) { | ||
| CRANE_ERROR("Failed to persist resumed status for task #{}", task_id); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| return CraneErrCode::SUCCESS; | ||
| } | ||
|
|
||
| CraneErrCode TaskScheduler::ChangeTaskTimeLimit(task_id_t task_id, | ||
| int64_t secs) { | ||
| if (!CheckIfTimeLimitSecIsValid(secs)) return CraneErrCode::ERR_INVALID_PARAM; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suspend应该只支持Job级别的
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok