Skip to content

Commit 79a4f94

Browse files
feat: Refactor partial failures (#741)
1 parent cd2681f commit 79a4f94

File tree

7 files changed

+89
-92
lines changed

7 files changed

+89
-92
lines changed

oracle/market_mapper.go

+29-2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"encoding/json"
66
"os"
7+
"strings"
78
"time"
89

910
"go.uber.org/zap"
@@ -42,8 +43,31 @@ func (o *OracleImpl) listenForMarketMapUpdates(ctx context.Context) {
4243
continue
4344
}
4445

46+
if o.lastUpdated != 0 && o.lastUpdated == result.Value.LastUpdated {
47+
o.logger.Debug("skipping market map update on no lastUpdated change", zap.Uint64("lastUpdated", o.lastUpdated))
48+
continue
49+
}
50+
51+
validSubset, err := result.Value.MarketMap.GetValidSubset()
52+
if err != nil {
53+
o.logger.Error("failed to validate market map", zap.Error(err))
54+
continue
55+
}
56+
57+
// Detect removed markets and surface info about the removals
58+
var removedMarkets []string
59+
for t := range result.Value.MarketMap.Markets {
60+
if _, in := validSubset.Markets[t]; !in {
61+
removedMarkets = append(removedMarkets, t)
62+
}
63+
}
64+
if len(validSubset.Markets) == 0 || len(validSubset.Markets) != len(result.Value.MarketMap.Markets) {
65+
o.logger.Warn("invalid market map update has caused some markets to be removed")
66+
o.logger.Info("markets removed from invalid market map", zap.String("markets", strings.Join(removedMarkets, " ")))
67+
}
68+
4569
// Update the oracle with the latest market map iff the market map has changed.
46-
updated := result.Value.MarketMap
70+
updated := validSubset
4771
if o.marketMap.Equal(updated) {
4872
o.logger.Debug("market map has not changed")
4973
continue
@@ -55,12 +79,15 @@ func (o *OracleImpl) listenForMarketMapUpdates(ctx context.Context) {
5579
continue
5680
}
5781

82+
o.lastUpdated = result.Value.GetLastUpdated()
83+
5884
// Write the market map to the configured path.
5985
if err := o.WriteMarketMap(); err != nil {
6086
o.logger.Error("failed to write market map", zap.Error(err))
6187
}
6288

63-
o.logger.Info("updated oracle with new market map", zap.Any("market_map", updated))
89+
o.logger.Info("updated oracle with new market map")
90+
o.logger.Debug("updated oracle with new market map", zap.Any("market_map", updated))
6491
}
6592
}
6693
}

oracle/market_mapper_test.go

+42
Original file line numberDiff line numberDiff line change
@@ -296,4 +296,46 @@ func TestListenForMarketMapUpdates(t *testing.T) {
296296
// Clean up the file.
297297
require.NoError(t, os.Remove(path))
298298
})
299+
t.Run("can update providers with a new market map and handle partially invalid state", func(t *testing.T) {
300+
chains := []mmclienttypes.Chain{{ChainID: "dYdX"}}
301+
handler, factory := marketMapperFactory(t, chains)
302+
handler.On("CreateURL", mock.Anything).Return("", nil).Maybe()
303+
304+
resolved := make(mmclienttypes.ResolvedMarketMap)
305+
resp := mmtypes.MarketMapResponse{
306+
MarketMap: partialInvalidMarketMap,
307+
}
308+
resolved[chains[0]] = mmclienttypes.NewMarketMapResult(&resp, time.Now())
309+
handler.On("ParseResponse", mock.Anything, mock.Anything).Return(mmclienttypes.NewMarketMapResponse(resolved, nil)).Maybe()
310+
311+
o, err := oracle.New(
312+
oracleCfgWithMockMapper,
313+
noOpPriceAggregator{},
314+
oracle.WithLogger(logger),
315+
oracle.WithMarketMapperFactory(factory),
316+
oracle.WithPriceAPIQueryHandlerFactory(oraclefactory.APIQueryHandlerFactory),
317+
oracle.WithPriceWebSocketQueryHandlerFactory(oraclefactory.WebSocketQueryHandlerFactory),
318+
)
319+
require.NoError(t, err)
320+
321+
ctx, cancel := context.WithCancel(context.Background())
322+
defer cancel()
323+
324+
go func() {
325+
err := o.Start(ctx)
326+
if !errors.Is(err, context.Canceled) {
327+
t.Errorf("Start() should have returned context.Canceled error")
328+
}
329+
}()
330+
331+
// Wait for the oracle to start.
332+
time.Sleep(5000 * time.Millisecond)
333+
334+
// The oracle should have been updated.
335+
require.Equal(t, validMarketMapSubset, o.GetMarketMap())
336+
337+
// Stop the oracle.
338+
cancel()
339+
o.Stop()
340+
})
299341
}

