1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164
|
// Package streamformatter provides helper functions to format a stream.
package streamformatter // import "github.com/docker/docker/pkg/streamformatter"
import (
"encoding/json"
"fmt"
"io"
"sync"
"github.com/docker/docker/pkg/jsonmessage"
"github.com/docker/docker/pkg/progress"
)
const streamNewline = "\r\n"
type jsonProgressFormatter struct{}
func appendNewline(source []byte) []byte {
return append(source, []byte(streamNewline)...)
}
// FormatStatus formats the specified objects according to the specified format (and id).
func FormatStatus(id, format string, a ...interface{}) []byte {
str := fmt.Sprintf(format, a...)
b, err := json.Marshal(&jsonmessage.JSONMessage{ID: id, Status: str})
if err != nil {
return FormatError(err)
}
return appendNewline(b)
}
// FormatError formats the error as a JSON object
func FormatError(err error) []byte {
jsonError, ok := err.(*jsonmessage.JSONError)
if !ok {
jsonError = &jsonmessage.JSONError{Message: err.Error()}
}
if b, err := json.Marshal(&jsonmessage.JSONMessage{Error: jsonError, ErrorMessage: err.Error()}); err == nil {
return appendNewline(b)
}
return []byte(`{"error":"format error"}` + streamNewline)
}
func (sf *jsonProgressFormatter) formatStatus(id, format string, a ...interface{}) []byte {
return FormatStatus(id, format, a...)
}
// formatProgress formats the progress information for a specified action.
func (sf *jsonProgressFormatter) formatProgress(id, action string, progress *jsonmessage.JSONProgress, aux interface{}) []byte {
if progress == nil {
progress = &jsonmessage.JSONProgress{}
}
var auxJSON *json.RawMessage
if aux != nil {
auxJSONBytes, err := json.Marshal(aux)
if err != nil {
return nil
}
auxJSON = new(json.RawMessage)
*auxJSON = auxJSONBytes
}
b, err := json.Marshal(&jsonmessage.JSONMessage{
Status: action,
ProgressMessage: progress.String(),
Progress: progress,
ID: id,
Aux: auxJSON,
})
if err != nil {
return nil
}
return appendNewline(b)
}
type rawProgressFormatter struct{}
func (sf *rawProgressFormatter) formatStatus(id, format string, a ...interface{}) []byte {
return []byte(fmt.Sprintf(format, a...) + streamNewline)
}
func (sf *rawProgressFormatter) formatProgress(id, action string, progress *jsonmessage.JSONProgress, aux interface{}) []byte {
if progress == nil {
progress = &jsonmessage.JSONProgress{}
}
endl := "\r"
if progress.String() == "" {
endl += "\n"
}
return []byte(action + " " + progress.String() + endl)
}
// NewProgressOutput returns a progress.Output object that can be passed to
// progress.NewProgressReader.
func NewProgressOutput(out io.Writer) progress.Output {
return &progressOutput{sf: &rawProgressFormatter{}, out: out, newLines: true}
}
// NewJSONProgressOutput returns a progress.Output that formats output
// using JSON objects
func NewJSONProgressOutput(out io.Writer, newLines bool) progress.Output {
return &progressOutput{sf: &jsonProgressFormatter{}, out: out, newLines: newLines}
}
type formatProgress interface {
formatStatus(id, format string, a ...interface{}) []byte
formatProgress(id, action string, progress *jsonmessage.JSONProgress, aux interface{}) []byte
}
type progressOutput struct {
sf formatProgress
out io.Writer
newLines bool
mu sync.Mutex
}
// WriteProgress formats progress information from a ProgressReader.
func (out *progressOutput) WriteProgress(prog progress.Progress) error {
var formatted []byte
if prog.Message != "" {
formatted = out.sf.formatStatus(prog.ID, "%s", prog.Message)
} else {
jsonProgress := jsonmessage.JSONProgress{Current: prog.Current, Total: prog.Total, HideCounts: prog.HideCounts, Units: prog.Units}
formatted = out.sf.formatProgress(prog.ID, prog.Action, &jsonProgress, prog.Aux)
}
out.mu.Lock()
defer out.mu.Unlock()
_, err := out.out.Write(formatted)
if err != nil {
return err
}
if out.newLines && prog.LastUpdate {
_, err = out.out.Write(out.sf.formatStatus("", ""))
return err
}
return nil
}
// AuxFormatter is a streamFormatter that writes aux progress messages
type AuxFormatter struct {
io.Writer
}
// Emit emits the given interface as an aux progress message
func (sf *AuxFormatter) Emit(id string, aux interface{}) error {
auxJSONBytes, err := json.Marshal(aux)
if err != nil {
return err
}
auxJSON := new(json.RawMessage)
*auxJSON = auxJSONBytes
msgJSON, err := json.Marshal(&jsonmessage.JSONMessage{ID: id, Aux: auxJSON})
if err != nil {
return err
}
msgJSON = appendNewline(msgJSON)
n, err := sf.Writer.Write(msgJSON)
if n != len(msgJSON) {
return io.ErrShortWrite
}
return err
}
|