File: iocopy.go

package info (click to toggle)
golang-github-coreos-pkg 3-1~bpo8%2B1
  • links: PTS, VCS
  • area: main
  • in suites: jessie-backports
  • size: 320 kB
  • sloc: sh: 30; makefile: 3
file content (189 lines) | stat: -rw-r--r-- 4,708 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
// Copyright 2016 CoreOS Inc
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package progressutil

import (
	"errors"
	"fmt"
	"io"
	"sync"
	"time"
)

var (
	ErrAlreadyStarted = errors.New("cannot add copies after PrintAndWait has been called")
)

type copyReader struct {
	reader  io.Reader
	current int64
	total   int64
	pb      *ProgressBar
}

func (cr *copyReader) Read(p []byte) (int, error) {
	n, err := cr.reader.Read(p)
	cr.current += int64(n)
	err1 := cr.updateProgressBar()
	if err == nil {
		err = err1
	}
	return n, err
}

func (cr *copyReader) updateProgressBar() error {
	cr.pb.SetPrintAfter(cr.formattedProgress())

	progress := float64(cr.current) / float64(cr.total)
	if progress > 1 {
		progress = 1
	}
	return cr.pb.SetCurrentProgress(progress)
}

// NewCopyProgressPrinter returns a new CopyProgressPrinter
func NewCopyProgressPrinter() *CopyProgressPrinter {
	return &CopyProgressPrinter{
		results: make(chan error),
		cancel:  make(chan struct{}),
		pbp:     &ProgressBarPrinter{PadToBeEven: true},
	}
}

// CopyProgressPrinter will perform an arbitrary number of io.Copy calls, while
// continually printing the progress of each copy.
type CopyProgressPrinter struct {
	results chan error
	cancel  chan struct{}

	// `lock` mutex protects all fields below it in CopyProgressPrinter struct
	lock    sync.Mutex
	readers []*copyReader
	started bool
	pbp     *ProgressBarPrinter
}

// AddCopy adds a copy for this CopyProgressPrinter to perform. An io.Copy call
// will be made to copy bytes from reader to dest, and name and size will be
// used to label the progress bar and display how much progress has been made.
// If size is 0, the total size of the reader is assumed to be unknown.
// AddCopy can only be called before PrintAndWait; otherwise, ErrAlreadyStarted
// will be returned.
func (cpp *CopyProgressPrinter) AddCopy(reader io.Reader, name string, size int64, dest io.Writer) error {
	cpp.lock.Lock()
	defer cpp.lock.Unlock()

	if cpp.started {
		return ErrAlreadyStarted
	}

	cr := &copyReader{
		reader:  reader,
		current: 0,
		total:   size,
		pb:      cpp.pbp.AddProgressBar(),
	}
	cr.pb.SetPrintBefore(name)
	cr.pb.SetPrintAfter(cr.formattedProgress())

	cpp.readers = append(cpp.readers, cr)

	go func() {
		_, err := io.Copy(dest, cr)
		select {
		case <-cpp.cancel:
			return
		case cpp.results <- err:
			return
		}
	}()
	return nil
}

// PrintAndWait will print the progress for each copy operation added with
// AddCopy to printTo every printInterval. This will continue until every added
// copy is finished, or until cancel is written to.
// PrintAndWait may only be called once; any subsequent calls will immediately
// return ErrAlreadyStarted.  After PrintAndWait has been called, no more
// copies may be added to the CopyProgressPrinter.
func (cpp *CopyProgressPrinter) PrintAndWait(printTo io.Writer, printInterval time.Duration, cancel chan struct{}) error {
	cpp.lock.Lock()
	if cpp.started {
		cpp.lock.Unlock()
		return ErrAlreadyStarted
	}
	cpp.started = true
	cpp.lock.Unlock()

	n := len(cpp.readers)
	if n == 0 {
		// Nothing to do.
		return nil
	}

	defer close(cpp.cancel)
	t := time.NewTicker(printInterval)
	allDone := false
	for i := 0; i < n; {
		select {
		case <-cancel:
			return nil
		case <-t.C:
			_, err := cpp.pbp.Print(printTo)
			if err != nil {
				return err
			}
		case err := <-cpp.results:
			i++
			// Once completion is signaled, further on this just drains
			// (unlikely) errors from the channel.
			if err == nil && !allDone {
				allDone, err = cpp.pbp.Print(printTo)
			}
			if err != nil {
				return err
			}
		}
	}
	return nil
}

func (cr *copyReader) formattedProgress() string {
	var totalStr string
	if cr.total == 0 {
		totalStr = "?"
	} else {
		totalStr = ByteUnitStr(cr.total)
	}
	return fmt.Sprintf("%s / %s", ByteUnitStr(cr.current), totalStr)
}

var byteUnits = []string{"B", "KB", "MB", "GB", "TB", "PB"}

// ByteUnitStr pretty prints a number of bytes.
func ByteUnitStr(n int64) string {
	var unit string
	size := float64(n)
	for i := 1; i < len(byteUnits); i++ {
		if size < 1000 {
			unit = byteUnits[i-1]
			break
		}

		size = size / 1000
	}

	return fmt.Sprintf("%.3g %s", size, unit)
}