oracle/oracle.go

+2
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ type OracleImpl struct { //nolint:revive
5454
cfg config.OracleConfig
5555
// marketMap is the market map that the oracle is using.
5656
marketMap mmtypes.MarketMap
57+
// lastUpdated is the field in the marketmap module tracking the last block at which an update was posted
58+
lastUpdated uint64
5759
// writeTo is a path to write the market map to.
5860
writeTo string
5961

oracle/update.go

+3-8
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,14 @@ func (o *OracleImpl) UpdateMarketMap(marketMap mmtypes.MarketMap) error {
1919
o.mut.Lock()
2020
defer o.mut.Unlock()
2121

22-
validSubset, err := marketMap.GetValidSubset()
23-
if err != nil {
22+
if err := marketMap.ValidateBasic(); err != nil {
2423
o.logger.Error("failed to validate market map", zap.Error(err))
2524
return err
2625
}
2726

28-
if len(validSubset.Markets) == 0 {
29-
o.logger.Warn("market map update produced no valid markets to fetch")
30-
}
31-
3227
// Iterate over all existing price providers and update their market maps.
3328
for name, state := range o.priceProviders {
34-
providerTickers, err := types.ProviderTickersFromMarketMap(name, validSubset)
29+
providerTickers, err := types.ProviderTickersFromMarketMap(name, marketMap)
3530
if err != nil {
3631
o.logger.Error("failed to create provider market map", zap.String("provider", name), zap.Error(err))
3732
return err
@@ -47,7 +42,7 @@ func (o *OracleImpl) UpdateMarketMap(marketMap mmtypes.MarketMap) error {
4742
o.priceProviders[name] = updatedState
4843
}
4944

50-
o.marketMap = validSubset
45+
o.marketMap = marketMap
5146
if o.aggregator != nil {
5247
o.aggregator.UpdateMarketMap(o.marketMap)
5348
}

oracle/update_test.go

+2-58
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import (
1818
)
1919

2020
func TestUpdateWithMarketMap(t *testing.T) {
21-
t.Run("bad market map is not rejected", func(t *testing.T) {
21+
t.Run("bad market map is rejected", func(t *testing.T) {
2222
orc, err := oracle.New(
2323
oracleCfg,
2424
noOpPriceAggregator{},
@@ -35,7 +35,7 @@ func TestUpdateWithMarketMap(t *testing.T) {
3535
"bad": {},
3636
},
3737
})
38-
require.NoError(t, err)
38+
require.Error(t, err)
3939

4040
o.Stop()
4141
})
@@ -626,60 +626,4 @@ func TestUpdateProviderState(t *testing.T) {
626626
500*time.Millisecond,
627627
)
628628
})
629-
630-
t.Run("can update the market map with partial failure on NormalizeBy", func(t *testing.T) {
631-
orc, err := oracle.New(
632-
oracleCfg,
633-
noOpPriceAggregator{},
634-
oracle.WithLogger(logger),
635-
oracle.WithPriceAPIQueryHandlerFactory(oraclefactory.APIQueryHandlerFactory),
636-
oracle.WithPriceWebSocketQueryHandlerFactory(oraclefactory.WebSocketQueryHandlerFactory),
637-
)
638-
require.NoError(t, err)
639-
o := orc.(*oracle.OracleImpl)
640-
require.NoError(t, o.Init(context.TODO()))
641-
642-
providers := o.GetProviderState()
643-
require.Len(t, providers, 3)
644-
645-
// Update the oracle's market map.
646-
require.NoError(t, o.UpdateMarketMap(partialInvalidMarketMap))
647-
648-
providers = o.GetProviderState()
649-
650-
cbTickers, err := types.ProviderTickersFromMarketMap(coinbase.Name, validMarketMapSubset)
651-
require.NoError(t, err)
652-
653-
// Check the state after the update.
654-
coinbaseState, ok := providers[coinbase.Name]
655-
require.True(t, ok)
656-
checkProviderState(
657-
t,
658-
cbTickers,
659-
coinbase.Name,
660-
providertypes.API,
661-
false,
662-
coinbaseState,
663-
)
664-
665-
okxTickers, err := types.ProviderTickersFromMarketMap(okx.Name, validMarketMapSubset)
666-
require.NoError(t, err)
667-
668-
okxState, ok := providers[okx.Name]
669-
require.True(t, ok)
670-
checkProviderState(
671-
t,
672-
okxTickers,
673-
okx.Name,
674-
providertypes.WebSockets,
675-
false,
676-
okxState,
677-
)
678-
679-
binanceState, ok := providers[binance.Name]
680-
require.True(t, ok)
681-
checkProviderState(t, nil, binance.Name, providertypes.API, false, binanceState)
682-
683-
o.Stop()
684-
})
685629
}

providers/apis/marketmap/fetcher.go

-19
Original file line numberDiff line numberDiff line change
@@ -122,25 +122,6 @@ func (f *MarketMapFetcher) Fetch(
122122
)
123123
}
124124

125-
// Validate the market map response.
126-
//
127-
// TODO: Add checks on the chain ID.
128-
if err := resp.MarketMap.ValidateBasic(); err != nil {
129-
f.logger.Info(
130-
"invalid market map response from module",
131-
zap.Any("market_map", resp.MarketMap),
132-
zap.Error(err),
133-
)
134-
135-
return types.NewMarketMapResponseWithErr(
136-
chains,
137-
providertypes.NewErrorWithCode(
138-
fmt.Errorf("invalid market map response: %w", err),
139-
providertypes.ErrorInvalidResponse,
140-
),
141-
)
142-
}
143-
144125
resolved := make(types.ResolvedMarketMap)
145126
resolved[chains[0]] = types.NewMarketMapResult(resp, time.Now())
146127

providers/apis/marketmap/fetcher_test.go

+11-5
Original file line numberDiff line numberDiff line change
@@ -120,22 +120,28 @@ func TestFetch(t *testing.T) {
120120
},
121121
},
122122
{
123-
name: "errors when the market map response is invalid",
123+
name: "does not error when the market map response is invalid",
124124
chains: chains[:1],
125125
client: func() mmtypes.QueryClient {
126126
c := mocks.NewQueryClient(t)
127127
c.On("MarketMap", mock.Anything, mock.Anything).Return(
128128
&mmtypes.MarketMapResponse{
129-
MarketMap: badMarketMap,
129+
MarketMap: badMarketMap,
130+
ChainId: chains[0].ChainID,
131+
LastUpdated: 11,
130132
},
131133
nil,
132134
)
133135
return c
134136
},
135137
expected: types.MarketMapResponse{
136-
UnResolved: types.UnResolvedMarketMap{
137-
chains[0]: providertypes.UnresolvedResult{
138-
ErrorWithCode: providertypes.NewErrorWithCode(fmt.Errorf("invalid market map response"), providertypes.ErrorAPIGeneral),
138+
Resolved: types.ResolvedMarketMap{
139+
chains[0]: types.MarketMapResult{
140+
Value: &mmtypes.MarketMapResponse{
141+
MarketMap: badMarketMap,
142+
ChainId: chains[0].ChainID,
143+
LastUpdated: 11,
144+
},
139145
},
140146
},
141147
},

0 commit comments

Comments
 (0)