Skip to content

Commit 434f3dc

Browse files
Tomislavj Janjusicartpol84Sergei-Lebedev
committed
Adding the XCCL DPU team, and DPU daemon
Signed-off-by: Tomislavj Janjusic <tomislavj@nvidia.com> Co-authored-by: Artem Polyakov <artpol84@gmail.com> Co-authored-by: Sergey Lebedev <sergeyle@nvidia.com>
1 parent 7d88537 commit 434f3dc

File tree

13 files changed

+1969
-2
lines changed

13 files changed

+1969
-2
lines changed

configure.ac

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,13 @@ AC_ARG_WITH([cuda],
117117
AM_CONDITIONAL([HAVE_CUDA], [test "x$cuda_happy" != xno])
118118
AC_MSG_RESULT([CUDA support: $cuda_happy; $CUDA_CPPFLAGS $CUDA_LDFLAGS])
119119

120+
AC_ARG_WITH([dpu],
121+
AC_HELP_STRING([--with-dpu=yes/no], [Enable/Disable dpu team]),
122+
[AS_IF([test "x$with_dpu" != "xno"], dpu_happy="yes", dpu_happy="no")],
123+
[dpu_happy="no"])
124+
AM_CONDITIONAL([HAVE_DPU], [test "x$dpu_happy" != xno])
125+
AC_MSG_RESULT([DPU support: $dpu_happy])
126+
120127
AM_CONDITIONAL([HAVE_NCCL], [false])
121128
if test "x$cuda_happy" != xno; then
122129
m4_include([m4/nccl.m4])
@@ -136,6 +143,7 @@ AC_CONFIG_FILES([
136143
src/team_lib/hier/Makefile
137144
src/team_lib/multirail/Makefile
138145
src/team_lib/nccl/Makefile
146+
src/team_lib/dpu/Makefile
139147
src/utils/cuda/Makefile
140148
src/utils/cuda/kernels/Makefile
141149
test/Makefile

contrib/dpu_daemon/Makefile

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
#
2+
# Copyright (c) 2020 Mellanox Technologies. All rights reserved.
3+
# $COPYRIGHT$
4+
#
5+
# Additional copyrights may follow
6+
#
7+
# $HEADER$
8+
#
9+
10+
CFLAGS = -I$(XCCL_PATH)/include -I$(UCX_PATH)/include
11+
LDFLAGS = -L$(XCCL_PATH)/lib $(XCCL_PATH)/lib/libxccl.so $(UCX_PATH)/lib/libucs.so $(UCX_PATH)/lib/libucp.so -Wl,-rpath -Wl,$(XCCL_PATH)/lib -Wl,-rpath -Wl,$(XCCL_PATH)/lib
12+
13+
rel:
14+
mpicc -O3 -DNDEBUG -std=c11 $(CFLAGS) -o dpu_server dpu_server.c host_channel.c server_xccl.c $(LDFLAGS)
15+
16+
dbg:
17+
mpicc -O0 -g -std=c11 $(CFLAGS) -o dpu_server dpu_server.c host_channel.c server_xccl.c $(LDFLAGS)
18+
19+
clean:
20+
rm -f dpu_server

contrib/dpu_daemon/dpu_server.c

Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
/*
2+
* Copyright (C) Mellanox Technologies Ltd. 2020. ALL RIGHTS RESERVED.
3+
* See file LICENSE for terms.
4+
*/
5+
6+
#include <stdio.h>
7+
#include <stdlib.h>
8+
#include <pthread.h>
9+
#include <unistd.h>
10+
11+
#include "server_xccl.h"
12+
#include "host_channel.h"
13+
14+
#define MAX_THREADS 128
15+
typedef struct {
16+
pthread_t id;
17+
int idx, nthreads;
18+
dpu_xccl_comm_t comm;
19+
dpu_hc_t *hc;
20+
unsigned int itt;
21+
} thread_ctx_t;
22+
23+
/* thread accisble data - split reader/writer */
24+
typedef struct {
25+
volatile unsigned long g_itt; /* first cache line */
26+
volatile unsigned long pad[3]; /* pad to 64bytes */
27+
volatile unsigned long l_itt; /* second cache line */
28+
volatile unsigned long pad2[3]; /* pad to 64 bytes */
29+
} thread_sync_t;
30+
31+
static thread_sync_t *thread_sync = NULL;
32+
33+
void *dpu_worker(void *arg)
34+
{
35+
int i = 0;
36+
thread_ctx_t *ctx = (thread_ctx_t*)arg;
37+
xccl_coll_req_h request;
38+
39+
while(1) {
40+
ctx->itt++;
41+
if (ctx->idx > 0) {
42+
while (thread_sync[ctx->idx].g_itt < ctx->itt) {
43+
/* busy wait */
44+
}
45+
}
46+
else {
47+
dpu_hc_wait(ctx->hc, ctx->itt);
48+
for (i = 0; i < ctx->nthreads; i++) {
49+
thread_sync[i].g_itt++;
50+
}
51+
}
52+
53+
int offset, block;
54+
int count = dpu_hc_get_count(ctx->hc);
55+
int ready = 0;
56+
57+
block = count / ctx->nthreads;
58+
offset = block * ctx->idx;
59+
if(ctx->idx < (count % ctx->nthreads)) {
60+
offset += ctx->idx;
61+
block++;
62+
} else {
63+
offset += (count % ctx->nthreads);
64+
}
65+
66+
xccl_coll_op_args_t coll = {
67+
.field_mask = 0,
68+
.coll_type = XCCL_ALLREDUCE,
69+
.buffer_info = {
70+
.src_buffer = ctx->hc->mem_segs.put.base + offset * sizeof(int),
71+
.dst_buffer = ctx->hc->mem_segs.get.base + offset * sizeof(int),
72+
.len = block * xccl_dt_size(dpu_hc_get_dtype(ctx->hc)),
73+
},
74+
.reduce_info = {
75+
.dt = dpu_hc_get_dtype(ctx->hc),
76+
.op = dpu_hc_get_op(ctx->hc),
77+
.count = block,
78+
},
79+
.alg.set_by_user = 0,
80+
.tag = 123, //todo
81+
};
82+
83+
if (coll.reduce_info.op == XCCL_OP_UNSUPPORTED) {
84+
break;
85+
}
86+
87+
XCCL_CHECK(xccl_collective_init(&coll, &request, ctx->comm.team));
88+
XCCL_CHECK(xccl_collective_post(request));
89+
while (XCCL_OK != xccl_collective_test(request)) {
90+
xccl_context_progress(ctx->comm.ctx);
91+
}
92+
XCCL_CHECK(xccl_collective_finalize(request));
93+
94+
thread_sync[ctx->idx].l_itt++;
95+
96+
if (ctx->idx == 0) {
97+
while (ready != ctx->nthreads) {
98+
ready = 0;
99+
for (i = 0; i < ctx->nthreads; i++) {
100+
if (thread_sync[i].l_itt == ctx->itt) {
101+
ready++;
102+
}
103+
else {
104+
break;
105+
}
106+
}
107+
}
108+
109+
dpu_hc_reply(ctx->hc, ctx->itt);
110+
}
111+
}
112+
113+
return NULL;
114+
}
115+
116+
int main(int argc, char **argv)
117+
{
118+
int nthreads = 0, i;
119+
thread_ctx_t *tctx_pool = NULL;
120+
dpu_xccl_global_t xccl_glob;
121+
dpu_hc_t hc_b, *hc = &hc_b;
122+
123+
if (argc < 2 ) {
124+
printf("Need thread # as an argument\n");
125+
return 1;
126+
}
127+
nthreads = atoi(argv[1]);
128+
if (MAX_THREADS < nthreads || 0 >= nthreads) {
129+
printf("ERROR: bad thread #: %d\n", nthreads);
130+
return 1;
131+
}
132+
printf("DPU daemon: Running with %d threads\n", nthreads);
133+
tctx_pool = calloc(nthreads, sizeof(*tctx_pool));
134+
XCCL_CHECK(dpu_xccl_init(argc, argv, &xccl_glob));
135+
136+
// thread_sync = calloc(nthreads, sizeof(*thread_sync));
137+
thread_sync = aligned_alloc(64, nthreads * sizeof(*thread_sync));
138+
memset(thread_sync, 0, nthreads * sizeof(*thread_sync));
139+
140+
dpu_hc_init(hc);
141+
dpu_hc_accept(hc);
142+
143+
for(i = 0; i < nthreads; i++) {
144+
// printf("Thread %d spawned!\n", i);
145+
XCCL_CHECK(dpu_xccl_alloc_team(&xccl_glob, &tctx_pool[i].comm));
146+
147+
tctx_pool[i].idx = i;
148+
tctx_pool[i].nthreads = nthreads;
149+
tctx_pool[i].hc = hc;
150+
tctx_pool[i].itt = 0;
151+
152+
if (i < nthreads - 1) {
153+
pthread_create(&tctx_pool[i].id, NULL, dpu_worker,
154+
(void*)&tctx_pool[i]);
155+
}
156+
}
157+
158+
/* The final DPU worker is executed in this context */
159+
dpu_worker((void*)&tctx_pool[i-1]);
160+
161+
for(i = 0; i < nthreads; i++) {
162+
if (i < nthreads - 1) {
163+
pthread_join(tctx_pool[i].id, NULL);
164+
}
165+
dpu_xccl_free_team(&xccl_glob, &tctx_pool[i].comm);
166+
// printf("Thread %d joined!\n", i);
167+
}
168+
169+
dpu_xccl_finalize(&xccl_glob);
170+
return 0;
171+
}

0 commit comments

Comments
 (0)