From 7531b39dec0df717c7ff0b48bae9fe25f8c1ce14 Mon Sep 17 00:00:00 2001 From: lordforever Date: Sun, 15 Dec 2024 16:15:59 +0530 Subject: [PATCH 1/3] signature implemented --- contract-tools/xchain/go.mod | 10 +- contract-tools/xchain/go.sum | 10 + contract-tools/xchain/xchain.go | 294 ++++++++++++++++++++++----- contract-tools/xchain/xchain_test.go | 22 +- src/Prover-Axelar.sol | 103 +++++++++- 5 files changed, 379 insertions(+), 60 deletions(-) diff --git a/contract-tools/xchain/go.mod b/contract-tools/xchain/go.mod index 83dd59e..e445f23 100644 --- a/contract-tools/xchain/go.mod +++ b/contract-tools/xchain/go.mod @@ -16,7 +16,7 @@ require ( github.com/mitchellh/go-homedir v1.1.0 github.com/multiformats/go-multiaddr v0.12.4 github.com/urfave/cli/v2 v2.27.2 - golang.org/x/sync v0.7.0 + golang.org/x/sync v0.10.0 ) @@ -55,7 +55,7 @@ require ( github.com/filecoin-project/go-amt-ipld/v4 v4.3.0 // indirect github.com/filecoin-project/go-bitfield v0.2.4 // indirect github.com/filecoin-project/go-cbor-util v0.0.1 // indirect - github.com/filecoin-project/go-crypto v0.0.2-0.20240424000926-1808e310bbac // indirect + github.com/filecoin-project/go-crypto v0.1.0 // indirect github.com/filecoin-project/go-data-transfer v1.15.4-boost // indirect github.com/filecoin-project/go-data-transfer/v2 v2.0.0-rc8 // indirect github.com/filecoin-project/go-fil-commcid v0.1.0 // indirect @@ -216,12 +216,12 @@ require ( go.uber.org/mock v0.4.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect - golang.org/x/crypto v0.24.0 // indirect + golang.org/x/crypto v0.31.0 // indirect golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 // indirect golang.org/x/mod v0.17.0 // indirect golang.org/x/net v0.25.0 // indirect - golang.org/x/sys v0.21.0 // indirect - golang.org/x/text v0.16.0 // indirect + golang.org/x/sys v0.28.0 // indirect + golang.org/x/text v0.21.0 // indirect golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect google.golang.org/protobuf v1.34.1 // indirect diff --git a/contract-tools/xchain/go.sum b/contract-tools/xchain/go.sum index 27ea9b3..42ed3a9 100644 --- a/contract-tools/xchain/go.sum +++ b/contract-tools/xchain/go.sum @@ -168,6 +168,8 @@ github.com/filecoin-project/go-commp-utils v0.1.4/go.mod h1:Sekocu5q9b4ECAUFu853 github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ= github.com/filecoin-project/go-crypto v0.0.2-0.20240424000926-1808e310bbac h1:SwaxonCdMJz7n1PvSXj7FUK5xdC6E4dJuQBVbOkv3wE= github.com/filecoin-project/go-crypto v0.0.2-0.20240424000926-1808e310bbac/go.mod h1:K9UFXvvoyAVvB+0Le7oGlKiT9mgA5FHOJdYQXEE8IhI= +github.com/filecoin-project/go-crypto v0.1.0 h1:Pob2MphoipMbe/ksxZOMcQvmBHAd3sI/WEqcbpIsGI0= +github.com/filecoin-project/go-crypto v0.1.0/go.mod h1:K9UFXvvoyAVvB+0Le7oGlKiT9mgA5FHOJdYQXEE8IhI= github.com/filecoin-project/go-data-segment v0.0.1 h1:1wmDxOG4ubWQm3ZC1XI5nCon5qgSq7Ra3Rb6Dbu10Gs= github.com/filecoin-project/go-data-segment v0.0.1/go.mod h1:H0/NKbsRxmRFBcLibmABv+yFNHdmtl5AyplYLnb0Zv4= github.com/filecoin-project/go-data-transfer v1.15.4-boost h1:rGsPDeDk0nbzLOPn/9iCIrhLNy69Vkr9tRBcetM4kd0= @@ -1068,6 +1070,8 @@ golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDf golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= golang.org/x/crypto v0.24.0 h1:mnl8DM0o513X8fdIkmyFE/5hTYxbwYOjDS/+rK6qpRI= golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5DM= +golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U= +golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 h1:vr/HnozRka3pE4EsMEg1lgkXJkTFJCVUX+S/ZT6wYzM= golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842/go.mod h1:XtvwrStGgqGPLc4cjQfWqZHG1YFdYs6swckp8vpsjnc= @@ -1136,6 +1140,8 @@ golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= +golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180810173357-98c5dad5d1a0/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -1179,6 +1185,8 @@ golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws= golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= +golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= @@ -1202,6 +1210,8 @@ golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= +golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= +golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= diff --git a/contract-tools/xchain/xchain.go b/contract-tools/xchain/xchain.go index 182b390..73a2e26 100644 --- a/contract-tools/xchain/xchain.go +++ b/contract-tools/xchain/xchain.go @@ -9,6 +9,8 @@ import ( "io" "log" "math/big" + "math/bits" + "net/http" "os" "os/signal" @@ -26,31 +28,32 @@ import ( "github.com/ethereum/go-ethereum/accounts/keystore" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" + ethcrypto "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethclient" boosttypes "github.com/filecoin-project/boost/storagemarket/types" boosttypes2 "github.com/filecoin-project/boost/transport/types" "github.com/filecoin-project/go-address" cborutil "github.com/filecoin-project/go-cbor-util" + gocrypto "github.com/filecoin-project/go-crypto" "github.com/filecoin-project/go-data-segment/datasegment" "github.com/filecoin-project/go-data-segment/merkletree" "github.com/filecoin-project/go-jsonrpc" - inet "github.com/libp2p/go-libp2p/core/network" - filabi "github.com/filecoin-project/go-state-types/abi" fbig "github.com/filecoin-project/go-state-types/big" builtintypes "github.com/filecoin-project/go-state-types/builtin" "github.com/filecoin-project/go-state-types/builtin/v9/market" - "github.com/filecoin-project/go-state-types/crypto" "github.com/filecoin-project/lotus/api/v0api" lotustypes "github.com/filecoin-project/lotus/chain/types" "github.com/google/uuid" "github.com/ipfs/go-cid" "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p/core/host" + inet "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/mitchellh/go-homedir" "github.com/multiformats/go-multiaddr" "github.com/urfave/cli/v2" + "golang.org/x/crypto/blake2b" ) func main() { @@ -238,7 +241,9 @@ type Config struct { TransferPort int ProviderAddr string LotusAPI string - TargetAggSize int + LighthouseApiKey string + AuthToken string + TargetAggSize int } // Mirror OnRamp.sol's `Offer` struct @@ -279,6 +284,8 @@ type aggregator struct { transferID int // ID of the next transfer transferAddr string // address to listen for transfer requests targetDealSize uint64 // how big aggregates should be + minDealSize uint64 // minimum deal size + lighthouseApiKey string // API key for lighthouse host host.Host // libp2p host for deal protocol to boost spDealAddr *peer.AddrInfo // address to reach boost (or other) deal v 1.2 provider spActorAddr address.Address // address of the storage provider actor @@ -351,7 +358,7 @@ func NewAggregator(ctx context.Context, cfg *Config) (*aggregator, error) { return nil, err } - lAPI, closer, err := NewLotusDaemonAPIClientV0(ctx, cfg.LotusAPI, 1, "") + lAPI, closer, err := NewLotusDaemonAPIClientV0(ctx, cfg.LotusAPI, 1, cfg.AuthToken) if err != nil { return nil, err } @@ -401,6 +408,7 @@ func NewAggregator(ctx context.Context, cfg *Config) (*aggregator, error) { spDealAddr: psPeerInfo, spActorAddr: providerAddr, lotusAPI: lAPI, + lighthouseApiKey: cfg.LighthouseApiKey, cleanup: func() { closer() fmt.Printf("done with lotus api closer\n") @@ -424,7 +432,10 @@ func (a *aggregator) run(ctx context.Context) error { } err := a.SubscribeQuery(ctx, query) - for err == nil || strings.Contains(err.Error(), "read tcp") { + // if err != nil { + // return err + // } + for err == nil || strings.Contains(err.Error(), "read udp") { if err != nil { log.Printf("ignoring mystery error: %s", err) } @@ -483,7 +494,7 @@ const ( // libp2p identifier for latest deal protocol DealProtocolv120 = "/fil/storage/mk/1.2.0" // Delay to start deal at. For 2k devnet 4 second block time this is 13.3 minutes TODO Config - dealDelayEpochs = 200 + dealDelayEpochs = 10000 // Storage deal duration, TODO figure out what to do about this, either comes from offer or config dealDuration = 518400 // 6 months (on mainnet) ) @@ -491,12 +502,13 @@ const ( func (a *aggregator) runAggregate(ctx context.Context) error { // pieces being aggregated, flushed upon commitment // Invariant: the pieces in the pending queue can always make a valid aggregate w.r.t a.targetDealSize - pending := make([]DataReadyEvent, 0, 256) + // pending := make([]DataReadyEvent, 0, 256) + var pending []DataReadyEvent total := uint64(0) - prefixPiece := filabi.PieceInfo{ - Size: filabi.PaddedPieceSize(prefixCARSizePadded), - PieceCID: cid.MustParse(prefixCARCid), - } + // prefixPiece := filabi.PieceInfo{ + // Size: filabi.PaddedPieceSize(prefixCARSizePadded), + // PieceCID: cid.MustParse(prefixCARCid), + // } for { select { @@ -504,15 +516,17 @@ func (a *aggregator) runAggregate(ctx context.Context) error { fmt.Printf("ctx done shutting down aggregation") return nil case latestEvent := <-a.ch: - // Check if the offer is too big to fit in a valid aggregate on its own + if len(pending) >= 0 { + // Check if the offer is too big to fit in a valid aggregate on its own // TODO: as referenced below there must be a better way when we introspect on the gory details of NewAggregate latestPiece, err := latestEvent.Offer.Piece() + pending = append(pending, latestEvent) if err != nil { log.Printf("skipping offer %d, size %d not valid padded piece size ", latestEvent.OfferID, latestEvent.Offer.Size) continue } _, err = datasegment.NewAggregate(filabi.PaddedPieceSize(a.targetDealSize), []filabi.PieceInfo{ - prefixPiece, + // prefixPiece, latestPiece, }) if err != nil { @@ -528,7 +542,7 @@ func (a *aggregator) runAggregate(ctx context.Context) error { // all the gory edge cases in NewAggregate // Turn offers into datasegment pieces - pieces := make([]filabi.PieceInfo, len(pending)+1) + pieces := make([]filabi.PieceInfo, len(pending)) for i, event := range pending { piece, err := event.Offer.Piece() if err != nil { @@ -537,18 +551,31 @@ func (a *aggregator) runAggregate(ctx context.Context) error { pieces[i] = piece } - pieces[len(pending)] = latestPiece + // pieces[len(pending)] = latestPiece // aggregate - aggregatePieces := append([]filabi.PieceInfo{ - prefixPiece, - }, pieces...) - _, err = datasegment.NewAggregate(filabi.PaddedPieceSize(a.targetDealSize), aggregatePieces) - if err != nil { // we've overshot, lets commit to just pieces in pending + // aggregatePieces := append([]filabi.PieceInfo{ + // prefixPiece, + // }, pieces...) + aggregatePieces := pieces + _, size, err := datasegment.ComputeDealPlacement(aggregatePieces) + if err != nil { + panic(err) + } + overallSize := filabi.PaddedPieceSize(size) + next := 1 << (64 - bits.LeadingZeros64(uint64(overallSize+256))) + if next < int(a.minDealSize) { + next = int(a.minDealSize) + } + dealSize := filabi.PaddedPieceSize(next) + a.targetDealSize = uint64(dealSize) + // _, err = datasegment.NewAggregate(dealSize, aggregatePieces) + // if err != nil { // we've overshot, lets commit to just pieces in pending total = 0 + // Remove the latest offer which took us over - pieces = pieces[:len(pieces)-1] - aggregatePieces = aggregatePieces[:len(aggregatePieces)-1] - agg, err := datasegment.NewAggregate(filabi.PaddedPieceSize(a.targetDealSize), aggregatePieces) + // pieces = pieces[:len(pieces)-1] + // aggregatePieces = aggregatePieces[:len(aggregatePieces)-1] + agg, err := datasegment.NewAggregate(dealSize, aggregatePieces) if err != nil { return fmt.Errorf("failed to create aggregate from pending, should not be reachable: %w", err) } @@ -593,17 +620,44 @@ func (a *aggregator) runAggregate(ctx context.Context) error { a.transferID++ a.transferLk.Unlock() log.Printf("Transfer ID %d scheduled for aggregate %s", transferID, aggCommp.String()) - - err = a.sendDeal(ctx, aggCommp, transferID) + aggLocation := `~/` + aggCommp.String() + err = a.saveAggregateToFile(transferID, aggLocation) + if err != nil { + log.Fatalf("failed to save aggregate to file: %s", err) + } + // send file to lighthouse + lhResp, err := UploadToLighthouse(aggLocation, a.lighthouseApiKey) + if err != nil { + log.Fatalf("failed to upload to lighthouse: %s", err) + } + retrievalURL := fmt.Sprintf("https://gateway.lighthouse.storage/ipfs/%s", lhResp.Hash) + err = a.sendDeal(ctx, aggCommp, retrievalURL); if err != nil { log.Printf("[ERROR] failed to send deal: %s", err) } - + // Delete the file at aggLocation + err = os.Remove(aggLocation) + if err != nil { + log.Printf("[ERROR] failed to delete file at %s: %s", aggLocation, err) + } // Reset queue to empty, add the event that triggered aggregation pending = pending[:0] - pending = append(pending, latestEvent) + // pending = append(pending, latestEvent) } else { + latestPiece, err := latestEvent.Offer.Piece() + if err != nil { + log.Printf("skipping offer %d, size %d not valid padded piece size ", latestEvent.OfferID, latestEvent.Offer.Size) + continue + } + _, err = datasegment.NewAggregate(filabi.PaddedPieceSize(a.targetDealSize), []filabi.PieceInfo{ + // prefixPiece, + latestPiece, + }) + if err != nil { + log.Printf("skipping offer %d, size %d exceeds max PODSI packable size", latestEvent.OfferID, latestEvent.Offer.Size) + continue + } total += latestEvent.Offer.Size pending = append(pending, latestEvent) log.Printf("Offer %d added. %d offers pending aggregation with total size=%d\n", latestEvent.OfferID, len(pending), total) @@ -615,7 +669,7 @@ func (a *aggregator) runAggregate(ctx context.Context) error { // Send deal data to the configured SP deal making address (boost node) // The deal is made with the configured prover client contract // Heavily inspired by boost client -func (a *aggregator) sendDeal(ctx context.Context, aggCommp cid.Cid, transferID int) error { +func (a *aggregator) sendDeal(ctx context.Context, aggCommp cid.Cid, url string) error { if err := a.host.Connect(ctx, *a.spDealAddr); err != nil { return fmt.Errorf("failed to connect to peer %s: %w", a.spDealAddr.ID, err) } @@ -631,7 +685,8 @@ func (a *aggregator) sendDeal(ctx context.Context, aggCommp cid.Cid, transferID dealUuid := uuid.New() log.Printf("making deal for commp %s, UUID=%s\n", aggCommp.String(), dealUuid) transferParams := boosttypes2.HttpRequest{ - URL: fmt.Sprintf("http://%s/?id=%d", a.transferAddr, transferID), + + URL: url, } paramsBytes, err := json.Marshal(transferParams) if err != nil { @@ -639,7 +694,7 @@ func (a *aggregator) sendDeal(ctx context.Context, aggCommp cid.Cid, transferID } transfer := boosttypes.Transfer{ Type: "http", - ClientID: fmt.Sprintf("%d", transferID), + // ClientID: fmt.Sprintf("%d", transferID), Params: paramsBytes, Size: a.targetDealSize - a.targetDealSize/128, // aggregate for transfer is not fr32 encoded } @@ -666,37 +721,88 @@ func (a *aggregator) sendDeal(ctx context.Context, aggCommp cid.Cid, transferID return fmt.Errorf("failed to get chain ID: %w", err) } // Encode the chainID as uint256 - encodedChainID, err := encodeChainID(chainID) + // intEncodedChainID, err := encodeChainID(chainID) + // if err != nil { + // return fmt.Errorf("failed to encode chainID: %w", err) + // } + + clientAddr, err := a.lotusAPI.WalletDefaultAddress(ctx) + if err != nil { + return err + } + dummyBuf, err := cborutil.Dump("dummy") + if err != nil { + return err + } + dummySig, err := a.lotusAPI.WalletSign(ctx, clientAddr, dummyBuf) if err != nil { - return fmt.Errorf("failed to encode chainID: %w", err) + return fmt.Errorf("wallet sign failed: %w", err) } - dealLabel, err := market.NewLabelFromBytes(encodedChainID) + b2sum := blake2b.Sum256(dummyBuf) + pubk, err := gocrypto.EcRecover(b2sum[:], dummySig.Data) + if err != nil { + return err + } + pubkHash := ethcrypto.Keccak256(pubk[1:]) // Skip the first byte (0x04) which indicates uncompressed public key + ethAddress := common.BytesToAddress(pubkHash[12:]) // Take the last 20 bytes + encodedLabel, err := encodeChainIDAndAddress(chainID, ethAddress) + if err != nil { + return fmt.Errorf("failed to encode chainID and address: %w", err) + } + fmt.Println(ethAddress.Hex()) + fmt.Println(hex.EncodeToString(encodedLabel)) + dealLabel, err := market.NewLabelFromBytes(encodedLabel) if err != nil { return fmt.Errorf("failed to create deal label: %w", err) } - proposal := market.ClientDealProposal{ - Proposal: market.DealProposal{ - PieceCID: aggCommp, - PieceSize: filabi.PaddedPieceSize(a.targetDealSize), - VerifiedDeal: false, - Client: filClient, - Provider: a.spActorAddr, - StartEpoch: dealStart, - EndEpoch: dealEnd, - StoragePricePerEpoch: fbig.NewInt(0), - ProviderCollateral: providerCollateral, - Label: dealLabel, - }, + // encodedChainID, err := encodeChainIDAsString(chainID) + // if err != nil { + // return fmt.Errorf("failed to encode chainID: %w", err) + // } + // fmt.Println(encodedChainID) + // dealLabel, err := market.NewLabelFromString(encodedChainID) + // if err != nil { + // return fmt.Errorf("failed to create deal label: %w", err) + // } + proposal := market.DealProposal{ + PieceCID: aggCommp, + PieceSize: filabi.PaddedPieceSize(a.targetDealSize), + VerifiedDeal: false, + Client: filClient, + Provider: a.spActorAddr, + StartEpoch: dealStart, + EndEpoch: dealEnd, + StoragePricePerEpoch: fbig.NewInt(0), + ProviderCollateral: providerCollateral, + Label: dealLabel, + } + + + buf, err := cborutil.Dump(&proposal) + if err != nil { + return err + } +// fmt.Println(hex.EncodeToString(buf)) + log.Printf("about to sign with clientAddr: %s", clientAddr) + sig, err := a.lotusAPI.WalletSign(ctx, clientAddr, buf) + if err != nil { + return fmt.Errorf("wallet sign failed: %w", err) + } +// fmt.Println(hex.EncodeToString(sig.Data)) + + clientProposal := market.ClientDealProposal{ + Proposal: proposal, // Signature is unchecked since client is smart contract - ClientSignature: crypto.Signature{ - Type: crypto.SigTypeBLS, - Data: []byte{0xc0, 0xff, 0xee}, - }, + ClientSignature: *sig, + // crypto.Signature{ + // Type: crypto.SigTypeBLS, + // Data: []byte{0xc0, 0xff, 0xee}, + // }, } dealParams := boosttypes.DealParams{ DealUUID: dealUuid, - ClientDealProposal: proposal, + ClientDealProposal: clientProposal, DealDataRoot: aggCommp, IsOffline: false, Transfer: transfer, @@ -828,6 +934,49 @@ func (a *aggregator) transferHandler(w http.ResponseWriter, r *http.Request) { } } +func (a *aggregator) saveAggregateToFile(id int, location string) error { + a.transferLk.RLock() + transfer, ok := a.transfers[id] + a.transferLk.RUnlock() + if !ok { + return fmt.Errorf("no data found for ID %d", id) + } + + // First write the CAR prefix to the file + // prefixCARBytes, err := hex.DecodeString(prefixCAR) + // if err != nil { + // return fmt.Errorf("failed to decode CAR prefix: %w", err) + // } + + readers := []io.Reader{ + // bytes.NewReader(prefixCARBytes) + } + // Fetch each sub piece from its buffer location and add to readers + for _, url := range transfer.locations { + lazyReader := &lazyHTTPReader{url: url} + readers = append(readers, lazyReader) + defer lazyReader.Close() + } + aggReader, err := transfer.agg.AggregateObjectReader(readers) + if err != nil { + return fmt.Errorf("failed to create aggregate reader: %w", err) + } + + // Create the file at the specified location + file, err := os.Create(location) + if err != nil { + return fmt.Errorf("failed to create file: %w", err) + } + defer file.Close() + + // Copy the aggregated data to the file + _, err = io.Copy(file, aggReader) + if err != nil { + return fmt.Errorf("failed to write aggregate stream to file: %w", err) + } + + return nil +} type AggregateTransfer struct { locations []string agg *datasegment.Aggregate @@ -838,6 +987,7 @@ func (a *aggregator) SubscribeQuery(ctx context.Context, query ethereum.FilterQu log.Printf("Listening for data ready events on %s\n", a.onrampAddr.Hex()) sub, err := a.client.SubscribeFilterLogs(ctx, query, logs) if err != nil { + fmt.Println(err) return err } defer sub.Unsubscribe() @@ -932,7 +1082,7 @@ func MakeOffer(cidStr string, sizeStr string, location string, token string, amo Amount: amountBig, Size: uint64(size), } - + fmt.Println(hex.EncodeToString(commP.Bytes())) return &offer, nil } @@ -1038,4 +1188,42 @@ func encodeChainID(chainID *big.Int) ([]byte, error) { return data, nil } +// encodeChainIDAndAddress encodes the chain ID and Ethereum address into bytes +func encodeChainIDAndAddress(chainID *big.Int, ethAddress common.Address) ([]byte, error) { + uint256Type, err := abi.NewType("uint256", "", nil) + if err != nil { + return nil, fmt.Errorf("failed to create uint256 type: %w", err) + } + + addressType, err := abi.NewType("address", "", nil) + if err != nil { + return nil, fmt.Errorf("failed to create address type: %w", err) + } + + + // Define the ABI encoding + arguments := abi.Arguments{ + {Type: uint256Type}, + {Type: addressType}, + } + + // Encode the chain ID and Ethereum address + encodedBytes, err := arguments.Pack(chainID, ethAddress) + if err != nil { + return nil, fmt.Errorf("error encoding arguments: %w", err) + } + + return encodedBytes, nil +} +// encodeChainIDAsString converts a *big.Int chain ID to its string representation +func encodeChainIDAsString(chainID *big.Int) (string, error) { + if chainID == nil { + return "", fmt.Errorf("chainID cannot be nil") + } + + // Convert the *big.Int to a string + chainIDStr := chainID.String() + + return chainIDStr, nil +} diff --git a/contract-tools/xchain/xchain_test.go b/contract-tools/xchain/xchain_test.go index 4ba280c..499e0c7 100644 --- a/contract-tools/xchain/xchain_test.go +++ b/contract-tools/xchain/xchain_test.go @@ -9,6 +9,7 @@ import ( "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/ethclient" + "github.com/filecoin-project/go-address" "github.com/stretchr/testify/assert" ) @@ -79,4 +80,23 @@ func decodeChainID(data []byte) (*big.Int, error) { } return chainID, nil -} \ No newline at end of file +} + +func TestAddrToBytes(t *testing.T) { + // Example address (replace with actual address) + addr, err := address.NewFromString("t0116147") + if err != nil { + fmt.Println("Error creating address:", err) + return + } + + // Get the byte representation of the address + addrBytes := addr.Bytes() + + // Encode the byte representation to a hexadecimal string + addrHex := hex.EncodeToString(addrBytes) + + fmt.Println("Address in bytes: ", addrBytes) + fmt.Println("Address in hex: ", addrHex) + assert.Equal(t, addrHex, "00b38b07", "Address does not match expected value") + } \ No newline at end of file diff --git a/src/Prover-Axelar.sol b/src/Prover-Axelar.sol index 3d4d746..4b03b36 100644 --- a/src/Prover-Axelar.sol +++ b/src/Prover-Axelar.sol @@ -51,6 +51,15 @@ contract DealClient is AxelarExecutable { mapping(bytes => Status) public pieceStatus; mapping(bytes => uint256) public providerGasFunds; // Funds set aside for calling oracle by provider mapping(uint256 => DestinationChain) public chainIdToDestinationChain; + event DealNotify( + uint64 dealId, + bytes commP, + bytes data, + bytes chainId, + bytes provider, + bytes payload + ); + event ReceivedDataCap(string received); constructor( address _gateway, @@ -87,6 +96,38 @@ contract DealClient is AxelarExecutable { providerGasFunds[providerAddrData] += msg.value; } + function receiveDataCap(bytes memory) internal { + require( + msg.sender == DATACAP_ACTOR_ETH_ADDRESS, + "msg.sender needs to be datacap actor f07" + ); + emit ReceivedDataCap("DataCap Received!"); + // Add get datacap balance api and store datacap amount + } + + // authenticateMessage is the callback from the market actor into the contract + // as part of PublishStorageDeals. This message holds the deal proposal from the + // miner, which needs to be validated by the contract in accordance with the + // deal requests made and the contract's own policies + // @params - cbor byte array of AccountTypes.AuthenticateMessageParams + function authenticateMessage(bytes memory params) internal view { + require( + msg.sender == MARKET_ACTOR_ETH_ADDRESS, + "msg.sender needs to be market actor f05" + ); + + AccountTypes.AuthenticateMessageParams memory amp = params + .deserializeAuthenticateMessageParams(); + MarketTypes.DealProposal memory proposal = MarketCBOR + .deserializeDealProposal(amp.message); + (, address filAddress) = abi.decode( + proposal.label.data, + (uint256, address) + ); + address recovered = recovers(bytes32(amp.message), amp.signature); + require(recovered == filAddress, "Invalid signature"); + } + // dealNotify is the callback from the market actor into the contract at the end // of PublishStorageDeals. This message holds the previously approved deal proposal // and the associated dealID. The dealID is stored as part of the contract state @@ -109,7 +150,13 @@ contract DealClient is AxelarExecutable { int64 duration = CommonTypes.ChainEpoch.unwrap(proposal.end_epoch) - CommonTypes.ChainEpoch.unwrap(proposal.start_epoch); // Expects deal label to be chainId encoded in bytes - uint256 chainId = abi.decode(proposal.label.data, (uint256)); + // string memory chainIdStr = abi.decode(proposal.label.data, (string)); + (uint256 chainId, ) = abi.decode( + proposal.label.data, + (uint256, address) + ); + + // uint256 chainId = asciiBytesToUint(proposal.label.data); DataAttestation memory attest = DataAttestation( proposal.piece_cid.data, duration, @@ -117,6 +164,15 @@ contract DealClient is AxelarExecutable { uint256(Status.DealPublished) ); bytes memory payload = abi.encode(attest); + + emit DealNotify( + mdnp.dealId, + proposal.piece_cid.data, + params, + proposal.label.data, + proposal.provider.data, + payload + ); if (chainId == block.chainid) { IBridgeContract( chainIdToDestinationChain[chainId].destinationAddress @@ -225,4 +281,49 @@ contract DealClient is AxelarExecutable { ) internal pure returns (string memory) { return Strings.toHexString(uint256(uint160(_addr)), 20); } + + function asciiBytesToUint( + bytes memory asciiBytes + ) public pure returns (uint256) { + uint256 result = 0; + for (uint256 i = 0; i < asciiBytes.length; i++) { + uint256 digit = uint256(uint8(asciiBytes[i])) - 48; // Convert ASCII to digit + require(digit <= 9, "Invalid ASCII byte"); + result = result * 10 + digit; + } + return result; + } + + function recovers( + bytes32 hash, + bytes memory signature + ) public pure returns (address) { + bytes32 r; + bytes32 s; + uint8 v; + + // Check the signature length + if (signature.length != 65) { + return (address(0)); + } + + // Divide the signature in r, s and v variables + assembly { + r := mload(add(signature, 32)) + s := mload(add(signature, 64)) + v := byte(0, mload(add(signature, 96))) + } + + // Version of signature should be 27 or 28, but 0 and 1 are also possible versions + if (v < 27) { + v += 27; + } + // address check = ECDSA.recover(hash, signature); + // If the version is correct return the signer address + if (v != 27 && v != 28) { + return (address(0)); + } else { + return ecrecover(hash, v, r, s); + } + } } From c5bd09e2ce42c1de57010f4a63809bbc4e652d99 Mon Sep 17 00:00:00 2001 From: lordforever Date: Tue, 17 Dec 2024 23:30:01 +0530 Subject: [PATCH 2/3] sig with blake2b --- contract-tools/xchain/xchain.go | 19 +-- foundry.toml | 1 + src/Prover-Axelar.sol | 42 ++++-- src/blake2lib.sol | 246 ++++++++++++++++++++++++++++++++ 4 files changed, 285 insertions(+), 23 deletions(-) create mode 100644 src/blake2lib.sol diff --git a/contract-tools/xchain/xchain.go b/contract-tools/xchain/xchain.go index 73a2e26..c257436 100644 --- a/contract-tools/xchain/xchain.go +++ b/contract-tools/xchain/xchain.go @@ -749,21 +749,13 @@ func (a *aggregator) sendDeal(ctx context.Context, aggCommp cid.Cid, url string) if err != nil { return fmt.Errorf("failed to encode chainID and address: %w", err) } - fmt.Println(ethAddress.Hex()) - fmt.Println(hex.EncodeToString(encodedLabel)) - dealLabel, err := market.NewLabelFromBytes(encodedLabel) + labelString := hex.EncodeToString(encodedLabel) + + dealLabel, err := market.NewLabelFromString(labelString) if err != nil { return fmt.Errorf("failed to create deal label: %w", err) } - // encodedChainID, err := encodeChainIDAsString(chainID) - // if err != nil { - // return fmt.Errorf("failed to encode chainID: %w", err) - // } - // fmt.Println(encodedChainID) - // dealLabel, err := market.NewLabelFromString(encodedChainID) - // if err != nil { - // return fmt.Errorf("failed to create deal label: %w", err) - // } + proposal := market.DealProposal{ PieceCID: aggCommp, PieceSize: filabi.PaddedPieceSize(a.targetDealSize), @@ -782,13 +774,12 @@ func (a *aggregator) sendDeal(ctx context.Context, aggCommp cid.Cid, url string) if err != nil { return err } -// fmt.Println(hex.EncodeToString(buf)) + log.Printf("about to sign with clientAddr: %s", clientAddr) sig, err := a.lotusAPI.WalletSign(ctx, clientAddr, buf) if err != nil { return fmt.Errorf("wallet sign failed: %w", err) } -// fmt.Println(hex.EncodeToString(sig.Data)) clientProposal := market.ClientDealProposal{ Proposal: proposal, diff --git a/foundry.toml b/foundry.toml index e6810b2..77b36c4 100644 --- a/foundry.toml +++ b/foundry.toml @@ -2,5 +2,6 @@ src = 'src' out = 'out' libs = ['lib'] +evm_version = 'cancun' # See more config options https://github.com/foundry-rs/foundry/tree/master/config \ No newline at end of file diff --git a/src/Prover-Axelar.sol b/src/Prover-Axelar.sol index 4b03b36..b3b5c2c 100644 --- a/src/Prover-Axelar.sol +++ b/src/Prover-Axelar.sol @@ -18,6 +18,7 @@ import {Strings} from "lib/openzeppelin-contracts/contracts/utils/Strings.sol"; import {AxelarExecutable} from "lib/axelar-gmp-sdk-solidity/contracts/executable/AxelarExecutable.sol"; import {IAxelarGateway} from "lib/axelar-gmp-sdk-solidity/contracts/interfaces/IAxelarGateway.sol"; import {IAxelarGasService} from "lib/axelar-gmp-sdk-solidity/contracts/interfaces/IAxelarGasService.sol"; +import {BLAKE2b} from "./blake2lib.sol"; using CBOR for CBOR.CBORBuffer; @@ -120,11 +121,12 @@ contract DealClient is AxelarExecutable { .deserializeAuthenticateMessageParams(); MarketTypes.DealProposal memory proposal = MarketCBOR .deserializeDealProposal(amp.message); - (, address filAddress) = abi.decode( - proposal.label.data, - (uint256, address) + bytes memory encodedData = convertAsciiHexToBytes(proposal.label.data); + (, address filAddress) = abi.decode(encodedData, (uint256, address)); + address recovered = recovers( + bytes32(BLAKE2b.hash(amp.message, "", "", "", 32)), + amp.signature ); - address recovered = recovers(bytes32(amp.message), amp.signature); require(recovered == filAddress, "Invalid signature"); } @@ -151,10 +153,8 @@ contract DealClient is AxelarExecutable { CommonTypes.ChainEpoch.unwrap(proposal.start_epoch); // Expects deal label to be chainId encoded in bytes // string memory chainIdStr = abi.decode(proposal.label.data, (string)); - (uint256 chainId, ) = abi.decode( - proposal.label.data, - (uint256, address) - ); + bytes memory encodedData = convertAsciiHexToBytes(proposal.label.data); + (uint256 chainId, ) = abi.decode(encodedData, (uint256, address)); // uint256 chainId = asciiBytesToUint(proposal.label.data); DataAttestation memory attest = DataAttestation( @@ -262,14 +262,16 @@ contract DealClient is AxelarExecutable { uint64 codec; // dispatch methods if (method == AUTHENTICATE_MESSAGE_METHOD_NUM) { + authenticateMessage(params); // If we haven't reverted, we should return a CBOR true to indicate that verification passed. - // Always authenticate message CBOR.CBORBuffer memory buf = CBOR.create(1); buf.writeBool(true); ret = buf.data(); codec = Misc.CBOR_CODEC; } else if (method == MARKET_NOTIFY_DEAL_METHOD_NUM) { dealNotify(params); + } else if (method == DATACAP_RECEIVER_HOOK_METHOD_NUM) { + receiveDataCap(params); } else { revert("the filecoin method that was called is not handled"); } @@ -294,6 +296,28 @@ contract DealClient is AxelarExecutable { return result; } + function convertAsciiHexToBytes( + bytes memory asciiHex + ) public pure returns (bytes memory) { + require(asciiHex.length % 2 == 0, "Invalid ASCII hex string length"); + + bytes memory result = new bytes(asciiHex.length / 2); + for (uint256 i = 0; i < asciiHex.length / 2; i++) { + result[i] = byteFromHexChar(asciiHex[2 * i], asciiHex[2 * i + 1]); + } + + return result; + } + + function byteFromHexChar( + bytes1 char1, + bytes1 char2 + ) internal pure returns (bytes1) { + uint8 nibble1 = uint8(char1) - (uint8(char1) < 58 ? 48 : 87); + uint8 nibble2 = uint8(char2) - (uint8(char2) < 58 ? 48 : 87); + return bytes1(nibble1 * 16 + nibble2); + } + function recovers( bytes32 hash, bytes memory signature diff --git a/src/blake2lib.sol b/src/blake2lib.sol new file mode 100644 index 0000000..90bda8d --- /dev/null +++ b/src/blake2lib.sol @@ -0,0 +1,246 @@ +// SPDX-License-Identifier: MIT +// SPDX-FileCopyrightText: Copyright 2024 David Leung + +pragma solidity ^0.8.17; + +error OutputLengthCannotBeZero(); +error OutputLengthExceeded(); +error KeyLengthExceeded(); +error InputLengthExceeded(); + +library BLAKE2b { + // Initial state vectors + // + // IV 0-3 as numerical values + // 0x6A09E667F3BCC908 0xbb67ae8584caa73b 0x3c6ef372fe94f82b 0xa54ff53a5f1d36f1 + // IV 0-3 in little-endian encoding + // 08c9bcf367e6096a 3ba7ca8485ae67bb 2bf894fe72f36e3c f1361d5f3af54fa5 + // IV 0-3 XOR with parameter block set to sequential mode: + // 0000010100000000 0000000000000000 0000000000000000 0000000000000000 + // XOR Result: + // 08c9bdf267e6096a 3ba7ca8485ae67bb 2bf894fe72f36e3c f1361d5f3af54fa5 + // + // IV 4-7 as numerical values + // 0x510e527fade682d1 0x9b05688c2b3e6c1f 0x1f83d9abfb41bd6b 0x5be0cd19137e2179 + // IV 4-7 as little-endian encoded bytes + // d182e6ad7f520e51 1f6c3e2b8c68059b 6bbd41fbabd9831f 79217e1319cde05b + bytes32 private constant IS0 = + bytes32( + hex"08c9bdf267e6096a3ba7ca8485ae67bb2bf894fe72f36e3cf1361d5f3af54fa5" + ); + bytes32 private constant IS1 = + bytes32( + hex"d182e6ad7f520e511f6c3e2b8c68059b6bbd41fbabd9831f79217e1319cde05b" + ); + + uint256 private constant BLOCK_SIZE_BYTES = 128; + + function hash( + bytes memory input, + bytes memory key, + bytes memory salt, + bytes memory personalization, + uint256 digestLen + ) internal view returns (bytes memory digest) { + if (digestLen == 0) { + revert OutputLengthCannotBeZero(); + } + + if (digestLen > 64) { + revert OutputLengthExceeded(); + } + + if (key.length > 64) { + revert KeyLengthExceeded(); + } + + //////////////////////////////////////////// + // INITIALIZATION + //////////////////////////////////////////// + + // See https://eips.ethereum.org/EIPS/eip-152#specification + // We refer to the collective inputs to the F function as the context. + bytes memory context = new bytes(213); + + bytes32[2] memory h = [IS0 ^ bytes32(digestLen << 248), IS1]; + + if (key.length > 0) { + h[0] ^= bytes32(key.length << 240); + } + + if (salt.length > 0) { + h[1] ^= bytes32(salt); + } + + if (personalization.length > 0) { + h[1] ^= bytes32(personalization) >> 128; + } + + assembly { + // Set the round count (12 for BLAKE2b) in the context + mstore8(add(context, 35), 12) + // Copy the initial hash state to the context + mcopy(add(context, 36), h, 64) + } + + uint256 bytesProcessed = 0; + uint256 bufferUsed = 0; + + // If key is present, copy it to the context, and compress it as a full block + if (key.length > 0) { + assembly { + // key length := mload(key) + // pointer to key := add(key, 32) + // pointer to state := add(context, 100) + mcopy(add(context, 100), add(key, 32), mload(key)) + } + + bufferUsed = BLOCK_SIZE_BYTES; + } + + //////////////////////////////////////////// + // INPUT PROCESSING + //////////////////////////////////////////// + + uint256 readInputOffset = 0; + + // Read full block chunks + while (readInputOffset + BLOCK_SIZE_BYTES <= input.length) { + if (bufferUsed == BLOCK_SIZE_BYTES) { + unchecked { + bytesProcessed += BLOCK_SIZE_BYTES; + } + + bytes8[1] memory tt = [ + bytes8(reverseByteOrder(uint64(bytesProcessed))) + ]; + + assembly { + mcopy(add(context, 228), tt, 8) + if iszero( + staticcall( + not(0), + 0x09, + add(context, 32), + 0xd5, + add(context, 36), + 0x40 + ) + ) { + revert(0, 0) + } + } + + bufferUsed = 0; + } + + assembly { + mcopy( + add(add(context, 100), bufferUsed), + add(input, add(32, readInputOffset)), + BLOCK_SIZE_BYTES + ) + } + + unchecked { + bufferUsed = BLOCK_SIZE_BYTES; + readInputOffset += BLOCK_SIZE_BYTES; + } + } + + // Handle partial block + if (readInputOffset < input.length) { + if (bufferUsed == BLOCK_SIZE_BYTES) { + unchecked { + bytesProcessed += BLOCK_SIZE_BYTES; + } + + bytes8[1] memory tt = [ + bytes8(reverseByteOrder(uint64(bytesProcessed))) + ]; + + assembly { + mcopy(add(context, 228), tt, 8) + if iszero( + staticcall( + not(0), + 0x09, + add(context, 32), + 0xd5, + add(context, 36), + 0x40 + ) + ) { + revert(0, 0) + } + } + + bufferUsed = 0; + + // Reset the message buffer, as we are going to process a partial block + assembly { + mstore(add(context, 100), 0) + mstore(add(context, 132), 0) + mstore(add(context, 164), 0) + mstore(add(context, 196), 0) + } + } + + assembly { + // left = input.length - inputOffset. Safe casting, because left is always less than 128 + let left := sub(mload(input), readInputOffset) + mcopy( + add(add(context, 100), bufferUsed), + add(input, add(32, readInputOffset)), + left + ) + bufferUsed := add(bufferUsed, left) + } + } + + //////////////////////////////////////////// + // FINAL + //////////////////////////////////////////// + + unchecked { + bytesProcessed += bufferUsed; + } + + bytes8[1] memory tt = [ + bytes8(reverseByteOrder(uint64(bytesProcessed))) + ]; + + assembly { + // Set final block flag + mstore8(add(context, 244), 1) + mcopy(add(context, 228), tt, 8) + if iszero( + staticcall( + not(0), + 0x09, + add(context, 32), + 0xd5, + add(context, 36), + 0x40 + ) + ) { + revert(0, 0) + } + + // digest = new bytes(digestLen) + digest := mload(0x40) + mstore(0x40, add(digest, add(digestLen, 0x20))) + mstore(digest, digestLen) + + // copy final hash state to digest + mcopy(add(digest, 32), add(context, 36), digestLen) + } + } + + function reverseByteOrder(uint64 input) internal pure returns (uint64 v) { + v = input; + v = ((v & 0xFF00FF00FF00FF00) >> 8) | ((v & 0x00FF00FF00FF00FF) << 8); + v = ((v & 0xFFFF0000FFFF0000) >> 16) | ((v & 0x0000FFFF0000FFFF) << 16); + v = (v >> 32) | (v << 32); + } +} From fc11a90247d40490f9b32462cafa91fa934f190e Mon Sep 17 00:00:00 2001 From: lordforever Date: Fri, 20 Dec 2024 04:59:59 +0530 Subject: [PATCH 3/3] buffer and nits --- contract-tools/xchain/xchain.go | 176 +++++++++++++------------------- 1 file changed, 73 insertions(+), 103 deletions(-) diff --git a/contract-tools/xchain/xchain.go b/contract-tools/xchain/xchain.go index c257436..3ea5df4 100644 --- a/contract-tools/xchain/xchain.go +++ b/contract-tools/xchain/xchain.go @@ -1,7 +1,6 @@ package main import ( - "bytes" "context" "encoding/hex" "encoding/json" @@ -20,6 +19,9 @@ import ( "sync" "time" + ethcrypto "github.com/ethereum/go-ethereum/crypto" + gocrypto "github.com/filecoin-project/go-crypto" + "golang.org/x/crypto/blake2b" "golang.org/x/sync/errgroup" "github.com/ethereum/go-ethereum" @@ -28,16 +30,16 @@ import ( "github.com/ethereum/go-ethereum/accounts/keystore" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" - ethcrypto "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethclient" boosttypes "github.com/filecoin-project/boost/storagemarket/types" boosttypes2 "github.com/filecoin-project/boost/transport/types" "github.com/filecoin-project/go-address" cborutil "github.com/filecoin-project/go-cbor-util" - gocrypto "github.com/filecoin-project/go-crypto" "github.com/filecoin-project/go-data-segment/datasegment" "github.com/filecoin-project/go-data-segment/merkletree" "github.com/filecoin-project/go-jsonrpc" + inet "github.com/libp2p/go-libp2p/core/network" + filabi "github.com/filecoin-project/go-state-types/abi" fbig "github.com/filecoin-project/go-state-types/big" builtintypes "github.com/filecoin-project/go-state-types/builtin" @@ -48,12 +50,10 @@ import ( "github.com/ipfs/go-cid" "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p/core/host" - inet "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/mitchellh/go-homedir" "github.com/multiformats/go-multiaddr" "github.com/urfave/cli/v2" - "golang.org/x/crypto/blake2b" ) func main() { @@ -73,11 +73,6 @@ func main() { Name: "daemon", Usage: "Start the xchain adapter daemon", Flags: []cli.Flag{ - &cli.BoolFlag{ - Name: "buffer-service", - Usage: "Run a buffer server", - Value: false, - }, &cli.BoolFlag{ Name: "aggregation-service", Usage: "Run an aggregation server", @@ -85,52 +80,53 @@ func main() { }, }, Action: func(cctx *cli.Context) error { - isBuffer := cctx.Bool("buffer-service") - isAgg := cctx.Bool("aggregation-service") - if !isBuffer && !isAgg { // default to running aggregator - isAgg = true + isAgg := cctx.Bool("aggregation-service") + if !isAgg { + isAgg = true // default to running aggregator } - + cfg, err := LoadConfig(cctx.String("config")) if err != nil { log.Fatal(err) } - + g, ctx := errgroup.WithContext(cctx.Context) + + // Start buffer service if using local buffer g.Go(func() error { - if !isBuffer { - return nil - } - path, err := homedir.Expand(cfg.BufferPath) - if err != nil { - return err - } - if err := os.MkdirAll(path, os.ModePerm); err != nil { - return err - } - - srv, err := NewBufferHTTPService(cfg.BufferPath) - if err != nil { - return &http.MaxBytesError{} - } - http.HandleFunc("/put", srv.PutHandler) - http.HandleFunc("/get", srv.GetHandler) - - fmt.Printf("Server starting on port %d\n", cfg.BufferPort) - server := &http.Server{ - Addr: fmt.Sprintf("0.0.0.0:%d", cfg.BufferPort), - Handler: nil, // http.DefaultServeMux - } - go func() { - if err := server.ListenAndServe(); err != http.ErrServerClosed { - log.Fatalf("Buffer HTTP server ListenAndServe: %v", err) + if cfg.BufferType == "local" { + path, err := homedir.Expand(cfg.BufferPath) + if err != nil { + return err } - }() - <-ctx.Done() - - // Context is cancelled, shut down the server - return server.Shutdown(context.Background()) + if err := os.MkdirAll(path, os.ModePerm); err != nil { + return err + } + + srv, err := NewBufferHTTPService(cfg.BufferPath) + if err != nil { + return err + } + http.HandleFunc("/put", srv.PutHandler) + http.HandleFunc("/get", srv.GetHandler) + + fmt.Printf("Local buffer server starting on port %d\n", cfg.BufferPort) + server := &http.Server{ + Addr: fmt.Sprintf("0.0.0.0:%d", cfg.BufferPort), + Handler: nil, + } + go func() { + if err := server.ListenAndServe(); err != http.ErrServerClosed { + log.Fatalf("Buffer HTTP server ListenAndServe: %v", err) + } + }() + <-ctx.Done() + return server.Shutdown(context.Background()) + } + return nil }) + + // Start aggregator service g.Go(func() error { if !isAgg { return nil @@ -244,6 +240,7 @@ type Config struct { LighthouseApiKey string AuthToken string TargetAggSize int + BufferType string // "lighthouse" or "local" } // Mirror OnRamp.sol's `Offer` struct @@ -291,6 +288,7 @@ type aggregator struct { spActorAddr address.Address // address of the storage provider actor lotusAPI v0api.FullNode // Lotus API for determining deal start epoch and collateral bounds cleanup func() // cleanup function to call on shutdown + bufferType string // Type of buffer to use } // Thank you @ribasushi @@ -413,6 +411,7 @@ func NewAggregator(ctx context.Context, cfg *Config) (*aggregator, error) { closer() fmt.Printf("done with lotus api closer\n") }, + bufferType: cfg.BufferType, }, nil } @@ -477,18 +476,6 @@ func (a *aggregator) run(ctx context.Context) error { } const ( - // PODSI aggregation uses 64 extra bytes per piece - // pieceOverhead = uint64(64) TODO uncomment this when we are smarter about determining threshold crossing - // Piece CID of small valid car (below) that must be prepended to the aggregation for deal acceptance - prefixCARCid = "baga6ea4seaqiklhpuei4wz7x3wwpvnul3sscfyrz2dpi722vgpwlolfky2dmwey" - // Hex of the prefix car file - prefixCAR = "3aa265726f6f747381d82a58250001701220b9ecb605f194801ee8a8355014e7e6e62966f94ccb6081" + - "631e82217872209dae6776657273696f6e014101551220704a26a32a76cf3ab66ffe41eb27adefefe9c93206960bb0" + - "147b9ed5e1e948b0576861744966487567684576657265747449494957617352696768743f5601701220b9ecb605f1" + - "94801ee8a8355014e7e6e62966f94ccb6081631e82217872209dae122c0a2401551220704a26a32a76cf3ab66ffe41" + - "eb27adefefe9c93206960bb0147b9ed5e1e948b012026576181d0a020801" - // Size of the padded prefix car in bytes - prefixCARSizePadded = uint64(256) // Data transfer port transferPort = 1728 // libp2p identifier for latest deal protocol @@ -505,10 +492,6 @@ func (a *aggregator) runAggregate(ctx context.Context) error { // pending := make([]DataReadyEvent, 0, 256) var pending []DataReadyEvent total := uint64(0) - // prefixPiece := filabi.PieceInfo{ - // Size: filabi.PaddedPieceSize(prefixCARSizePadded), - // PieceCID: cid.MustParse(prefixCARCid), - // } for { select { @@ -531,16 +514,9 @@ func (a *aggregator) runAggregate(ctx context.Context) error { }) if err != nil { log.Printf("error creating aggregate: %s", err) - log.Printf("skipping offer %d, size %d exceeds max PODSI packable size %d", latestEvent.OfferID, latestEvent.Offer.Size, a.targetDealSize) continue } // TODO: in production we'll maybe want to move data from buffer before we commit to storing it. - - // TODO: Unsorted greedy is a very naive knapsack strategy, production will want something better - // TODO: doing all the work of creating an aggregate for every new offer is quite wasteful - // there must be a cheaper way to do this, but for now it is the most expediant without learning - // all the gory edge cases in NewAggregate - // Turn offers into datasegment pieces pieces := make([]filabi.PieceInfo, len(pending)) for i, event := range pending { @@ -551,11 +527,6 @@ func (a *aggregator) runAggregate(ctx context.Context) error { pieces[i] = piece } - // pieces[len(pending)] = latestPiece - // aggregate - // aggregatePieces := append([]filabi.PieceInfo{ - // prefixPiece, - // }, pieces...) aggregatePieces := pieces _, size, err := datasegment.ComputeDealPlacement(aggregatePieces) if err != nil { @@ -620,26 +591,14 @@ func (a *aggregator) runAggregate(ctx context.Context) error { a.transferID++ a.transferLk.Unlock() log.Printf("Transfer ID %d scheduled for aggregate %s", transferID, aggCommp.String()) - aggLocation := `~/` + aggCommp.String() - err = a.saveAggregateToFile(transferID, aggLocation) - if err != nil { - log.Fatalf("failed to save aggregate to file: %s", err) - } - // send file to lighthouse - lhResp, err := UploadToLighthouse(aggLocation, a.lighthouseApiKey) - if err != nil { - log.Fatalf("failed to upload to lighthouse: %s", err) - } - retrievalURL := fmt.Sprintf("https://gateway.lighthouse.storage/ipfs/%s", lhResp.Hash) - err = a.sendDeal(ctx, aggCommp, retrievalURL); + + + + err = a.sendDeal(ctx, aggCommp, transferID); if err != nil { log.Printf("[ERROR] failed to send deal: %s", err) } - // Delete the file at aggLocation - err = os.Remove(aggLocation) - if err != nil { - log.Printf("[ERROR] failed to delete file at %s: %s", aggLocation, err) - } + // Reset queue to empty, add the event that triggered aggregation pending = pending[:0] // pending = append(pending, latestEvent) @@ -669,7 +628,7 @@ func (a *aggregator) runAggregate(ctx context.Context) error { // Send deal data to the configured SP deal making address (boost node) // The deal is made with the configured prover client contract // Heavily inspired by boost client -func (a *aggregator) sendDeal(ctx context.Context, aggCommp cid.Cid, url string) error { +func (a *aggregator) sendDeal(ctx context.Context, aggCommp cid.Cid, transferID int) error { if err := a.host.Connect(ctx, *a.spDealAddr); err != nil { return fmt.Errorf("failed to connect to peer %s: %w", a.spDealAddr.ID, err) } @@ -680,12 +639,30 @@ func (a *aggregator) sendDeal(ctx context.Context, aggCommp cid.Cid, url string) if len(x) == 0 { return fmt.Errorf("cannot make a deal with storage provider %s because it does not support protocol version 1.2.0", a.spDealAddr.ID) } + // make serving url as per buffer + var url string + if a.bufferType == "lighthouse" { + // Upload to Lighthouse + aggLocation := `~/` + aggCommp.String() + err = a.saveAggregateToFile(transferID, aggLocation) + if err != nil { + log.Fatalf("failed to save aggregate to file: %s", err) + } + + lhResp, err := UploadToLighthouse(aggLocation, a.lighthouseApiKey) + if err != nil { + log.Fatalf("failed to upload to lighthouse: %s", err) + } + url = fmt.Sprintf("https://gateway.lighthouse.storage/ipfs/%s", lhResp.Hash) + } else { + // Use local buffer + url = fmt.Sprintf("http://%s/?id=%d", a.transferAddr, transferID) + } // Construct deal dealUuid := uuid.New() log.Printf("making deal for commp %s, UUID=%s\n", aggCommp.String(), dealUuid) transferParams := boosttypes2.HttpRequest{ - URL: url, } paramsBytes, err := json.Marshal(transferParams) @@ -694,7 +671,7 @@ func (a *aggregator) sendDeal(ctx context.Context, aggCommp cid.Cid, url string) } transfer := boosttypes.Transfer{ Type: "http", - // ClientID: fmt.Sprintf("%d", transferID), + ClientID: fmt.Sprintf("%d", transferID), Params: paramsBytes, Size: a.targetDealSize - a.targetDealSize/128, // aggregate for transfer is not fr32 encoded } @@ -755,7 +732,6 @@ func (a *aggregator) sendDeal(ctx context.Context, aggCommp cid.Cid, url string) if err != nil { return fmt.Errorf("failed to create deal label: %w", err) } - proposal := market.DealProposal{ PieceCID: aggCommp, PieceSize: filabi.PaddedPieceSize(a.targetDealSize), @@ -900,14 +876,8 @@ func (a *aggregator) transferHandler(w http.ResponseWriter, r *http.Request) { http.Error(w, "No data found", http.StatusNotFound) return } - // First write the CAR prefix to the response - prefixCARBytes, err := hex.DecodeString(prefixCAR) - if err != nil { - http.Error(w, "Failed to decode CAR prefix", http.StatusInternalServerError) - return - } - readers := []io.Reader{bytes.NewReader(prefixCARBytes)} + readers := []io.Reader{} // Fetch each sub piece from its buffer location and write to response for _, url := range transfer.locations { lazyReader := &lazyHTTPReader{url: url}