File: mapreduce.go

package info (click to toggle)
golang-github-tideland-golib 4.24.2-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 1,144 kB
  • sloc: makefile: 4
file content (158 lines) | stat: -rw-r--r-- 3,545 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
// Tideland Go Library - Map/Reduce
//
// Copyright (C) 2009-2017 Frank Mueller / Tideland / Oldenburg / Germany
//
// All rights reserved. Use of this source code is governed
// by the new BSD license.

package mapreduce

//--------------------
// IMPORTS
//--------------------

import (
	"hash/adler32"
	"runtime"
)

//--------------------
// KEY/VALUE
//--------------------

// KeyValue is a pair of a string key and any data as value.
type KeyValue interface {
	// Key returns the key for the mapping.
	Key() string

	// Value returns the payload value for processing.
	Value() interface{}
}

// KeyValueChan is a channel for the transfer of key/value pairs.
type KeyValueChan chan KeyValue

// Close the channel for key/value pairs.
func (c KeyValueChan) Close() {
	close(c)
}

//--------------------
// MAP/REDUCE
//--------------------

// MapReducer has to be implemented to control the map/reducing.
type MapReducer interface {
	// Input has to return the input channel for the
	// date to process.
	Input() KeyValueChan

	// Map maps a key/value pair to another one and emits it.
	Map(in KeyValue, emit KeyValueChan)

	// Reduce reduces the values delivered via the input
	// channel to the emit channel.
	Reduce(in, emit KeyValueChan)

	// Consume allows the MapReducer to consume the
	// processed data.
	Consume(in KeyValueChan) error
}

// MapReduce applies a map and a reduce function to keys and values in parallel.
func MapReduce(mr MapReducer) error {
	mapEmitChan := make(KeyValueChan)
	reduceEmitChan := make(KeyValueChan)

	go performReducing(mr, mapEmitChan, reduceEmitChan)
	go performMapping(mr, mapEmitChan)

	return mr.Consume(reduceEmitChan)
}

//--------------------
// PRIVATE
//--------------------

// closerChan signals the closing of channels.
type closerChan chan struct{}

// closerChan closes given channel after a number of signals.
func newCloserChan(kvc KeyValueChan, size int) closerChan {
	signals := make(closerChan)
	go func() {
		ctr := 0
		for {
			<-signals
			ctr++
			if ctr == size {
				kvc.Close()
				close(signals)
				return
			}
		}
	}()
	return signals
}

// performReducing runs the reducing goroutines.
func performReducing(mr MapReducer, mapEmitChan, reduceEmitChan KeyValueChan) {
	// Start a closer for the reduce emit chan.
	size := runtime.NumCPU()
	signals := newCloserChan(reduceEmitChan, size)

	// Start reduce goroutines.
	reduceChans := make([]KeyValueChan, size)
	for i := 0; i < size; i++ {
		reduceChans[i] = make(KeyValueChan)
		go func(in KeyValueChan) {
			mr.Reduce(in, reduceEmitChan)
			signals <- struct{}{}
		}(reduceChans[i])
	}

	// Read map emitted data.
	for kv := range mapEmitChan {
		hash := adler32.Checksum([]byte(kv.Key()))
		idx := hash % uint32(size)
		reduceChans[idx] <- kv
	}

	// Close reduce channels.
	for _, reduceChan := range reduceChans {
		reduceChan.Close()
	}
}

// Perform the mapping.
func performMapping(mr MapReducer, mapEmitChan KeyValueChan) {
	// Start a closer for the map emit chan.
	size := runtime.NumCPU() * 4
	signals := newCloserChan(mapEmitChan, size)

	// Start map goroutines.
	mapChans := make([]KeyValueChan, size)
	for i := 0; i < size; i++ {
		mapChans[i] = make(KeyValueChan)
		go func(in KeyValueChan) {
			for kv := range in {
				mr.Map(kv, mapEmitChan)
			}
			signals <- struct{}{}
		}(mapChans[i])
	}

	// Dispatch input data to map channels.
	idx := 0
	for kv := range mr.Input() {
		mapChans[idx%size] <- kv
		idx++
	}

	// Close map channels.
	for i := 0; i < size; i++ {
		mapChans[i].Close()
	}
}

// EOF