1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
|
package main
import (
"context"
"encoding/json"
"flag"
"fmt"
"io"
"runtime"
"time"
log "github.com/sirupsen/logrus"
"gitlab.com/gitlab-org/gitaly/v16/client"
"gitlab.com/gitlab-org/gitaly/v16/internal/backup"
"gitlab.com/gitlab-org/gitaly/v16/internal/gitaly/storage"
internalclient "gitlab.com/gitlab-org/gitaly/v16/internal/grpc/client"
"gitlab.com/gitlab-org/gitaly/v16/proto/go/gitalypb"
)
type serverRepository struct {
storage.ServerInfo
StorageName string `json:"storage_name"`
RelativePath string `json:"relative_path"`
GlProjectPath string `json:"gl_project_path"`
}
type createSubcommand struct {
backupPath string
parallel int
parallelStorage int
layout string
incremental bool
backupID string
}
func (cmd *createSubcommand) Flags(fs *flag.FlagSet) {
fs.StringVar(&cmd.backupPath, "path", "", "repository backup path")
fs.IntVar(&cmd.parallel, "parallel", runtime.NumCPU(), "maximum number of parallel backups")
fs.IntVar(&cmd.parallelStorage, "parallel-storage", 2, "maximum number of parallel backups per storage. Note: actual parallelism when combined with `-parallel` depends on the order the repositories are received.")
fs.StringVar(&cmd.layout, "layout", "pointer", "how backup files are located. Either pointer or legacy.")
fs.BoolVar(&cmd.incremental, "incremental", false, "creates an incremental backup if possible.")
fs.StringVar(&cmd.backupID, "id", time.Now().UTC().Format("20060102150405"), "the backup ID used when creating a full backup.")
}
func (cmd *createSubcommand) Run(ctx context.Context, stdin io.Reader, stdout io.Writer) error {
sink, err := backup.ResolveSink(ctx, cmd.backupPath)
if err != nil {
return fmt.Errorf("create: resolve sink: %w", err)
}
locator, err := backup.ResolveLocator(cmd.layout, sink)
if err != nil {
return fmt.Errorf("create: resolve locator: %w", err)
}
pool := client.NewPool(internalclient.UnaryInterceptor(), internalclient.StreamInterceptor())
defer pool.Close()
manager := backup.NewManager(sink, locator, pool, cmd.backupID)
var pipeline backup.Pipeline
pipeline = backup.NewLoggingPipeline(log.StandardLogger())
if cmd.parallel > 0 || cmd.parallelStorage > 0 {
pipeline = backup.NewParallelPipeline(pipeline, cmd.parallel, cmd.parallelStorage)
}
decoder := json.NewDecoder(stdin)
for {
var sr serverRepository
if err := decoder.Decode(&sr); err == io.EOF {
break
} else if err != nil {
return fmt.Errorf("create: %w", err)
}
repo := gitalypb.Repository{
StorageName: sr.StorageName,
RelativePath: sr.RelativePath,
GlProjectPath: sr.GlProjectPath,
}
pipeline.Handle(ctx, backup.NewCreateCommand(manager, sr.ServerInfo, &repo, cmd.incremental))
}
if err := pipeline.Done(); err != nil {
return fmt.Errorf("create: %w", err)
}
return nil
}
|