Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,9 @@ else ()
message(FATAL_ERROR "LibAIO was not found.")
endif ()

find_package(HWLOC REQUIRED)


if (CRANE_ENABLE_BPF)
find_package(BPF 1.4.6 REQUIRED)
endif ()
Expand Down
42 changes: 42 additions & 0 deletions CMakeModule/FindHWLOC.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# ~~~
# - Try to find hwloc
#
# Once done this will define
#
# HWLOC_FOUND - System has hwloc
# HWLOC_INCLUDE_DIRS - The hwloc include directories
# HWLOC_LIBRARIES - The libraries needed to use hwloc
# ~~~

find_package(PkgConfig QUIET)
pkg_check_modules(PC_HWLOC hwloc)

message(STATUS "PkgConfig: HWLOC INCLUDE_DIR:${PC_HWLOC_INCLUDE_DIRS}, LIB_DIR:${PC_HWLOC_LIBRARY_DIRS}, Ver: ${PC_HWLOC_VERSION}")

find_path(
HWLOC_INCLUDE_DIRS
NAMES hwloc.h
HINTS ${PC_HWLOC_INCLUDE_DIRS}
)

find_library(
HWLOC_LIBRARIES
NAMES hwloc
HINTS ${PC_HWLOC_LIBRARY_DIRS}
)

include(FindPackageHandleStandardArgs)
find_package_handle_standard_args(
HWLOC
REQUIRED_VARS HWLOC_LIBRARIES HWLOC_INCLUDE_DIRS
VERSION_VAR PC_HWLOC_VERSION
FAIL_MESSAGE "Error occurred when searching for hwloc"
)

if (HWLOC_FOUND AND NOT TARGET HWLOC::hwloc)
add_library(HWLOC::hwloc INTERFACE IMPORTED)
set_target_properties(HWLOC::hwloc PROPERTIES
INTERFACE_LINK_LIBRARIES "${HWLOC_LIBRARIES}"
INTERFACE_INCLUDE_DIRECTORIES "${HWLOC_INCLUDE_DIRS}"
)
endif ()
3 changes: 3 additions & 0 deletions etc/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ Nodes:
- name: "cn[15-16]"
cpu: 2
memory: 2G
Sockets: 2
CoresPerSocket: 8
ThreadsPerCore: 2

- name: "cn[17-18]"
cpu: 2
Expand Down
9 changes: 9 additions & 0 deletions protos/PublicDefs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ message TaskToCtld {

string reservation = 36;
bool exclusive = 37;
uint32 cores_per_socket = 38;
}

