File: archivereader.go

package info (click to toggle)
gitlab-workhorse 7.6.0%2Bdebian-1
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 7,440 kB
  • sloc: makefile: 233; sh: 161; python: 15
file content (124 lines) | stat: -rw-r--r-- 2,732 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package git

import (
	"context"
	"fmt"
	"io"
	"os/exec"
	"syscall"

	pb "gitlab.com/gitlab-org/gitaly-proto/go"

	"gitlab.com/gitlab-org/gitlab-workhorse/internal/helper"
)

func parseArchiveFormat(format pb.GetArchiveRequest_Format) (*exec.Cmd, string) {
	switch format {
	case pb.GetArchiveRequest_TAR:
		return nil, "tar"
	case pb.GetArchiveRequest_TAR_GZ:
		return exec.Command("gzip", "-c", "-n"), "tar"
	case pb.GetArchiveRequest_TAR_BZ2:
		return exec.Command("bzip2", "-c"), "tar"
	case pb.GetArchiveRequest_ZIP:
		return nil, "zip"
	default:
		return nil, "invalid format"
	}
}

type archiveReader struct {
	waitCmds []*exec.Cmd
	stdout   io.Reader
}

func (a *archiveReader) Read(p []byte) (int, error) {
	n, err := a.stdout.Read(p)

	if err != io.EOF {
		return n, err
	}

	err = a.wait()
	if err == nil {
		err = io.EOF
	}
	return n, err
}

func (a *archiveReader) wait() error {
	var waitErrors []error

	// Must call Wait() on _all_ commands
	for _, cmd := range a.waitCmds {
		waitErrors = append(waitErrors, cmd.Wait())
	}

	for _, err := range waitErrors {
		if err != nil {
			return err
		}
	}
	return nil
}

func newArchiveReader(ctx context.Context, repoPath string, format pb.GetArchiveRequest_Format, archivePrefix string, commitId string) (a *archiveReader, err error) {
	a = &archiveReader{}

	compressCmd, formatArg := parseArchiveFormat(format)
	archiveCmd := gitCommand("git", "--git-dir="+repoPath, "archive", "--format="+formatArg, "--prefix="+archivePrefix+"/", commitId)

	var archiveStdout io.ReadCloser
	archiveStdout, err = archiveCmd.StdoutPipe()
	if err != nil {
		return nil, fmt.Errorf("SendArchive: archive stdout: %v", err)
	}
	defer func() {
		if err != nil {
			archiveStdout.Close()
		}
	}()

	a.stdout = archiveStdout

	if compressCmd != nil {
		compressCmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
		compressCmd.Stdin = archiveStdout

		var compressStdout io.ReadCloser
		compressStdout, err = compressCmd.StdoutPipe()
		if err != nil {
			return nil, fmt.Errorf("SendArchive: compress stdout: %v", err)
		}
		defer func() {
			if err != nil {
				compressStdout.Close()
			}
		}()

		if err := compressCmd.Start(); err != nil {
			return nil, fmt.Errorf("SendArchive: start %v: %v", compressCmd.Args, err)
		}

		go ctxKill(ctx, compressCmd)
		a.waitCmds = append(a.waitCmds, compressCmd)

		a.stdout = compressStdout
		archiveStdout.Close()
	}

	if err := archiveCmd.Start(); err != nil {
		return nil, fmt.Errorf("SendArchive: start %v: %v", archiveCmd.Args, err)
	}

	go ctxKill(ctx, archiveCmd)
	a.waitCmds = append(a.waitCmds, archiveCmd)

	return a, nil
}

func ctxKill(ctx context.Context, cmd *exec.Cmd) {
	<-ctx.Done()
	helper.CleanUpProcessGroup(cmd)
	cmd.Wait()
}