1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
|
//go:build !no_workceptor
// +build !no_workceptor
package workceptor
import (
"context"
"fmt"
"os"
"path"
"strconv"
"sync"
"testing"
"time"
)
func TestStatusFileLock(t *testing.T) {
numWriterThreads := 8
numReaderThreads := 8
baseWaitTime := 200 * time.Millisecond
tmpdir, err := os.MkdirTemp(os.TempDir(), "receptor-test-*")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tmpdir)
statusFilename := path.Join(tmpdir, "status")
startTime := time.Now()
var totalWaitTime time.Duration
wg := sync.WaitGroup{}
wg.Add(numWriterThreads)
for i := 0; i < numWriterThreads; i++ {
waitTime := time.Duration(i) * baseWaitTime
totalWaitTime += waitTime
go func(iter int, waitTime time.Duration) {
sfd := StatusFileData{}
sfd.UpdateFullStatus(statusFilename, func(status *StatusFileData) {
time.Sleep(waitTime)
status.State = iter
status.StdoutSize = int64(iter)
status.Detail = fmt.Sprintf("%d", iter)
})
wg.Done()
}(i, waitTime)
}
ctx, cancel := context.WithCancel(context.Background())
wg2 := sync.WaitGroup{}
wg2.Add(numReaderThreads)
for i := 0; i < numReaderThreads; i++ {
go func() {
sfd := StatusFileData{}
fileHasExisted := false
for {
if ctx.Err() != nil {
wg2.Done()
return
}
err := sfd.Load(statusFilename)
if os.IsNotExist(err) && !fileHasExisted {
continue
}
fileHasExisted = true
if err != nil {
t.Fatalf("Error loading status file: %s", err)
}
detailIter, err := strconv.Atoi(sfd.Detail)
if err != nil {
t.Fatalf("Error converting status detail to int: %s", err)
}
if detailIter >= 0 {
if int64(sfd.State) != sfd.StdoutSize || sfd.State != detailIter {
t.Fatal("Mismatched data in struct")
}
}
}
}()
}
wg.Wait()
cancel()
totalTime := time.Since(startTime)
if totalTime < totalWaitTime {
t.Fatal("File locks apparently not locking")
}
wg2.Wait()
}
|