Skip to content

Commit 39e039b

Browse files
author
George Malamidis
committed
Can set log file for listeners
1 parent 13598f4 commit 39e039b

7 files changed

+81
-46
lines changed

client.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import (
55
"encoding/json"
66
"fmt"
77
"github.com/nutrun/lentil"
8-
"log"
8+
"os"
99
"strings"
1010
)
1111

@@ -28,7 +28,7 @@ func NewClient(verbose bool) (*Client, error) {
2828
func (this *Client) put(msg *Message) error {
2929
message, e := json.Marshal(msg)
3030
if this.verbose {
31-
log.Printf("QUEUEING UP: %s\n", message)
31+
fmt.Fprintf(os.Stderr, "QUEUEING UP: %s\n", message)
3232
}
3333
if e != nil {
3434
return e
@@ -98,7 +98,7 @@ func (this *Client) drain(tubes string) error {
9898
}
9999
}
100100
drainedJobs = append(drainedJobs, []byte("\n]")...)
101-
log.Printf("%s", string(drainedJobs))
101+
fmt.Fprintf(os.Stderr, "%s", string(drainedJobs))
102102
return nil
103103
}
104104

@@ -108,7 +108,7 @@ func (this *Client) pause(tubes string, delay int) error {
108108
if e != nil {
109109
return e
110110
}
111-
log.Printf("Paused %s for %d seconds", tubes, delay)
111+
fmt.Fprintf(os.Stderr, "Paused %s for %d seconds", tubes, delay)
112112
}
113113
return nil
114114
}

integration_test.py

+4-2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from select import select
88

99
HERE = os.path.dirname(os.path.abspath(__file__))
10+
debug = False
1011

1112
class TestGlowIntegration(unittest.TestCase):
1213

@@ -131,7 +132,7 @@ def test_append_to_output_file_if_exists(self):
131132
self.assertEqual('job1\njob2\n', outfile.read())
132133
self.listener.interrupt()
133134

134-
debug = False
135+
135136

