Skip to content
This repository was archived by the owner on Jul 12, 2020. It is now read-only.

Commit 003912b

Browse files
committed
WIP: add initial cases
0 parents  commit 003912b

5 files changed

Lines changed: 334 additions & 0 deletions

File tree

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
vendor
2+
coverage.out
3+
.idea/

Gopkg.lock

Lines changed: 38 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Gopkg.toml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Gopkg.toml example
2+
#
3+
# Refer to https://github.com/golang/dep/blob/master/docs/Gopkg.toml.md
4+
# for detailed Gopkg.toml documentation.
5+
#
6+
# required = ["github.com/user/thing/cmd/thing"]
7+
# ignored = ["github.com/user/project/pkgX", "bitbucket.org/user/project/pkgA/pkgY"]
8+
#
9+
# [[constraint]]
10+
# name = "github.com/user/project"
11+
# version = "1.0.0"
12+
#
13+
# [[constraint]]
14+
# name = "github.com/user/project2"
15+
# branch = "dev"
16+
# source = "github.com/myfork/project2"
17+
#
18+
# [[override]]
19+
# name = "github.com/x/y"
20+
# version = "2.4.0"
21+
22+
23+
[[constraint]]
24+
name = "github.com/stretchr/testify"
25+
version = "1.2.0"

job.go

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
package job
2+
3+
import (
4+
"time"
5+
"errors"
6+
"sync"
7+
)
8+
9+
type Config struct {
10+
ResultProcessor ResultProcessor
11+
RuntimeProcessor RuntimeProcessor
12+
RuntimeProcessingFrequency time.Duration
13+
LockProvider LockProvider
14+
Worker Worker
15+
}
16+
17+
type ResultProcessor interface{}
18+
type RuntimeProcessor func (tick time.Time, message string, startTime time.Time) error
19+
type Worker func(Channel chan string, WaitGroup *sync.WaitGroup)
20+
21+
type LockProvider interface {
22+
Acquire() error
23+
Release() error
24+
}
25+
26+
func New() Config {
27+
return Config{
28+
LockProvider: nil,
29+
RuntimeProcessor: nil,
30+
ResultProcessor: nil,
31+
RuntimeProcessingFrequency: 200 * time.Millisecond,
32+
}
33+
}
34+
35+
func (config *Config) WithLockProvider(lockProvider LockProvider) {
36+
config.LockProvider = lockProvider
37+
}
38+
39+
func (config *Config) WithResultProcessor(processor ResultProcessor) {
40+
config.ResultProcessor = processor
41+
}
42+
43+
func (config *Config) WithRuntimeProcessingFrequency(frequency time.Duration) {
44+
config.RuntimeProcessingFrequency = frequency
45+
}
46+
47+
func (config *Config) WithRuntimeProcessor(processor RuntimeProcessor) {
48+
config.RuntimeProcessor = processor
49+
}
50+
51+
func (config *Config) SetWorker(worker Worker) {
52+
config.Worker = worker
53+
}
54+
55+
func (config *Config) Run() {
56+
err := config.ensureLock()
57+
if err != nil {
58+
panic(err)
59+
}
60+
err = config.runWorker()
61+
if err != nil {
62+
panic(err)
63+
}
64+
}
65+
66+
func (config *Config) runWorker() error {
67+
if config.Worker == nil {
68+
return errors.New("worker not set")
69+
}
70+
startTime := time.Now()
71+
ticker := time.NewTicker(config.RuntimeProcessingFrequency)
72+
defer ticker.Stop()
73+
commChan := make(chan string)
74+
defer close(commChan)
75+
wg := sync.WaitGroup{}
76+
wg.Add(1)
77+
go config.Worker(commChan, &wg)
78+
go config.invokeRuntimeProcessor(ticker, commChan, startTime)
79+
80+
wg.Wait()
81+
return nil
82+
}
83+
84+
func (config *Config) invokeRuntimeProcessor(ticker *time.Ticker, commChan chan string, startTime time.Time) {
85+
for t := range ticker.C {
86+
message := getMessage(commChan)
87+
if config.RuntimeProcessor == nil {
88+
return
89+
}
90+
err := config.RuntimeProcessor(t, message, startTime)
91+
if err != nil {
92+
panic(err)
93+
}
94+
}
95+
}
96+
97+
func (config *Config) ensureLock() error {
98+
if config.LockProvider == nil {
99+
return nil
100+
}
101+
return config.LockProvider.Acquire()
102+
}
103+
104+
func getMessage(ch chan string) string {
105+
select {
106+
case msg := <-ch:
107+
return msg
108+
default:
109+
return ""
110+
}
111+
}

