File: source.go

package info (click to toggle)
singularity-container 4.1.5%2Bds4-1
  • links: PTS, VCS
  • area: main
  • in suites: sid
  • size: 43,876 kB
  • sloc: asm: 14,840; sh: 3,190; ansic: 1,751; awk: 414; makefile: 413; python: 99
file content (126 lines) | stat: -rw-r--r-- 2,913 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package ops

import (
	"context"
	"strings"
	"sync"

	"github.com/moby/buildkit/session"
	"github.com/moby/buildkit/solver"
	"github.com/moby/buildkit/solver/llbsolver/ops/opsutils"
	"github.com/moby/buildkit/solver/pb"
	"github.com/moby/buildkit/source"
	"github.com/moby/buildkit/worker"
	digest "github.com/opencontainers/go-digest"
	"golang.org/x/sync/semaphore"
)

const sourceCacheType = "buildkit.source.v0"

type SourceOp struct {
	mu          sync.Mutex
	op          *pb.Op_Source
	platform    *pb.Platform
	sm          *source.Manager
	src         source.SourceInstance
	sessM       *session.Manager
	w           worker.Worker
	vtx         solver.Vertex
	parallelism *semaphore.Weighted
	pin         string
	id          source.Identifier
}

var _ solver.Op = &SourceOp{}

func NewSourceOp(vtx solver.Vertex, op *pb.Op_Source, platform *pb.Platform, sm *source.Manager, parallelism *semaphore.Weighted, sessM *session.Manager, w worker.Worker) (*SourceOp, error) {
	if err := opsutils.Validate(&pb.Op{Op: op}); err != nil {
		return nil, err
	}
	return &SourceOp{
		op:          op,
		sm:          sm,
		w:           w,
		sessM:       sessM,
		platform:    platform,
		vtx:         vtx,
		parallelism: parallelism,
	}, nil
}

func (s *SourceOp) IsProvenanceProvider() {}

func (s *SourceOp) Pin() (source.Identifier, string) {
	return s.id, s.pin
}

func (s *SourceOp) instance(ctx context.Context) (source.SourceInstance, error) {
	s.mu.Lock()
	defer s.mu.Unlock()
	if s.src != nil {
		return s.src, nil
	}
	id, err := s.sm.Identifier(s.op, s.platform)
	if err != nil {
		return nil, err
	}
	src, err := s.sm.Resolve(ctx, id, s.sessM, s.vtx)
	if err != nil {
		return nil, err
	}
	s.src = src
	s.id = id
	return s.src, nil
}

func (s *SourceOp) CacheMap(ctx context.Context, g session.Group, index int) (*solver.CacheMap, bool, error) {
	src, err := s.instance(ctx)
	if err != nil {
		return nil, false, err
	}

	k, pin, cacheOpts, done, err := src.CacheKey(ctx, g, index)
	if err != nil {
		return nil, false, err
	}

	if s.pin == "" {
		s.pin = pin
	}

	dgst := digest.FromBytes([]byte(sourceCacheType + ":" + k))
	if strings.HasPrefix(k, "session:") {
		dgst = digest.Digest("random:" + dgst.Encoded())
	}

	return &solver.CacheMap{
		// TODO: add os/arch
		Digest: dgst,
		Opts:   cacheOpts,
	}, done, nil
}

func (s *SourceOp) Exec(ctx context.Context, g session.Group, _ []solver.Result) (outputs []solver.Result, err error) {
	src, err := s.instance(ctx)
	if err != nil {
		return nil, err
	}
	ref, err := src.Snapshot(ctx, g)
	if err != nil {
		return nil, err
	}
	return []solver.Result{worker.NewWorkerRefResult(ref, s.w)}, nil
}

func (s *SourceOp) Acquire(ctx context.Context) (solver.ReleaseFunc, error) {
	if s.parallelism == nil {
		return func() {}, nil
	}
	err := s.parallelism.Acquire(ctx, 1)
	if err != nil {
		return nil, err
	}
	return func() {
		s.parallelism.Release(1)
	}, nil
}