-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathstage.go
123 lines (100 loc) · 2.17 KB
/
stage.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package stream
import (
"github.com/youthlin/stream/types"
)
// stage 记录一个操作
// Begin 用于操作开始,参数是元素的个数,如果个数不确定,则是 unknownSize
// Accept 接收每个元素
// CanFinish 用于判断是否可以提前结束
// End 是收尾动作
type stage interface {
Begin(size int64)
Accept(types.T)
CanFinish() bool
End()
}
// region baseStage
type baseStage struct {
begin func(int64) // begin(size)
action types.Consumer
canFinish func() bool // canFinish() bool
end func() // end()
}
func (b *baseStage) Begin(size int64) {
b.begin(size)
}
func (b *baseStage) Accept(t types.T) {
b.action(t)
}
func (b *baseStage) CanFinish() bool {
return b.canFinish()
}
func (b *baseStage) End() {
b.end()
}
type option func(b *baseStage)
func begin(onBegin func(int64)) option {
return func(c *baseStage) {
c.begin = onBegin
}
}
func canFinish(judge func() bool) option {
return func(c *baseStage) {
c.canFinish = judge
}
}
func action(onAction types.Consumer) option {
return func(c *baseStage) {
c.action = onAction
}
}
func end(onEnd func()) option {
return func(c *baseStage) {
c.end = onEnd
}
}
// endregion baseStage
// region chainedStage
// chainedStage 串起下一个操作
// down 是下一个操作
type chainedStage struct {
*baseStage
}
func defaultChainedStage(down stage) *chainedStage {
return &chainedStage{
baseStage: &baseStage{
begin: down.Begin,
action: down.Accept,
canFinish: down.CanFinish,
end: down.End,
},
}
}
func newChainedStage(down stage, opt ...option) *chainedStage {
s := defaultChainedStage(down)
for _, o := range opt {
o(s.baseStage)
}
return s
}
// endregion
// region terminalStage
type terminalStage struct {
*baseStage
}
func defaultTerminal(action types.Consumer) *terminalStage {
return &terminalStage{&baseStage{
begin: func(int64) {},
action: action,
canFinish: func() bool { return false },
end: func() {},
}}
}
func newTerminalStage(action types.Consumer, opt ...option) *terminalStage {
s := defaultTerminal(action)
for _, o := range opt {
o(s.baseStage)
}
return s
}
// endregion terminalStage