-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathkmeans_lloyd.cu
More file actions
480 lines (458 loc) · 18.6 KB
/
kmeans_lloyd.cu
File metadata and controls
480 lines (458 loc) · 18.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
#include <stdlib.h>
#include <stdio.h>
#include <float.h>
#include "kmeans.h"
#define BLOCK_SZ_CNT_ASS 256
#define BLOCK_SZ_CNT_ADJ 256
//Define some counters on device for access to changes
__device__ uint32_t mem_change_ctr;
//Define these constant variables that are going to be that way for the entire
//experiment
__constant__ uint32_t num_features;
__constant__ uint32_t num_samples;
__constant__ uint32_t num_clusters;
__constant__ uint32_t shmem_size;
__constant__ int d_debug;
// https://devblogs.nvidia.com/parallelforall/cuda-pro-tip-optimized-filtering-warp-aggregated-atomics/
__device__ __forceinline__ uint32_t atomicAggInc(uint32_t *ctr) {
int mask = __ballot(1);
int leader = __ffs(mask) - 1;
uint32_t res;
if ((threadIdx.x % 32) == leader) {
res = atomicAdd(ctr, __popc(mask));
}
res = __shfl(res, leader);
return res + __popc(mask & ((1 << (threadIdx.x % 32)) - 1));
}
__device__ float distance( float *sample1, float * sample2, uint32_t
incr1, uint32_t incr2);
// Currently handled as an array, contingent upon caller to coalesce access.
// sample1 - memory location from which to read the data for point 1. Typically
// in our case going to be the dataset we are looking at.
// sample2 - memory location from which to read the data for point 2. Typicaly
// in our case going to be the centroids we are looking at.
// incr1 - memory jumps to access next feature of point 1. Typically in our case
// is going to be the dataset size.
// incr2 - memory jumps to access next feature of point 2. Typically in our case
// is going to be the number of centroids K.
// num_features - assumed to be in constant memory indicates feature dimension
// space
__device__ float distance( float *sample1, float *sample2, uint32_t incr1,
uint32_t incr2)
{
float ret_distance = 0;
#pragma unroll 4
for(int i=0;i<num_features;i++)
ret_distance +=
(sample1[i*incr1]-sample2[i*incr2])*(sample1[i*incr1]-sample2[i*incr2]);
return ret_distance;
}
__global__ void nearest_cluster_assign( float *samples, float *centroids,
uint32_t *membership, uint32_t *membership_old)
{
uint32_t sample_idx = blockIdx.x * blockDim.x + threadIdx.x;
float min_dist = FLT_MAX;
uint32_t nearest_cluster = num_clusters-1;
if(sample_idx >= num_samples)
return;
// Goto concerned start pointer in samples array
samples += sample_idx;
//Create a shared mem buffer for centroid. If all centroids fit into the
//region, well and good. If not we load it on and off in batches and take
//distances. Theoretically should benefit from it but yet to see if it
//actually deteriorates emperically.
extern __shared__ float shared_centroids[];
//TODO: Check back to see if this calculation is right or sizeof(float)
//incorporated elsewhere
uint32_t max_shared_centroids =
shmem_size/(num_features*sizeof(float));
//TODO: define min if needed
uint32_t nactive_threads = min(blockDim.x, num_samples - blockIdx.x *
blockDim.x);
uint32_t thread_num_shared_process = ceilf(1.0 * max_shared_centroids /
nactive_threads);
//Load a batch of centroids to shared and compute pairwise distance between
//the current point and all centroids
for(uint32_t centroids_batch=0; centroids_batch<num_clusters;
centroids_batch += max_shared_centroids)
{
for(uint32_t i=0; i<thread_num_shared_process; i++)
{
uint32_t local_offset = i * nactive_threads + threadIdx.x;
uint32_t global_offset = local_offset + centroids_batch;
//Confused in offsets, put a conditional here to be safe
if(global_offset<num_clusters && local_offset<max_shared_centroids)
{
#pragma unroll 4
for(uint32_t feature_idx=0; feature_idx<num_features;
feature_idx++)
{
shared_centroids[feature_idx*max_shared_centroids + local_offset] =
centroids[feature_idx*num_clusters + global_offset];
}
}
}
__syncthreads();
for(uint32_t cluster = 0; cluster < max_shared_centroids && cluster <
num_clusters - centroids_batch; cluster++)
{
float dist = distance(samples, shared_centroids+cluster,
num_samples, max_shared_centroids);
if(dist<min_dist)
{
min_dist = dist;
nearest_cluster = cluster + centroids_batch;
}
}
}
uint32_t mem_old = membership[sample_idx];
membership_old[sample_idx] = mem_old;
if(mem_old != nearest_cluster)
{
membership[sample_idx] = nearest_cluster;
atomicAggInc(&mem_change_ctr);
}
}
__global__ void adjust_centroids( float *samples, float *centroids, uint32_t
*membership, uint32_t *membership_old, uint32_t *cluster_counts)
{
uint32_t centroid_idx = blockIdx.x * blockDim.x + threadIdx.x;
if(centroid_idx >= num_clusters)
return;
centroids += centroid_idx;
uint32_t cluster_count = cluster_counts[centroid_idx];
//if(d_debug)
//{
// for(uint32_t tmp = 0; tmp < num_features; tmp++)
// printf("printing pre-centroid id=%u feature %u: %f with count %u\n", centroid_idx,
// tmp, centroids[index(tmp,0,num_clusters)], cluster_count);
//}
//multiply each centroid by it's count to make it ready for adjustments -
//neccessary evil of globalmemory writes
for(uint32_t i = 0; i < num_features; i++)
{
centroids[i*num_clusters] *= cluster_count;
}
//Mask off shared mem stuff for now.
//extern __shared__ uint32_t shared_memberships[];
//Since each membership is of type uint32_t and we have old and new
//memberships, we can load the memberships to fill half the shared memory
//uint32_t sample_step = shmem_size/(2*sizeof(uint32_t));
//uint32_t nactive_threads = min(blockDim.x, num_clusters - blockIdx.x *
// blockDim.x);
//uint32_t samples_per_thread = ceilf(1.0*sample_step/nactive_threads);
//for(uint32_t sample_start = 0; sample_start < num_samples; sample_start +=
// sample_step)
//{
// for(uint32_t i=0; i<samples_per_thread; i++)
// {
// uint32_t local_offset = i * nactive_threads + threadIdx.x;
// uint32_t global_offset = local_offset + sample_start;
// if(global_offset < num_samples && local_offset < sample_step)
// {
// shared_memberships[2*local_offset] = membership[global_offset];
// shared_memberships[2*local_offset+1] = membership_old[global_offset];
// }
// }
// __syncthreads();
// //Now each thread is going to scan all the shared samples
// for(uint32_t i=0; i < sample_step && sample_start + i < num_samples;
// i++)
// {
// uint32_t local_membership = shared_memberships[2*i];
// uint32_t local_membership_prev = shared_memberships[2*i+1];
// int sign = 0;
// if(local_membership_prev == centroid_idx && local_membership !=
// centroid_idx)
// {
// //if(d_debug && cluster_count == 0)
// //{
// // printf("Cluster count 0 decrement triggered for cluster=%u,"
// // " on sample %u - membersip changed from %u to %u\n",
// // centroid_idx, sample_start + i,
// // local_membership_prev, local_membership);
// // for(uint32_t tmp = 0; tmp < num_samples; tmp++)
// // {
// // if(membership_old[tmp] == centroid_idx)
// // printf("Cluster %u found for sample %u with chenge"
// // "to %u\n", centroid_idx, tmp,
// // membership[tmp]);
// // }
// //}
// sign = -1;
// cluster_count--;
// }
// else if(local_membership_prev != centroid_idx && local_membership ==
// centroid_idx)
// {
// sign = 1;
// cluster_count++;
// }
// if(sign)
// {
// uint32_t sample_offset = sample_start + i;
// for(uint32_t feature = 0; feature < num_features; feature++)
// {
// centroids[feature * num_clusters] += sign *
// samples[sample_offset + feature * num_samples];
// }
// }
// }
//}
for(uint32_t i = 0; i < num_samples; i++)
{
uint32_t local_membership = membership[i];
uint32_t local_membership_prev = membership_old[i];
int sign = 0;
if(local_membership_prev == centroid_idx && local_membership !=
centroid_idx)
{
//if(d_debug && cluster_count == 0)
//{
// printf("Cluster count 0 decrement triggered for cluster=%u,"
// " on sample %u - membersip changed from %u to %u\n",
// centroid_idx, i,
// local_membership_prev, local_membership);
// for(uint32_t tmp = 0; tmp < num_samples; tmp++)
// {
// if(membership_old[tmp] == centroid_idx)
// printf("Cluster %u found for sample %u with chenge"
// "to %u\n", centroid_idx, tmp,
// membership[tmp]);
// }
//}
sign = -1;
cluster_count--;
}
else if(local_membership_prev != centroid_idx && local_membership ==
centroid_idx)
{
sign = 1;
cluster_count++;
}
if(sign)
{
#pragma unroll 4
for(uint32_t feature = 0; feature < num_features; feature++)
{
centroids[feature * num_clusters] += sign *
samples[i + feature * num_samples];
}
}
}
//TODO: In the case where cluster counts goes to 0, Currently the centrods
//will become nan and no sample will have membership to that cluster. This
//seems perfectly acceptable, however in the future it might be better to
//retain the centroid as is with counts 0 so that it ay pick back up at a
//different iteration. This is hard to do with current structure without
//replicating clusters in memory.
// Average the centroid
#pragma unroll 4
for(uint32_t i = 0; i < num_features; i++)
{
//if(d_debug)
// printf("printing post-centroid unnormalized id=%u feature %u: %f with count %u\n",
// centroid_idx, i, centroids[index(i,0,num_clusters)], cluster_count);
centroids[i*num_clusters] /= cluster_count;
}
//Write back local count to memory
cluster_counts[centroid_idx] = cluster_count;
//if(d_debug)
//{
// for(uint32_t tmp = 0; tmp < num_features; tmp++)
// printf("printing post-centroid id=%u feature %u: %f with count %u\n",
// centroid_idx, tmp, centroids[index(tmp,0,num_clusters)],
// cluster_count);
//}
}
__global__ void dummy_kernel()
{
}
//Debugging functions
__global__ void verify_counts(uint32_t *cluster_counts)
{
uint32_t sum = 0;
for(uint32_t i = 0; i < num_clusters; i++)
{
sum += cluster_counts[i];
}
if(sum != num_samples)
printf("sum of counts (%u) doesn't add up to required (%u)\n", sum,
num_samples);
}
__global__ void verify_memberships(uint32_t *memberships, uint32_t *cc)
{
for(uint32_t i = 0; i < num_samples; i++)
{
if(memberships[i]<0 || memberships[i]>=num_clusters)
printf("membership for %u wrongly assigned to %u", i,
memberships[i]);
cc[memberships[i]]++;
}
}
//------------------------Host Functions--------------------------------
uint32_t initTasks(uint32_t n_samples, uint32_t n_clusters, uint32_t
n_features, int dev_num=0)
{
cudaDeviceProp props;
gpuErrchk(cudaSetDevice(dev_num));
gpuErrchk(cudaGetDeviceProperties(&props, dev_num));
uint32_t smem_size = props.sharedMemPerBlock;
if(_debug)
printf("gpu %d has %u bytes of shared memory\n", dev_num, smem_size);
//Not sure if this helps as <64kb transfers are supposed to exit async
//anyways
cudaStream_t stream = 0;
#if optimLevel==1
gpuErrchk(cudaStreamCreate(&stream));
#endif
gpuErrchk(cudaMemcpyToSymbolAsync(num_samples, &n_samples,
sizeof(n_samples), 0, cudaMemcpyHostToDevice, stream));
gpuErrchk(cudaMemcpyToSymbolAsync(num_clusters, &n_clusters,
sizeof(n_clusters), 0, cudaMemcpyHostToDevice, stream));
gpuErrchk(cudaMemcpyToSymbolAsync(num_features, &n_features,
sizeof(n_features), 0, cudaMemcpyHostToDevice, stream));
gpuErrchk(cudaMemcpyToSymbolAsync(shmem_size, &smem_size, sizeof(smem_size),
0, cudaMemcpyHostToDevice, stream));
uint32_t zero = 0;
gpuErrchk(cudaMemcpyToSymbolAsync(mem_change_ctr, &zero, sizeof(zero), 0,
cudaMemcpyHostToDevice, stream));
gpuErrchk(cudaMemcpyToSymbolAsync(d_debug, &_debug, sizeof(_debug), 0,
cudaMemcpyHostToDevice, stream));
#if optimLevel==1
//Safe to do as it will only remove once all work is done but return
//immediately
gpuErrchk(cudaStreamDestroy(stream));
#endif
return smem_size;
}
int check_change_ratio(float tolerance, uint32_t n_samples)
{
uint32_t num_changes = 0;
cudaStream_t stream = 0;
#if optimLevel==1
gpuErrchk(cudaStreamCreate(&stream));
#endif
gpuErrchk(cudaMemcpyFromSymbolAsync(&num_changes, mem_change_ctr,
sizeof(num_changes), 0, cudaMemcpyDeviceToHost, stream));
uint32_t zero = 0;
gpuErrchk(cudaMemcpyToSymbolAsync(mem_change_ctr, &zero, sizeof(zero), 0,
cudaMemcpyHostToDevice, stream));
gpuErrchk(cudaStreamSynchronize(stream));
#if optimLevel==1
gpuErrchk(cudaStreamDestroy(stream));
#endif
if(_debug)
printf("num changes = %u\n",num_changes);
if(num_changes <= tolerance * n_samples)
return -1;
return 0;
}
cudaError_t kmeans_cuda( InitMethod init, float tolerance, uint32_t n_samples,
uint32_t n_features, uint32_t n_clusters, uint32_t seed, float
*samples, float *centroids, uint32_t *memberships, int *iterations =
NULL)
{
uint32_t smem_size = initTasks(n_samples, n_clusters, n_features);
dim3 sample_block(BLOCK_SZ_CNT_ASS);
dim3 centroid_block(BLOCK_SZ_CNT_ADJ);
dim3 sample_grid(ceil(1.0 * n_samples/sample_block.x));
dim3 centroid_grid(ceil(1.0 * n_clusters/centroid_block.x));
uint32_t *memberships_old, *cluster_counts;
cudaStream_t stream1 = 0, stream2 = 0;
#if optimLevel==1
gpuErrchk(cudaStreamCreate(&stream1));
gpuErrchk(cudaStreamCreate(&stream2));
#endif
gpuErrchk(cudaMalloc((void **) &memberships_old,
n_samples*sizeof(uint32_t)));
gpuErrchk(cudaMalloc((void **) &cluster_counts,
n_clusters*sizeof(uint32_t)));
uint32_t *cc_verification;
if(_debug > 1)
{
#if optimLevel==1
//Put this to sync all transfers as we want to copy membership -- really
//pointles but put in for debug
gpuErrchk(cudaDeviceSynchronize());
#endif
gpuErrchk(cudaMalloc((void **) &cc_verification,
n_clusters*sizeof(uint32_t)));
gpuErrchk(cudaMemcpy( memberships_old, memberships, n_samples *
sizeof(uint32_t), cudaMemcpyDeviceToDevice));
}
gpuErrchk(cudaMemsetAsync(cluster_counts, 0, n_clusters*sizeof(uint32_t),
stream1));
//Call dummy kernel to setup init?
dummy_kernel<<<1,1,0,stream2>>>();
gpuErrchk( cudaPeekAtLastError() );
#if optimLevel==1
gpuErrchk(cudaDeviceSynchronize());
#endif
//arbitrary - set maxiter to 500
for(int i = 0; i < 500; i++)
{
if(_debug)
{
printf("In iteration %d\n",i);
printf("grid size is %dx%d, block size is %dx%d and shared mem needed is %u\n"
, sample_grid.x, sample_grid.y, sample_block.x,
sample_block.y, smem_size);
}
nearest_cluster_assign<<<sample_grid,sample_block,smem_size,stream1>>>( samples,
centroids, memberships, memberships_old);
gpuErrchk( cudaPeekAtLastError() );
//Syncronize here irrespective of optimLevel as it is confusing with all
//these streams in non optim mode. We need it synchronized here anyways.
gpuErrchk(cudaDeviceSynchronize());
adjust_centroids<<<centroid_grid,centroid_block,0,stream1>>>( samples,
centroids, memberships, memberships_old, cluster_counts);
gpuErrchk( cudaPeekAtLastError() );
//Below function although executing in a separate stream, waits for that
//stream to complete before returning, so should be safe.
int change_ratio_good = check_change_ratio(tolerance, n_samples);
if(_debug)
{
printf("change ratio is %d\n",change_ratio_good);
if(_debug > 1)
{
gpuErrchk(cudaMemsetAsync(cc_verification, 0,
n_clusters*sizeof(uint32_t), stream2));
verify_memberships<<<1,1,0,stream2>>>(memberships, cc_verification);
verify_counts<<<1,1,0,stream2>>>(cc_verification);
}
}
//Syncronize here irrespective of optimLevel as it is confusing with all
//these streams in non optim mode. We need it synchronized here anyways.
gpuErrchk(cudaDeviceSynchronize());
if(change_ratio_good<0)
{
if(iterations)
*iterations = i;
cudaFree(memberships_old);
cudaFree(cluster_counts);
cudaStreamDestroy(stream1);
cudaStreamDestroy(stream2);
if(_debug)
cudaFree(cc_verification);
return cudaSuccess;
}
//Debuging to catch if total counts has messed up with respect to total
//number of samples
if(_debug > 1)
{
verify_counts<<<1,1>>>(cluster_counts);
gpuErrchk(cudaDeviceSynchronize());
}
}
*iterations = 500;
cudaFree(memberships_old);
cudaFree(cluster_counts);
#if optimLevel==1
cudaStreamDestroy(stream1);
cudaStreamDestroy(stream2);
#endif
if(_debug)
cudaFree(cc_verification);
return cudaSuccess;
}