1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184
|
// Copyright 2013 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package net
import "sync/atomic"
// fdMutex is a specialized synchronization primitive
// that manages lifetime of an fd and serializes access
// to Read and Write methods on netFD.
type fdMutex struct {
state uint64
rsema uint32
wsema uint32
}
// fdMutex.state is organized as follows:
// 1 bit - whether netFD is closed, if set all subsequent lock operations will fail.
// 1 bit - lock for read operations.
// 1 bit - lock for write operations.
// 20 bits - total number of references (read+write+misc).
// 20 bits - number of outstanding read waiters.
// 20 bits - number of outstanding write waiters.
const (
mutexClosed = 1 << 0
mutexRLock = 1 << 1
mutexWLock = 1 << 2
mutexRef = 1 << 3
mutexRefMask = (1<<20 - 1) << 3
mutexRWait = 1 << 23
mutexRMask = (1<<20 - 1) << 23
mutexWWait = 1 << 43
mutexWMask = (1<<20 - 1) << 43
)
// Read operations must do RWLock(true)/RWUnlock(true).
// Write operations must do RWLock(false)/RWUnlock(false).
// Misc operations must do Incref/Decref. Misc operations include functions like
// setsockopt and setDeadline. They need to use Incref/Decref to ensure that
// they operate on the correct fd in presence of a concurrent Close call
// (otherwise fd can be closed under their feet).
// Close operation must do IncrefAndClose/Decref.
// RWLock/Incref return whether fd is open.
// RWUnlock/Decref return whether fd is closed and there are no remaining references.
func (mu *fdMutex) Incref() bool {
for {
old := atomic.LoadUint64(&mu.state)
if old&mutexClosed != 0 {
return false
}
new := old + mutexRef
if new&mutexRefMask == 0 {
panic("net: inconsistent fdMutex")
}
if atomic.CompareAndSwapUint64(&mu.state, old, new) {
return true
}
}
}
func (mu *fdMutex) IncrefAndClose() bool {
for {
old := atomic.LoadUint64(&mu.state)
if old&mutexClosed != 0 {
return false
}
// Mark as closed and acquire a reference.
new := (old | mutexClosed) + mutexRef
if new&mutexRefMask == 0 {
panic("net: inconsistent fdMutex")
}
// Remove all read and write waiters.
new &^= mutexRMask | mutexWMask
if atomic.CompareAndSwapUint64(&mu.state, old, new) {
// Wake all read and write waiters,
// they will observe closed flag after wakeup.
for old&mutexRMask != 0 {
old -= mutexRWait
runtime_Semrelease(&mu.rsema)
}
for old&mutexWMask != 0 {
old -= mutexWWait
runtime_Semrelease(&mu.wsema)
}
return true
}
}
}
func (mu *fdMutex) Decref() bool {
for {
old := atomic.LoadUint64(&mu.state)
if old&mutexRefMask == 0 {
panic("net: inconsistent fdMutex")
}
new := old - mutexRef
if atomic.CompareAndSwapUint64(&mu.state, old, new) {
return new&(mutexClosed|mutexRefMask) == mutexClosed
}
}
}
func (mu *fdMutex) RWLock(read bool) bool {
var mutexBit, mutexWait, mutexMask uint64
var mutexSema *uint32
if read {
mutexBit = mutexRLock
mutexWait = mutexRWait
mutexMask = mutexRMask
mutexSema = &mu.rsema
} else {
mutexBit = mutexWLock
mutexWait = mutexWWait
mutexMask = mutexWMask
mutexSema = &mu.wsema
}
for {
old := atomic.LoadUint64(&mu.state)
if old&mutexClosed != 0 {
return false
}
var new uint64
if old&mutexBit == 0 {
// Lock is free, acquire it.
new = (old | mutexBit) + mutexRef
if new&mutexRefMask == 0 {
panic("net: inconsistent fdMutex")
}
} else {
// Wait for lock.
new = old + mutexWait
if new&mutexMask == 0 {
panic("net: inconsistent fdMutex")
}
}
if atomic.CompareAndSwapUint64(&mu.state, old, new) {
if old&mutexBit == 0 {
return true
}
runtime_Semacquire(mutexSema)
// The signaller has subtracted mutexWait.
}
}
}
func (mu *fdMutex) RWUnlock(read bool) bool {
var mutexBit, mutexWait, mutexMask uint64
var mutexSema *uint32
if read {
mutexBit = mutexRLock
mutexWait = mutexRWait
mutexMask = mutexRMask
mutexSema = &mu.rsema
} else {
mutexBit = mutexWLock
mutexWait = mutexWWait
mutexMask = mutexWMask
mutexSema = &mu.wsema
}
for {
old := atomic.LoadUint64(&mu.state)
if old&mutexBit == 0 || old&mutexRefMask == 0 {
panic("net: inconsistent fdMutex")
}
// Drop lock, drop reference and wake read waiter if present.
new := (old &^ mutexBit) - mutexRef
if old&mutexMask != 0 {
new -= mutexWait
}
if atomic.CompareAndSwapUint64(&mu.state, old, new) {
if old&mutexMask != 0 {
runtime_Semrelease(mutexSema)
}
return new&(mutexClosed|mutexRefMask) == mutexClosed
}
}
}
// Implemented in runtime package.
func runtime_Semacquire(sema *uint32)
func runtime_Semrelease(sema *uint32)
|