1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218
|
package agent
import (
"github.com/gogo/protobuf/proto"
"github.com/moby/swarmkit/v2/api"
bolt "go.etcd.io/bbolt"
)
// Layout:
//
// bucket(v1.tasks.<id>) ->
// data (task protobuf)
// status (task status protobuf)
// assigned (key present)
var (
bucketKeyStorageVersion = []byte("v1")
bucketKeyTasks = []byte("tasks")
bucketKeyAssigned = []byte("assigned")
bucketKeyData = []byte("data")
bucketKeyStatus = []byte("status")
)
// InitDB prepares a database for writing task data.
//
// Proper buckets will be created if they don't already exist.
func InitDB(db *bolt.DB) error {
return db.Update(func(tx *bolt.Tx) error {
_, err := createBucketIfNotExists(tx, bucketKeyStorageVersion, bucketKeyTasks)
return err
})
}
// GetTask retrieves the task with id from the datastore.
func GetTask(tx *bolt.Tx, id string) (*api.Task, error) {
var t api.Task
if err := withTaskBucket(tx, id, func(bkt *bolt.Bucket) error {
p := bkt.Get(bucketKeyData)
if p == nil {
return errTaskUnknown
}
return proto.Unmarshal(p, &t)
}); err != nil {
return nil, err
}
return &t, nil
}
// WalkTasks walks all tasks in the datastore.
func WalkTasks(tx *bolt.Tx, fn func(task *api.Task) error) error {
bkt := getTasksBucket(tx)
if bkt == nil {
return nil
}
return bkt.ForEach(func(k, v []byte) error {
tbkt := bkt.Bucket(k)
p := tbkt.Get(bucketKeyData)
var t api.Task
if err := proto.Unmarshal(p, &t); err != nil {
return err
}
return fn(&t)
})
}
// TaskAssigned returns true if the task is assigned to the node.
func TaskAssigned(tx *bolt.Tx, id string) bool {
bkt := getTaskBucket(tx, id)
if bkt == nil {
return false
}
return len(bkt.Get(bucketKeyAssigned)) > 0
}
// GetTaskStatus returns the current status for the task.
func GetTaskStatus(tx *bolt.Tx, id string) (*api.TaskStatus, error) {
var ts api.TaskStatus
if err := withTaskBucket(tx, id, func(bkt *bolt.Bucket) error {
p := bkt.Get(bucketKeyStatus)
if p == nil {
return errTaskUnknown
}
return proto.Unmarshal(p, &ts)
}); err != nil {
return nil, err
}
return &ts, nil
}
// WalkTaskStatus calls fn for the status of each task.
func WalkTaskStatus(tx *bolt.Tx, fn func(id string, status *api.TaskStatus) error) error {
bkt := getTasksBucket(tx)
if bkt == nil {
return nil
}
return bkt.ForEach(func(k, v []byte) error {
tbkt := bkt.Bucket(k)
p := tbkt.Get(bucketKeyStatus)
var ts api.TaskStatus
if err := proto.Unmarshal(p, &ts); err != nil {
return err
}
return fn(string(k), &ts)
})
}
// PutTask places the task into the database.
func PutTask(tx *bolt.Tx, task *api.Task) error {
return withCreateTaskBucketIfNotExists(tx, task.ID, func(bkt *bolt.Bucket) error {
taskCopy := *task
taskCopy.Status = api.TaskStatus{} // blank out the status.
p, err := proto.Marshal(&taskCopy)
if err != nil {
return err
}
return bkt.Put(bucketKeyData, p)
})
}
// PutTaskStatus updates the status for the task with id.
func PutTaskStatus(tx *bolt.Tx, id string, status *api.TaskStatus) error {
// this used to be withCreateTaskBucketIfNotExists, but that could lead
// to weird race conditions, and was not necessary.
return withTaskBucket(tx, id, func(bkt *bolt.Bucket) error {
p, err := proto.Marshal(status)
if err != nil {
return err
}
return bkt.Put(bucketKeyStatus, p)
})
}
// DeleteTask completely removes the task from the database.
func DeleteTask(tx *bolt.Tx, id string) error {
bkt := getTasksBucket(tx)
if bkt == nil {
return nil
}
return bkt.DeleteBucket([]byte(id))
}
// SetTaskAssignment sets the current assignment state.
func SetTaskAssignment(tx *bolt.Tx, id string, assigned bool) error {
return withTaskBucket(tx, id, func(bkt *bolt.Bucket) error {
if assigned {
return bkt.Put(bucketKeyAssigned, []byte{0xFF})
}
return bkt.Delete(bucketKeyAssigned)
})
}
func createBucketIfNotExists(tx *bolt.Tx, keys ...[]byte) (*bolt.Bucket, error) {
bkt, err := tx.CreateBucketIfNotExists(keys[0])
if err != nil {
return nil, err
}
for _, key := range keys[1:] {
bkt, err = bkt.CreateBucketIfNotExists(key)
if err != nil {
return nil, err
}
}
return bkt, nil
}
func withCreateTaskBucketIfNotExists(tx *bolt.Tx, id string, fn func(bkt *bolt.Bucket) error) error {
bkt, err := createBucketIfNotExists(tx, bucketKeyStorageVersion, bucketKeyTasks, []byte(id))
if err != nil {
return err
}
return fn(bkt)
}
func withTaskBucket(tx *bolt.Tx, id string, fn func(bkt *bolt.Bucket) error) error {
bkt := getTaskBucket(tx, id)
if bkt == nil {
return errTaskUnknown
}
return fn(bkt)
}
func getTaskBucket(tx *bolt.Tx, id string) *bolt.Bucket {
return getBucket(tx, bucketKeyStorageVersion, bucketKeyTasks, []byte(id))
}
func getTasksBucket(tx *bolt.Tx) *bolt.Bucket {
return getBucket(tx, bucketKeyStorageVersion, bucketKeyTasks)
}
func getBucket(tx *bolt.Tx, keys ...[]byte) *bolt.Bucket {
bkt := tx.Bucket(keys[0])
for _, key := range keys[1:] {
if bkt == nil {
break
}
bkt = bkt.Bucket(key)
}
return bkt
}
|