1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
|
package phony
import (
"runtime"
"sync"
"sync/atomic"
)
var stops = sync.Pool{New: func() interface{} { return make(chan struct{}, 1) }}
var elems = sync.Pool{New: func() interface{} { return new(queueElem) }}
// A message in the queue
type queueElem struct {
msg func()
next atomic.Pointer[queueElem] // *queueElem, accessed atomically
}
// Inbox is an ordered queue of messages which an Actor will process sequentially.
// Messages are meant to be in the form of non-blocking functions of 0 arguments, often closures.
// The intent is for the Inbox struct to be embedded in other structs, causing them to satisfy the Actor interface, and then the Actor is used to access any protected fields of the struct.
// It is up to the user to ensure that memory is used safely, and that messages do not contain blocking operations.
// An Inbox must not be copied after first use.
type Inbox struct {
noCopy noCopy
head *queueElem // Used carefully to avoid needing atomics
tail atomic.Pointer[queueElem] // *queueElem, accessed atomically
busy atomic.Bool // accessed atomically, 1 if sends should apply backpressure
}
// Actor is the interface for Actors, based on their ability to receive a message from another Actor.
// It's meant so that structs which embed an Inbox can satisfy a mutually compatible interface for message passing.
type Actor interface {
Act(Actor, func())
enqueue(func())
restart()
advance() bool
}
// enqueue puts a message into the Inbox and returns true if backpressure should be applied.
// If the inbox was empty, then the actor was not already running, so enqueue starts it.
func (a *Inbox) enqueue(msg func()) {
q := elems.Get().(*queueElem)
*q = queueElem{msg: msg}
tail := a.tail.Swap(q)
if tail != nil {
//An old tail exists, so update its next pointer to reference q
tail.next.Store(q)
} else {
// No old tail existed, so no worker is currently running
// Update the head to point to q, then start the worker
a.head = q
a.restart()
}
}
// Act adds a message to an Inbox, which will be executed by the inbox's Actor at some point in the future.
// When one Actor sends a message to another, the sender is meant to provide itself as the first argument to this function.
// If the sender argument is non-nil and the receiving Inbox has been flooded, then backpressure is applied to the sender.
// This backpressue cause the sender stop processing messages at some point in the future until the receiver has caught up with the sent message.
// A nil first argument is valid, but should only be used in cases where backpressure is known to be unnecessary, such as when an Actor sends a message to itself or sends a response to a request (where it's the request sender's fault if they're flooded by responses).
func (a *Inbox) Act(from Actor, action func()) {
if action == nil {
panic("tried to send nil action")
}
a.enqueue(action)
if from != nil && a.busy.Load() {
done := stops.Get().(chan struct{})
a.enqueue(func() { done <- struct{}{} })
from.enqueue(func() {
<-done
stops.Put(done)
})
}
}
// Block adds a message to an Actor's Inbox, which will be executed at some point in the future.
// It then blocks until the Actor has finished running the provided function.
// Block meant exclusively as a convenience function for non-Actor code to send messages and wait for responses.
// If an Actor calls Block, then it may cause a deadlock, so Act should always be used instead.
func Block(actor Actor, action func()) {
if actor == nil {
panic("tried to send to nil actor")
} else if action == nil {
panic("tried to send nil action")
}
done := stops.Get().(chan struct{})
actor.enqueue(action)
actor.enqueue(func() { done <- struct{}{} })
<-done
stops.Put(done)
}
// run is executed when a message is placed in an empty Inbox, and launches a worker goroutine.
// The worker goroutine processes messages from the Inbox until empty, and then exits.
func (a *Inbox) run() {
a.busy.Store(true)
for running := true; running; running = a.advance() {
a.head.msg()
}
}
// returns true if we still have more work to do
func (a *Inbox) advance() (more bool) {
head := a.head
a.head = head.next.Load()
if a.head == nil {
// We loaded the last message
// Unset busy and CAS the tail to nil to shut down
a.busy.Store(false)
if !a.tail.CompareAndSwap(head, nil) {
// Someone pushed to the list before we could CAS the tail to shut down
// This means we're effectively restarting at this point
// Set busy and load the next message
a.busy.Store(true)
for a.head == nil {
// Busy loop until the message is successfully loaded
// Gosched to avoid blocking the thread in the mean time
runtime.Gosched()
a.head = head.next.Load()
}
more = true
}
} else {
more = true
}
*head = queueElem{}
elems.Put(head)
return
}
func (a *Inbox) restart() {
go a.run()
}
// noCopy implements the sync.Locker interface, so go vet can catch unsafe copying
type noCopy struct{}
func (n *noCopy) Lock() {}
func (n *noCopy) Unlock() {}
|