1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
|
package scheduler
import (
log "github.com/golang/glog"
mesos "github.com/mesos/mesos-go/api/v0/mesosproto"
"github.com/mesos/mesos-go/api/v0/upid"
"sync"
)
type cachedOffer struct {
offer *mesos.Offer
slavePid *upid.UPID
}
func newCachedOffer(offer *mesos.Offer, slavePid *upid.UPID) *cachedOffer {
return &cachedOffer{offer: offer, slavePid: slavePid}
}
// schedCache a managed cache with backing maps to store offeres
// and tasked slaves.
type schedCache struct {
lock sync.RWMutex
savedOffers map[string]*cachedOffer // current offers key:OfferID
savedSlavePids map[string]*upid.UPID // Current saved slaves, key:slaveId
}
func newSchedCache() *schedCache {
return &schedCache{
savedOffers: make(map[string]*cachedOffer),
savedSlavePids: make(map[string]*upid.UPID),
}
}
// putOffer stores an offer and the slavePID associated with offer.
func (cache *schedCache) putOffer(offer *mesos.Offer, pid *upid.UPID) {
if offer == nil || pid == nil {
log.V(3).Infoln("WARN: Offer not cached. The offer or pid cannot be nil")
return
}
log.V(3).Infoln("Caching offer ", offer.Id.GetValue(), " with slavePID ", pid.String())
cache.lock.Lock()
cache.savedOffers[offer.Id.GetValue()] = &cachedOffer{offer: offer, slavePid: pid}
cache.lock.Unlock()
}
// getOffer returns cached offer
func (cache *schedCache) getOffer(offerId *mesos.OfferID) *cachedOffer {
if offerId == nil {
log.V(3).Infoln("WARN: OfferId == nil, returning nil")
return nil
}
cache.lock.RLock()
defer cache.lock.RUnlock()
return cache.savedOffers[offerId.GetValue()]
}
// containsOff test cache for offer(offerId)
func (cache *schedCache) containsOffer(offerId *mesos.OfferID) bool {
cache.lock.RLock()
defer cache.lock.RUnlock()
_, ok := cache.savedOffers[offerId.GetValue()]
return ok
}
func (cache *schedCache) removeOffer(offerId *mesos.OfferID) {
cache.lock.Lock()
delete(cache.savedOffers, offerId.GetValue())
cache.lock.Unlock()
}
func (cache *schedCache) putSlavePid(slaveId *mesos.SlaveID, pid *upid.UPID) {
cache.lock.Lock()
cache.savedSlavePids[slaveId.GetValue()] = pid
cache.lock.Unlock()
}
func (cache *schedCache) getSlavePid(slaveId *mesos.SlaveID) *upid.UPID {
if slaveId == nil {
log.V(3).Infoln("SlaveId == nil, returning empty UPID")
return nil
}
return cache.savedSlavePids[slaveId.GetValue()]
}
func (cache *schedCache) containsSlavePid(slaveId *mesos.SlaveID) bool {
cache.lock.RLock()
defer cache.lock.RUnlock()
_, ok := cache.savedSlavePids[slaveId.GetValue()]
return ok
}
func (cache *schedCache) removeSlavePid(slaveId *mesos.SlaveID) {
cache.lock.Lock()
delete(cache.savedSlavePids, slaveId.GetValue())
cache.lock.Unlock()
}
|