message TaskInEmbeddedDb {
Expand Down Expand Up @@ -239,6 +240,7 @@ message TaskToD {
double cpus_per_task = 23;

bool get_user_env = 24;
uint32 cores_per_socket = 25;
}

message BatchTaskAdditionalMeta {
Expand Down Expand Up @@ -579,6 +581,12 @@ message NetworkInterface {
repeated string ipv6_addresses = 4;
}

message TopologyInfo {
uint32 socket_count = 1;
uint32 cores_per_socket = 2;
uint32 threads_per_core = 3;
}

message CranedRemoteMeta {
DedicatedResourceInNode dres_in_node = 1;
SystemRelInfo sys_rel_info = 2;
Expand All @@ -588,4 +596,5 @@ message CranedRemoteMeta {
repeated uint32 lost_tasks = 6;
repeated uint32 lost_jobs = 7;
repeated NetworkInterface network_interfaces = 8;
TopologyInfo topology_info = 9;
}
32 changes: 32 additions & 0 deletions src/CraneCtld/CraneCtld.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,38 @@ void ParseConfig(int argc, char** argv) {
}
}

if (node["Sockets"]) {
auto sockets = std::stoul(node["Sockets"].as<std::string>());
if (sockets == 0) {
CRANE_ERROR("Sockets must be greater than 0 for node {}",
node["name"].Scalar());
std::exit(1);
}
node_ptr->topology_info.socket_count = sockets;
}

if (node["CoresPerSocket"]) {
auto cores_per_socket =
std::stoul(node["CoresPerSocket"].as<std::string>());
if (cores_per_socket == 0) {
CRANE_ERROR("CoresPerSocket must be greater than 0 for node {}",
node["name"].Scalar());
std::exit(1);
}
node_ptr->topology_info.cores_per_socket = cores_per_socket;
}

if (node["ThreadsPerCore"]) {
auto threads_per_core =
std::stoul(node["ThreadsPerCore"].as<std::string>());
if (threads_per_core == 0) {
CRANE_ERROR("ThreadsPerCore must be greater than 0 for node {}",
node["name"].Scalar());
std::exit(1);
}
node_ptr->topology_info.threads_per_core = threads_per_core;
}

for (auto&& node_id : node_id_list) {
g_config.Nodes[node_id] = node_ptr;
g_config.Nodes[node_id]->dedicated_resource = resourceInNode;
Expand Down
43 changes: 43 additions & 0 deletions src/CraneCtld/CranedMetaContainer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

#include "CranedMetaContainer.h"

#include <yaml-cpp/node/detail/node.h>

#include "RpcService/CranedKeeper.h"
#include "crane/PluginClient.h"
#include "protos/PublicDefs.pb.h"
Expand Down Expand Up @@ -61,6 +63,40 @@ void CranedMetaContainer::CranedUp(
part_global_meta.alive_craned_cnt++;
}

uint32_t hw_sockets = node_meta->remote_meta.topology_info.socket_count;
uint32_t hw_cps = node_meta->remote_meta.topology_info.cores_per_socket;
uint32_t hw_threads = node_meta->remote_meta.topology_info.threads_per_core;
if (hw_sockets == 0 || hw_cps == 0 || hw_threads == 0) {
CRANE_ERROR("Failed to detect Craned {} cpu hardware.", craned_id);
}

// When the configuration file is not set, the actual hardware resources are
// used as the basis.
if (node_meta->static_meta.topology_info.socket_count == 0)
node_meta->static_meta.topology_info.socket_count = hw_sockets;

if (node_meta->static_meta.topology_info.cores_per_socket == 0)
node_meta->static_meta.topology_info.cores_per_socket = hw_cps;

if (node_meta->static_meta.topology_info.threads_per_core == 0)
node_meta->static_meta.topology_info.threads_per_core = hw_threads;

if (node_meta->static_meta.topology_info.cores_per_socket > hw_cps ||
node_meta->static_meta.topology_info.socket_count > hw_sockets ||
node_meta->static_meta.topology_info.threads_per_core > hw_threads)
CRANE_WARN(
"Configured cpu hardware (sockets: {}, cores_per_socket: {}, "
"threads_per_core: {}) is "
"greater than the actual hardware "
"(sockets: {}, cores_per_socket: {}, threads_per_core: {}) on Craned "
"{}. "
"Please check your configuration or hardware. Using the configured "
"value may cause resource allocation errors.",
node_meta->static_meta.topology_info.socket_count,
node_meta->static_meta.topology_info.cores_per_socket,
node_meta->static_meta.topology_info.threads_per_core, hw_sockets,
hw_cps, hw_threads, craned_id);

CRANE_INFO("Craned {} is up now.", craned_id);
}

Expand Down Expand Up @@ -294,6 +330,13 @@ void CranedMetaContainer::InitFromConfig(const Config& config) {
static_meta.hostname = craned_name;
static_meta.port = std::strtoul(
g_config.CranedListenConf.CranedListenPort.c_str(), nullptr, 10);
static_meta.topology_info.socket_count =
node_ptr->topology_info.socket_count =
node_ptr->topology_info.socket_count;
static_meta.topology_info.cores_per_socket =
node_ptr->topology_info.cores_per_socket;
static_meta.topology_info.threads_per_core =
node_ptr->topology_info.threads_per_core;

craned_meta.res_total += static_meta.res;
craned_meta.res_avail += static_meta.res;
Expand Down
15 changes: 15 additions & 0 deletions src/CraneCtld/CtldPublicDefs.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ struct Config {
uint32_t cpu;
uint64_t memory_bytes;
DedicatedResourceInNode dedicated_resource;
TopologyInfo topology_info;
};

struct Partition {
Expand Down Expand Up @@ -192,6 +193,8 @@ struct CranedStaticMeta {
std::list<std::string> partition_ids; // Partitions to which
// this craned belongs to
ResourceInNode res;

TopologyInfo topology_info;
};

struct CranedRemoteMeta {
Expand All @@ -203,6 +206,8 @@ struct CranedRemoteMeta {

std::vector<crane::grpc::NetworkInterface> network_interfaces;

TopologyInfo topology_info;

CranedRemoteMeta() = default;

explicit CranedRemoteMeta(const crane::grpc::CranedRemoteMeta& grpc_meta)
Expand All @@ -219,6 +224,12 @@ struct CranedRemoteMeta {
for (const auto& interface : grpc_meta.network_interfaces()) {
this->network_interfaces.emplace_back(interface);
}

this->topology_info.socket_count = grpc_meta.topology_info().socket_count();
this->topology_info.cores_per_socket =
grpc_meta.topology_info().cores_per_socket();
this->topology_info.threads_per_core =
grpc_meta.topology_info().threads_per_core();
}
};

Expand Down Expand Up @@ -395,6 +406,8 @@ struct TaskInCtld {

std::string reservation;

uint32_t cores_per_socket;

private:
/* ------------- [2] -------------
* Fields that won't change after this task is accepted.
Expand Down Expand Up @@ -612,6 +625,8 @@ struct TaskInCtld {
extra_attr = val.extra_attr();

reservation = val.reservation();

cores_per_socket = val.cores_per_socket();
}

void SetFieldsByRuntimeAttr(crane::grpc::RuntimeAttrOfTask const& val) {
Expand Down
21 changes: 11 additions & 10 deletions src/CraneCtld/DbClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -939,10 +939,10 @@ MongodbClient::document MongodbClient::TaskInEmbeddedDbToDocument_(
// 20 script state timelimit time_submit work_dir
// 25 submit_line exit_code username qos get_user_env
// 30 type extra_attr reservation exclusive cpus_alloc
// 35 mem_alloc device_map
// 35 mem_alloc device_map cores_per_socket

// clang-format off
std::array<std::string, 37> fields{
std::array<std::string, 38> fields{
// 0 - 4
"task_id", "task_db_id", "mod_time", "deleted", "account",
// 5 - 9
Expand All @@ -958,7 +958,7 @@ MongodbClient::document MongodbClient::TaskInEmbeddedDbToDocument_(
// 30 - 34
"type", "extra_attr", "reservation", "exclusive", "cpus_alloc",
// 35 - 39
"mem_alloc", "device_map"
"mem_alloc", "device_map", "cores_per_socket"
};
// clang-format on

Expand All @@ -969,7 +969,7 @@ MongodbClient::document MongodbClient::TaskInEmbeddedDbToDocument_(
std::string, int32_t, int64_t, int64_t, std::string, /*20-24*/
std::string, int32_t, std::string, std::string, bool, /*25-29*/
int32_t, std::string, std::string, bool, double, /*30-34*/
int64_t, std::string> /*35-39*/
int64_t, std::string, int64_t> /*35-39*/
values{ // 0-4
static_cast<int32_t>(runtime_attr.task_id()),
runtime_attr.task_db_id(), absl::ToUnixSeconds(absl::Now()), false,
Expand Down Expand Up @@ -1003,7 +1003,8 @@ MongodbClient::document MongodbClient::TaskInEmbeddedDbToDocument_(
allocated_res_view.CpuCount(),
// 35-39
static_cast<int64_t>(allocated_res_view.MemoryBytes()),
device_map_str};
device_map_str,
static_cast<int64_t>(task_to_ctld.cores_per_socket())};

return DocumentConstructor_(fields, values);
}
Expand All @@ -1029,10 +1030,10 @@ MongodbClient::document MongodbClient::TaskInCtldToDocument_(TaskInCtld* task) {
// 20 script state timelimit time_submit work_dir
// 25 submit_line exit_code username qos get_user_env
// 30 type extra_attr reservation exclusive cpus_alloc
// 35 mem_alloc device_map
// 35 mem_alloc device_map cores_per_socket

// clang-format off
std::array<std::string, 37> fields{
std::array<std::string, 38> fields{
// 0 - 4
"task_id", "task_db_id", "mod_time", "deleted", "account",
// 5 - 9
Expand All @@ -1048,7 +1049,7 @@ MongodbClient::document MongodbClient::TaskInCtldToDocument_(TaskInCtld* task) {
// 30 - 34
"type", "extra_attr", "reservation", "exclusive", "cpus_alloc",
// 35 - 39
"mem_alloc", "device_map"
"mem_alloc", "device_map", "cores_per_socket"
};
// clang-format on

Expand All @@ -1059,7 +1060,7 @@ MongodbClient::document MongodbClient::TaskInCtldToDocument_(TaskInCtld* task) {
std::string, int32_t, int64_t, int64_t, std::string, /*20-24*/
std::string, int32_t, std::string, std::string, bool, /*25-29*/
int32_t, std::string, std::string, bool, double, /*30-34*/
int64_t, std::string> /*35-39*/
int64_t, std::string, int64_t> /*35-39*/
values{ // 0-4
static_cast<int32_t>(task->TaskId()), task->TaskDbId(),
absl::ToUnixSeconds(absl::Now()), false, task->account,
Expand All @@ -1085,7 +1086,7 @@ MongodbClient::document MongodbClient::TaskInCtldToDocument_(TaskInCtld* task) {
task->allocated_res_view.CpuCount(),
// 35-39
static_cast<int64_t>(task->allocated_res_view.MemoryBytes()),
device_map_str};
device_map_str, static_cast<int64_t>(task->cores_per_socket)};
return DocumentConstructor_(fields, values);
}

Expand Down
1 change: 1 addition & 0 deletions src/CraneCtld/RpcService/CranedKeeper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ crane::grpc::ExecuteTasksRequest CranedStub::NewExecuteTasksRequests(
auto *mutable_meta = mutable_task->mutable_interactive_meta();
mutable_meta->CopyFrom(task->TaskToCtld().interactive_meta());
}
mutable_task->set_cores_per_socket(task->TaskToCtld().cores_per_socket());
}

return request;
Expand Down
Loading