File: buckets.c

package info (click to toggle)
hpcc 1.5.0-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,752 kB
  • sloc: ansic: 27,044; makefile: 50; sh: 24
file content (131 lines) | stat: -rw-r--r-- 3,166 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
/* buckets.c
 *
 * Each process (PE) has a set of buckets, one for each possible
 * destination PE. Each set of buckets is implementated as an
 * array of objects, one for each destination PE, where each object
 * keeps the number of updates currently in the bucket and a pointer
 * to a list of updates.
 * The motivation for using lists (instead of fixed size buckets)
 * is to keep the memory requirements low as the number of processes
 * increase. To avoid the overheads of allocating memory dynamically,
 * a pool of memory is previously allocated and objetcs are
 * allocated/returned from/to this pool (see pool.c for details).
 *
 * An auxiliary data structure keeps the local buckets ordered
 * according to the number of updates of each bucket (see heap.c).
 *
 */


#include <hpcc.h>
#include "RandomAccess.h"
#include "buckets.h"
#include "heap.h"
#include "pool.h"

/* memory pool for updates */
static POOL *Update_Pool;

Bucket_Ptr HPCC_InitBuckets(int numPEs, int maxNumUpdates)
{
  Bucket_Ptr Buckets;
  int i;

  Buckets = (Bucket_Ptr) malloc (numPEs * sizeof(Bucket_T));
  for (i=0; i<numPEs; i++) {
    Buckets[i].numUpdates = 0;
    Buckets[i].updateList = NULL_UPDATE_PTR;
  }

  /* initialize memory pool for updates */
  Update_Pool = HPCC_PoolInit (maxNumUpdates, sizeof(Update_T));

  /* initialize heap of PE's with pending updates */
  HPCC_ra_Heap_Init(numPEs);

  return(Buckets);
}


void HPCC_InsertUpdate(u64Int ran, int pe, Bucket_Ptr Buckets)
{

  Update_Ptr update;
  Bucket_Ptr bucket;
  int numUpdates;

  bucket = Buckets + pe; /* bucket = &(Buckets[pe]); */
  update = (Update_T*) HPCC_PoolGetObj(Update_Pool);
  update->value = ran;
  update->forward = bucket->updateList;
  bucket->updateList = update;
  bucket->numUpdates++;

  numUpdates = bucket->numUpdates;
  if (numUpdates == 1) {  /* this is the first update for this PE since last send */
    HPCC_ra_Heap_Insert (pe, numUpdates);
  }
  else { /* PE already in heap, just increment number of updates */
    HPCC_ra_Heap_IncrementKey(pe);
  }

}



int HPCC_GetUpdates(Bucket_Ptr Buckets, u64Int *bufferPtr, int bufferSize, int *peUpdates)
{

  int pe;
  Bucket_Ptr bucket;
  Update_Ptr update, tmp;
  u64Int *buffer;

  HPCC_ra_Heap_ExtractMax (&pe, peUpdates);
  bucket = Buckets + pe; /* bucket = &(Buckets[pe]); */

  /* copy updates to buffer */
  update = bucket->updateList;
  buffer = bufferPtr;
  while (update != NULL_UPDATE_PTR) {
    *buffer = (u64Int)(update->value);
    buffer ++;
    tmp = update;
    update = update->forward;
    HPCC_PoolReturnObj(Update_Pool, tmp);
  }

  if (buffer - bufferPtr > bufferSize)
    buffer --;

  *peUpdates = bucket->numUpdates;
  bucket->numUpdates = 0;
  bucket->updateList = NULL_UPDATE_PTR;

  return(pe);

}


void HPCC_FreeBuckets (Bucket_Ptr Buckets, int numPEs)
{

  Update_Ptr ptr1, ptr2;
  int i;

  HPCC_ra_Heap_Free();

  for (i = 0; i < numPEs; i ++) {
    ptr1 = Buckets[i].updateList;
    while (ptr1 != NULL_UPDATE_PTR) {
      ptr2 = ptr1;
      ptr1 = ptr1->forward;
      HPCC_PoolReturnObj(Update_Pool, ptr2);
    }
  }

  HPCC_PoolFree(Update_Pool);
  free(Update_Pool);
  free (Buckets);

}