Skip to content

Commit

Permalink
BIG WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
hacdias committed Aug 2, 2023
1 parent 2ea39b8 commit ba63f59
Show file tree
Hide file tree
Showing 15 changed files with 210 additions and 194 deletions.
12 changes: 6 additions & 6 deletions routing/http/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (c *measuringIter[T]) Close() error {
return c.Iter.Close()
}

func (c *client) FindProviders(ctx context.Context, key cid.Cid) (provs iter.ResultIter[types.ProviderResponse], err error) {
func (c *client) FindProviders(ctx context.Context, key cid.Cid) (provs iter.ResultIter[types.Record], err error) {
// TODO test measurements
m := newMeasurement("FindProviders")

Expand Down Expand Up @@ -185,7 +185,7 @@ func (c *client) FindProviders(ctx context.Context, key cid.Cid) (provs iter.Res
if resp.StatusCode == http.StatusNotFound {
resp.Body.Close()
m.record(ctx)
return iter.FromSlice[iter.Result[types.ProviderResponse]](nil), nil
return iter.FromSlice[iter.Result[types.Record]](nil), nil
}

if resp.StatusCode != http.StatusOK {
Expand Down Expand Up @@ -213,22 +213,22 @@ func (c *client) FindProviders(ctx context.Context, key cid.Cid) (provs iter.Res
}
}()

var it iter.ResultIter[types.ProviderResponse]
var it iter.ResultIter[types.Record]
switch mediaType {
case mediaTypeJSON:
parsedResp := &jsontypes.ProvidersResponse{}
err = json.NewDecoder(resp.Body).Decode(parsedResp)
var sliceIt iter.Iter[types.ProviderResponse] = iter.FromSlice(parsedResp.Providers)
var sliceIt iter.Iter[types.Record] = iter.FromSlice(parsedResp.Providers)
it = iter.ToResultIter(sliceIt)
case mediaTypeNDJSON:
skipBodyClose = true
it = ndjson.NewReadProvidersResponseIter(resp.Body)
it = ndjson.NewProvidersResponseIter(resp.Body)
default:
logger.Errorw("unknown media type", "MediaType", mediaType, "ContentType", respContentType)
return nil, errors.New("unknown content type")
}

return &measuringIter[iter.Result[types.ProviderResponse]]{Iter: it, ctx: ctx, m: m}, nil
return &measuringIter[iter.Result[types.Record]]{Iter: it, ctx: ctx, m: m}, nil
}

func (c *client) FindIPNSRecord(ctx context.Context, name ipns.Name) (*ipns.Record, error) {
Expand Down
24 changes: 12 additions & 12 deletions routing/http/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ import (

type mockContentRouter struct{ mock.Mock }

func (m *mockContentRouter) FindProviders(ctx context.Context, key cid.Cid, limit int) (iter.ResultIter[types.ProviderResponse], error) {
func (m *mockContentRouter) FindProviders(ctx context.Context, key cid.Cid, limit int) (iter.ResultIter[types.Record], error) {
args := m.Called(ctx, key, limit)
return args.Get(0).(iter.ResultIter[types.ProviderResponse]), args.Error(1)
return args.Get(0).(iter.ResultIter[types.Record]), args.Error(1)
}

func (m *mockContentRouter) FindIPNSRecord(ctx context.Context, name ipns.Name) (*ipns.Record, error) {
Expand Down Expand Up @@ -139,13 +139,13 @@ func addrsToDRAddrs(addrs []multiaddr.Multiaddr) (drmas []types.Multiaddr) {
return
}

func makeBSReadProviderResp() types.ReadBitswapProviderRecord {
func makeBSReadProviderResp() types.PeerRecord {
peerID, addrs, _ := makeProviderAndIdentity()
return types.ReadBitswapProviderRecord{
Protocol: "transport-bitswap",
Schema: types.SchemaBitswap,
ID: &peerID,
Addrs: addrsToDRAddrs(addrs),
return types.PeerRecord{
Schema: types.SchemaPeer,
ID: &peerID,
Protocols: []string{"transport-bitswap"},
Addrs: addrsToDRAddrs(addrs),
}
}

Expand Down Expand Up @@ -190,21 +190,21 @@ func (e *osErrContains) errContains(t *testing.T, err error) {

func TestClient_FindProviders(t *testing.T) {
bsReadProvResp := makeBSReadProviderResp()
bitswapProvs := []iter.Result[types.ProviderResponse]{
bitswapProvs := []iter.Result[types.Record]{
{Val: &bsReadProvResp},
}

cases := []struct {
name string
httpStatusCode int
stopServer bool
routerProvs []iter.Result[types.ProviderResponse]
routerProvs []iter.Result[types.Record]
routerErr error
clientRequiresStreaming bool
serverStreamingDisabled bool

expErrContains osErrContains
expProvs []iter.Result[types.ProviderResponse]
expProvs []iter.Result[types.Record]
expStreamingResponse bool
expJSONResponse bool
}{
Expand Down Expand Up @@ -308,7 +308,7 @@ func TestClient_FindProviders(t *testing.T) {

c.expErrContains.errContains(t, err)

provs := iter.ReadAll[iter.Result[types.ProviderResponse]](provsIter)
provs := iter.ReadAll[iter.Result[types.Record]](provsIter)
assert.Equal(t, c.expProvs, provs)
})
}
Expand Down
8 changes: 4 additions & 4 deletions routing/http/contentrouter/contentrouter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ var logger = logging.Logger("service/contentrouting")
const ttl = 24 * time.Hour

type Client interface {
FindProviders(ctx context.Context, key cid.Cid) (iter.ResultIter[types.ProviderResponse], error)
FindProviders(ctx context.Context, key cid.Cid) (iter.ResultIter[types.Record], error)
}

type contentRouter struct {
Expand Down Expand Up @@ -66,7 +66,7 @@ func (c *contentRouter) Ready() bool {
}

// readProviderResponses reads bitswap records from the iterator into the given channel, dropping non-bitswap records.
func readProviderResponses(iter iter.ResultIter[types.ProviderResponse], ch chan<- peer.AddrInfo) {
func readProviderResponses(iter iter.ResultIter[types.Record], ch chan<- peer.AddrInfo) {
defer close(ch)
defer iter.Close()
for iter.Next() {
Expand All @@ -76,8 +76,8 @@ func readProviderResponses(iter iter.ResultIter[types.ProviderResponse], ch chan
continue
}
v := res.Val
if v.GetSchema() == types.SchemaBitswap {
result, ok := v.(*types.ReadBitswapProviderRecord)
if v.GetSchema() == types.SchemaPeer {
result, ok := v.(*types.PeerRecord)
if !ok {
logger.Errorw(
"problem casting find providers result",
Expand Down
30 changes: 15 additions & 15 deletions routing/http/contentrouter/contentrouter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ import (

type mockClient struct{ mock.Mock }

func (m *mockClient) FindProviders(ctx context.Context, key cid.Cid) (iter.ResultIter[types.ProviderResponse], error) {
func (m *mockClient) FindProviders(ctx context.Context, key cid.Cid) (iter.ResultIter[types.Record], error) {
args := m.Called(ctx, key)
return args.Get(0).(iter.ResultIter[types.ProviderResponse]), args.Error(1)
return args.Get(0).(iter.ResultIter[types.Record]), args.Error(1)
}
func (m *mockClient) Ready(ctx context.Context) (bool, error) {
args := m.Called(ctx)
Expand Down Expand Up @@ -46,22 +46,22 @@ func TestFindProvidersAsync(t *testing.T) {

p1 := peer.ID("peer1")
p2 := peer.ID("peer2")
ais := []types.ProviderResponse{
&types.ReadBitswapProviderRecord{
Protocol: "transport-bitswap",
Schema: types.SchemaBitswap,
ID: &p1,
ais := []types.Record{
&types.PeerRecord{
Schema: types.SchemaPeer,
ID: &p1,
Protocols: []string{"transport-bitswap"},
},
&types.ReadBitswapProviderRecord{
Protocol: "transport-bitswap",
Schema: types.SchemaBitswap,
ID: &p2,
},
&types.UnknownProviderRecord{
Protocol: "UNKNOWN",
&types.PeerRecord{
Schema: types.SchemaPeer,
ID: &p2,
Protocols: []string{"transport-bitswap"},
},
// &types.UnknownRecord{
// Protocol: "UNKNOWN",
// },
}
aisIter := iter.ToResultIter[types.ProviderResponse](iter.FromSlice(ais))
aisIter := iter.ToResultIter[types.Record](iter.FromSlice(ais))

client.On("FindProviders", ctx, key).Return(aisIter, nil)

Expand Down
12 changes: 6 additions & 6 deletions routing/http/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ const (
)

type FindProvidersAsyncResponse struct {
ProviderResponse types.ProviderResponse
ProviderResponse types.Record
Error error
}

type ContentRouter interface {
// FindProviders searches for peers who are able to provide a given key. Limit
// indicates the maximum amount of results to return. 0 means unbounded.
FindProviders(ctx context.Context, key cid.Cid, limit int) (iter.ResultIter[types.ProviderResponse], error)
FindProviders(ctx context.Context, key cid.Cid, limit int) (iter.ResultIter[types.Record], error)

// FindIPNSRecord searches for an [ipns.Record] for the given [ipns.Name].
FindIPNSRecord(ctx context.Context, name ipns.Name) (*ipns.Record, error)
Expand Down Expand Up @@ -119,7 +119,7 @@ func (s *server) findProviders(w http.ResponseWriter, httpReq *http.Request) {
return
}

var handlerFunc func(w http.ResponseWriter, provIter iter.ResultIter[types.ProviderResponse])
var handlerFunc func(w http.ResponseWriter, provIter iter.ResultIter[types.Record])

var supportsNDJSON bool
var supportsJSON bool
Expand Down Expand Up @@ -167,11 +167,11 @@ func (s *server) findProviders(w http.ResponseWriter, httpReq *http.Request) {
handlerFunc(w, provIter)
}

func (s *server) findProvidersJSON(w http.ResponseWriter, provIter iter.ResultIter[types.ProviderResponse]) {
func (s *server) findProvidersJSON(w http.ResponseWriter, provIter iter.ResultIter[types.Record]) {
defer provIter.Close()

var (
providers []types.ProviderResponse
providers []types.Record
i int
)

Expand All @@ -188,7 +188,7 @@ func (s *server) findProvidersJSON(w http.ResponseWriter, provIter iter.ResultIt
writeJSONResult(w, "FindProviders", response)
}

func (s *server) findProvidersNDJSON(w http.ResponseWriter, provIter iter.ResultIter[types.ProviderResponse]) {
func (s *server) findProvidersNDJSON(w http.ResponseWriter, provIter iter.ResultIter[types.Record]) {
defer provIter.Close()

w.Header().Set("Content-Type", mediaTypeNDJSON)
Expand Down
40 changes: 20 additions & 20 deletions routing/http/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ func TestHeaders(t *testing.T) {
t.Cleanup(server.Close)
serverAddr := "http://" + server.Listener.Addr().String()

results := iter.FromSlice([]iter.Result[types.ProviderResponse]{
{Val: &types.ReadBitswapProviderRecord{
Protocol: "transport-bitswap",
Schema: types.SchemaBitswap,
results := iter.FromSlice([]iter.Result[types.Record]{
{Val: &types.PeerRecord{
Schema: types.SchemaPeer,
Protocols: []string{"transport-bitswap"},
}}},
)

Expand Down Expand Up @@ -72,18 +72,18 @@ func TestResponse(t *testing.T) {
runTest := func(t *testing.T, contentType string, expectedStream bool, expectedBody string) {
t.Parallel()

results := iter.FromSlice([]iter.Result[types.ProviderResponse]{
{Val: &types.ReadBitswapProviderRecord{
Protocol: "transport-bitswap",
Schema: types.SchemaBitswap,
ID: &pid,
Addrs: []types.Multiaddr{},
results := iter.FromSlice([]iter.Result[types.Record]{
{Val: &types.PeerRecord{
Schema: types.SchemaPeer,
ID: &pid,
Protocols: []string{"transport-bitswap"},
Addrs: []types.Multiaddr{},
}},
{Val: &types.ReadBitswapProviderRecord{
Protocol: "transport-bitswap",
Schema: types.SchemaBitswap,
ID: &pid2,
Addrs: []types.Multiaddr{},
{Val: &types.PeerRecord{
Schema: types.SchemaPeer,
ID: &pid2,
Protocols: []string{"transport-bitswap"},
Addrs: []types.Multiaddr{},
}}},
)

Expand Down Expand Up @@ -111,15 +111,15 @@ func TestResponse(t *testing.T) {
body, err := io.ReadAll(resp.Body)
require.NoError(t, err)

require.Equal(t, string(body), expectedBody)
require.Equal(t, expectedBody, string(body))
}

t.Run("JSON Response", func(t *testing.T) {
runTest(t, mediaTypeJSON, false, `{"Providers":[{"Protocol":"transport-bitswap","Schema":"bitswap","ID":"12D3KooWM8sovaEGU1bmiWGWAzvs47DEcXKZZTuJnpQyVTkRs2Vn","Addrs":[]},{"Protocol":"transport-bitswap","Schema":"bitswap","ID":"12D3KooWM8sovaEGU1bmiWGWAzvs47DEcXKZZTuJnpQyVTkRs2Vz","Addrs":[]}]}`)
runTest(t, mediaTypeJSON, false, `{"Providers":[{"Addrs":[],"ID":"12D3KooWM8sovaEGU1bmiWGWAzvs47DEcXKZZTuJnpQyVTkRs2Vn","Protocols":["transport-bitswap"],"Schema":"peer"},{"Addrs":[],"ID":"12D3KooWM8sovaEGU1bmiWGWAzvs47DEcXKZZTuJnpQyVTkRs2Vz","Protocols":["transport-bitswap"],"Schema":"peer"}]}`)
})

t.Run("NDJSON Response", func(t *testing.T) {
runTest(t, mediaTypeNDJSON, true, `{"Protocol":"transport-bitswap","Schema":"bitswap","ID":"12D3KooWM8sovaEGU1bmiWGWAzvs47DEcXKZZTuJnpQyVTkRs2Vn","Addrs":[]}`+"\n"+`{"Protocol":"transport-bitswap","Schema":"bitswap","ID":"12D3KooWM8sovaEGU1bmiWGWAzvs47DEcXKZZTuJnpQyVTkRs2Vz","Addrs":[]}`+"\n")
runTest(t, mediaTypeNDJSON, true, `{"Addrs":[],"ID":"12D3KooWM8sovaEGU1bmiWGWAzvs47DEcXKZZTuJnpQyVTkRs2Vn","Protocols":["transport-bitswap"],"Schema":"peer"}`+"\n"+`{"Addrs":[],"ID":"12D3KooWM8sovaEGU1bmiWGWAzvs47DEcXKZZTuJnpQyVTkRs2Vz","Protocols":["transport-bitswap"],"Schema":"peer"}`+"\n")
})
}

Expand Down Expand Up @@ -257,9 +257,9 @@ func TestIPNS(t *testing.T) {

type mockContentRouter struct{ mock.Mock }

func (m *mockContentRouter) FindProviders(ctx context.Context, key cid.Cid, limit int) (iter.ResultIter[types.ProviderResponse], error) {
func (m *mockContentRouter) FindProviders(ctx context.Context, key cid.Cid, limit int) (iter.ResultIter[types.Record], error) {
args := m.Called(ctx, key, limit)
return args.Get(0).(iter.ResultIter[types.ProviderResponse]), args.Error(1)
return args.Get(0).(iter.ResultIter[types.Record]), args.Error(1)
}

func (m *mockContentRouter) FindIPNSRecord(ctx context.Context, name ipns.Name) (*ipns.Record, error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

// ProvidersResponse is the result of a GET Providers request.
type ProvidersResponse struct {
Providers []types.ProviderResponse
Providers []types.Record
}

func (r *ProvidersResponse) UnmarshalJSON(b []byte) error {
Expand All @@ -19,15 +19,15 @@ func (r *ProvidersResponse) UnmarshalJSON(b []byte) error {
}

for _, provBytes := range tempFPR.Providers {
var readProv types.UnknownProviderRecord
var readProv types.UnknownRecord
err := json.Unmarshal(provBytes, &readProv)
if err != nil {
return err
}

switch readProv.Schema {
case types.SchemaBitswap:
var prov types.ReadBitswapProviderRecord
case types.SchemaPeer:
var prov types.PeerRecord
err := json.Unmarshal(readProv.Bytes, &prov)
if err != nil {
return err
Expand Down
36 changes: 0 additions & 36 deletions routing/http/types/ndjson/provider.go

This file was deleted.

Loading

0 comments on commit ba63f59

Please sign in to comment.