File: restore.go

package info (click to toggle)
gitlab-shell 14.35.0%2Bds1-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 23,652 kB
  • sloc: ruby: 1,129; makefile: 583; sql: 391; sh: 384
file content (104 lines) | stat: -rw-r--r-- 3,387 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package main

import (
	"context"
	"encoding/json"
	"errors"
	"flag"
	"fmt"
	"io"
	"runtime"
	"strings"
	"time"

	log "github.com/sirupsen/logrus"
	"gitlab.com/gitlab-org/gitaly/v16/client"
	"gitlab.com/gitlab-org/gitaly/v16/internal/backup"
	"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
	internalclient "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
	"gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
)

type restoreRequest struct {
	storage.ServerInfo
	StorageName   string `json:"storage_name"`
	RelativePath  string `json:"relative_path"`
	GlProjectPath string `json:"gl_project_path"`
	AlwaysCreate  bool   `json:"always_create"`
}

type restoreSubcommand struct {
	backupPath            string
	parallel              int
	parallelStorage       int
	layout                string
	removeAllRepositories []string
}

func (cmd *restoreSubcommand) Flags(fs *flag.FlagSet) {
	fs.StringVar(&cmd.backupPath, "path", "", "repository backup path")
	fs.IntVar(&cmd.parallel, "parallel", runtime.NumCPU(), "maximum number of parallel restores")
	fs.IntVar(&cmd.parallelStorage, "parallel-storage", 2, "maximum number of parallel restores per storage. Note: actual parallelism when combined with `-parallel` depends on the order the repositories are received.")
	fs.StringVar(&cmd.layout, "layout", "pointer", "how backup files are located. Either pointer or legacy.")
	fs.Func("remove-all-repositories", "comma-separated list of storage names to have all repositories removed from before restoring.", func(removeAll string) error {
		cmd.removeAllRepositories = strings.Split(removeAll, ",")
		return nil
	})
}

func (cmd *restoreSubcommand) Run(ctx context.Context, stdin io.Reader, stdout io.Writer) error {
	sink, err := backup.ResolveSink(ctx, cmd.backupPath)
	if err != nil {
		return fmt.Errorf("restore: resolve sink: %w", err)
	}

	locator, err := backup.ResolveLocator(cmd.layout, sink)
	if err != nil {
		return fmt.Errorf("restore: resolve locator: %w", err)
	}

	pool := client.NewPool(internalclient.UnaryInterceptor(), internalclient.StreamInterceptor())
	defer pool.Close()

	manager := backup.NewManager(sink, locator, pool, time.Now().UTC().Format("20060102150405"))
	logger := log.StandardLogger()

	for _, storageName := range cmd.removeAllRepositories {
		err := manager.RemoveAllRepositories(ctx, &backup.RemoveAllRepositoriesRequest{
			StorageName: storageName,
		})
		if err != nil {
			// Treat RemoveAll failures as soft failures until we can determine
			// how often it fails.
			logger.WithError(err).Warnf("failed to remove all repositories from %s", storageName)
		}
	}

	var pipeline backup.Pipeline
	pipeline = backup.NewLoggingPipeline(logger)
	if cmd.parallel > 0 || cmd.parallelStorage > 0 {
		pipeline = backup.NewParallelPipeline(pipeline, cmd.parallel, cmd.parallelStorage)
	}

	decoder := json.NewDecoder(stdin)
	for {
		var req restoreRequest
		if err := decoder.Decode(&req); errors.Is(err, io.EOF) {
			break
		} else if err != nil {
			return fmt.Errorf("restore: %w", err)
		}

		repo := gitalypb.Repository{
			StorageName:   req.StorageName,
			RelativePath:  req.RelativePath,
			GlProjectPath: req.GlProjectPath,
		}
		pipeline.Handle(ctx, backup.NewRestoreCommand(manager, req.ServerInfo, &repo, req.AlwaysCreate))
	}

	if err := pipeline.Done(); err != nil {
		return fmt.Errorf("restore: %w", err)
	}
	return nil
}