1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159
|
package toxiproxy
import (
"io"
"github.com/Shopify/toxiproxy/stream"
"github.com/Shopify/toxiproxy/toxics"
"github.com/Sirupsen/logrus"
)
// ToxicLinks are single direction pipelines that connects an input and output via
// a chain of toxics. The chain always starts with a NoopToxic, and toxics are added
// and removed as they are enabled/disabled. New toxics are always added to the end
// of the chain.
//
// NoopToxic LatencyToxic
// v v
// Input > ToxicStub > ToxicStub > Output
//
type ToxicLink struct {
stubs []*toxics.ToxicStub
proxy *Proxy
toxics *ToxicCollection
input *stream.ChanWriter
output *stream.ChanReader
direction stream.Direction
}
func NewToxicLink(proxy *Proxy, collection *ToxicCollection, direction stream.Direction) *ToxicLink {
link := &ToxicLink{
stubs: make([]*toxics.ToxicStub, len(collection.chain[direction]), cap(collection.chain[direction])),
proxy: proxy,
toxics: collection,
direction: direction,
}
// Initialize the link with ToxicStubs
last := make(chan *stream.StreamChunk) // The first toxic is always a noop
link.input = stream.NewChanWriter(last)
for i := 0; i < len(link.stubs); i++ {
var next chan *stream.StreamChunk
if i+1 < len(link.stubs) {
next = make(chan *stream.StreamChunk, link.toxics.chain[direction][i+1].BufferSize)
} else {
next = make(chan *stream.StreamChunk)
}
link.stubs[i] = toxics.NewToxicStub(last, next)
last = next
}
link.output = stream.NewChanReader(last)
return link
}
// Start the link with the specified toxics
func (link *ToxicLink) Start(name string, source io.Reader, dest io.WriteCloser) {
go func() {
bytes, err := io.Copy(link.input, source)
if err != nil {
logrus.WithFields(logrus.Fields{
"name": link.proxy.Name,
"bytes": bytes,
"err": err,
}).Warn("Source terminated")
}
link.input.Close()
}()
for i, toxic := range link.toxics.chain[link.direction] {
go link.stubs[i].Run(toxic)
}
go func() {
bytes, err := io.Copy(dest, link.output)
if err != nil {
logrus.WithFields(logrus.Fields{
"name": link.proxy.Name,
"bytes": bytes,
"err": err,
}).Warn("Destination terminated")
}
dest.Close()
link.toxics.RemoveLink(name)
link.proxy.RemoveConnection(name)
}()
}
// Add a toxic to the end of the chain.
func (link *ToxicLink) AddToxic(toxic *toxics.ToxicWrapper) {
i := len(link.stubs)
newin := make(chan *stream.StreamChunk, toxic.BufferSize)
link.stubs = append(link.stubs, toxics.NewToxicStub(newin, link.stubs[i-1].Output))
// Interrupt the last toxic so that we don't have a race when moving channels
if link.stubs[i-1].InterruptToxic() {
link.stubs[i-1].Output = newin
go link.stubs[i].Run(toxic)
go link.stubs[i-1].Run(link.toxics.chain[link.direction][i-1])
} else {
// This link is already closed, make sure the new toxic matches
link.stubs[i].Output = newin // The real output is already closed, close this instead
link.stubs[i].Close()
}
}
// Update an existing toxic in the chain.
func (link *ToxicLink) UpdateToxic(toxic *toxics.ToxicWrapper) {
if link.stubs[toxic.Index].InterruptToxic() {
go link.stubs[toxic.Index].Run(toxic)
}
}
// Remove an existing toxic from the chain.
func (link *ToxicLink) RemoveToxic(toxic *toxics.ToxicWrapper) {
i := toxic.Index
if link.stubs[i].InterruptToxic() {
stop := make(chan bool)
// Interrupt the previous toxic to update its output
go func() {
stop <- link.stubs[i-1].InterruptToxic()
}()
// Unblock the previous toxic if it is trying to flush
// If the previous toxic is closed, continue flusing until we reach the end.
interrupted := false
stopped := false
for !interrupted {
select {
case interrupted = <-stop:
stopped = true
case tmp := <-link.stubs[i].Input:
if tmp == nil {
link.stubs[i].Close()
if !stopped {
<-stop
}
return
}
link.stubs[i].Output <- tmp
}
}
// Empty the toxic's buffer if necessary
for len(link.stubs[i].Input) > 0 {
tmp := <-link.stubs[i].Input
if tmp == nil {
link.stubs[i].Close()
return
}
link.stubs[i].Output <- tmp
}
link.stubs[i-1].Output = link.stubs[i].Output
link.stubs = append(link.stubs[:i], link.stubs[i+1:]...)
go link.stubs[i-1].Run(link.toxics.chain[link.direction][i-1])
}
}
|