File: GB_jit_AxB_phase2.cuh

package info (click to toggle)
suitesparse-graphblas 7.4.0%2Bdfsg-1
  • links: PTS, VCS
  • area: main
  • in suites: bookworm
  • size: 67,112 kB
  • sloc: ansic: 1,072,243; cpp: 8,081; sh: 512; makefile: 503; asm: 369; python: 125; awk: 10
file content (203 lines) | stat: -rw-r--r-- 6,596 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
//------------------------------------------------------------------------------
// templates/GB_AxB_cuda_dot3_phase2: fill the global buckets
//------------------------------------------------------------------------------

// TODO describe me
#pragma once

#define GB_CUDA_KERNEL

#include "GB_cuda_buckets.h"
#include "GB_cuda_kernel.h"
#include <cooperative_groups.h>
#include <cub/block/block_scan.cuh>

using namespace cooperative_groups;

// A stateful callback functor that maintains a running prefix to be applied
// during consecutive scan operations.
struct BlockPrefixCallbackOp
{
   // Running prefix
   int64_t running_total;
   // Constructor
   __device__ BlockPrefixCallbackOp(int64_t running_total) : running_total(running_total) {}

   // Callback operator to be entered by the first warp of threads in the block.
   // Thread-0 is responsible for returning a value for seeding the block-wide scan.
   __device__ int64_t operator()(int64_t block_aggregate)
   {
     int64_t old_prefix = running_total;
     running_total += block_aggregate;
     return old_prefix;
   }
};

__inline__
__device__ void blockBucketExclusiveSum(int bucketId, int64_t *d_data, int nblocks)
{
   #define blocksize  32

   // Specialize BlockScan for a 1D block of 32 threads
   typedef cub::BlockScan<int64_t, 32, cub::BLOCK_SCAN_WARP_SCANS> BlockScan;

   // Allocate shared memory for BlockScan
   __shared__ typename BlockScan::TempStorage temp_storage;

   // Initialize running total
   BlockPrefixCallbackOp prefix_op(0);

   // Have the block iterate over segments of items
   int64_t data=0;

   int64_t *blockbucket= d_data;

   for (int block_id = 0; block_id < nblocks; block_id += blocksize)
   {
    // Load a segment of consecutive items that are blocked across threads

    //printf("block %d entering sum\n",blockIdx.x);
      int loc = block_id + threadIdx.x;
      if ( loc < nblocks)
      {
        //printf("block %di loading tid=%d\n",block_id,tid);
        data  = blockbucket[bucketId*nblocks +loc ] ;
      }
      this_thread_block().sync();

      //printf("bb%d_%d s0 before prefix= %ld \n", block_id,bucketId,
      //                     blockbucket[bucketId*nblocks +loc] )  ;
      // Collectively compute the block-wide exclusive prefix sum
      BlockScan(temp_storage).ExclusiveSum( data, data, prefix_op);
      this_thread_block().sync();

      if ( loc < nblocks)
      {
        blockbucket[bucketId*nblocks   +loc ]  = data  ;
      }
      //this_thread_block().sync();

      //printf("bb%d_%d = %ld \n", block_id, bucketId, blockbucket[bucketId*nblocks +loc] )  ;

      data = 0;
   }
}


template< typename T, int tile_sz>
__inline__ __device__ T warp_ReduceSumPlus( thread_block_tile<tile_sz> tile, T val)
{
    // Each iteration halves the number of active threads
    // Each thread adds its partial sum[i] to sum[lane+i]
    for (int i = tile.size() / 2; i > 0; i /= 2) {
        val +=  tile.shfl_down( val, i);
    }
    return val; // note: only thread 0 will return full sum
}

template<typename T, int warpSize>
__inline__ __device__ T block_ReduceSum(thread_block g, T val)
{
  static __shared__ T shared[warpSize]; // Shared mem for 32 partial sums
  int lane = threadIdx.x % warpSize;
  int wid = threadIdx.x / warpSize;
  thread_block_tile<warpSize> tile = tiled_partition<warpSize>( g );

  // Each warp performs partial reduction
  val = warp_ReduceSumPlus<T, warpSize>( tile, val);

  // Wait for all partial reductions
  if (lane==0) {
     //printf("thd%d warp%d sum is %d\n", threadIdx.x, wid, val);
     shared[wid]=val; // Write reduced value to shared memory
     //printf("thd%d stored warp %d sum %d\n", threadIdx.x, wid, val);
  }
  this_thread_block().sync(); // Wait for all partial reductions

  if (wid > 0 ) return val ;git2

  //read from shared memory only if that warp existed
  val = (threadIdx.x <  (blockDim.x / warpSize ) ) ? shared[lane] : 0;
  //Final reduce within first warp
  if (wid==0) val = warp_ReduceSumPlus<T, warpSize>( tile, val) ;

  return val;
}

// GB_AxB_cuda_dot3_phase2 is a CUDA kernel that takes as input the
// nanobuckets and blockbucket arrays computed by the first phase kernel,
// GB_AxB_cuda_dot3_phase1.  The launch geometry of this kernel must match the
// GB_AxB_cuda_dot3_phase1 kernel, with the same # of threads and threadblocks.

__global__ void AxB_phase2
(
    // input, not modified:
    int64_t *__restrict__ blockbucket,    // global bucket count, of size NBUCKETS*nblocks
    // output:
    int64_t *__restrict__ offset,         // global offsets, for each bucket
    // inputs, not modified:
    const int nblocks        // input number of blocks to reduce across, ie size of vector for 1 bucket
)
{

    //--------------------------------------------------------------------------
    // sum up the bucket counts of prior threadblocks
    //--------------------------------------------------------------------------

    // blockbucket is an array of size NBUCKETS-by-nblocks, held by row.  The
    // entry blockbucket [bucket * nblocks + t] holds the # of entries
    // in the bucket (in range 0 to NBUCKETS-1) found by threadblock t.

    //__shared__ uint64_t offset [NBUCKETS] ;
    uint64_t s[NBUCKETS];

    #pragma unroll
    for(int b = 0; b < NBUCKETS; ++b){
        s[b] = 0;
    }

    thread_block_tile<32> tile = tiled_partition<32>(this_thread_block() );

    //printf("block %d,dim %d entering sum %d nblocks\n",blockIdx.x, blockDim.x, nblocks);
    int64_t tid = threadIdx.x  + blockIdx.x * blockDim.x;


     #pragma unroll
     for(int b = 0; b < NBUCKETS; ++b) {
         for( tid = threadIdx.x + blockIdx.x * blockDim.x;
              tid < nblocks;
              tid += blockDim.x*gridDim.x) {
            s[b]  += blockbucket[  b * nblocks +tid] ;
         }
         this_thread_block().sync(); 

         s[b]  = warp_ReduceSumPlus<uint64_t , 32>( tile, s[b]);
     }

    if (threadIdx.x ==0 )
    {
        #pragma unroll
        for(int b = 0; b < NBUCKETS; ++b) {
            atomicAdd( (unsigned long long int*)&(offset[b]), s[b]);
        }
    }
    this_thread_block().sync(); 

    if( gridDim.x >= NBUCKETS)
    {
        // Cumulative sum across blocks for each bucket
        if (blockIdx.x <NBUCKETS) {
            blockBucketExclusiveSum( blockIdx.x, blockbucket, nblocks ) ;
        }
    }
    else
    {
        if (blockIdx.x == 0)
        {
            #pragma unroll
            for(int b = 0; b < NBUCKETS; ++b) {
                blockBucketExclusiveSum( b, blockbucket, nblocks ) ;
            }
        }
    }
} // phase2