1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226
|
package agent
import (
"context"
"errors"
"fmt"
"sync"
"time"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/module/modagent"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/module/starboard_vulnerability/agent/resources"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/logz"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/pkg/agentcfg"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
)
const (
maxParallel int = 10 // Trivy scanner Pods batch size
trivyK8sWrapper = "registry.gitlab.com/security-products/trivy-k8s-wrapper:0.2.14"
kindPod = "Pod"
kindReplicaSet = "ReplicaSet"
kindReplicationController = "ReplicationController"
kindStatefulSet = "StatefulSet"
kindDaemonSet = "DaemonSet"
kindCronJob = "CronJob"
kindJob = "Job"
)
type scanJob struct {
log *zap.Logger
api modagent.API
kubeClientset kubernetes.Interface
gitlabAgentNamespace string
gitlabAgentServiceAccount string
agentID int64
targetNamespaces []string
resourceRequirements *agentcfg.ResourceRequirements
ocsServiceAccountName string
}
func (s *scanJob) Run(ctx context.Context) {
if err := s.scan(ctx); err != nil {
s.log.Error("Error running vulnerability scan", logz.Error(err))
}
}
type uuidCollection struct {
uuids []string
mux sync.Mutex
}
func (u *uuidCollection) Append(uuids []string) {
u.mux.Lock()
u.uuids = append(u.uuids, uuids...)
u.mux.Unlock()
}
func (u *uuidCollection) Items() []string {
u.mux.Lock()
defer u.mux.Unlock()
return u.uuids
}
func (s *scanJob) scan(ctx context.Context) error {
s.log.Info("Start Trivy k8s scan")
var allUUIDs uuidCollection
reporter := NewReporter(s.log, s.api)
var wg wait.Group
limit := make(chan struct{}, maxParallel)
for i := range s.targetNamespaces {
targetNamespace := s.targetNamespaces[i]
wg.Start(func() {
limit <- struct{}{}
defer func() { <-limit }()
scanLogger := s.log.With(logz.TargetNamespace(targetNamespace))
uuids, err := s.startPodScanForNamespace(ctx, scanLogger, targetNamespace, reporter)
if err != nil {
// Not logging errors for context canceled since this is part of normal operation that can be triggered when agent configuration changes.
if errors.Is(err, context.Canceled) {
return
}
if errors.Is(err, context.DeadlineExceeded) {
s.log.Error("Error running Trivy scan due to context timeout")
return
}
s.log.Error("Error running Trivy scan", logz.Error(err))
return
}
allUUIDs.Append(uuids)
})
}
wg.Wait()
if len(allUUIDs.Items()) != 0 {
s.log.Info("Resolving no longer detected vulnerabilities in GitLab")
err := reporter.ResolveVulnerabilities(ctx, allUUIDs.Items())
s.log.Info("Resolved no longer detected vulnerabilities in GitLab")
if err != nil {
return fmt.Errorf("error resolving vulnerabilities: %w", err)
}
}
return nil
}
func (s *scanJob) startPodScanForNamespace(ctx context.Context, scanLogger *zap.Logger, targetNamespace string, reporter *Reporter) ([]string, error) {
// The default timeout for Trivy scan is 5 minutes, using a context.WithTimout to ensure that this function doesn't get stuck perpetually.
// Giving additional 5 minutes after Trivy scan is complete for reading the chaned configmaps and transmitting the vulnerability report.
ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
defer cancel()
var uuids []string
podName := fmt.Sprintf("trivy-scan-%s", targetNamespace)
scanLogger = scanLogger.With(logz.PodName(podName))
manager := scanningManager{
kubeClientset: s.kubeClientset,
resourcesManager: &resources.Manager{
Requirements: s.resourceRequirements,
},
agentID: s.agentID,
gitlabAgentNamespace: s.gitlabAgentNamespace,
scanLogger: scanLogger,
}
if err := manager.deleteChainedConfigmaps(ctx, targetNamespace); err != nil {
return nil, fmt.Errorf("could not delete configmaps, before deploying scanning pod: %w", err)
}
podSpec, errP := manager.getScanningPodSpecs(podName, targetNamespace, s.ocsServiceAccountName)
if errP != nil {
return nil, fmt.Errorf("could not get specs for OCS Scanning Pod: %w", errP)
}
scanLogger.Debug("Deploying OCS Scanning Pod")
if err := manager.deployScanningPod(ctx, podSpec, podName); err != nil {
return nil, fmt.Errorf("could not deploy scanning pod: %w", err)
}
defer func() { //nolint:contextcheck
scanLogger.Debug("Deleting OCS Scanning Pod")
if err := manager.deleteScanningPod(podName); err != nil {
scanLogger.Error("Error deleting Pod", logz.Error(err))
}
}()
scanLogger.Debug("Start watcher for OCS Scanning Pod")
watcher, err := manager.watchScanningPod(ctx, podName)
if err != nil {
return nil, fmt.Errorf("could not start watcher for OCS Scanning Pod: %w", err)
}
defer watcher.Stop()
for {
select {
case event, ok := <-watcher.ResultChan():
if !ok {
return nil, errors.New("channel closed unexpectedly")
}
pod, ok := event.Object.(*corev1.Pod)
if !ok {
return nil, errors.New("watcher received unexpected object that is not a Pod")
}
scanLogger.Debug("pod status", logz.PodStatus(string(pod.Status.Phase)))
if pod.Status.Phase == corev1.PodFailed {
scanLogger.Info("OCS Scanning Pod failed")
scanLogger.Info("Deleting remaining chained configmaps")
if err := manager.deleteChainedConfigmaps(ctx, targetNamespace); err != nil {
return nil, fmt.Errorf("could not delete configmaps: %w", err)
}
if len(pod.Status.ContainerStatuses) != 1 {
return nil, errors.New("OCS Scanning pod should have only one container")
}
if pod.Status.ContainerStatuses[0].State.Terminated != nil {
return nil, manager.extractExitCodeError(pod.Status.ContainerStatuses[0].State.Terminated.ExitCode,
pod.Status.ContainerStatuses[0].State.Terminated.Reason)
}
return nil, fmt.Errorf("OCS Scanning pod exited with an error. Could not retrieve an exit code.")
}
if pod.Status.Phase == corev1.PodSucceeded {
scanLogger.Info("OCS Scanning Pod Succeeded")
scanLogger.Info("Reading chained configmaps")
defer func() {
scanLogger.Info("Deleting chained configmaps")
if err := manager.deleteChainedConfigmaps(ctx, targetNamespace); err != nil {
scanLogger.Error("Could not delete ConfigMaps", logz.Error(err))
}
}()
payload, trivyVersion, err := manager.readChainedConfigmaps(ctx, targetNamespace)
if err != nil {
return nil, fmt.Errorf("could not read chained ConfigMaps: %w", err)
}
payloads, err := manager.parseScaningPodPayload(payload, trivyVersion)
if err != nil {
return nil, fmt.Errorf("could not parse OCS scanning pod report: %w", err)
}
uuids, err = reporter.Transmit(ctx, payloads)
if err != nil {
return nil, fmt.Errorf("error transmitting vulnerability reports: %w", err)
}
scanLogger.Info("Transmitted vulnerabilities to Gitlab")
return uuids, nil
}
case <-ctx.Done():
scanLogger.Debug("Stopping watcher as context canceled")
return nil, ctx.Err()
}
}
}
|