Skip to content

Commit b18d47d

Browse files
Added WithDisableCircuitBreaker and WithDisableBusyLoopBreaker options (#16)
Added `WithDisableCircuitBreaker` and `WithDisableBusyLoopBreaker` options. These are variants of the now deprecated `DisableCircuitBreaker` and `DisableBusyLoopBreaker` options. They provide a booling parameter which is more convenient for usage with code generation and for shimming with configuration.
1 parent ef8e6bc commit b18d47d

File tree

4 files changed

+33
-12
lines changed

4 files changed

+33
-12
lines changed

changelog.md

+6
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file.
44

55
This project adheres to Semantic Versioning.
66

7+
## 1.3.0 (Sep 25, 2024)
8+
9+
1. Added `WithDisableCircuitBreaker` and `WithDisableBusyLoopBreaker` options. These are variants of the now deprecated `DisableCircuitBreaker`
10+
and `DisableBusyLoopBreaker` options. They provide a booling parameter which is more convenient for usage with
11+
code generation and for shimming with configuration.
12+
713
## 1.2.0 (Sep 23, 2024)
814

915
1. Update to allow subject name specification (not just TopicNameStrategy)

test/worker_test.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,7 @@ func TestWork_Run_CircuitBreaksOnProcessError(t *testing.T) {
203203
zkafka.ConsumerTopicConfig{Topic: topicName},
204204
kproc,
205205
zkafka.CircuitBreakAfter(1), // Circuit breaks after 1 error.
206+
zkafka.WithDisableCircuitBreaker(false),
206207
zkafka.CircuitBreakFor(50*time.Millisecond),
207208
zkafka.WithOnDone(func(ctx context.Context, message *zkafka.Message, err error) {
208209
cnt.Add(1)
@@ -443,7 +444,7 @@ func TestWork_Run_DisabledCircuitBreakerContinueReadError(t *testing.T) {
443444
w := kwf.Create(
444445
zkafka.ConsumerTopicConfig{Topic: topicName},
445446
&fakeProcessor{},
446-
zkafka.DisableCircuitBreaker(),
447+
zkafka.WithDisableCircuitBreaker(true),
447448
zkafka.WithLifecycleHooks(zkafka.LifecycleHooks{PostFanout: func(ctx context.Context) {
448449
cnt.Add(1)
449450
}}),
@@ -966,7 +967,7 @@ func TestWork_WithDeadLetterTopic_FailedToGetWriterDoesntPauseProcessing(t *test
966967
},
967968
},
968969
&processor,
969-
zkafka.DisableCircuitBreaker(),
970+
zkafka.WithDisableCircuitBreaker(true),
970971
)
971972

972973
ctx, cancel := context.WithCancel(context.Background())
@@ -1564,7 +1565,6 @@ func TestWork_CircuitBreaker_WithoutBusyLoopBreaker_DoesNotWaitsForCircuitToOpen
15641565
kcp := zkafka_mocks.NewMockClientProvider(ctrl)
15651566
kcp.EXPECT().Reader(gomock.Any(), gomock.Any()).Return(r, nil).AnyTimes()
15661567

1567-
//l := zkafka.NoopLogger{}
15681568
l := stdLogger{includeDebug: true}
15691569
kwf := zkafka.NewWorkFactory(kcp, zkafka.WithLogger(l))
15701570

@@ -1578,7 +1578,7 @@ func TestWork_CircuitBreaker_WithoutBusyLoopBreaker_DoesNotWaitsForCircuitToOpen
15781578
return errors.New("an error occurred during processing")
15791579
},
15801580
},
1581-
zkafka.DisableBusyLoopBreaker(),
1581+
zkafka.WithDisableBusyLoopBreaker(true),
15821582
zkafka.WithLifecycleHooks(zkafka.LifecycleHooks{PostFanout: func(ctx context.Context) {
15831583
fanOutCount.Add(1)
15841584
}}),
@@ -2132,7 +2132,7 @@ func BenchmarkWork_Run_CircuitBreaker_DisableBusyLoopBreaker(b *testing.B) {
21322132
zkafka.Speedup(10),
21332133
zkafka.CircuitBreakAfter(100),
21342134
zkafka.CircuitBreakFor(30*time.Millisecond),
2135-
zkafka.DisableBusyLoopBreaker(),
2135+
zkafka.WithDisableCircuitBreaker(true),
21362136
)
21372137

21382138
ctx, cancel := context.WithTimeout(ctx, time.Second)

work.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ type Work struct {
8383
// Duration for which a circuit is open. Use CircuitBreakFor to control
8484
cbFor *time.Duration
8585

86-
// Disable circuit breaking. Use DisableCircuitBreaker to control
86+
// Disable circuit breaking. Use WithDisableCircuitBreaker to control
8787
disableCb bool
8888

8989
// Busy loop breaker. When circuit breaker circuit is open, instead of consuming cpu in a busy loop

workoption.go

+21-6
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,26 @@ func CircuitBreakFor(duration time.Duration) WorkOption {
2626
return circuitBreakForOption{duration: duration}
2727
}
2828

29-
// DisableCircuitBreaker disables the circuit breaker so that it never breaks
29+
// Deprecated: DisableCircuitBreaker disables the circuit breaker so that it never breaks
3030
func DisableCircuitBreaker() WorkOption {
31-
return disableCbOption{}
31+
return WithDisableCircuitBreaker(true)
3232
}
3333

34-
// DisableBusyLoopBreaker disables the busy loop breaker which would block subsequent read calls till the circuit re-closes.
34+
// WithDisableCircuitBreaker allows the user to control whether circuit breaker is disabled or not
35+
func WithDisableCircuitBreaker(isDisabled bool) WorkOption {
36+
return disableCbOption{disabled: isDisabled}
37+
}
38+
39+
// Deprecated: DisableBusyLoopBreaker disables the busy loop breaker which would block subsequent read calls till the circuit re-closes.
3540
// Without blb we see increased cpu usage when circuit is open
3641
func DisableBusyLoopBreaker() WorkOption {
37-
return disableBlbOption{}
42+
return WithDisableBusyLoopBreaker(true)
43+
}
44+
45+
// WithDisableBusyLoopBreaker disables the busy loop breaker which would block subsequent read calls till the circuit re-closes.
46+
// Without blb we see increased cpu usage when circuit is open
47+
func WithDisableBusyLoopBreaker(isDisabled bool) WorkOption {
48+
return disableBlbOption{disabled: isDisabled}
3849
}
3950

4051
// WithOnDone allows you to specify a callback function executed after processing of a kafka message
@@ -75,7 +86,9 @@ func (c circuitBreakForOption) apply(w *Work) {
7586
}
7687
}
7788

78-
type disableCbOption struct{}
89+
type disableCbOption struct {
90+
disabled bool
91+
}
7992

8093
func (d disableCbOption) apply(w *Work) {
8194
w.disableCb = true
@@ -99,7 +112,9 @@ func (o lifeCycleOption) apply(w *Work) {
99112
w.lifecycle = o.lh
100113
}
101114

102-
type disableBlbOption struct{}
115+
type disableBlbOption struct {
116+
disabled bool
117+
}
103118

104119
func (d disableBlbOption) apply(w *Work) {
105120
w.blb.disabled = true

0 commit comments

Comments
 (0)