File: stream_decompressor.go

package info (click to toggle)
kitty 0.42.1-1
  • links: PTS, VCS
  • area: main
  • in suites: experimental
  • size: 28,564 kB
  • sloc: ansic: 82,787; python: 55,191; objc: 5,122; sh: 1,295; xml: 364; makefile: 143; javascript: 78
file content (124 lines) | stat: -rw-r--r-- 2,827 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
// License: GPLv3 Copyright: 2023, Kovid Goyal, <kovid at kovidgoyal.net>

package utils

import (
	"errors"
	"fmt"
	"io"
)

var _ = fmt.Print

type StreamDecompressor = func(chunk []byte, is_last bool) error

type pipe_reader struct {
	pr *io.PipeReader
}

func (self *pipe_reader) Read(b []byte) (n int, err error) {
	// ensure the decompressor code never gets a zero byte read with no error
	for len(b) > 0 {
		n, err = self.pr.Read(b)
		if err != nil || n > 0 {
			return
		}
	}
	return
}

// Wrap Go's awful decompressor routines to allow feeding them
// data in chunks. For example:
// sd := NewStreamDecompressor(zlib.NewReader, output)
// sd(chunk, false)
// ...
// sd(last_chunk, true)
// after this call, calling sd() further will just return io.EOF.
// To close the decompressor at any time, call sd(nil, true).
// Note: output.Write() may be called from a different thread, but only while the main thread is in sd()
func NewStreamDecompressor(constructor func(io.Reader) (io.ReadCloser, error), output io.Writer) StreamDecompressor {
	if constructor == nil { // identity decompressor
		var err error
		return func(chunk []byte, is_last bool) error {
			if err != nil {
				return err
			}
			if len(chunk) > 0 {
				_, err = output.Write(chunk)
			}
			if is_last {
				if err == nil {
					err = io.EOF
					return nil
				}
			}
			return err
		}
	}
	pipe_r, pipe_w := io.Pipe()
	pr := pipe_reader{pr: pipe_r}
	finished := make(chan error, 1)
	finished_err := errors.New("finished")
	go func() {
		var err error
		defer func() {
			finished <- err
		}()
		var impl io.ReadCloser
		impl, err = constructor(&pr)
		if err != nil {
			pipe_r.CloseWithError(err)
			return
		}
		_, err = io.Copy(output, impl)
		cerr := impl.Close()
		if err == nil {
			err = cerr
		}
		if err == nil {
			err = finished_err
		}
		pipe_r.CloseWithError(err)
	}()

	var iter_err error
	return func(chunk []byte, is_last bool) error {
		if iter_err != nil {
			if iter_err == finished_err {
				iter_err = io.EOF
			}
			return iter_err
		}
		if len(chunk) > 0 {
			var n int
			n, iter_err = pipe_w.Write(chunk)
			if iter_err != nil && iter_err != finished_err {
				return iter_err
			}
			if n < len(chunk) {
				iter_err = io.ErrShortWrite
				return iter_err
			}
			// wait for output to finish
			if iter_err == nil {
				// after a zero byte read, pipe_reader.Read() calls pipe_r.Read() again so
				// we know it is either blocked waiting for a write to pipe_w or has finished
				_, iter_err = pipe_w.Write(nil)
				if iter_err != nil && iter_err != finished_err {
					return iter_err
				}
			}
		}
		if is_last {
			pipe_w.CloseWithError(io.EOF)
			err := <-finished
			if err != nil && err != io.EOF && err != finished_err {
				iter_err = err
				return err
			}
			iter_err = io.EOF
			return nil
		}
		return nil
	}
}