136137
class Listener:
137138
def __init__(self):
@@ -171,7 +172,8 @@ def _wait_for_job_update(self, job_desc, status, seconds, max_num_non_matching_e
171172
fds, _, _ = select([self.process.stderr], [], [], seconds)
172173
if fds != [self.process.stderr]:
173174
raise Exception('timed out waiting for {0} {1}'.format(status, job_desc))
174-
line = self.process.stderr.readline()
175+
line = self.process.stderr.readline().split(' ')[2:] # get rid of log date/time
176+
line = " ".join(line)
175177
if debug: print line
176178
if line.startswith(status):
177179
job = cjson.decode(line[len(status):])

listener.go

+13-10
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,17 @@ type Listener struct {
1818
sig os.Signal
1919
}
2020

21-
func NewListener(verbose, inclusive bool, filter []string) (*Listener, error) {
21+
func NewListener(verbose, inclusive bool, filter []string, logpath string) (*Listener, error) {
2222
this := new(Listener)
23+
logfile, err := os.OpenFile(logpath, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666)
24+
if err != nil {
25+
return nil, err
26+
}
27+
this.logger = log.New(logfile, "", log.LstdFlags)
2328
q, err := lentil.Dial(Config.QueueAddr)
24-
2529
if err != nil {
2630
return nil, err
2731
}
28-
2932
this.q = q
3033
this.verbose = verbose
3134
this.jobqueue = NewJobQueue(this.q, inclusive, filter)
@@ -42,18 +45,18 @@ listenerloop:
4245
}
4346
e := Config.LoadDeps()
4447
if e != nil {
45-
log.Fatalf("Error loading dependency config: %s\n", e)
48+
this.logger.Fatalf("Error loading dependency config: %s\n", e)
4649
}
4750
job, e := this.jobqueue.Next()
4851
if e != nil {
4952
if strings.Contains(e.Error(), "TIMED_OUT") {
5053
time.Sleep(time.Second)
5154
goto listenerloop
5255
}
53-
log.Fatal(e)
56+
this.logger.Fatal(e)
5457
}
5558
if this.verbose {
56-
log.Printf("RUNNING: %s", job.Body)
59+
this.logger.Printf("RUNNING: %s", job.Body)
5760
}
5861
msg := new(Message)
5962
e = json.Unmarshal([]byte(job.Body), &msg)
@@ -63,10 +66,10 @@ listenerloop:
6366
e = this.execute(msg)
6467
if e == nil {
6568
if this.verbose {
66-
log.Printf("COMPLETE: %s", job.Body)
69+
this.logger.Printf("COMPLETE: %s", job.Body)
6770
}
6871
} else {
69-
log.Printf("FAILED: %s", job.Body)
72+
this.logger.Printf("FAILED: %s", job.Body)
7073
}
7174
e = this.jobqueue.Delete(job.Id)
7275
if e != nil {
@@ -81,12 +84,12 @@ func (this *Listener) trap() {
8184
signal.Notify(receivedSignal, syscall.SIGTERM, syscall.SIGINT)
8285
this.sig = <-receivedSignal
8386
if this.sig.String() == syscall.SIGTERM.String() {
84-
log.Printf("Got signal %d. Killing current job.", this.sig)
87+
this.logger.Printf("Got signal %d. Killing current job.", this.sig)
8588
if this.proc != nil {
8689
this.proc.Kill()
8790
}
8891
os.Exit(1)
8992
}
9093
go this.trap()
91-
log.Printf("Got signal %d. Waiting for current job to complete.\n", this.sig)
94+
this.logger.Printf("Got signal %d. Waiting for current job to complete.\n", this.sig)
9295
}

listener_test.go

+1-4
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
11
package main
22

33
import (
4-
"bytes"
54
"encoding/json"
65
"io/ioutil"
7-
"log"
86
"os"
97
"strings"
108
"testing"
@@ -36,11 +34,10 @@ func TestOutput(t *testing.T) {
3634
}
3735

3836
func TestPutErrorOnBeanstalk(t *testing.T) {
39-
listener, err := NewListener(false, false, []string{})
37+
listener, err := NewListener(false, false, []string{}, "/dev/null")
4038
if err != nil {
4139
t.Fatal(err)
4240
}
43-
log.SetOutput(bytes.NewBufferString(""))
4441
msg, e := createTestMessage("lsdonmybrain", "test.out", ".")
4542
if e != nil {
4643
t.Fatal(e)

main.go

+33-15
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package main
33
import (
44
"bufio"
55
"flag"
6+
"fmt"
67
"log"
78
"os"
89
"strings"
@@ -27,11 +28,11 @@ var pausedelay *int = flag.Int("pause-delay", 0, "How many seconds to pause tube
2728
var mailfrom *string = flag.String("mail-from", "[email protected]", "Email 'from' field [listen]")
2829
var smtpserver *string = flag.String("smtp-server", "", "Server to use for sending emails [listen]")
2930
var deps *string = flag.String("deps", "", "Path to tube dependency config file [listen]")
31+
var logpath *string = flag.String("log", "/dev/stderr", "Path to log file [listen]")
3032

3133
var Config *Configuration
3234

3335
func main() {
34-
log.SetFlags(0)
3536
flag.Parse()
3637
Config = NewConfig(*deps, *smtpserver, *mailfrom)
3738
if *listener {
@@ -40,9 +41,10 @@ func main() {
4041
if *exclude != "" {
4142
filter = strings.Split(*exclude, ",")
4243
}
43-
l, e := NewListener(*verbose, include, filter)
44+
l, e := NewListener(*verbose, include, filter, *logpath)
4445
if e != nil {
45-
log.Fatalf("ERROR: %s", e)
46+
fmt.Fprintln(os.Stderr, e)
47+
os.Exit(1)
4648
}
4749
l.run()
4850
return
@@ -56,38 +58,50 @@ func main() {
5658
// hack: local doesn't need tube, defaulting it to respect the Message API
5759
msg, e := NewMessage(executable, arguments, *mailto, *workdir, *stdout, *stderr, "localignore", 0, 0)
5860
if e != nil {
59-
log.Fatal(e)
61+
fmt.Fprintln(os.Stderr, e)
62+
os.Exit(1)
6063
}
61-
runner, e := NewRunner(*verbose)
64+
logfile, e := os.OpenFile(*logpath, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666)
6265
if e != nil {
63-
log.Fatal(e)
66+
fmt.Fprintln(os.Stderr, e)
67+
os.Exit(1)
68+
}
69+
runner, e := NewRunner(*verbose, log.New(logfile, "", log.LstdFlags))
70+
if e != nil {
71+
fmt.Fprintln(os.Stderr, e)
72+
os.Exit(1)
6473
}
6574
e = runner.execute(msg)
6675
if e != nil {
67-
log.Fatalf("ERROR: %s", e)
76+
fmt.Fprintln(os.Stderr, e)
77+
os.Exit(1)
6878
}
6979
return
7080
}
7181

7282
c, e := NewClient(*verbose)
7383
if e != nil {
74-
log.Fatalf("ERROR: %s", e)
84+
fmt.Fprintln(os.Stderr, e)
85+
os.Exit(1)
7586
}
7687

7788
if *drain != "" {
7889
e = c.drain(*drain)
7990
if e != nil {
80-
log.Fatalf("ERROR: %s", e)
91+
fmt.Fprintln(os.Stderr, e)
92+
os.Exit(1)
8193
}
8294
} else if *pause != "" {
8395
if *pausedelay == 0 {
84-
log.Fatal("Usage: glow -pause=<tube1,tube2,...> -pause-delay=<seconds>")
96+
fmt.Fprintln(os.Stderr, "Usage: glow -pause=<tube1,tube2,...> -pause-delay=<seconds>")
97+
os.Exit(1)
8598
}
8699
e = c.pause(*pause, *pausedelay)
87100
} else if *stats {
88101
e = c.stats()
89102
if e != nil {
90-
log.Fatalf("ERROR: %s", e)
103+
fmt.Fprintln(os.Stderr, e)
104+
os.Exit(1)
91105
}
92106
} else if len(flag.Args()) == 0 { // Queue up many jobs from STDIN
93107
in := bufio.NewReaderSize(os.Stdin, 1024*1024)
@@ -98,23 +112,27 @@ func main() {
98112
if e.Error() == "EOF" {
99113
break
100114
}
101-
log.Fatalf("ERROR: %s", e)
115+
fmt.Fprintln(os.Stderr, e)
116+
os.Exit(1)
102117
}
103118
input = append(input, line...)
104119
}
105120
e = c.putMany(input)
106121
if e != nil {
107-
log.Fatalf("ERROR: %s", e)
122+
fmt.Fprintln(os.Stderr, e)
123+
os.Exit(1)
108124
}
109125
} else { // Queue up one job
110126
executable, arguments := parseCommand()
111127
msg, e := NewMessage(executable, arguments, *mailto, *workdir, *stdout, *stderr, *tube, *priority, *delay)
112128
if e != nil {
113-
log.Fatalf("ERROR: %s", e)
129+
fmt.Fprintln(os.Stderr, e)
130+
os.Exit(1)
114131
}
115132
e = c.put(msg)
116133
if e != nil {
117-
log.Fatalf("ERROR: %s", e)
134+
fmt.Fprintln(os.Stderr, e)
135+
os.Exit(1)
118136
}
119137
}
120138
}

runner.go

+11-9
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,12 @@ type Runner struct {
1616
q *lentil.Beanstalkd
1717
proc *os.Process
1818
verbose bool
19+
logger *log.Logger
1920
}
2021

21-
func NewRunner(verbose bool) (*Runner, error) {
22+
func NewRunner(verbose bool, logger *log.Logger) (*Runner, error) {
2223
this := new(Runner)
24+
this.logger = logger
2325
q, err := lentil.Dial(Config.QueueAddr)
2426

2527
if err != nil {
@@ -60,9 +62,9 @@ func (this *Runner) execute(msg *Message) error {
6062
cmd.Stdout = stdoutF
6163
cmd.Stderr = stderrF
6264
if this.verbose {
63-
log.Printf("INFO: Running command '%s %s'\n", msg.Executable, strings.Join(msg.Arguments, " "))
64-
log.Printf("INFO: STDOUT to %s\n", msg.Stdout)
65-
log.Printf("INFO: STDERR to %s\n", msg.Stderr)
65+
this.logger.Printf("INFO: Running command '%s %s'\n", msg.Executable, strings.Join(msg.Arguments, " "))
66+
this.logger.Printf("INFO: STDOUT to %s\n", msg.Stdout)
67+
this.logger.Printf("INFO: STDERR to %s\n", msg.Stderr)
6668
}
6769
e = cmd.Start()
6870
if e != nil {
@@ -80,25 +82,25 @@ func (this *Runner) execute(msg *Message) error {
8082

8183
// Log and email errors
8284
func (this *Runner) catch(msg *Message, e error) {
83-
log.Printf("ERROR: %s\n", e)
85+
this.logger.Printf("ERROR: %s\n", e)
8486
this.mail(msg, e)
8587
this.publishError(msg, e)
8688
}
8789

8890
func (this *Runner) publishError(msg *Message, e error) {
8991
err := this.q.Use(Config.errorQueue)
9092
if err != nil {
91-
log.Printf("ERROR: %s\n", err)
93+
this.logger.Printf("ERROR: %s\n", err)
9294
return
9395
}
9496
payload, err := json.Marshal(NewErrMessage(msg, e))
9597
if err != nil {
96-
log.Printf("ERROR: %s\n", err)
98+
this.logger.Printf("ERROR: %s\n", err)
9799
return
98100
}
99101
_, err = this.q.Put(0, 0, 60*60, payload)
100102
if err != nil {
101-
log.Printf("ERROR: %s\n", err)
103+
this.logger.Printf("ERROR: %s\n", err)
102104
}
103105
}
104106

@@ -115,6 +117,6 @@ func (this *Runner) mail(msg *Message, e error) {
115117
mail := fmt.Sprintf("%s%s", subject, fmt.Sprintf("Ran on [%s]\n%s\n%s\n%s", hostname, subject, e, msg.readOut()))
116118
e = smtp.SendMail(Config.SmtpServer, nil, Config.MailFrom, to, []byte(mail))
117119
if e != nil {
118-
log.Printf("ERROR: %s\n", e)
120+
this.logger.Printf("ERROR: %s\n", e)
119121
}
120122
}

runner_test.go

+15-2
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,15 @@ import (
1010
)
1111

1212
func TestRunnerOutput(t *testing.T) {
13-
runner := new(Runner)
13+
devnull, e := os.OpenFile(os.DevNull, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666)
14+
if e != nil {
15+
t.Fatal(e)
16+
}
17+
l := log.New(devnull, "", log.LstdFlags)
18+
runner, e := NewRunner(false, l)
19+
if e != nil {
20+
t.Fatal(e)
21+
}
1422
msg, e := createTestMessage("echo you suck", "test.out", ".")
1523
if e != nil {
1624
t.Fatal(e)
@@ -30,7 +38,12 @@ func TestRunnerOutput(t *testing.T) {
3038
}
3139

3240
func TestRunnerShouldPutErrorOnBeanstalk(t *testing.T) {
33-
runner, err := NewRunner(false)
41+
devnull, err := os.OpenFile(os.DevNull, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0666)
42+
if err != nil {
43+
t.Fatal(err)
44+
}
45+
l := log.New(devnull, "", log.LstdFlags)
46+
runner, err := NewRunner(false, l)
3447
if err != nil {
3548
t.Fatal(err)
3649
}

0 commit comments

Comments
 (0)