diff --git a/contract-tools/xchain/go.mod b/contract-tools/xchain/go.mod index 83dd59e..e445f23 100644 --- a/contract-tools/xchain/go.mod +++ b/contract-tools/xchain/go.mod @@ -16,7 +16,7 @@ require ( github.com/mitchellh/go-homedir v1.1.0 github.com/multiformats/go-multiaddr v0.12.4 github.com/urfave/cli/v2 v2.27.2 - golang.org/x/sync v0.7.0 + golang.org/x/sync v0.10.0 ) @@ -55,7 +55,7 @@ require ( github.com/filecoin-project/go-amt-ipld/v4 v4.3.0 // indirect github.com/filecoin-project/go-bitfield v0.2.4 // indirect github.com/filecoin-project/go-cbor-util v0.0.1 // indirect - github.com/filecoin-project/go-crypto v0.0.2-0.20240424000926-1808e310bbac // indirect + github.com/filecoin-project/go-crypto v0.1.0 // indirect github.com/filecoin-project/go-data-transfer v1.15.4-boost // indirect github.com/filecoin-project/go-data-transfer/v2 v2.0.0-rc8 // indirect github.com/filecoin-project/go-fil-commcid v0.1.0 // indirect @@ -216,12 +216,12 @@ require ( go.uber.org/mock v0.4.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect - golang.org/x/crypto v0.24.0 // indirect + golang.org/x/crypto v0.31.0 // indirect golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 // indirect golang.org/x/mod v0.17.0 // indirect golang.org/x/net v0.25.0 // indirect - golang.org/x/sys v0.21.0 // indirect - golang.org/x/text v0.16.0 // indirect + golang.org/x/sys v0.28.0 // indirect + golang.org/x/text v0.21.0 // indirect golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect google.golang.org/protobuf v1.34.1 // indirect diff --git a/contract-tools/xchain/go.sum b/contract-tools/xchain/go.sum index 27ea9b3..42ed3a9 100644 --- a/contract-tools/xchain/go.sum +++ b/contract-tools/xchain/go.sum @@ -168,6 +168,8 @@ github.com/filecoin-project/go-commp-utils v0.1.4/go.mod h1:Sekocu5q9b4ECAUFu853 github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ= github.com/filecoin-project/go-crypto v0.0.2-0.20240424000926-1808e310bbac h1:SwaxonCdMJz7n1PvSXj7FUK5xdC6E4dJuQBVbOkv3wE= github.com/filecoin-project/go-crypto v0.0.2-0.20240424000926-1808e310bbac/go.mod h1:K9UFXvvoyAVvB+0Le7oGlKiT9mgA5FHOJdYQXEE8IhI= +github.com/filecoin-project/go-crypto v0.1.0 h1:Pob2MphoipMbe/ksxZOMcQvmBHAd3sI/WEqcbpIsGI0= +github.com/filecoin-project/go-crypto v0.1.0/go.mod h1:K9UFXvvoyAVvB+0Le7oGlKiT9mgA5FHOJdYQXEE8IhI= github.com/filecoin-project/go-data-segment v0.0.1 h1:1wmDxOG4ubWQm3ZC1XI5nCon5qgSq7Ra3Rb6Dbu10Gs= github.com/filecoin-project/go-data-segment v0.0.1/go.mod h1:H0/NKbsRxmRFBcLibmABv+yFNHdmtl5AyplYLnb0Zv4= github.com/filecoin-project/go-data-transfer v1.15.4-boost h1:rGsPDeDk0nbzLOPn/9iCIrhLNy69Vkr9tRBcetM4kd0= @@ -1068,6 +1070,8 @@ golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDf golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= golang.org/x/crypto v0.24.0 h1:mnl8DM0o513X8fdIkmyFE/5hTYxbwYOjDS/+rK6qpRI= golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5DM= +golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U= +golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 h1:vr/HnozRka3pE4EsMEg1lgkXJkTFJCVUX+S/ZT6wYzM= golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842/go.mod h1:XtvwrStGgqGPLc4cjQfWqZHG1YFdYs6swckp8vpsjnc= @@ -1136,6 +1140,8 @@ golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= +golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180810173357-98c5dad5d1a0/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -1179,6 +1185,8 @@ golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws= golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= +golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= @@ -1202,6 +1210,8 @@ golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= +golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= +golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= diff --git a/contract-tools/xchain/xchain.go b/contract-tools/xchain/xchain.go index 182b390..3ea5df4 100644 --- a/contract-tools/xchain/xchain.go +++ b/contract-tools/xchain/xchain.go @@ -1,7 +1,6 @@ package main import ( - "bytes" "context" "encoding/hex" "encoding/json" @@ -9,6 +8,8 @@ import ( "io" "log" "math/big" + "math/bits" + "net/http" "os" "os/signal" @@ -18,6 +19,9 @@ import ( "sync" "time" + ethcrypto "github.com/ethereum/go-ethereum/crypto" + gocrypto "github.com/filecoin-project/go-crypto" + "golang.org/x/crypto/blake2b" "golang.org/x/sync/errgroup" "github.com/ethereum/go-ethereum" @@ -40,7 +44,6 @@ import ( fbig "github.com/filecoin-project/go-state-types/big" builtintypes "github.com/filecoin-project/go-state-types/builtin" "github.com/filecoin-project/go-state-types/builtin/v9/market" - "github.com/filecoin-project/go-state-types/crypto" "github.com/filecoin-project/lotus/api/v0api" lotustypes "github.com/filecoin-project/lotus/chain/types" "github.com/google/uuid" @@ -70,11 +73,6 @@ func main() { Name: "daemon", Usage: "Start the xchain adapter daemon", Flags: []cli.Flag{ - &cli.BoolFlag{ - Name: "buffer-service", - Usage: "Run a buffer server", - Value: false, - }, &cli.BoolFlag{ Name: "aggregation-service", Usage: "Run an aggregation server", @@ -82,52 +80,53 @@ func main() { }, }, Action: func(cctx *cli.Context) error { - isBuffer := cctx.Bool("buffer-service") - isAgg := cctx.Bool("aggregation-service") - if !isBuffer && !isAgg { // default to running aggregator - isAgg = true + isAgg := cctx.Bool("aggregation-service") + if !isAgg { + isAgg = true // default to running aggregator } - + cfg, err := LoadConfig(cctx.String("config")) if err != nil { log.Fatal(err) } - + g, ctx := errgroup.WithContext(cctx.Context) + + // Start buffer service if using local buffer g.Go(func() error { - if !isBuffer { - return nil - } - path, err := homedir.Expand(cfg.BufferPath) - if err != nil { - return err - } - if err := os.MkdirAll(path, os.ModePerm); err != nil { - return err - } - - srv, err := NewBufferHTTPService(cfg.BufferPath) - if err != nil { - return &http.MaxBytesError{} - } - http.HandleFunc("/put", srv.PutHandler) - http.HandleFunc("/get", srv.GetHandler) - - fmt.Printf("Server starting on port %d\n", cfg.BufferPort) - server := &http.Server{ - Addr: fmt.Sprintf("0.0.0.0:%d", cfg.BufferPort), - Handler: nil, // http.DefaultServeMux - } - go func() { - if err := server.ListenAndServe(); err != http.ErrServerClosed { - log.Fatalf("Buffer HTTP server ListenAndServe: %v", err) + if cfg.BufferType == "local" { + path, err := homedir.Expand(cfg.BufferPath) + if err != nil { + return err } - }() - <-ctx.Done() - - // Context is cancelled, shut down the server - return server.Shutdown(context.Background()) + if err := os.MkdirAll(path, os.ModePerm); err != nil { + return err + } + + srv, err := NewBufferHTTPService(cfg.BufferPath) + if err != nil { + return err + } + http.HandleFunc("/put", srv.PutHandler) + http.HandleFunc("/get", srv.GetHandler) + + fmt.Printf("Local buffer server starting on port %d\n", cfg.BufferPort) + server := &http.Server{ + Addr: fmt.Sprintf("0.0.0.0:%d", cfg.BufferPort), + Handler: nil, + } + go func() { + if err := server.ListenAndServe(); err != http.ErrServerClosed { + log.Fatalf("Buffer HTTP server ListenAndServe: %v", err) + } + }() + <-ctx.Done() + return server.Shutdown(context.Background()) + } + return nil }) + + // Start aggregator service g.Go(func() error { if !isAgg { return nil @@ -238,7 +237,10 @@ type Config struct { TransferPort int ProviderAddr string LotusAPI string - TargetAggSize int + LighthouseApiKey string + AuthToken string + TargetAggSize int + BufferType string // "lighthouse" or "local" } // Mirror OnRamp.sol's `Offer` struct @@ -279,11 +281,14 @@ type aggregator struct { transferID int // ID of the next transfer transferAddr string // address to listen for transfer requests targetDealSize uint64 // how big aggregates should be + minDealSize uint64 // minimum deal size + lighthouseApiKey string // API key for lighthouse host host.Host // libp2p host for deal protocol to boost spDealAddr *peer.AddrInfo // address to reach boost (or other) deal v 1.2 provider spActorAddr address.Address // address of the storage provider actor lotusAPI v0api.FullNode // Lotus API for determining deal start epoch and collateral bounds cleanup func() // cleanup function to call on shutdown + bufferType string // Type of buffer to use } // Thank you @ribasushi @@ -351,7 +356,7 @@ func NewAggregator(ctx context.Context, cfg *Config) (*aggregator, error) { return nil, err } - lAPI, closer, err := NewLotusDaemonAPIClientV0(ctx, cfg.LotusAPI, 1, "") + lAPI, closer, err := NewLotusDaemonAPIClientV0(ctx, cfg.LotusAPI, 1, cfg.AuthToken) if err != nil { return nil, err } @@ -401,10 +406,12 @@ func NewAggregator(ctx context.Context, cfg *Config) (*aggregator, error) { spDealAddr: psPeerInfo, spActorAddr: providerAddr, lotusAPI: lAPI, + lighthouseApiKey: cfg.LighthouseApiKey, cleanup: func() { closer() fmt.Printf("done with lotus api closer\n") }, + bufferType: cfg.BufferType, }, nil } @@ -424,7 +431,10 @@ func (a *aggregator) run(ctx context.Context) error { } err := a.SubscribeQuery(ctx, query) - for err == nil || strings.Contains(err.Error(), "read tcp") { + // if err != nil { + // return err + // } + for err == nil || strings.Contains(err.Error(), "read udp") { if err != nil { log.Printf("ignoring mystery error: %s", err) } @@ -466,24 +476,12 @@ func (a *aggregator) run(ctx context.Context) error { } const ( - // PODSI aggregation uses 64 extra bytes per piece - // pieceOverhead = uint64(64) TODO uncomment this when we are smarter about determining threshold crossing - // Piece CID of small valid car (below) that must be prepended to the aggregation for deal acceptance - prefixCARCid = "baga6ea4seaqiklhpuei4wz7x3wwpvnul3sscfyrz2dpi722vgpwlolfky2dmwey" - // Hex of the prefix car file - prefixCAR = "3aa265726f6f747381d82a58250001701220b9ecb605f194801ee8a8355014e7e6e62966f94ccb6081" + - "631e82217872209dae6776657273696f6e014101551220704a26a32a76cf3ab66ffe41eb27adefefe9c93206960bb0" + - "147b9ed5e1e948b0576861744966487567684576657265747449494957617352696768743f5601701220b9ecb605f1" + - "94801ee8a8355014e7e6e62966f94ccb6081631e82217872209dae122c0a2401551220704a26a32a76cf3ab66ffe41" + - "eb27adefefe9c93206960bb0147b9ed5e1e948b012026576181d0a020801" - // Size of the padded prefix car in bytes - prefixCARSizePadded = uint64(256) // Data transfer port transferPort = 1728 // libp2p identifier for latest deal protocol DealProtocolv120 = "/fil/storage/mk/1.2.0" // Delay to start deal at. For 2k devnet 4 second block time this is 13.3 minutes TODO Config - dealDelayEpochs = 200 + dealDelayEpochs = 10000 // Storage deal duration, TODO figure out what to do about this, either comes from offer or config dealDuration = 518400 // 6 months (on mainnet) ) @@ -491,12 +489,9 @@ const ( func (a *aggregator) runAggregate(ctx context.Context) error { // pieces being aggregated, flushed upon commitment // Invariant: the pieces in the pending queue can always make a valid aggregate w.r.t a.targetDealSize - pending := make([]DataReadyEvent, 0, 256) + // pending := make([]DataReadyEvent, 0, 256) + var pending []DataReadyEvent total := uint64(0) - prefixPiece := filabi.PieceInfo{ - Size: filabi.PaddedPieceSize(prefixCARSizePadded), - PieceCID: cid.MustParse(prefixCARCid), - } for { select { @@ -504,31 +499,26 @@ func (a *aggregator) runAggregate(ctx context.Context) error { fmt.Printf("ctx done shutting down aggregation") return nil case latestEvent := <-a.ch: - // Check if the offer is too big to fit in a valid aggregate on its own + if len(pending) >= 0 { + // Check if the offer is too big to fit in a valid aggregate on its own // TODO: as referenced below there must be a better way when we introspect on the gory details of NewAggregate latestPiece, err := latestEvent.Offer.Piece() + pending = append(pending, latestEvent) if err != nil { log.Printf("skipping offer %d, size %d not valid padded piece size ", latestEvent.OfferID, latestEvent.Offer.Size) continue } _, err = datasegment.NewAggregate(filabi.PaddedPieceSize(a.targetDealSize), []filabi.PieceInfo{ - prefixPiece, + // prefixPiece, latestPiece, }) if err != nil { log.Printf("error creating aggregate: %s", err) - log.Printf("skipping offer %d, size %d exceeds max PODSI packable size %d", latestEvent.OfferID, latestEvent.Offer.Size, a.targetDealSize) continue } // TODO: in production we'll maybe want to move data from buffer before we commit to storing it. - - // TODO: Unsorted greedy is a very naive knapsack strategy, production will want something better - // TODO: doing all the work of creating an aggregate for every new offer is quite wasteful - // there must be a cheaper way to do this, but for now it is the most expediant without learning - // all the gory edge cases in NewAggregate - // Turn offers into datasegment pieces - pieces := make([]filabi.PieceInfo, len(pending)+1) + pieces := make([]filabi.PieceInfo, len(pending)) for i, event := range pending { piece, err := event.Offer.Piece() if err != nil { @@ -537,18 +527,26 @@ func (a *aggregator) runAggregate(ctx context.Context) error { pieces[i] = piece } - pieces[len(pending)] = latestPiece - // aggregate - aggregatePieces := append([]filabi.PieceInfo{ - prefixPiece, - }, pieces...) - _, err = datasegment.NewAggregate(filabi.PaddedPieceSize(a.targetDealSize), aggregatePieces) - if err != nil { // we've overshot, lets commit to just pieces in pending + aggregatePieces := pieces + _, size, err := datasegment.ComputeDealPlacement(aggregatePieces) + if err != nil { + panic(err) + } + overallSize := filabi.PaddedPieceSize(size) + next := 1 << (64 - bits.LeadingZeros64(uint64(overallSize+256))) + if next < int(a.minDealSize) { + next = int(a.minDealSize) + } + dealSize := filabi.PaddedPieceSize(next) + a.targetDealSize = uint64(dealSize) + // _, err = datasegment.NewAggregate(dealSize, aggregatePieces) + // if err != nil { // we've overshot, lets commit to just pieces in pending total = 0 + // Remove the latest offer which took us over - pieces = pieces[:len(pieces)-1] - aggregatePieces = aggregatePieces[:len(aggregatePieces)-1] - agg, err := datasegment.NewAggregate(filabi.PaddedPieceSize(a.targetDealSize), aggregatePieces) + // pieces = pieces[:len(pieces)-1] + // aggregatePieces = aggregatePieces[:len(aggregatePieces)-1] + agg, err := datasegment.NewAggregate(dealSize, aggregatePieces) if err != nil { return fmt.Errorf("failed to create aggregate from pending, should not be reachable: %w", err) } @@ -594,16 +592,31 @@ func (a *aggregator) runAggregate(ctx context.Context) error { a.transferLk.Unlock() log.Printf("Transfer ID %d scheduled for aggregate %s", transferID, aggCommp.String()) - err = a.sendDeal(ctx, aggCommp, transferID) + + + err = a.sendDeal(ctx, aggCommp, transferID); if err != nil { log.Printf("[ERROR] failed to send deal: %s", err) } // Reset queue to empty, add the event that triggered aggregation pending = pending[:0] - pending = append(pending, latestEvent) + // pending = append(pending, latestEvent) } else { + latestPiece, err := latestEvent.Offer.Piece() + if err != nil { + log.Printf("skipping offer %d, size %d not valid padded piece size ", latestEvent.OfferID, latestEvent.Offer.Size) + continue + } + _, err = datasegment.NewAggregate(filabi.PaddedPieceSize(a.targetDealSize), []filabi.PieceInfo{ + // prefixPiece, + latestPiece, + }) + if err != nil { + log.Printf("skipping offer %d, size %d exceeds max PODSI packable size", latestEvent.OfferID, latestEvent.Offer.Size) + continue + } total += latestEvent.Offer.Size pending = append(pending, latestEvent) log.Printf("Offer %d added. %d offers pending aggregation with total size=%d\n", latestEvent.OfferID, len(pending), total) @@ -626,12 +639,31 @@ func (a *aggregator) sendDeal(ctx context.Context, aggCommp cid.Cid, transferID if len(x) == 0 { return fmt.Errorf("cannot make a deal with storage provider %s because it does not support protocol version 1.2.0", a.spDealAddr.ID) } + // make serving url as per buffer + var url string + if a.bufferType == "lighthouse" { + // Upload to Lighthouse + aggLocation := `~/` + aggCommp.String() + err = a.saveAggregateToFile(transferID, aggLocation) + if err != nil { + log.Fatalf("failed to save aggregate to file: %s", err) + } + + lhResp, err := UploadToLighthouse(aggLocation, a.lighthouseApiKey) + if err != nil { + log.Fatalf("failed to upload to lighthouse: %s", err) + } + url = fmt.Sprintf("https://gateway.lighthouse.storage/ipfs/%s", lhResp.Hash) + } else { + // Use local buffer + url = fmt.Sprintf("http://%s/?id=%d", a.transferAddr, transferID) + } // Construct deal dealUuid := uuid.New() log.Printf("making deal for commp %s, UUID=%s\n", aggCommp.String(), dealUuid) transferParams := boosttypes2.HttpRequest{ - URL: fmt.Sprintf("http://%s/?id=%d", a.transferAddr, transferID), + URL: url, } paramsBytes, err := json.Marshal(transferParams) if err != nil { @@ -666,37 +698,78 @@ func (a *aggregator) sendDeal(ctx context.Context, aggCommp cid.Cid, transferID return fmt.Errorf("failed to get chain ID: %w", err) } // Encode the chainID as uint256 - encodedChainID, err := encodeChainID(chainID) + // intEncodedChainID, err := encodeChainID(chainID) + // if err != nil { + // return fmt.Errorf("failed to encode chainID: %w", err) + // } + + clientAddr, err := a.lotusAPI.WalletDefaultAddress(ctx) + if err != nil { + return err + } + dummyBuf, err := cborutil.Dump("dummy") + if err != nil { + return err + } + dummySig, err := a.lotusAPI.WalletSign(ctx, clientAddr, dummyBuf) + if err != nil { + return fmt.Errorf("wallet sign failed: %w", err) + } + b2sum := blake2b.Sum256(dummyBuf) + pubk, err := gocrypto.EcRecover(b2sum[:], dummySig.Data) + if err != nil { + return err + } + pubkHash := ethcrypto.Keccak256(pubk[1:]) // Skip the first byte (0x04) which indicates uncompressed public key + ethAddress := common.BytesToAddress(pubkHash[12:]) // Take the last 20 bytes + encodedLabel, err := encodeChainIDAndAddress(chainID, ethAddress) if err != nil { - return fmt.Errorf("failed to encode chainID: %w", err) + return fmt.Errorf("failed to encode chainID and address: %w", err) } - dealLabel, err := market.NewLabelFromBytes(encodedChainID) + labelString := hex.EncodeToString(encodedLabel) + + dealLabel, err := market.NewLabelFromString(labelString) if err != nil { return fmt.Errorf("failed to create deal label: %w", err) } - proposal := market.ClientDealProposal{ - Proposal: market.DealProposal{ - PieceCID: aggCommp, - PieceSize: filabi.PaddedPieceSize(a.targetDealSize), - VerifiedDeal: false, - Client: filClient, - Provider: a.spActorAddr, - StartEpoch: dealStart, - EndEpoch: dealEnd, - StoragePricePerEpoch: fbig.NewInt(0), - ProviderCollateral: providerCollateral, - Label: dealLabel, - }, + proposal := market.DealProposal{ + PieceCID: aggCommp, + PieceSize: filabi.PaddedPieceSize(a.targetDealSize), + VerifiedDeal: false, + Client: filClient, + Provider: a.spActorAddr, + StartEpoch: dealStart, + EndEpoch: dealEnd, + StoragePricePerEpoch: fbig.NewInt(0), + ProviderCollateral: providerCollateral, + Label: dealLabel, + } + + + buf, err := cborutil.Dump(&proposal) + if err != nil { + return err + } + + log.Printf("about to sign with clientAddr: %s", clientAddr) + sig, err := a.lotusAPI.WalletSign(ctx, clientAddr, buf) + if err != nil { + return fmt.Errorf("wallet sign failed: %w", err) + } + + clientProposal := market.ClientDealProposal{ + Proposal: proposal, // Signature is unchecked since client is smart contract - ClientSignature: crypto.Signature{ - Type: crypto.SigTypeBLS, - Data: []byte{0xc0, 0xff, 0xee}, - }, + ClientSignature: *sig, + // crypto.Signature{ + // Type: crypto.SigTypeBLS, + // Data: []byte{0xc0, 0xff, 0xee}, + // }, } dealParams := boosttypes.DealParams{ DealUUID: dealUuid, - ClientDealProposal: proposal, + ClientDealProposal: clientProposal, DealDataRoot: aggCommp, IsOffline: false, Transfer: transfer, @@ -803,14 +876,8 @@ func (a *aggregator) transferHandler(w http.ResponseWriter, r *http.Request) { http.Error(w, "No data found", http.StatusNotFound) return } - // First write the CAR prefix to the response - prefixCARBytes, err := hex.DecodeString(prefixCAR) - if err != nil { - http.Error(w, "Failed to decode CAR prefix", http.StatusInternalServerError) - return - } - readers := []io.Reader{bytes.NewReader(prefixCARBytes)} + readers := []io.Reader{} // Fetch each sub piece from its buffer location and write to response for _, url := range transfer.locations { lazyReader := &lazyHTTPReader{url: url} @@ -828,6 +895,49 @@ func (a *aggregator) transferHandler(w http.ResponseWriter, r *http.Request) { } } +func (a *aggregator) saveAggregateToFile(id int, location string) error { + a.transferLk.RLock() + transfer, ok := a.transfers[id] + a.transferLk.RUnlock() + if !ok { + return fmt.Errorf("no data found for ID %d", id) + } + + // First write the CAR prefix to the file + // prefixCARBytes, err := hex.DecodeString(prefixCAR) + // if err != nil { + // return fmt.Errorf("failed to decode CAR prefix: %w", err) + // } + + readers := []io.Reader{ + // bytes.NewReader(prefixCARBytes) + } + // Fetch each sub piece from its buffer location and add to readers + for _, url := range transfer.locations { + lazyReader := &lazyHTTPReader{url: url} + readers = append(readers, lazyReader) + defer lazyReader.Close() + } + aggReader, err := transfer.agg.AggregateObjectReader(readers) + if err != nil { + return fmt.Errorf("failed to create aggregate reader: %w", err) + } + + // Create the file at the specified location + file, err := os.Create(location) + if err != nil { + return fmt.Errorf("failed to create file: %w", err) + } + defer file.Close() + + // Copy the aggregated data to the file + _, err = io.Copy(file, aggReader) + if err != nil { + return fmt.Errorf("failed to write aggregate stream to file: %w", err) + } + + return nil +} type AggregateTransfer struct { locations []string agg *datasegment.Aggregate @@ -838,6 +948,7 @@ func (a *aggregator) SubscribeQuery(ctx context.Context, query ethereum.FilterQu log.Printf("Listening for data ready events on %s\n", a.onrampAddr.Hex()) sub, err := a.client.SubscribeFilterLogs(ctx, query, logs) if err != nil { + fmt.Println(err) return err } defer sub.Unsubscribe() @@ -932,7 +1043,7 @@ func MakeOffer(cidStr string, sizeStr string, location string, token string, amo Amount: amountBig, Size: uint64(size), } - + fmt.Println(hex.EncodeToString(commP.Bytes())) return &offer, nil } @@ -1038,4 +1149,42 @@ func encodeChainID(chainID *big.Int) ([]byte, error) { return data, nil } +// encodeChainIDAndAddress encodes the chain ID and Ethereum address into bytes +func encodeChainIDAndAddress(chainID *big.Int, ethAddress common.Address) ([]byte, error) { + uint256Type, err := abi.NewType("uint256", "", nil) + if err != nil { + return nil, fmt.Errorf("failed to create uint256 type: %w", err) + } + + addressType, err := abi.NewType("address", "", nil) + if err != nil { + return nil, fmt.Errorf("failed to create address type: %w", err) + } + + + // Define the ABI encoding + arguments := abi.Arguments{ + {Type: uint256Type}, + {Type: addressType}, + } + + // Encode the chain ID and Ethereum address + encodedBytes, err := arguments.Pack(chainID, ethAddress) + if err != nil { + return nil, fmt.Errorf("error encoding arguments: %w", err) + } + + return encodedBytes, nil +} +// encodeChainIDAsString converts a *big.Int chain ID to its string representation +func encodeChainIDAsString(chainID *big.Int) (string, error) { + if chainID == nil { + return "", fmt.Errorf("chainID cannot be nil") + } + + // Convert the *big.Int to a string + chainIDStr := chainID.String() + + return chainIDStr, nil +} diff --git a/contract-tools/xchain/xchain_test.go b/contract-tools/xchain/xchain_test.go index 4ba280c..499e0c7 100644 --- a/contract-tools/xchain/xchain_test.go +++ b/contract-tools/xchain/xchain_test.go @@ -9,6 +9,7 @@ import ( "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/ethclient" + "github.com/filecoin-project/go-address" "github.com/stretchr/testify/assert" ) @@ -79,4 +80,23 @@ func decodeChainID(data []byte) (*big.Int, error) { } return chainID, nil -} \ No newline at end of file +} + +func TestAddrToBytes(t *testing.T) { + // Example address (replace with actual address) + addr, err := address.NewFromString("t0116147") + if err != nil { + fmt.Println("Error creating address:", err) + return + } + + // Get the byte representation of the address + addrBytes := addr.Bytes() + + // Encode the byte representation to a hexadecimal string + addrHex := hex.EncodeToString(addrBytes) + + fmt.Println("Address in bytes: ", addrBytes) + fmt.Println("Address in hex: ", addrHex) + assert.Equal(t, addrHex, "00b38b07", "Address does not match expected value") + } \ No newline at end of file diff --git a/foundry.toml b/foundry.toml index e6810b2..77b36c4 100644 --- a/foundry.toml +++ b/foundry.toml @@ -2,5 +2,6 @@ src = 'src' out = 'out' libs = ['lib'] +evm_version = 'cancun' # See more config options https://github.com/foundry-rs/foundry/tree/master/config \ No newline at end of file diff --git a/src/Prover-Axelar.sol b/src/Prover-Axelar.sol index 3d4d746..b3b5c2c 100644 --- a/src/Prover-Axelar.sol +++ b/src/Prover-Axelar.sol @@ -18,6 +18,7 @@ import {Strings} from "lib/openzeppelin-contracts/contracts/utils/Strings.sol"; import {AxelarExecutable} from "lib/axelar-gmp-sdk-solidity/contracts/executable/AxelarExecutable.sol"; import {IAxelarGateway} from "lib/axelar-gmp-sdk-solidity/contracts/interfaces/IAxelarGateway.sol"; import {IAxelarGasService} from "lib/axelar-gmp-sdk-solidity/contracts/interfaces/IAxelarGasService.sol"; +import {BLAKE2b} from "./blake2lib.sol"; using CBOR for CBOR.CBORBuffer; @@ -51,6 +52,15 @@ contract DealClient is AxelarExecutable { mapping(bytes => Status) public pieceStatus; mapping(bytes => uint256) public providerGasFunds; // Funds set aside for calling oracle by provider mapping(uint256 => DestinationChain) public chainIdToDestinationChain; + event DealNotify( + uint64 dealId, + bytes commP, + bytes data, + bytes chainId, + bytes provider, + bytes payload + ); + event ReceivedDataCap(string received); constructor( address _gateway, @@ -87,6 +97,39 @@ contract DealClient is AxelarExecutable { providerGasFunds[providerAddrData] += msg.value; } + function receiveDataCap(bytes memory) internal { + require( + msg.sender == DATACAP_ACTOR_ETH_ADDRESS, + "msg.sender needs to be datacap actor f07" + ); + emit ReceivedDataCap("DataCap Received!"); + // Add get datacap balance api and store datacap amount + } + + // authenticateMessage is the callback from the market actor into the contract + // as part of PublishStorageDeals. This message holds the deal proposal from the + // miner, which needs to be validated by the contract in accordance with the + // deal requests made and the contract's own policies + // @params - cbor byte array of AccountTypes.AuthenticateMessageParams + function authenticateMessage(bytes memory params) internal view { + require( + msg.sender == MARKET_ACTOR_ETH_ADDRESS, + "msg.sender needs to be market actor f05" + ); + + AccountTypes.AuthenticateMessageParams memory amp = params + .deserializeAuthenticateMessageParams(); + MarketTypes.DealProposal memory proposal = MarketCBOR + .deserializeDealProposal(amp.message); + bytes memory encodedData = convertAsciiHexToBytes(proposal.label.data); + (, address filAddress) = abi.decode(encodedData, (uint256, address)); + address recovered = recovers( + bytes32(BLAKE2b.hash(amp.message, "", "", "", 32)), + amp.signature + ); + require(recovered == filAddress, "Invalid signature"); + } + // dealNotify is the callback from the market actor into the contract at the end // of PublishStorageDeals. This message holds the previously approved deal proposal // and the associated dealID. The dealID is stored as part of the contract state @@ -109,7 +152,11 @@ contract DealClient is AxelarExecutable { int64 duration = CommonTypes.ChainEpoch.unwrap(proposal.end_epoch) - CommonTypes.ChainEpoch.unwrap(proposal.start_epoch); // Expects deal label to be chainId encoded in bytes - uint256 chainId = abi.decode(proposal.label.data, (uint256)); + // string memory chainIdStr = abi.decode(proposal.label.data, (string)); + bytes memory encodedData = convertAsciiHexToBytes(proposal.label.data); + (uint256 chainId, ) = abi.decode(encodedData, (uint256, address)); + + // uint256 chainId = asciiBytesToUint(proposal.label.data); DataAttestation memory attest = DataAttestation( proposal.piece_cid.data, duration, @@ -117,6 +164,15 @@ contract DealClient is AxelarExecutable { uint256(Status.DealPublished) ); bytes memory payload = abi.encode(attest); + + emit DealNotify( + mdnp.dealId, + proposal.piece_cid.data, + params, + proposal.label.data, + proposal.provider.data, + payload + ); if (chainId == block.chainid) { IBridgeContract( chainIdToDestinationChain[chainId].destinationAddress @@ -206,14 +262,16 @@ contract DealClient is AxelarExecutable { uint64 codec; // dispatch methods if (method == AUTHENTICATE_MESSAGE_METHOD_NUM) { + authenticateMessage(params); // If we haven't reverted, we should return a CBOR true to indicate that verification passed. - // Always authenticate message CBOR.CBORBuffer memory buf = CBOR.create(1); buf.writeBool(true); ret = buf.data(); codec = Misc.CBOR_CODEC; } else if (method == MARKET_NOTIFY_DEAL_METHOD_NUM) { dealNotify(params); + } else if (method == DATACAP_RECEIVER_HOOK_METHOD_NUM) { + receiveDataCap(params); } else { revert("the filecoin method that was called is not handled"); } @@ -225,4 +283,71 @@ contract DealClient is AxelarExecutable { ) internal pure returns (string memory) { return Strings.toHexString(uint256(uint160(_addr)), 20); } + + function asciiBytesToUint( + bytes memory asciiBytes + ) public pure returns (uint256) { + uint256 result = 0; + for (uint256 i = 0; i < asciiBytes.length; i++) { + uint256 digit = uint256(uint8(asciiBytes[i])) - 48; // Convert ASCII to digit + require(digit <= 9, "Invalid ASCII byte"); + result = result * 10 + digit; + } + return result; + } + + function convertAsciiHexToBytes( + bytes memory asciiHex + ) public pure returns (bytes memory) { + require(asciiHex.length % 2 == 0, "Invalid ASCII hex string length"); + + bytes memory result = new bytes(asciiHex.length / 2); + for (uint256 i = 0; i < asciiHex.length / 2; i++) { + result[i] = byteFromHexChar(asciiHex[2 * i], asciiHex[2 * i + 1]); + } + + return result; + } + + function byteFromHexChar( + bytes1 char1, + bytes1 char2 + ) internal pure returns (bytes1) { + uint8 nibble1 = uint8(char1) - (uint8(char1) < 58 ? 48 : 87); + uint8 nibble2 = uint8(char2) - (uint8(char2) < 58 ? 48 : 87); + return bytes1(nibble1 * 16 + nibble2); + } + + function recovers( + bytes32 hash, + bytes memory signature + ) public pure returns (address) { + bytes32 r; + bytes32 s; + uint8 v; + + // Check the signature length + if (signature.length != 65) { + return (address(0)); + } + + // Divide the signature in r, s and v variables + assembly { + r := mload(add(signature, 32)) + s := mload(add(signature, 64)) + v := byte(0, mload(add(signature, 96))) + } + + // Version of signature should be 27 or 28, but 0 and 1 are also possible versions + if (v < 27) { + v += 27; + } + // address check = ECDSA.recover(hash, signature); + // If the version is correct return the signer address + if (v != 27 && v != 28) { + return (address(0)); + } else { + return ecrecover(hash, v, r, s); + } + } } diff --git a/src/blake2lib.sol b/src/blake2lib.sol new file mode 100644 index 0000000..90bda8d --- /dev/null +++ b/src/blake2lib.sol @@ -0,0 +1,246 @@ +// SPDX-License-Identifier: MIT +// SPDX-FileCopyrightText: Copyright 2024 David Leung + +pragma solidity ^0.8.17; + +error OutputLengthCannotBeZero(); +error OutputLengthExceeded(); +error KeyLengthExceeded(); +error InputLengthExceeded(); + +library BLAKE2b { + // Initial state vectors + // + // IV 0-3 as numerical values + // 0x6A09E667F3BCC908 0xbb67ae8584caa73b 0x3c6ef372fe94f82b 0xa54ff53a5f1d36f1 + // IV 0-3 in little-endian encoding + // 08c9bcf367e6096a 3ba7ca8485ae67bb 2bf894fe72f36e3c f1361d5f3af54fa5 + // IV 0-3 XOR with parameter block set to sequential mode: + // 0000010100000000 0000000000000000 0000000000000000 0000000000000000 + // XOR Result: + // 08c9bdf267e6096a 3ba7ca8485ae67bb 2bf894fe72f36e3c f1361d5f3af54fa5 + // + // IV 4-7 as numerical values + // 0x510e527fade682d1 0x9b05688c2b3e6c1f 0x1f83d9abfb41bd6b 0x5be0cd19137e2179 + // IV 4-7 as little-endian encoded bytes + // d182e6ad7f520e51 1f6c3e2b8c68059b 6bbd41fbabd9831f 79217e1319cde05b + bytes32 private constant IS0 = + bytes32( + hex"08c9bdf267e6096a3ba7ca8485ae67bb2bf894fe72f36e3cf1361d5f3af54fa5" + ); + bytes32 private constant IS1 = + bytes32( + hex"d182e6ad7f520e511f6c3e2b8c68059b6bbd41fbabd9831f79217e1319cde05b" + ); + + uint256 private constant BLOCK_SIZE_BYTES = 128; + + function hash( + bytes memory input, + bytes memory key, + bytes memory salt, + bytes memory personalization, + uint256 digestLen + ) internal view returns (bytes memory digest) { + if (digestLen == 0) { + revert OutputLengthCannotBeZero(); + } + + if (digestLen > 64) { + revert OutputLengthExceeded(); + } + + if (key.length > 64) { + revert KeyLengthExceeded(); + } + + //////////////////////////////////////////// + // INITIALIZATION + //////////////////////////////////////////// + + // See https://eips.ethereum.org/EIPS/eip-152#specification + // We refer to the collective inputs to the F function as the context. + bytes memory context = new bytes(213); + + bytes32[2] memory h = [IS0 ^ bytes32(digestLen << 248), IS1]; + + if (key.length > 0) { + h[0] ^= bytes32(key.length << 240); + } + + if (salt.length > 0) { + h[1] ^= bytes32(salt); + } + + if (personalization.length > 0) { + h[1] ^= bytes32(personalization) >> 128; + } + + assembly { + // Set the round count (12 for BLAKE2b) in the context + mstore8(add(context, 35), 12) + // Copy the initial hash state to the context + mcopy(add(context, 36), h, 64) + } + + uint256 bytesProcessed = 0; + uint256 bufferUsed = 0; + + // If key is present, copy it to the context, and compress it as a full block + if (key.length > 0) { + assembly { + // key length := mload(key) + // pointer to key := add(key, 32) + // pointer to state := add(context, 100) + mcopy(add(context, 100), add(key, 32), mload(key)) + } + + bufferUsed = BLOCK_SIZE_BYTES; + } + + //////////////////////////////////////////// + // INPUT PROCESSING + //////////////////////////////////////////// + + uint256 readInputOffset = 0; + + // Read full block chunks + while (readInputOffset + BLOCK_SIZE_BYTES <= input.length) { + if (bufferUsed == BLOCK_SIZE_BYTES) { + unchecked { + bytesProcessed += BLOCK_SIZE_BYTES; + } + + bytes8[1] memory tt = [ + bytes8(reverseByteOrder(uint64(bytesProcessed))) + ]; + + assembly { + mcopy(add(context, 228), tt, 8) + if iszero( + staticcall( + not(0), + 0x09, + add(context, 32), + 0xd5, + add(context, 36), + 0x40 + ) + ) { + revert(0, 0) + } + } + + bufferUsed = 0; + } + + assembly { + mcopy( + add(add(context, 100), bufferUsed), + add(input, add(32, readInputOffset)), + BLOCK_SIZE_BYTES + ) + } + + unchecked { + bufferUsed = BLOCK_SIZE_BYTES; + readInputOffset += BLOCK_SIZE_BYTES; + } + } + + // Handle partial block + if (readInputOffset < input.length) { + if (bufferUsed == BLOCK_SIZE_BYTES) { + unchecked { + bytesProcessed += BLOCK_SIZE_BYTES; + } + + bytes8[1] memory tt = [ + bytes8(reverseByteOrder(uint64(bytesProcessed))) + ]; + + assembly { + mcopy(add(context, 228), tt, 8) + if iszero( + staticcall( + not(0), + 0x09, + add(context, 32), + 0xd5, + add(context, 36), + 0x40 + ) + ) { + revert(0, 0) + } + } + + bufferUsed = 0; + + // Reset the message buffer, as we are going to process a partial block + assembly { + mstore(add(context, 100), 0) + mstore(add(context, 132), 0) + mstore(add(context, 164), 0) + mstore(add(context, 196), 0) + } + } + + assembly { + // left = input.length - inputOffset. Safe casting, because left is always less than 128 + let left := sub(mload(input), readInputOffset) + mcopy( + add(add(context, 100), bufferUsed), + add(input, add(32, readInputOffset)), + left + ) + bufferUsed := add(bufferUsed, left) + } + } + + //////////////////////////////////////////// + // FINAL + //////////////////////////////////////////// + + unchecked { + bytesProcessed += bufferUsed; + } + + bytes8[1] memory tt = [ + bytes8(reverseByteOrder(uint64(bytesProcessed))) + ]; + + assembly { + // Set final block flag + mstore8(add(context, 244), 1) + mcopy(add(context, 228), tt, 8) + if iszero( + staticcall( + not(0), + 0x09, + add(context, 32), + 0xd5, + add(context, 36), + 0x40 + ) + ) { + revert(0, 0) + } + + // digest = new bytes(digestLen) + digest := mload(0x40) + mstore(0x40, add(digest, add(digestLen, 0x20))) + mstore(digest, digestLen) + + // copy final hash state to digest + mcopy(add(digest, 32), add(context, 36), digestLen) + } + } + + function reverseByteOrder(uint64 input) internal pure returns (uint64 v) { + v = input; + v = ((v & 0xFF00FF00FF00FF00) >> 8) | ((v & 0x00FF00FF00FF00FF) << 8); + v = ((v & 0xFFFF0000FFFF0000) >> 16) | ((v & 0x0000FFFF0000FFFF) << 16); + v = (v >> 32) | (v << 32); + } +}