-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmonitor.go
139 lines (129 loc) · 3.14 KB
/
monitor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package main
import (
"fmt"
"github.com/fsnotify/fsnotify"
"io"
"log"
"os"
"strings"
)
// A monitor watches a file for changes. it will send new lines over the lineChannel
type Monitor struct {
filename string
file *os.File
watcher *fsnotify.Watcher
readBufferSize uint32 // size in bytes of chunk to read from watched file
lineChannel chan []string
}
// Constructs a new monitor
func NewMonitor(pathToFile string) (*Monitor, error) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
return nil, err
} else {
file, err := os.Open(pathToFile)
if err != nil {
return nil, err
} else {
m := &Monitor{
filename: pathToFile,
file: file,
watcher: watcher,
readBufferSize: 64 * 1024,
lineChannel: make(chan []string, 1024),
}
err := m.seekToEnd()
return m, err
}
}
}
// seeks the position of the monitored file to the end
func (m *Monitor) seekToEnd() error {
fileInfo, err := m.file.Stat()
if err == nil {
endPosition := fileInfo.Size()
log.Println("seeking", m.filename, "to", endPosition)
m.file.Seek(int64(endPosition), 0)
return nil
} else {
return err
}
}
func (m *Monitor) handle() {
readBuffer := make([]byte, 64*1024)
var buffer []byte
for {
select {
case ev := <-m.watcher.Events:
switch ev.Op {
case fsnotify.Write:
for {
// probably can read directly into buffer?
n, err := m.file.Read(readBuffer)
if n > 0 {
buffer = append(buffer, readBuffer[:n]...)
stringRead := string(buffer)
lines := strings.Split(stringRead, "\n")
numLines := len(lines)
// assume that any line not ending in \n isn't a fulli
fullLines := lines[:numLines-1]
m.lineChannel <- fullLines
partialLine := lines[numLines-1]
buffer = []byte(partialLine)
}
if err == io.EOF {
break
}
}
}
case err := <-m.watcher.Errors:
log.Fatal("error", err)
break
}
}
}
// Start watching for changes. Should only be called once.
// TODO: probably should raise (or return) a runtime error if it is already watching
func (m *Monitor) StartWatching() {
go m.handle()
err := m.watcher.Add(m.filename)
if err != nil {
log.Println("error watching", err)
}
}
// Drain takes input from the monitors and merges it down to one unified channel
type Drain struct {
monitors map[string]*Monitor // map of path to monitor
merged chan string
}
func NewDrain(monitors ...*Monitor) *Drain {
drain := &Drain{
monitors: make(map[string]*Monitor),
merged: make(chan string, 10000),
}
// start piping monitors in, if any
for _, mon := range monitors {
drain.Pipe(mon)
}
return drain
}
// consumes off of the unified merge channel. Currently just dumps to stdout
// TODO: other output options
// Blocks the calling thread
func (drain *Drain) Consume() {
for line := range drain.merged {
// TODO:
fmt.Println(line)
}
}
// pipes output of a monitor into the drain
func (drain *Drain) Pipe(monitor *Monitor) {
drain.monitors[monitor.filename] = monitor
go func() {
for lines := range monitor.lineChannel {
for _, line := range lines {
drain.merged <- line
}
}
}()
}