1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162
|
// Copyright 2018 The gVisor Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pipe
// Tx is the transmit side of the shared memory ring buffer.
type Tx struct {
p pipe
maxPayloadSize uint64
head uint64
tail uint64
next uint64
tailHeader uint64
}
// Init initializes the transmit end of the pipe. In the initial state, the next
// slot to be written is the very first one, and the transmitter has the whole
// ring buffer available to it.
func (t *Tx) Init(b []byte) {
t.p.init(b)
// maxPayloadSize excludes the header of the payload, and the header
// of the wrapping message.
t.maxPayloadSize = uint64(len(t.p.buffer)) - 2*sizeOfSlotHeader
t.tail = 0xfffffffe * jump
t.next = t.tail
t.head = t.tail + jump
t.p.write(t.tail, slotFree)
}
// Capacity determines how many records of the given size can be written to the
// pipe before it fills up.
func (t *Tx) Capacity(recordSize uint64) uint64 {
available := uint64(len(t.p.buffer)) - sizeOfSlotHeader
entryLen := payloadToSlotSize(recordSize)
return available / entryLen
}
// Push reserves "payloadSize" bytes for transmission in the pipe. The caller
// populates the returned slice with the data to be transferred and enventually
// calls Flush() to make the data visible to the reader, or Abort() to make the
// pipe forget all Push() calls since the last Flush().
//
// The returned slice is available until Flush() or Abort() is next called.
// After that, it must not be touched.
func (t *Tx) Push(payloadSize uint64) []byte {
// Fail request if we know we will never have enough room.
if payloadSize > t.maxPayloadSize {
return nil
}
// True if TxPipe currently has a pushed message, i.e., it is not
// Flush()'ed.
messageAhead := t.next != t.tail
totalLen := payloadToSlotSize(payloadSize)
newNext := t.next + totalLen
nextWrap := (t.next & revolutionMask) | uint64(len(t.p.buffer))
if int64(newNext-nextWrap) >= 0 {
// The new buffer would overflow the pipe, so we push a wrapping
// slot, then try to add the actual slot to the front of the
// pipe.
newNext = (newNext & revolutionMask) + jump
if !t.reclaim(newNext) {
return nil
}
wrappingPayloadSize := slotToPayloadSize(newNext - t.next)
oldNext := t.next
t.next = newNext
if messageAhead {
t.p.write(oldNext, wrappingPayloadSize)
} else {
t.tailHeader = wrappingPayloadSize
t.Flush()
}
return t.Push(payloadSize)
}
// Check that we have enough room for the buffer.
if !t.reclaim(newNext) {
return nil
}
if messageAhead {
t.p.write(t.next, payloadSize)
} else {
t.tailHeader = payloadSize
}
// Grab the buffer before updating t.next.
b := t.p.data(t.next, payloadSize)
t.next = newNext
return b
}
// reclaim attempts to advance the head until at least newNext. If the head is
// already at or beyond newNext, nothing happens and true is returned; otherwise
// it tries to reclaim slots that have already been consumed by the receive end
// of the pipe (they will be marked as free) and returns a boolean indicating
// whether it was successful in reclaiming enough slots.
func (t *Tx) reclaim(newNext uint64) bool {
for int64(newNext-t.head) > 0 {
// Can't reclaim if slot is not free.
header := t.p.readAtomic(t.head)
if header&slotFree == 0 {
return false
}
payloadSize := header & slotSizeMask
newHead := t.head + payloadToSlotSize(payloadSize)
// Check newHead is within bounds and valid.
if int64(newHead-t.tail) > int64(jump) || newHead&offsetMask >= uint64(len(t.p.buffer)) {
return false
}
t.head = newHead
}
return true
}
// Abort causes all Push() calls since the last Flush() to be forgotten and
// therefore they will not be made visible to the receiver.
func (t *Tx) Abort() {
t.next = t.tail
}
// Flush causes all buffers pushed since the last Flush() [or Abort(), whichever
// is the most recent] to be made visible to the receiver.
func (t *Tx) Flush() {
if t.next == t.tail {
// Nothing to do if there are no pushed buffers.
return
}
if t.next != t.head {
// The receiver will spin in t.next, so we must make sure that
// the slotFree bit is set.
t.p.write(t.next, slotFree)
}
t.p.writeAtomic(t.tail, t.tailHeader)
t.tail = t.next
}
// Bytes returns the byte slice on which the pipe operates.
func (t *Tx) Bytes() []byte {
return t.p.buffer
}
|