Skip to content

Commit

Permalink
issue #22: prevent fluky tests and fix critical bug related to Err() …
Browse files Browse the repository at this point in the history
…guarantees
  • Loading branch information
kamilsk committed Jan 26, 2021
1 parent 573f176 commit da0975e
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 77 deletions.
96 changes: 42 additions & 54 deletions breaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"os"
"os/signal"
"sync"
"sync/atomic"
"time"
)

Expand Down Expand Up @@ -37,7 +36,7 @@ func New() Interface {
// background.Job().Do(interrupter)
//
func BreakByChannel(signal <-chan struct{}) Interface {
return (&channelBreaker{newBreaker(), signal}).trigger()
return (&channelBreaker{newBreaker(), make(chan struct{}), signal}).trigger()
}

// BreakByContext returns a new breaker based on the Context.
Expand Down Expand Up @@ -129,17 +128,13 @@ func newBreaker() *breaker {
}

type breaker struct {
closer sync.Once
signal chan struct{}
released int32
closer sync.Once
signal chan struct{}
}

// Close closes the Done channel and releases resources associated with it.
func (br *breaker) Close() {
br.closer.Do(func() {
close(br.signal)
atomic.StoreInt32(&br.released, 1)
})
br.closer.Do(func() { close(br.signal) })
}

// Done returns a channel that's closed when a cancellation signal occurred.
Expand All @@ -150,15 +145,19 @@ func (br *breaker) Done() <-chan struct{} {
// Err returns a non-nil error if the Done channel is closed and nil otherwise.
// After Err returns a non-nil error, successive calls to Err return the same error.
func (br *breaker) Err() error {
if atomic.LoadInt32(&br.released) == 1 {
select {
case <-br.signal:
return Interrupted
default:
return nil
}
return nil
}

// IsReleased returns true if resources associated with the breaker were released.
//
// Deprecated: see the extended interface.
func (br *breaker) IsReleased() bool {
return atomic.LoadInt32(&br.released) == 1
return br.Err() != nil
}

func (br *breaker) trigger() Interface {
Expand All @@ -167,27 +166,24 @@ func (br *breaker) trigger() Interface {

type channelBreaker struct {
*breaker
relay <-chan struct{}
internal chan struct{}
external <-chan struct{}
}

// Close closes the Done channel and releases resources associated with it.
func (br *channelBreaker) Close() {
br.closer.Do(func() {
close(br.signal)
})
br.closer.Do(func() { close(br.internal) })
}

// trigger starts listening to the internal signal to close the Done channel.
func (br *channelBreaker) trigger() Interface {
go func() {
select {
case <-br.relay:
case <-br.signal:
case <-br.external:
br.Close()
case <-br.internal:
}
br.Close()

// the goroutine is done
atomic.StoreInt32(&br.released, 1)
close(br.signal)
}()
return br
}
Expand All @@ -203,81 +199,73 @@ func (br *contextBreaker) Close() {
}

// IsReleased returns true if resources associated with the breaker were released.
//
// Deprecated: see the extended interface.
func (br *contextBreaker) IsReleased() bool {
select {
case <-br.Done():
return true
default:
return false
}
return br.Err() != nil
}

func (br *contextBreaker) trigger() Interface {
return br
}

func newSignalBreaker(signals []os.Signal) *signalBreaker {
return &signalBreaker{newBreaker(), make(chan os.Signal, len(signals)), signals}
return &signalBreaker{newBreaker(), make(chan struct{}), make(chan os.Signal, len(signals)), signals}
}

type signalBreaker struct {
*breaker
relay chan os.Signal
signals []os.Signal
internal chan struct{}
external chan os.Signal
signals []os.Signal
}

// Close closes the Done channel and releases resources associated with it.
func (br *signalBreaker) Close() {
br.closer.Do(func() {
signal.Stop(br.relay)
close(br.signal)
})
br.closer.Do(func() { close(br.internal) })
}

// trigger starts listening to the required signals to close the Done channel.
func (br *signalBreaker) trigger() Interface {
go func() {
signal.Notify(br.relay, br.signals...)
signal.Notify(br.external, br.signals...)
select {
case <-br.relay:
case <-br.signal:
case <-br.external:
br.Close()
case <-br.internal:
}
br.Close()

// the goroutine is done
atomic.StoreInt32(&br.released, 1)
signal.Stop(br.external)
close(br.external)
close(br.signal)
}()
return br
}

func newTimeoutBreaker(timeout time.Duration) *timeoutBreaker {
return &timeoutBreaker{newBreaker(), time.NewTimer(timeout)}
return &timeoutBreaker{newBreaker(), make(chan struct{}), time.NewTimer(timeout)}
}

type timeoutBreaker struct {
*breaker
*time.Timer
internal chan struct{}
external *time.Timer
}

// Close closes the Done channel and releases resources associated with it.
func (br *timeoutBreaker) Close() {
br.closer.Do(func() {
stop(br.Timer)
close(br.signal)
})
br.closer.Do(func() { close(br.internal) })
}

// trigger starts listening to the internal timer to close the Done channel.
func (br *timeoutBreaker) trigger() Interface {
go func() {
select {
case <-br.Timer.C:
case <-br.signal:
case <-br.external.C:
br.Close()
case <-br.internal:
}
br.Close()

// the goroutine is done
atomic.StoreInt32(&br.released, 1)
stop(br.external)
close(br.signal)
}()
return br
}
Expand Down
6 changes: 2 additions & 4 deletions breaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,9 +304,7 @@ func TestToContext(t *testing.T) {
}

br.Close()
if time.Sleep(delta); ctx.Err() == nil {
t.Error("invalid behavior")
}
<-ctx.Done()
}

// helpers
Expand All @@ -319,7 +317,7 @@ const (
func checkBreakerIsReleased(tb testing.TB, br Interface) {
tb.Helper()

time.Sleep(delta)
<-br.Done()
checkBreakerIsReleasedFast(tb, br)
}

Expand Down
4 changes: 4 additions & 0 deletions interface_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ import "testing"

type extended interface {
Interface
// IsReleased returns true if resources associated with the breaker were released.
//
// Deprecated: the original implementation contained a bug,
// and there is no longer any need for this method. It will be removed at v2.
IsReleased() bool
}

Expand Down
37 changes: 18 additions & 19 deletions multiplexer.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package breaker

import (
"reflect"
"sync/atomic"
)
import "reflect"

// Multiplex combines multiple breakers into one.
//
Expand All @@ -27,45 +24,47 @@ func Multiplex(breakers ...Interface) Interface {
}

func newMultiplexedBreaker(breakers []Interface) *multiplexedBreaker {
return &multiplexedBreaker{newBreaker(), breakers}
return &multiplexedBreaker{newBreaker(), make(chan struct{}), breakers}
}

type multiplexedBreaker struct {
*breaker
breakers []Interface
internal chan struct{}
external []Interface
}

// Close closes the Done channel and releases resources associated with it.
func (br *multiplexedBreaker) Close() {
br.closer.Do(func() {
each(br.breakers).Close()
close(br.signal)
})
br.closer.Do(func() { close(br.internal) })
}

// trigger starts listening to the all Done channels of multiplexed breakers.
func (br *multiplexedBreaker) trigger() Interface {
go func() {
if len(br.breakers) == 3 {
if len(br.external) == 3 {
select {
case <-br.breakers[0].Done():
case <-br.breakers[1].Done():
case <-br.breakers[2].Done():
case <-br.external[0].Done():
case <-br.external[1].Done():
case <-br.external[2].Done():
case <-br.internal:
}
} else {
brs := make([]reflect.SelectCase, 0, len(br.breakers))
for _, br := range br.breakers {
brs := make([]reflect.SelectCase, 0, len(br.external)+1)
brs = append(brs, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(br.internal),
})
for _, br := range br.external {
brs = append(brs, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(br.Done()),
})
}
reflect.Select(brs)
}
each(br.external).Close()
br.Close()

// the goroutine is done
atomic.StoreInt32(&br.released, 1)
close(br.signal)
}()
return br
}
Expand Down

0 comments on commit da0975e

Please sign in to comment.