Skip to content

Commit

Permalink
Merge pull request #24 from c4dt/fix_failing_test
Browse files Browse the repository at this point in the history
Fixing race condition and adding fuzzy test
  • Loading branch information
ineiti authored Oct 7, 2024
2 parents 5a00dc6 + 1097ad3 commit fe6f6b8
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 27 deletions.
19 changes: 12 additions & 7 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,27 @@ vet: tidy
tests:
while make test; do echo "Testing again at $$(date)"; done; echo "Failed testing"

FLAKY_TESTS := (TestService_Scenario_Basic|TestService_Scenario_ViewChange|TestService_Scenario_FinalizeFailure)
FLAKY_TESTS_PBFT := (TestService_Scenario_Basic|TestService_Scenario_ViewChange|TestService_Scenario_FinalizeFailure)
FLAKY_TESTS_MINOWS := (Test_session_Recv_SessionEnded)

# test runs all tests in DELA without coverage
# It first runs all the tests in "short" mode, so the flaky tests don't run.
# Then the flaky tests get run separately for at most 3 times, and hopefully it all works out.
test: tidy
go test ./... -short -count=1 || exit 1
@for count in $$( seq 3 ); do \
echo "Running $$count/3"; \
if go test -count=1 ./core/ordering/cosipbft -run="${FLAKY_TESTS}"; then \
break; \
fi; \
if [[ $$count == 3 ]]; then \
@for count in $$( seq 4 ); do \
if [ "$$count" -eq 4 ]; then \
echo "Couldn't run all flaky tests in 3 tries"; \
exit 1; \
fi; \
echo "Running $$count/3"; \
if ! go test -count=1 ./mino/minows -run="${FLAKY_TESTS_MINOWS}"; then \
continue; \
fi; \
if ! go test -count=1 ./core/ordering/cosipbft -run="${FLAKY_TESTS_PBFT}"; then \
continue; \
fi; \
break; \
done

# test runs all tests in DELA and generate a coverage output (to be used by sonarcloud)
Expand Down
20 changes: 18 additions & 2 deletions mino/minows/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"go.dedis.ch/dela/mino"
"go.dedis.ch/dela/serde"
"go.dedis.ch/dela/testing/fake"
"sync"
"testing"
)

