1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268
|
package tasklog
import (
"fmt"
"io"
"os"
"strings"
"sync"
"time"
"github.com/git-lfs/git-lfs/v3/tools"
isatty "github.com/mattn/go-isatty"
"github.com/olekukonko/ts"
)
const (
DefaultLoggingThrottle = 200 * time.Millisecond
)
// Logger logs a series of tasks to an io.Writer, processing each task in order
// until completion .
type Logger struct {
// sink is the writer to write to.
sink io.Writer
// widthFn is a function that returns the width of the terminal that
// this logger is running within.
widthFn func() int
// tty is true if sink is connected to a terminal
tty bool
// forceProgress forces progress status even when stdout is not a tty
forceProgress bool
// throttle is the minimum amount of time that must pass between each
// instant data is logged.
throttle time.Duration
// queue is the incoming, unbuffered queue of tasks to enqueue.
queue chan Task
// tasks is the set of tasks to process.
tasks chan Task
// wg is a WaitGroup that is incremented when new tasks are enqueued,
// and decremented when tasks finish.
wg *sync.WaitGroup
}
// Option is the type for
type Option func(*Logger)
// ForceProgress returns an options function that configures forced progress status
// on the logger.
func ForceProgress(v bool) Option {
return func(l *Logger) {
l.forceProgress = v
}
}
// NewLogger returns a new *Logger instance that logs to "sink" and uses the
// current terminal width as the width of the line. Will log progress status if
// stdout is a terminal or if forceProgress is true
func NewLogger(sink io.Writer, options ...Option) *Logger {
if sink == nil {
sink = io.Discard
}
l := &Logger{
sink: sink,
throttle: DefaultLoggingThrottle,
widthFn: func() int {
size, err := ts.GetSize()
if err != nil {
return 80
}
return size.Col()
},
queue: make(chan Task),
tasks: make(chan Task),
wg: new(sync.WaitGroup),
}
for _, option := range options {
option(l)
}
l.tty = tty(sink)
go l.consume()
return l
}
type hasFd interface {
Fd() uintptr
}
// tty returns true if the writer is connected to a tty
func tty(writer io.Writer) bool {
if v, ok := writer.(hasFd); ok {
return isatty.IsTerminal(v.Fd()) || isatty.IsCygwinTerminal(v.Fd())
}
return false
}
// Close closes the queue and does not allow new Tasks to be `enqueue()`'d. It
// waits until the currently running Task has completed.
func (l *Logger) Close() {
if l == nil {
return
}
close(l.queue)
l.wg.Wait()
}
// Waiter creates and enqueues a new *WaitingTask.
func (l *Logger) Waiter(msg string) *WaitingTask {
t := NewWaitingTask(msg)
l.Enqueue(t)
return t
}
// Percentage creates and enqueues a new *PercentageTask.
func (l *Logger) Percentage(msg string, total uint64) *PercentageTask {
t := NewPercentageTask(msg, total)
l.Enqueue(t)
return t
}
// List creates and enqueues a new *ListTask.
func (l *Logger) List(msg string) *ListTask {
t := NewListTask(msg)
l.Enqueue(t)
return t
}
// Simple creates and enqueues a new *SimpleTask.
func (l *Logger) Simple() *SimpleTask {
t := NewSimpleTask()
l.Enqueue(t)
return t
}
// Enqueue enqueues the given Tasks "ts".
func (l *Logger) Enqueue(ts ...Task) {
if l == nil {
for _, t := range ts {
if t == nil {
// NOTE: Do not allow nil tasks which are unable
// to be completed.
continue
}
go func(t Task) {
for range t.Updates() {
// Discard all updates.
}
}(t)
}
return
}
l.wg.Add(len(ts))
for _, t := range ts {
if t == nil {
// NOTE: See above.
continue
}
l.queue <- t
}
}
// consume creates a pseudo-infinte buffer between the incoming set of tasks and
// the queue of tasks to work on.
func (l *Logger) consume() {
go func() {
// Process the single next task in sequence until completion,
// then consume the next task.
for task := range l.tasks {
l.logTask(task)
}
}()
defer close(l.tasks)
for {
// Wait for either a) l.queue to close, or b) a new task
// to be submitted.
task, ok := <-l.queue
if !ok {
// If the queue is closed, no more new tasks may
// be added.
return
}
// Otherwise, add a new task to the set of tasks to
// process immediately.
l.tasks <- task
}
}
// logTask logs the set of updates from a given task to the sink, then logs a
// "done." message, and then marks the task as done.
//
// By default, the *Logger throttles log entry updates to once per the duration
// of time specified by `l.throttle time.Duration`.
//
// If the duration if 0, or the task is "durable" (by implementing
// github.com/git-lfs/git-lfs/tasklog#DurableTask), then all entries will be
// logged.
func (l *Logger) logTask(task Task) {
defer l.wg.Done()
logAll := !task.Throttled()
var last time.Time
var update *Update
for update = range task.Updates() {
if !tty(os.Stdout) && !l.forceProgress {
continue
}
if logAll || l.throttle == 0 || !update.Throttled(last.Add(l.throttle)) {
l.logLine(update.S)
last = update.At
}
}
if update != nil {
// If a task sent no updates, the last recorded update will be
// nil. Given this, only log a message when there was at least
// (1) update.
l.log(fmt.Sprintf("%s, done.\n", update.S))
}
if v, ok := task.(interface {
// OnComplete is called after the Task "task" is closed, but
// before new tasks are accepted.
OnComplete()
}); ok {
// If the Task implements this interface, call it and block
// before accepting new tasks.
v.OnComplete()
}
}
// logLine writes a complete line and moves the cursor to the beginning of the
// line.
//
// It returns the number of bytes "n" written to the sink and the error "err",
// if one was encountered.
func (l *Logger) logLine(str string) (n int, err error) {
padding := strings.Repeat(" ", tools.MaxInt(0, l.widthFn()-len(str)))
return l.log(str + padding + "\r")
}
// log writes a string verbatim to the sink.
//
// It returns the number of bytes "n" written to the sink and the error "err",
// if one was encountered.
func (l *Logger) log(str string) (n int, err error) {
return fmt.Fprint(l.sink, str)
}
|