Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(routing/http)!: delegated peer routing server and client, IPIP 417 #422

Merged
merged 17 commits into from
Aug 25, 2023
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,26 @@ The following emojis are used to highlight certain changes:

### Added

* ✨ The `routing/http` not supports Delegated Peer Routing as per [IPIP-417](https://github.com/ipfs/specs/pull/417).
lidel marked this conversation as resolved.
Show resolved Hide resolved

### Changed

* 🛠 The `routing/http` package has suffered the following modifications:
lidel marked this conversation as resolved.
Show resolved Hide resolved
* Client `GetIPNSRecord` and `PutIPNSRecord` have been renamed to `GetIPNS` and
`PutIPNS`, respectively. Similarly, the required function names in the server
`ContentRouter` have also been updated.
* `ReadBitswapProviderRecord` has been renamed to `BitswapRecord` and marked as deprecated.
From now on, please use the protocol-agnostic `PeerRecord` for most use cases. The new
Peer Schema has been introduced in [IPIP-417](https://github.com/ipfs/specs/pull/417).

### Removed

* 🛠 The `routing/http` package has suffered the following removals:
* Server and client no longer support the generic `Provide` method for content routing.
`ProvideBitswap` is still usable, but marked as deprecated. A protocol-agnostic
provide mechanism is being worked on in [IPIP-378](https://github.com/ipfs/specs/pull/378).
lidel marked this conversation as resolved.
Show resolved Hide resolved
* Server no longer exports `FindProvidersPath` and `ProvidePath`.

### Fixed

### Security
Expand All @@ -32,7 +48,7 @@ The following emojis are used to highlight certain changes:
as per [IPIP-379](https://specs.ipfs.tech/ipips/ipip-0379/).
* 🛠 The `verifycid` package has been updated with the new Allowlist interface as part of
reducing globals efforts.
* The `blockservice` and `provider` packages has been updated to accommodate for
* The `blockservice` and `provider` packages has been updated to accommodate for
changes in `verifycid`.

### Changed
Expand Down
23 changes: 4 additions & 19 deletions routing/http/README.md
Original file line number Diff line number Diff line change
@@ -1,24 +1,9 @@
go-delegated-routing
Routing V1 Server and Client
=======================

> Delegated routing Client and Server over Reframe RPC

This package provides delegated routing implementation in Go:
- Client (for IPFS nodes like [Kubo](https://github.com/ipfs/kubo/blob/master/docs/config.md#routingrouters-parameters)),
- Server (for public indexers such as https://cid.contact)
> Delegated Routing V1 Server and Client over HTTP API.

## Documentation

- Go docs: https://pkg.go.dev/github.com/ipfs/boxo/routing/http/

## Lead Maintainer

🦗🎶

## Contributing

Contributions are welcome! This repository is part of the IPFS project and therefore governed by our [contributing guidelines](https://github.com/ipfs/community/blob/master/CONTRIBUTING.md).

## License

[SPDX-License-Identifier: Apache-2.0 OR MIT](LICENSE.md)
- Go Documentation: https://pkg.go.dev/github.com/ipfs/boxo/routing/http
- Routing V1 Specification: https://specs.ipfs.tech/routing/http-routing-v1/
130 changes: 104 additions & 26 deletions routing/http/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,20 @@ import (
ipns "github.com/ipfs/boxo/ipns"
"github.com/ipfs/boxo/routing/http/contentrouter"
"github.com/ipfs/boxo/routing/http/internal/drjson"
"github.com/ipfs/boxo/routing/http/server"
"github.com/ipfs/boxo/routing/http/types"
"github.com/ipfs/boxo/routing/http/types/iter"
jsontypes "github.com/ipfs/boxo/routing/http/types/json"
"github.com/ipfs/boxo/routing/http/types/ndjson"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
record "github.com/libp2p/go-libp2p-record"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
)

var (
_ contentrouter.Client = &client{}
logger = logging.Logger("service/delegatedrouting")
logger = logging.Logger("routing/http/client")
defaultHTTPClient = &http.Client{
Transport: &ResponseBodyLimitedTransport{
RoundTripper: http.DefaultTransport,
Expand All @@ -50,18 +48,17 @@ const (
type client struct {
baseURL string
httpClient httpClient
validator record.Validator
clock clock.Clock

accepts string
accepts string

peerID peer.ID
addrs []types.Multiaddr
identity crypto.PrivKey

// called immeidately after signing a provide req
// used for testing, e.g. testing the server with a mangled signature
afterSignCallback func(req *types.WriteBitswapProviderRecord)
// Called immediately after signing a provide request. It is used
// for testing, e.g., testing the server with a mangled signature.
//lint:ignore SA1019 // ignore staticcheck
afterSignCallback func(req *types.WriteBitswapRecord)
}

// defaultUserAgent is used as a fallback to inform HTTP server which library
Expand Down Expand Up @@ -121,12 +118,11 @@ func WithStreamResultsRequired() Option {
}

// New creates a content routing API client.
// The Provider and identity parameters are option. If they are nil, the `Provide` method will not function.
// The Provider and identity parameters are option. If they are nil, the [client.ProvideBitswap] method will not function.
func New(baseURL string, opts ...Option) (*client, error) {
client := &client{
baseURL: baseURL,
httpClient: defaultHTTPClient,
validator: ipns.Validator{},
clock: clock.New(),
accepts: strings.Join([]string{mediaTypeNDJSON, mediaTypeJSON}, ","),
}
Expand Down Expand Up @@ -164,11 +160,11 @@ func (c *measuringIter[T]) Close() error {
return c.Iter.Close()
}

func (c *client) FindProviders(ctx context.Context, key cid.Cid) (provs iter.ResultIter[types.ProviderResponse], err error) {
func (c *client) FindProviders(ctx context.Context, key cid.Cid) (providers iter.ResultIter[types.Record], err error) {
// TODO test measurements
m := newMeasurement("FindProviders")

url := c.baseURL + server.ProvidePath + key.String()
url := c.baseURL + "/routing/v1/providers/" + key.String()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, err
Expand All @@ -192,7 +188,7 @@ func (c *client) FindProviders(ctx context.Context, key cid.Cid) (provs iter.Res
if resp.StatusCode == http.StatusNotFound {
resp.Body.Close()
m.record(ctx)
return iter.FromSlice[iter.Result[types.ProviderResponse]](nil), nil
return iter.FromSlice[iter.Result[types.Record]](nil), nil
}

if resp.StatusCode != http.StatusOK {
Expand Down Expand Up @@ -220,24 +216,27 @@ func (c *client) FindProviders(ctx context.Context, key cid.Cid) (provs iter.Res
}
}()

var it iter.ResultIter[types.ProviderResponse]
var it iter.ResultIter[types.Record]
switch mediaType {
case mediaTypeJSON:
parsedResp := &jsontypes.ReadProvidersResponse{}
parsedResp := &jsontypes.ProvidersResponse{}
err = json.NewDecoder(resp.Body).Decode(parsedResp)
var sliceIt iter.Iter[types.ProviderResponse] = iter.FromSlice(parsedResp.Providers)
var sliceIt iter.Iter[types.Record] = iter.FromSlice(parsedResp.Providers)
it = iter.ToResultIter(sliceIt)
case mediaTypeNDJSON:
skipBodyClose = true
it = ndjson.NewReadProvidersResponseIter(resp.Body)
it = ndjson.NewRecordsIter(resp.Body)
default:
logger.Errorw("unknown media type", "MediaType", mediaType, "ContentType", respContentType)
return nil, errors.New("unknown content type")
}

return &measuringIter[iter.Result[types.ProviderResponse]]{Iter: it, ctx: ctx, m: m}, nil
return &measuringIter[iter.Result[types.Record]]{Iter: it, ctx: ctx, m: m}, nil
}

// Deprecated: protocol-agnostic provide is being worked on in [IPIP-378]:
//
// [IPIP-378]: https://github.com/ipfs/specs/pull/378
func (c *client) ProvideBitswap(ctx context.Context, keys []cid.Cid, ttl time.Duration) (time.Duration, error) {
if c.identity == nil {
return 0, errors.New("cannot provide Bitswap records without an identity")
Expand All @@ -253,7 +252,7 @@ func (c *client) ProvideBitswap(ctx context.Context, keys []cid.Cid, ttl time.Du

now := c.clock.Now()

req := types.WriteBitswapProviderRecord{
req := types.WriteBitswapRecord{
Protocol: "transport-bitswap",
Schema: types.SchemaBitswap,
Payload: types.BitswapPayload{
Expand Down Expand Up @@ -282,10 +281,13 @@ func (c *client) ProvideBitswap(ctx context.Context, keys []cid.Cid, ttl time.Du
}

// ProvideAsync makes a provide request to a delegated router
func (c *client) provideSignedBitswapRecord(ctx context.Context, bswp *types.WriteBitswapProviderRecord) (time.Duration, error) {
req := jsontypes.WriteProvidersRequest{Providers: []types.WriteProviderRecord{bswp}}
//
//lint:ignore SA1019 // ignore staticcheck
func (c *client) provideSignedBitswapRecord(ctx context.Context, bswp *types.WriteBitswapRecord) (time.Duration, error) {
//lint:ignore SA1019 // ignore staticcheck
req := jsontypes.WriteProvidersRequest{Providers: []types.Record{bswp}}

url := c.baseURL + server.ProvidePath
url := c.baseURL + "/routing/v1/providers/"

b, err := drjson.MarshalJSONBytes(req)
if err != nil {
Expand All @@ -306,6 +308,8 @@ func (c *client) provideSignedBitswapRecord(ctx context.Context, bswp *types.Wri
if resp.StatusCode != http.StatusOK {
return 0, httpError(resp.StatusCode, resp.Body)
}

//lint:ignore SA1019 // ignore staticcheck
var provideResult jsontypes.WriteProvidersResponse
err = json.NewDecoder(resp.Body).Decode(&provideResult)
if err != nil {
Expand All @@ -315,7 +319,8 @@ func (c *client) provideSignedBitswapRecord(ctx context.Context, bswp *types.Wri
return 0, fmt.Errorf("expected 1 result but got %d", len(provideResult.ProvideResults))
}

v, ok := provideResult.ProvideResults[0].(*types.WriteBitswapProviderRecordResponse)
//lint:ignore SA1019 // ignore staticcheck
v, ok := provideResult.ProvideResults[0].(*types.WriteBitswapRecordResponse)
if !ok {
return 0, fmt.Errorf("expected AdvisoryTTL field")
}
Expand All @@ -327,7 +332,80 @@ func (c *client) provideSignedBitswapRecord(ctx context.Context, bswp *types.Wri
return 0, nil
}

func (c *client) FindIPNSRecord(ctx context.Context, name ipns.Name) (*ipns.Record, error) {
func (c *client) FindPeers(ctx context.Context, pid peer.ID) (peers iter.ResultIter[types.Record], err error) {
m := newMeasurement("FindPeers")

url := c.baseURL + "/routing/v1/peers/" + peer.ToCid(pid).String()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, err
}
req.Header.Set("Accept", c.accepts)

m.host = req.Host

start := c.clock.Now()
resp, err := c.httpClient.Do(req)

m.err = err
m.latency = c.clock.Since(start)

if err != nil {
m.record(ctx)
return nil, err
}

m.statusCode = resp.StatusCode
if resp.StatusCode == http.StatusNotFound {
resp.Body.Close()
m.record(ctx)
return iter.FromSlice[iter.Result[types.Record]](nil), nil
}

if resp.StatusCode != http.StatusOK {
err := httpError(resp.StatusCode, resp.Body)
resp.Body.Close()
m.record(ctx)
return nil, err
}

respContentType := resp.Header.Get("Content-Type")
mediaType, _, err := mime.ParseMediaType(respContentType)
if err != nil {
resp.Body.Close()
m.err = err
m.record(ctx)
return nil, fmt.Errorf("parsing Content-Type: %w", err)
}

m.mediaType = mediaType

var skipBodyClose bool
defer func() {
if !skipBodyClose {
resp.Body.Close()
}
}()

var it iter.ResultIter[types.Record]
switch mediaType {
case mediaTypeJSON:
parsedResp := &jsontypes.PeersResponse{}
err = json.NewDecoder(resp.Body).Decode(parsedResp)
var sliceIt iter.Iter[types.Record] = iter.FromSlice(parsedResp.Peers)
it = iter.ToResultIter(sliceIt)
case mediaTypeNDJSON:
skipBodyClose = true
it = ndjson.NewRecordsIter(resp.Body)
default:
logger.Errorw("unknown media type", "MediaType", mediaType, "ContentType", respContentType)
return nil, errors.New("unknown content type")
}

return &measuringIter[iter.Result[types.Record]]{Iter: it, ctx: ctx, m: m}, nil
}

func (c *client) GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error) {
url := c.baseURL + "/routing/v1/ipns/" + name.String()

httpReq, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
Expand Down Expand Up @@ -365,7 +443,7 @@ func (c *client) FindIPNSRecord(ctx context.Context, name ipns.Name) (*ipns.Reco
return record, nil
}

func (c *client) ProvideIPNSRecord(ctx context.Context, name ipns.Name, record *ipns.Record) error {
func (c *client) PutIPNS(ctx context.Context, name ipns.Name, record *ipns.Record) error {
url := c.baseURL + "/routing/v1/ipns/" + name.String()

rawRecord, err := ipns.MarshalRecord(record)
Expand Down
Loading