1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
|
package contractmanager
import (
"bytes"
"os"
"path/filepath"
"sync"
"testing"
"github.com/NebulousLabs/Sia/build"
"github.com/NebulousLabs/Sia/modules"
"github.com/NebulousLabs/fastrand"
)
// TestParallelFileAccess using a single file handle + ReadAt and WriteAt to
// write to multiple locations on a file in parallel, verifying that it's a
// safe thing to do.
func TestParallelFileAccess(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
t.Parallel()
// Create the file that will be used in parallel.
testdir := build.TempDir(modules.ContractManagerDir, "TestParallelFileAccess")
err := os.MkdirAll(testdir, 0700)
if err != nil {
t.Fatal(err)
}
f, err := os.Create(filepath.Join(testdir, "parallelFile"))
if err != nil {
t.Fatal(err)
}
defer f.Close()
// Create the data that will be writted to the file, such that it can be
// verified later.
writesPerThread := 200
numThreads := 500
dataSize := 163 // Intentionally overlaps sector boundaries.
datas := make([][]byte, numThreads*writesPerThread)
for i := 0; i < numThreads*writesPerThread; i++ {
datas[i] = make([]byte, dataSize)
fastrand.Read(datas[i])
}
// Spin up threads to make concurrent writes to the file in different
// locations. Have some reads + writes that are trying to overlap.
threadingModifier := 71
var wg1 sync.WaitGroup
var wg2 sync.WaitGroup
for i := 0; i < numThreads; i++ {
if i%threadingModifier == 0 {
wg1.Add(1)
} else {
wg2.Add(1)
}
go func(i int) {
if i%threadingModifier == 0 {
defer wg1.Done()
} else {
defer wg2.Done()
}
for j := 0; j < writesPerThread; j++ {
_, err := f.WriteAt(datas[i*j], int64(i*dataSize*j))
if err != nil {
t.Error(err)
}
}
}(i)
}
// Wait for the smaller set of first writes to complete.
wg1.Wait()
// Verify the results for the smaller set of writes.
for i := 0; i < numThreads; i++ {
if i%threadingModifier != 0 {
continue
}
wg1.Add(1)
go func(i int) {
defer wg1.Done()
for j := 0; j < writesPerThread; j++ {
data := make([]byte, dataSize)
_, err := f.ReadAt(data, int64(i*dataSize))
if err != nil {
t.Error(err)
}
if !bytes.Equal(data, datas[i]) {
t.Error("data mismatch for value", i)
}
}
}(i)
}
wg1.Wait()
wg2.Wait()
// Verify the results for all of the writes.
for i := 0; i < numThreads; i++ {
wg1.Add(1)
go func(i int) {
defer wg1.Done()
for j := 0; j < writesPerThread; j++ {
data := make([]byte, dataSize)
_, err := f.ReadAt(data, int64(i*dataSize))
if err != nil {
t.Error(err)
}
if !bytes.Equal(data, datas[i]) {
t.Error("data mismatch for value", i)
}
}
}(i)
}
wg1.Wait()
}
|