File: dagctx.go

package info (click to toggle)
golang-github-jbenet-go-context 0.0~git20150711.d14ea06-2
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 84 kB
  • sloc: makefile: 2
file content (119 lines) | stat: -rw-r--r-- 2,528 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package ctxext

import (
	"sync"
	"time"

	context "golang.org/x/net/context"
)

// WithParents returns a Context that listens to all given
// parents. It effectively transforms the Context Tree into
// a Directed Acyclic Graph. This is useful when a context
// may be cancelled for more than one reason. For example,
// consider a database with the following Get function:
//
//   func (db *DB) Get(ctx context.Context, ...) {}
//
// DB.Get may have to stop for two different contexts:
//  * the caller's context (caller might cancel)
//  * the database's context (might be shut down mid-request)
//
// WithParents saves the day by allowing us to "merge" contexts
// and continue on our merry contextual way:
//
//   ctx = ctxext.WithParents(ctx, db.ctx)
//
// Passing related (mutually derived) contexts to WithParents is
// actually ok. The child is cancelled when any of its parents is
// cancelled, so if any of its parents are also related, the cancel
// propagation will reach the child via the shortest path.
func WithParents(ctxts ...context.Context) context.Context {
	if len(ctxts) < 1 {
		panic("no contexts provided")
	}

	ctx := &errCtx{
		done: make(chan struct{}),
		dead: earliestDeadline(ctxts),
	}

	// listen to all contexts and use the first.
	for _, c2 := range ctxts {
		go func(pctx context.Context) {
			select {
			case <-ctx.Done(): // cancelled by another parent
				return
			case <-pctx.Done(): // this parent cancelled
				// race: two parents may have cancelled at the same time.
				// break tie with mutex (inside c.cancel)
				ctx.cancel(pctx.Err())
			}
		}(c2)
	}

	return ctx
}

func earliestDeadline(ctxts []context.Context) *time.Time {
	var d1 *time.Time
	for _, c := range ctxts {
		if c == nil {
			panic("given nil context.Context")
		}

		// use earliest deadline.
		d2, ok := c.Deadline()
		if !ok {
			continue
		}

		if d1 == nil || (*d1).After(d2) {
			d1 = &d2
		}
	}
	return d1
}

type errCtx struct {
	dead *time.Time
	done chan struct{}
	err  error
	mu   sync.RWMutex
}

func (c *errCtx) cancel(err error) {
	c.mu.Lock()
	defer c.mu.Unlock()

	select {
	case <-c.Done():
		return
	default:
	}

	c.err = err
	close(c.done) // signal done to all
}

func (c *errCtx) Done() <-chan struct{} {
	return c.done
}

func (c *errCtx) Err() error {
	c.mu.Lock()
	defer c.mu.Unlock()
	return c.err
}

func (c *errCtx) Value(key interface{}) interface{} {
	return nil
}

func (c *errCtx) Deadline() (deadline time.Time, ok bool) {
	if c.dead == nil {
		return
	}

	return *c.dead, true
}