-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstream.go
148 lines (130 loc) · 4.83 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
/*
This file is part of the uci package.
Copyright (C) 2018 David Hughes
uci is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
Foundation, either version 3 of the License, or (at your option) any later
version.
This program is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
this program. If not, see <https://www.gnu.org/licenses/>.
*/
// This code is slightly modified from the cmd project at
// https://github.com/go-cmd/cmd
package uci
import (
"bytes"
"fmt"
)
const (
// default size of the per-line buffer for the output stream
defaultLineBufferSize = 16384
)
// ErrLineBufferOverflow is returned by OutputStream.Write when the internal
// line buffer is filled before a newline character is written to terminate a
// line. Increasing the line buffer size passed to NewEngineFromPath or in the
// engine config file can help eliminate this error
type ErrLineBufferOverflow struct {
Line string // Unterminated line that caused the error
BufferSize int // Internal line buffer size
BufferFree int // Free bytes in line buffer
}
func (e ErrLineBufferOverflow) Error() string {
return fmt.Sprintf("line does not contain newline and is %d bytes too long to buffer (buffer size: %d)",
len(e.Line)-e.BufferSize, e.BufferSize)
}
// OutputStream represents real time, line by line output from a running Cmd.
// Lines are terminated by a single newline preceded by an optional carriage
// return. Both newline and carriage return are stripped from the line when
// sent to a caller-provided channel.
//
// The caller must begin receiving before starting the Cmd. This is done by the
// internal engine output parsing routines. Write blocks on the channel; the
// caller must always read the channel. The channel is not closed by the
// OutputStream.
//
// While runnableCmd is running, lines are sent to the channel as soon as they
// are written and newline-terminated by the command. After the command finishes,
// the caller should wait for the last lines to be sent:
//
// for len(stdoutChan) > 0 {
// time.Sleep(10 * time.Millisecond)
// }
//
// Since the channel is not closed by the OutputStream, the two indications that
// all lines have been sent and received are the command finishing and the
// channel size being zero.
type OutputStream struct {
streamChan chan string
bufSize int
buf []byte
lastChar int
}
// NewOutputStream creates a new streaming output on the given channel. The
// caller must begin receiving on the channel before the command is started.
// The OutputStream never closes the channel.
func NewOutputStream(streamChan chan string, lineBufSize int) *OutputStream {
out := &OutputStream{
streamChan: streamChan,
bufSize: lineBufSize,
buf: make([]byte, lineBufSize),
lastChar: 0,
}
return out
}
// Write makes OutputStream implement the io.Writer interface. Do not call
// this function directly.
func (rw *OutputStream) Write(p []byte) (n int, err error) {
n = len(p) // end of buffer
firstChar := 0
LINES:
for {
// Find next newline in stream buffer. nextLine starts at 0, but buff
// can contain multiple lines, like "foo\nbar". So in that case nextLine
// will be 0 ("foo\nbar\n") then 4 ("bar\n") on next iteration. And i
// will be 3 and 7, respectively. So lines are [0:3] are [4:7].
newlineOffset := bytes.IndexByte(p[firstChar:], '\n')
if newlineOffset < 0 {
break LINES // no newline in stream, next line incomplete
}
// End of line offset is start (nextLine) + newline offset. Like bufio.Scanner,
// we allow \r\n but strip the \r too by decrementing the offset for that byte.
lastChar := firstChar + newlineOffset // "line\n"
if newlineOffset > 0 && p[newlineOffset-1] == '\r' {
lastChar-- // "line\r\n"
}
// Send the line, prepend line buffer if set
var line string
if rw.lastChar > 0 {
line = string(rw.buf[0:rw.lastChar])
rw.lastChar = 0 // reset buffer
}
line += string(p[firstChar:lastChar])
rw.streamChan <- line // blocks if chan full
// Next line offset is the first byte (+1) after the newline (i)
firstChar += newlineOffset + 1
}
if firstChar < n {
remain := len(p[firstChar:])
bufFree := len(rw.buf[rw.lastChar:])
if remain > bufFree {
var line string
if rw.lastChar > 0 {
line = string(rw.buf[0:rw.lastChar])
}
line += string(p[firstChar:])
err = ErrLineBufferOverflow{
Line: line,
BufferSize: rw.bufSize,
BufferFree: bufFree,
}
n = firstChar
return // implicit
}
copy(rw.buf[rw.lastChar:], p[firstChar:])
rw.lastChar += remain
}
return // implicit
}