job_test.go

Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
package job_test
2+
3+
import (
4+
"testing"
5+
"github.com/cyberhck/jobs"
6+
"time"
7+
Assert "github.com/stretchr/testify/assert"
8+
"errors"
9+
"github.com/stretchr/testify/mock"
10+
"sync"
11+
)
12+
13+
type LockProviderMock struct {
14+
mock.Mock
15+
}
16+
17+
func (m *LockProviderMock) Acquire() error {
18+
m.Called()
19+
return nil
20+
}
21+
22+
func (m *LockProviderMock) Release() error {
23+
args := m.Called(nil)
24+
return args.Error(1)
25+
}
26+
27+
func TestNew(t *testing.T) {
28+
assert := Assert.New(t)
29+
30+
testJob := job.New()
31+
32+
assert.Equal(testJob.RuntimeProcessingFrequency, 200*time.Millisecond)
33+
assert.Nil(testJob.ResultProcessor)
34+
assert.Nil(testJob.RuntimeProcessor)
35+
assert.Nil(testJob.LockProvider)
36+
}
37+
38+
type LockProvider struct {
39+
acquire error
40+
release error
41+
}
42+
43+
func (r LockProvider) Acquire() error { return r.acquire }
44+
func (r LockProvider) Release() error { return r.release }
45+
46+
func getMockedLockProvider(acquire error, release error) job.LockProvider {
47+
return LockProvider{
48+
acquire: acquire,
49+
release: release,
50+
}
51+
}
52+
func TestWithLockProvider(t *testing.T) {
53+
testJob := job.New()
54+
testJob.WithLockProvider(getMockedLockProvider(nil, nil))
55+
Assert.NotNil(t, testJob.LockProvider)
56+
}
57+
58+
func TestWithResultProcessor(t *testing.T) {
59+
testJob := job.New()
60+
testJob.WithResultProcessor(func() {})
61+
Assert.NotNil(t, testJob.ResultProcessor)
62+
}
63+
64+
func TestWithRuntimeProcessingFrequency(t *testing.T) {
65+
testJob := job.New()
66+
testJob.WithRuntimeProcessingFrequency(1 * time.Second)
67+
if testJob.RuntimeProcessingFrequency != 1*time.Second {
68+
t.Error("Should be able to change runtime processing frequency")
69+
}
70+
}
71+
72+
func TestWithRuntimeProcessor(t *testing.T) {
73+
testJob := job.New()
74+
testJob.WithRuntimeProcessor(func(tick time.Time, message string, startTime time.Time) error {
75+
return nil
76+
})
77+
if testJob.RuntimeProcessor == nil {
78+
t.Error("Should be able to set run time processor")
79+
}
80+
}
81+
82+
func TestRunPanicsIfLockNotAcquired(t *testing.T) {
83+
assert := Assert.New(t)
84+
testJob := job.New()
85+
testJob.WithLockProvider(getMockedLockProvider(errors.New("couldn't acquire lock"), nil))
86+
testJob.SetWorker(func(channel chan string, doneFunc *sync.WaitGroup) {})
87+
assert.Panics(testJob.Run)
88+
}
89+
90+
func TestRunDoesNotPanicIfLockAcquired(t *testing.T) {
91+
assert := Assert.New(t)
92+
testJob := job.New()
93+
testJob.WithLockProvider(getMockedLockProvider(nil, nil))
94+
testJob.SetWorker(func(channel chan string, doneFunc *sync.WaitGroup) {
95+
doneFunc.Done()
96+
})
97+
assert.NotPanics(testJob.Run)
98+
}
99+
100+
func TestRunPanicsIfNoWorkerIsDefined(t *testing.T) {
101+
testJob := job.New()
102+
mocked := new(LockProviderMock)
103+
mocked.On("Acquire").Return(nil)
104+
testJob.WithLockProvider(mocked)
105+
Assert.Panics(t, testJob.Run)
106+
}
107+
108+
func TestRunCallsWorker(t *testing.T) {
109+
testJob := job.New()
110+
mocked := new(LockProviderMock)
111+
mocked.On("Acquire").Return(nil)
112+
testJob.WithLockProvider(mocked)
113+
testJob.SetWorker(func(channel chan string, doneFunc *sync.WaitGroup) {
114+
doneFunc.Done()
115+
})
116+
testJob.Run()
117+
mocked.AssertExpectations(t)
118+
}
119+
120+
func TestRunWorksWithoutLockProvider(t *testing.T) {
121+
testJob := job.New()
122+
testJob.SetWorker(func(channel chan string, doneFunc *sync.WaitGroup) {
123+
doneFunc.Done()
124+
})
125+
Assert.NotPanics(t, testJob.Run)
126+
}
127+
128+
func TestDoesNotPanicWhenNoRuntimeProcessorPresent(t *testing.T) {
129+
testJob := job.New()
130+
testJob.SetWorker(func(channel chan string, doneFunc *sync.WaitGroup) {
131+
doneFunc.Done()
132+
})
133+
Assert.NotPanics(t, testJob.Run)
134+
}
135+
func TestRunTimeProcessorGetsCalled(t *testing.T) {
136+
testJob := job.New()
137+
runtimeProcessorCalled := false
138+
testJob.WithRuntimeProcessor(func(tick time.Time, message string, startTime time.Time) error {
139+
runtimeProcessorCalled = true
140+
return nil
141+
})
142+
testJob.SetWorker(func(channel chan string, doneFunc *sync.WaitGroup) {
143+
time.Sleep(1 * time.Second)
144+
doneFunc.Done()
145+
})
146+
testJob.Run()
147+
Assert.True(t, runtimeProcessorCalled)
148+
}
149+
func TestLongRunningProcessorWorksWithoutRuntimeProcessor(t *testing.T) {
150+
testJob := job.New()
151+
testJob.SetWorker(func(channel chan string, doneFunc *sync.WaitGroup) {
152+
time.Sleep(10 * time.Millisecond)
153+
channel <- "Done..."
154+
doneFunc.Done()
155+
})
156+
Assert.NotPanics(t, testJob.Run)
157+
}

0 commit comments

Comments
 (0)