1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308
|
package agent
//goland:noinspection GoSnakeCaseUsage
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"time"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/module/modagent"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/module/remote_development/agent/k8s"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/errz"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/httpz"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/logz"
"gitlab.com/gitlab-org/cluster-integration/gitlab-agent/v16/internal/tool/retry"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/api/errors"
)
type MessageType string
type WorkspaceUpdateType string
type TerminationProgress string
const (
WorkspaceUpdateTypePartial WorkspaceUpdateType = "partial"
WorkspaceUpdateTypeFull WorkspaceUpdateType = "full"
WorkspaceStateTerminated string = "Terminated"
TerminationProgressTerminating TerminationProgress = "Terminating"
TerminationProgressTerminated TerminationProgress = "Terminated"
)
// reconciler is equipped to and responsible for carrying
// one cycle of reconciliation when reconciler.Run() is invoked
type reconciler struct {
log *zap.Logger
agentId int64
api modagent.Api
pollConfig retry.PollConfigFactory
pollFunction func(ctx context.Context, cfg retry.PollConfig, f retry.PollWithBackoffCtxFunc) error
stateTracker *persistedStateTracker
informer informer
k8sClient k8s.Client
// This is used to determine whether the reconciliation cycle corresponds to
// a full or partial sync. When a reconciler runs for the first time, this will be false
// indicating a full sync. After the full sync successfully completes, this will be
// indicating partial sync for subsequent cycles for the same reconciler
hasFullSyncRunBefore bool
// terminatingTracker tracks all workspaces for whom termination has been initiated but
// still exist in the cluster and therefore considered "Terminating". These workspaces are
// removed from the tracker once they are removed from the cluster and their Terminated
// status is persisted in Rails
terminatingTracker persistedTerminatingWorkspacesTracker
}
// TODO: revisit all request and response types, and make more strongly typed if possible
// issue: https://gitlab.com/gitlab-org/gitlab/-/issues/396882
type WorkspaceAgentInfo struct {
Name string `json:"name"`
Namespace string `json:"namespace,omitempty"`
LatestK8sDeploymentInfo map[string]interface{} `json:"latest_k8s_deployment_info,omitempty"`
TerminationProgress TerminationProgress `json:"termination_progress,omitempty"`
}
type RequestPayload struct {
UpdateType WorkspaceUpdateType `json:"update_type"`
WorkspaceAgentInfos []WorkspaceAgentInfo `json:"workspace_agent_infos"`
}
type WorkspaceRailsInfo struct {
Name string `json:"name"`
Namespace string `json:"namespace"`
DeploymentResourceVersion string `json:"deployment_resource_version,omitempty"`
ActualState string `json:"actual_state,omitempty"`
DesiredState string `json:"desired_state,omitempty"`
ConfigToApply string `json:"config_to_apply,omitempty"`
}
type ResponsePayload struct {
WorkspaceRailsInfos []*WorkspaceRailsInfo `json:"workspace_rails_infos"`
}
func (r *reconciler) Run(ctx context.Context) error {
r.log.Debug("Running reconciliation loop")
defer r.log.Debug("Reconciliation loop ended")
// Load and the info on the workspaces from the informer. Skip it if the persisted state in
// rails is already up-to-date for the workspace
workspaceAgentInfos := r.generateWorkspaceAgentInfos()
updateType := WorkspaceUpdateTypePartial
if !r.hasFullSyncRunBefore {
updateType = WorkspaceUpdateTypeFull
}
// Submit the workspace update request to the Rails API. Sends the latest AgentInfos in the request,
// and receives the latest RailsInfos in the response.
workspaceRailsInfos, err := r.performWorkspaceUpdateRequestToRailsApi(ctx, updateType, workspaceAgentInfos)
if err != nil {
return err
}
// Workspace update request was completed successfully, now process any RailsInfos received in the response
for _, workspaceRailsInfo := range workspaceRailsInfos {
// TODO: discuss whether/how to deal with errors returned by rails when processing individual workspaces
// for ex. there may be a case where no workspace is found for a given combination of workspace name &
// workspace
// issue: https://gitlab.com/gitlab-org/gitlab/-/issues/409807
err = r.applyWorkspaceChanges(ctx, workspaceRailsInfo)
if err != nil {
// TODO: how to report this back to rails?
// issue: https://gitlab.com/gitlab-org/gitlab/-/issues/397001
r.api.HandleProcessingError(ctx, r.log, r.agentId, "Error when applying workspace info", err)
}
}
r.hasFullSyncRunBefore = true
return nil
}
func (r *reconciler) applyWorkspaceChanges(ctx context.Context, workspaceRailsInfo *WorkspaceRailsInfo) error {
r.stateTracker.recordVersion(workspaceRailsInfo)
name := workspaceRailsInfo.Name
namespace := workspaceRailsInfo.Namespace
// When desired state is Terminated, trigger workspace deletion and exit early
// to avoid processing/applying the workspace config
if workspaceRailsInfo.DesiredState == WorkspaceStateTerminated {
// Handle Terminated state (delete the namespace and workspace) and return
err := r.handleDesiredStateIsTerminated(ctx, name, namespace, workspaceRailsInfo.ActualState)
if err != nil {
return fmt.Errorf("error when handling terminated state for workspace %s: %w", name, err)
}
// we don't want to continue by creating namespace if we just terminated the workspace
return nil
}
// Desired state is not Terminated, so continue to handle workspace creation and config apply if needed
// Create namespace if needed
namespaceExists := r.k8sClient.NamespaceExists(ctx, namespace)
if !namespaceExists {
err := r.k8sClient.CreateNamespace(ctx, namespace)
if err != nil && !errors.IsAlreadyExists(err) {
return fmt.Errorf("error creating namespace %s: %w", namespace, err)
}
}
// Apply workspace config if one was provided in the workspaceRailsInfo
if workspaceRailsInfo.ConfigToApply != "" {
err := r.k8sClient.Apply(ctx, workspaceRailsInfo.Namespace, workspaceRailsInfo.ConfigToApply)
if err != nil {
return fmt.Errorf("error applying workspace config (namespace %s, workspace name %s): %w", namespace, name, err)
}
}
return nil
}
func (r *reconciler) generateWorkspaceAgentInfos() []WorkspaceAgentInfo {
parsedWorkspaces := r.informer.List()
// "workspaceAgentInfos" is constructed by looping over "parsedWorkspaces" and "terminatingTracker".
// It can remain a nil slice because GitLab already has the latest version of all workspaces,
// However, we want it to be an empty(0-length) slice. Hence, initializing it.
// TODO: add a test case - https://gitlab.com/gitlab-org/gitlab/-/issues/407554
workspaceAgentInfos := make([]WorkspaceAgentInfo, 0)
// nonTerminatedWorkspaces is a set of all workspaces in the cluster returned by the informer
// it is compared with the workspaces in the terminatingTracker (considered "Terminating") to determine
// which works have been removed from the cluster and can be deemed "Terminated"
nonTerminatedWorkspaces := make(map[string]struct{})
for _, workspace := range parsedWorkspaces {
// any workspace returned by the informer is deemed as having not been terminated irrespective of its status
// workspaces that have been terminated completely will be absent from this set
nonTerminatedWorkspaces[workspace.Name] = struct{}{}
// if Rails already knows about the latest version of the resource, don't send the info again
if r.stateTracker.isPersisted(workspace.Name, workspace.ResourceVersion) {
r.log.Debug("Skipping sending workspace info. GitLab already has the latest version", logz.WorkspaceName(workspace.Name))
continue
}
workspaceAgentInfos = append(workspaceAgentInfos, WorkspaceAgentInfo{
Name: workspace.Name,
Namespace: workspace.Namespace,
LatestK8sDeploymentInfo: workspace.K8sDeploymentInfo,
})
}
// For each workspace that has been scheduled for termination, check if it exists in the cluster
// If it is missing from the cluster, it can be considered as Terminated otherwise Terminating
//
// TODO this implementation will repeatedly send workspaces that have to be completely removed from the
// cluster and are still considered Terminating. This may need to be optimized in case it becomes an issue
// issue: https://gitlab.com/gitlab-org/gitlab/-/issues/408844
for entry := range r.terminatingTracker {
_, existsInCluster := nonTerminatedWorkspaces[entry.name]
terminationProgress := TerminationProgressTerminating
if !existsInCluster {
terminationProgress = TerminationProgressTerminated
}
r.log.Debug("Sending termination progress workspace info workspace",
logz.WorkspaceName(entry.name),
logz.WorkspaceTerminationProgress(string(terminationProgress)),
)
workspaceAgentInfos = append(workspaceAgentInfos, WorkspaceAgentInfo{
Name: entry.name,
Namespace: entry.namespace,
TerminationProgress: terminationProgress,
})
}
return workspaceAgentInfos
}
func (r *reconciler) performWorkspaceUpdateRequestToRailsApi(
ctx context.Context,
updateType WorkspaceUpdateType,
workspaceAgentInfos []WorkspaceAgentInfo,
) (workspaceRailsInfos []*WorkspaceRailsInfo, retError error) {
// Do the POST request to the Rails API
r.log.Debug("Making GitLab request")
startTime := time.Now()
var requestPayload = RequestPayload{
UpdateType: updateType,
WorkspaceAgentInfos: workspaceAgentInfos,
}
// below code is from internal/module/starboard_vulnerability/agent/reporter.go
resp, err := r.api.MakeGitLabRequest(ctx, "/reconcile",
modagent.WithRequestMethod(http.MethodPost),
modagent.WithJsonRequestBody(requestPayload),
) // nolint: govet
if err != nil {
return nil, fmt.Errorf("error making api request: %w", err)
}
r.log.Debug("Made request to the Rails API",
logz.StatusCode(resp.StatusCode),
logz.RequestId(resp.Header.Get(httpz.RequestIdHeader)),
logz.DurationInMilliseconds(time.Since(startTime)),
)
defer errz.SafeClose(resp.Body, &retError)
if resp.StatusCode != http.StatusCreated {
return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("error reading response body: %w", err)
}
var responsePayload ResponsePayload
err = json.Unmarshal(body, &responsePayload)
if err != nil {
return nil, fmt.Errorf("error parsing response body: %w", err)
}
r.log.Debug(
"Read body from the Rails API",
logz.PayloadSizeInBytes(len(body)),
logz.WorkspaceDataCount(len(responsePayload.WorkspaceRailsInfos)),
)
return responsePayload.WorkspaceRailsInfos, nil
}
func (r *reconciler) handleDesiredStateIsTerminated(ctx context.Context, name string, namespace string, actualState string) error {
if r.terminatingTracker.isTerminating(name, namespace) && actualState == WorkspaceStateTerminated {
r.log.Debug("ActualState=Terminated, so deleting it from persistedTerminatingWorkspacesTracker", logz.WorkspaceNamespace(namespace))
r.terminatingTracker.delete(name, namespace)
r.stateTracker.delete(name)
return nil
}
if !r.k8sClient.NamespaceExists(ctx, namespace) {
// nothing to as the workspace has already absent from the cluster
return nil
}
r.log.Debug("Namespace for terminated workspace still exists, so deleting the namespace", logz.WorkspaceNamespace(namespace))
err := r.k8sClient.DeleteNamespace(ctx, namespace)
if err != nil {
return fmt.Errorf("failed to terminate workspace by deleting namespace: %w", err)
}
r.terminatingTracker.add(name, namespace)
return nil
}
func (r *reconciler) Stop() {
// this will be invoked at the end of each full cycle with the outgoing reconciler and its informer being stopped
// and new reconcilers (with new informers) created from scratch. The underlying principle is to prevent a re-use of
// reconciler state when a full sync occurs to prevent issues due to corruption of internal state
// However, this decision can be revisited in the future in of any issues
// issue: https://gitlab.com/gitlab-org/gitlab/-/issues/404748
r.informer.Stop()
}
|