Expand Down Expand Up @@ -244,8 +245,14 @@ func Test_rpc_Stream_ContextCancelled(t *testing.T) {
// echos back the same message
// - implements mino.Handler
type echoHandler struct {
from []mino.Address
messages []serde.Message
from []mino.Address
messages []serde.Message
mutex sync.Mutex
msgCounter chan struct{}
}

func newEchoHandler() *echoHandler {
return &echoHandler{msgCounter: make(chan struct{}, 100)}
}

func (h *echoHandler) Process(req mino.Request) (resp serde.Message,
Expand All @@ -261,15 +268,24 @@ func (h *echoHandler) Stream(out mino.Sender, in mino.Receiver) error {
if err != nil {
return err
}
h.mutex.Lock()
h.from = append(h.from, from)
h.messages = append(h.messages, msg)
err = <-out.Send(msg, from)
h.msgCounter <- struct{}{}
h.mutex.Unlock()
if err != nil {
return err
}
}
}

func (h *echoHandler) wait(count int) {
for i := 0; i < count; i++ {
<-h.msgCounter
}
}

func mustCreateRPC(t *testing.T, m mino.Mino, name string,
h mino.Handler) mino.RPC {
r, err := m.CreateRPC(name, h, fake.MessageFactory{})
Expand Down
3 changes: 1 addition & 2 deletions mino/minows/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package minows
import (
"context"
"encoding/gob"
"fmt"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/rs/zerolog"
Expand Down Expand Up @@ -113,7 +112,7 @@ func (m messageHandler) passMessages(ctx context.Context, stream network.Stream,
}
select {
case <-ctx.Done():
fmt.Println("messageHandler done:", m.isOrchestrator())
m.logger.Trace().Msgf("messageHandler done: %v", m.isOrchestrator())
return
case m.in <- pkt:
}
Expand Down
30 changes: 14 additions & 16 deletions mino/minows/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
)

func Test_session_Send(t *testing.T) {
handler := &echoHandler{}
handler := newEchoHandler()
const addrInitiator = "/ip4/127.0.0.1/tcp/6001/ws"
initiator, stop := mustCreateMinows(t, addrInitiator, addrInitiator)
defer stop()
Expand Down Expand Up @@ -45,15 +45,15 @@ func Test_session_Send(t *testing.T) {
require.NoError(t, err)
require.False(t, open)

wait()
handler.wait(3)
require.Equal(t, []mino.Address{s.(*messageHandler).myAddr,
s.(*messageHandler).myAddr, s.(*messageHandler).myAddr}, handler.from)
require.Equal(t, []serde.Message{fake.Message{},
fake.Message{}, fake.Message{}}, handler.messages)
}

func Test_session_Send_ToSelf(t *testing.T) {
handler := &echoHandler{}
handler := newEchoHandler()
const addrInitiator = "/ip4/127.0.0.1/tcp/6001/ws"
initiator, stop := mustCreateMinows(t, addrInitiator, addrInitiator)
defer stop()
Expand All @@ -77,7 +77,7 @@ func Test_session_Send_ToSelf(t *testing.T) {
require.NoError(t, err)
require.False(t, open)

wait()
handler.wait(3)
require.Equal(t, []mino.Address{s.(*messageHandler).myAddr,
s.(*messageHandler).myAddr, s.(*messageHandler).myAddr}, handler.from)
require.Equal(t, []serde.Message{fake.Message{},
Expand All @@ -86,7 +86,7 @@ func Test_session_Send_ToSelf(t *testing.T) {
}

func Test_session_Send_WrongAddressType(t *testing.T) {
handler := &echoHandler{}
handler := newEchoHandler()
const addrInitiator = "/ip4/127.0.0.1/tcp/6001/ws"
initiator, stop := mustCreateMinows(t, addrInitiator, addrInitiator)
defer stop()
Expand All @@ -105,7 +105,7 @@ func Test_session_Send_WrongAddressType(t *testing.T) {
}

func Test_session_Send_AddressNotPlayer(t *testing.T) {
handler := &echoHandler{}
handler := newEchoHandler()
const addrInitiator = "/ip4/127.0.0.1/tcp/6001/ws"
initiator, stop := mustCreateMinows(t, addrInitiator, addrInitiator)
defer stop()
Expand All @@ -126,7 +126,7 @@ func Test_session_Send_AddressNotPlayer(t *testing.T) {
}

func Test_session_Send_SessionEnded(t *testing.T) {
handler := &echoHandler{}
handler := newEchoHandler()
const addrInitiator = "/ip4/127.0.0.1/tcp/6001/ws"
initiator, stop := mustCreateMinows(t, addrInitiator, addrInitiator)
defer stop()
Expand All @@ -139,7 +139,6 @@ func Test_session_Send_SessionEnded(t *testing.T) {

s, _, stop := mustStream(t, rpc, initiator, player)
stop()
wait()

errs := s.Send(fake.Message{}, initiator.GetAddress(), player.GetAddress())
for i := 0; i < 2; i++ {
Expand All @@ -154,7 +153,7 @@ func Test_session_Send_SessionEnded(t *testing.T) {
}

func Test_session_Recv(t *testing.T) {
handler := &echoHandler{}
handler := newEchoHandler()
const addrInitiator = "/ip4/127.0.0.1/tcp/6001/ws"
initiator, stop := mustCreateMinows(t, addrInitiator, addrInitiator)
defer stop()
Expand Down Expand Up @@ -203,7 +202,7 @@ func Test_session_Recv(t *testing.T) {
}

func Test_session_Recv_FromSelf(t *testing.T) {
handler := &echoHandler{}
handler := newEchoHandler()
const addrInitiator = "/ip4/127.0.0.1/tcp/6001/ws"
initiator, stop := mustCreateMinows(t, addrInitiator, addrInitiator)
defer stop()
Expand Down Expand Up @@ -250,7 +249,10 @@ func Test_session_Recv_FromSelf(t *testing.T) {
}

func Test_session_Recv_SessionEnded(t *testing.T) {
handler := &echoHandler{}
if testing.Short() {
t.Skip("See issue https://github.com/dedis/dela/issues/291")
}
handler := newEchoHandler()
const addrInitiator = "/ip4/127.0.0.1/tcp/6001/ws"
initiator, stop := mustCreateMinows(t, addrInitiator, addrInitiator)
defer stop()
Expand All @@ -276,7 +278,7 @@ func Test_session_Recv_SessionEnded(t *testing.T) {
}

func Test_session_Recv_ContextCancelled(t *testing.T) {
handler := &echoHandler{}
handler := newEchoHandler()
const addrInitiator = "/ip4/127.0.0.1/tcp/6001/ws"
initiator, stop := mustCreateMinows(t, addrInitiator, addrInitiator)
defer stop()
Expand All @@ -301,10 +303,6 @@ func setTimeout() (context.Context, context.CancelFunc) {
return ctx, cancel
}

func wait() {
time.Sleep(2 * time.Second)
}

func mustStream(t *testing.T, rpc mino.RPC,
minos ...mino.Mino) (mino.Sender, mino.Receiver, func()) {
ctx, cancel := context.WithCancel(context.Background())
Expand Down

0 comments on commit fe6f6b8

Please sign in to comment.