Skip to content
8 changes: 4 additions & 4 deletions api/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,28 +80,28 @@ func handleAdminGetBalance(node *core.DeltaNode) func(c echo.Context) error {
"message": "invalid address",
})
}
bigIntBalance, err := node.LotusApi.WalletBalance(context.Background(), address)
bigIntBalance, err := node.LotusApiNode.WalletBalance(context.Background(), address)
if err != nil {
return c.JSON(500, map[string]interface{}{
"message": "failed to get balance",
})
}

act, err := node.LotusApi.StateGetActor(context.Background(), address, types.EmptyTSK)
act, err := node.LotusApiNode.StateGetActor(context.Background(), address, types.EmptyTSK)
if err != nil {
return c.JSON(500, map[string]interface{}{
"message": "failed to get actor",
})
}

market, err := node.LotusApi.StateMarketBalance(context.Background(), address, types.EmptyTSK)
market, err := node.LotusApiNode.StateMarketBalance(context.Background(), address, types.EmptyTSK)
if err != nil {
return c.JSON(500, map[string]interface{}{
"message": "failed to get market balance",
})
}

vcstatus, err := node.LotusApi.StateVerifiedClientStatus(context.Background(), address, types.EmptyTSK)
vcstatus, err := node.LotusApiNode.StateVerifiedClientStatus(context.Background(), address, types.EmptyTSK)

if err != nil {
return c.JSON(500, map[string]interface{}{
Expand Down
57 changes: 31 additions & 26 deletions api/deal.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func handleExistingContentsAdd(c echo.Context, node *core.DeltaNode) error {
errTxn := node.DB.Transaction(func(tx *gorm.DB) error {
var dealResponses []DealResponse
for _, dealRequest := range dealRequests {
err = ValidateMeta(dealRequest)
err = ValidateMeta(dealRequest, node)
if err != nil {
// return the error from the validation
return err
Expand Down Expand Up @@ -452,7 +452,7 @@ func handleExistingContentAdd(c echo.Context, node *core.DeltaNode) error {
authorizationString := c.Request().Header.Get("Authorization")
authParts := strings.Split(authorizationString, " ")
err := c.Bind(&dealRequest)
err = ValidateMeta(dealRequest)
err = ValidateMeta(dealRequest, node)
if err != nil {
// return the error from the validation
return err
Expand Down Expand Up @@ -668,7 +668,12 @@ func handleEndToEndDeal(c echo.Context, node *core.DeltaNode) error {
// fail safe
dealRequest.ConnectionMode = "e2e"

err = ValidateMeta(dealRequest)
err = ValidateMeta(dealRequest, node)

// validate the file if it's more than 1mb
if file.Size < 1000000 && dealRequest.DealVerifyState == utils.DEAL_VERIFIED {
return errors.New("File size is too small")
}

if err != nil {
// return the error from the validation
Expand Down Expand Up @@ -732,7 +737,7 @@ func handleEndToEndDeal(c echo.Context, node *core.DeltaNode) error {

// assign a miner
if dealRequest.Miner == "" {
minerAssignService := core.NewMinerAssignmentService()
minerAssignService := core.NewMinerAssignmentService(*node)
provider, errOnPv := minerAssignService.GetSPWithGivenBytes(file.Size)
if errOnPv != nil {
return errOnPv
Expand Down Expand Up @@ -882,7 +887,7 @@ func handleEndToEndDeal(c echo.Context, node *core.DeltaNode) error {
}

// TODO: Improve this, this is a hack to make sure the replication is done before the deal is made
contents := ReplicateContent(dealReplication, dealRequest, tx)
contents := ReplicateContent(node, dealReplication, dealRequest, tx)
var dispatchJobs core.IProcessor
for _, contentRep := range contents {
dispatchJobs = jobs.NewPieceCommpProcessor(node, contentRep.Content) // straight to pieceCommp
Expand Down Expand Up @@ -940,13 +945,13 @@ func handleOnlineImportDeal(c echo.Context, node *core.DeltaNode) error {
}

dealRequest.ConnectionMode = "e2e"
err = ValidateMeta(dealRequest)
err = ValidateMeta(dealRequest, node)

if err != nil {
return err
}

err = ValidatePieceCommitmentMeta(dealRequest.PieceCommitment)
err = ValidatePieceCommitmentMeta(dealRequest.PieceCommitment, node)
if err != nil {
return err
}
Expand Down Expand Up @@ -995,7 +1000,7 @@ func handleOnlineImportDeal(c echo.Context, node *core.DeltaNode) error {

// assign a miner
if dealRequest.Miner == "" {
minerAssignService := core.NewMinerAssignmentService()
minerAssignService := core.NewMinerAssignmentService(*node)
provider, errOnPv := minerAssignService.GetSPWithGivenBytes(dealRequest.Size)
if errOnPv != nil {
return errOnPv
Expand Down Expand Up @@ -1172,13 +1177,13 @@ func handleImportDeal(c echo.Context, node *core.DeltaNode) error {
}

dealRequest.ConnectionMode = "import"
err = ValidateMeta(dealRequest)
err = ValidateMeta(dealRequest, node)

if err != nil {
return err
}

err = ValidatePieceCommitmentMeta(dealRequest.PieceCommitment)
err = ValidatePieceCommitmentMeta(dealRequest.PieceCommitment, node)
if err != nil {
return err
}
Expand Down Expand Up @@ -1227,7 +1232,7 @@ func handleImportDeal(c echo.Context, node *core.DeltaNode) error {

// assign a miner
if dealRequest.Miner == "" {
minerAssignService := core.NewMinerAssignmentService()
minerAssignService := core.NewMinerAssignmentService(*node)
provider, errOnPv := minerAssignService.GetSPWithGivenBytes(dealRequest.Size)
if errOnPv != nil {
return errOnPv
Expand Down Expand Up @@ -1414,13 +1419,13 @@ func handleMultipleOnlineImportDeals(c echo.Context, node *core.DeltaNode) error
return errors.New("Connection mode import is not supported on this online endpoint")
}
dealRequest.ConnectionMode = "e2e"
err = ValidateMeta(dealRequest)
err = ValidateMeta(dealRequest, node)
if err != nil {
tx.Rollback()
return err
}

err = ValidatePieceCommitmentMeta(dealRequest.PieceCommitment)
err = ValidatePieceCommitmentMeta(dealRequest.PieceCommitment, node)
if err != nil {
tx.Rollback()
return err
Expand Down Expand Up @@ -1469,7 +1474,7 @@ func handleMultipleOnlineImportDeals(c echo.Context, node *core.DeltaNode) error

// assign a miner
if dealRequest.Miner == "" {
minerAssignService := core.NewMinerAssignmentService()
minerAssignService := core.NewMinerAssignmentService(*node)
provider, errOnPv := minerAssignService.GetSPWithGivenBytes(dealRequest.Size)
if errOnPv != nil {
return errOnPv
Expand Down Expand Up @@ -1665,13 +1670,13 @@ func handleMultipleBatchImportDeals(c echo.Context, node *core.DeltaNode) error
return errors.New("Connection mode e2e is not supported on this import endpoint")
}
dealRequest.ConnectionMode = "import"
err = ValidateMeta(dealRequest)
err = ValidateMeta(dealRequest, node)
if err != nil {
fmt.Println("Error validating the meta", err)
return err
}

err = ValidatePieceCommitmentMeta(dealRequest.PieceCommitment)
err = ValidatePieceCommitmentMeta(dealRequest.PieceCommitment, node)
if err != nil {
fmt.Println("Error validating the piece commitment meta", err)
return err
Expand Down Expand Up @@ -1730,7 +1735,7 @@ func handleMultipleBatchImportDeals(c echo.Context, node *core.DeltaNode) error

// assign a miner
if dealRequest.Miner == "" {
minerAssignService := core.NewMinerAssignmentService()
minerAssignService := core.NewMinerAssignmentService(*node)
provider, errOnPv := minerAssignService.GetSPWithGivenBytes(dealRequest.Size)
if errOnPv != nil {
return errOnPv
Expand Down Expand Up @@ -1923,13 +1928,13 @@ func handleMultipleImportDeals(c echo.Context, node *core.DeltaNode) error {
return errors.New("Connection mode e2e is not supported on this import endpoint")
}
dealRequest.ConnectionMode = "import"
err = ValidateMeta(dealRequest)
err = ValidateMeta(dealRequest, node)
if err != nil {
tx.Rollback()
return err
}

err = ValidatePieceCommitmentMeta(dealRequest.PieceCommitment)
err = ValidatePieceCommitmentMeta(dealRequest.PieceCommitment, node)
if err != nil {
tx.Rollback()
return err
Expand Down Expand Up @@ -1978,7 +1983,7 @@ func handleMultipleImportDeals(c echo.Context, node *core.DeltaNode) error {

// assign a miner
if dealRequest.Miner == "" {
minerAssignService := core.NewMinerAssignmentService()
minerAssignService := core.NewMinerAssignmentService(*node)
provider, errOnPv := minerAssignService.GetSPWithGivenBytes(dealRequest.Size)
if errOnPv != nil {
return errOnPv
Expand Down Expand Up @@ -2191,7 +2196,7 @@ type ValidateMetaResult struct {
}

// ValidatePieceCommitmentMeta `ValidateMeta` validates the `DealRequest` struct and returns an error if the request is invalid
func ValidatePieceCommitmentMeta(pieceCommitmentRequest PieceCommitmentRequest) error {
func ValidatePieceCommitmentMeta(pieceCommitmentRequest PieceCommitmentRequest, node *core.DeltaNode) error {
if (PieceCommitmentRequest{} == pieceCommitmentRequest) {
return errors.New("invalid piece_commitment request. piece_commitment is required")
}
Expand All @@ -2200,7 +2205,7 @@ func ValidatePieceCommitmentMeta(pieceCommitmentRequest PieceCommitmentRequest)
}

// It validates the deal request and returns an error if the request is invalid
func ValidateMeta(dealRequest DealRequest) error {
func ValidateMeta(dealRequest DealRequest, node *core.DeltaNode) error {

if (DealRequest{} == dealRequest) {
return errors.New("invalid deal request")
Expand Down Expand Up @@ -2234,8 +2239,8 @@ func ValidateMeta(dealRequest DealRequest) error {
return errors.New("start_epoch_in_days is required when duration_in_days is set")
}

if (DealRequest{} != dealRequest && dealRequest.Replication > 6) {
return errors.New("replication factor can only be up to 6")
if (DealRequest{} != dealRequest && dealRequest.Replication > node.Config.Common.MaxReplicationFactor) {
return errors.New("replication factor can only be up to " + strconv.Itoa(node.Config.Common.MaxReplicationFactor))
}

if (DealRequest{} != dealRequest && dealRequest.StartEpochInDays > 0 && dealRequest.DurationInDays == 0) {
Expand Down Expand Up @@ -2320,7 +2325,7 @@ type ReplicatedContent struct {
DealResponse DealResponse
}

func ReplicateContent(contentSource DealReplication, dealRequest DealRequest, txn *gorm.DB) []ReplicatedContent {
func ReplicateContent(node *core.DeltaNode, contentSource DealReplication, dealRequest DealRequest, txn *gorm.DB) []ReplicatedContent {
var replicatedContents []ReplicatedContent
for i := 0; i < dealRequest.Replication; i++ {
var replicatedContent ReplicatedContent
Expand All @@ -2346,7 +2351,7 @@ func ReplicateContent(contentSource DealReplication, dealRequest DealRequest, tx
return nil
}
// assign a miner
minerAssignService := core.NewMinerAssignmentService()
minerAssignService := core.NewMinerAssignmentService(*node)
provider, errOnPv := minerAssignService.GetSPWithGivenBytes(newContent.Size)
if errOnPv != nil {
fmt.Println(errOnPv)
Expand Down
8 changes: 4 additions & 4 deletions api/open_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,28 +36,28 @@ func handleOpenGetBalance(c echo.Context, node *core.DeltaNode) error {
"message": "invalid address",
})
}
bigIntBalance, err := node.LotusApi.WalletBalance(context.Background(), address)
bigIntBalance, err := node.LotusApiNode.WalletBalance(context.Background(), address)
if err != nil {
return c.JSON(500, map[string]interface{}{
"message": "failed to get balance",
})
}

act, err := node.LotusApi.StateGetActor(context.Background(), address, types.EmptyTSK)
act, err := node.LotusApiNode.StateGetActor(context.Background(), address, types.EmptyTSK)
if err != nil {
return c.JSON(500, map[string]interface{}{
"message": "failed to get actor",
})
}

market, err := node.LotusApi.StateMarketBalance(context.Background(), address, types.EmptyTSK)
market, err := node.LotusApiNode.StateMarketBalance(context.Background(), address, types.EmptyTSK)
if err != nil {
return c.JSON(500, map[string]interface{}{
"message": "failed to get market balance",
})
}

vcstatus, err := node.LotusApi.StateVerifiedClientStatus(context.Background(), address, types.EmptyTSK)
vcstatus, err := node.LotusApiNode.StateVerifiedClientStatus(context.Background(), address, types.EmptyTSK)
if err != nil {
return c.JSON(500, map[string]interface{}{
"message": "failed to get verified client status",
Expand Down
32 changes: 24 additions & 8 deletions api/open_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,30 +65,46 @@ func handleOpenGetDealsWithPaging(c echo.Context, node *core.DeltaNode) error {

// get page number
page, err := strconv.Atoi(c.QueryParam("page"))
if err != nil {
if err != nil || page < 1 {
page = 1
}

// total
var total int64

node.DB.Model(&model.ContentDeal{}).Count(&total)

// get page size
pageSize, err := strconv.Atoi(c.QueryParam("page_size"))
if err != nil {
if err != nil || pageSize < 1 {
pageSize = 10
}

// get deals
var deals []model.ContentDeal
node.DB.Raw("SELECT c1.* FROM content_deals c1 JOIN ( SELECT MAX(id) AS max_id, content FROM content_deals GROUP BY content ) c2 ON c1.id = c2.max_id ORDER BY c1.id DESC").Limit(pageSize).Offset((page - 1) * pageSize).Find(&deals)

// Subquery to find the latest deal for each content
subQuery := node.DB.Table("content_deals").
Select("MAX(id) AS max_id, content").
Group("content")

// Execute main query with LIMIT and OFFSET clauses for paging
err = node.DB.Table("content_deals c1").
Select("c1.*").
Joins("JOIN (?) c2 ON c1.id = c2.max_id", subQuery).
Order("c1.id DESC").
Limit(pageSize).
Offset((page - 1) * pageSize).
Find(&deals).Error

if err != nil {
return err
}

return c.JSON(200, map[string]interface{}{
"total": total,
"page": page,
"content_deals": deals,
"page": page,
"total": total,
"deals": deals,
})

}

// It gets the content deal, content, content deal proposal, and piece commitment from the database and returns them as
Expand Down
4 changes: 2 additions & 2 deletions api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func InitializeEchoRouterConfig(ln *core.DeltaNode, config config.DeltaConfig) {
e.Use(middleware.CORSWithConfig(middleware.CORSConfig{
AllowOrigins: []string{"*"},
}))

e.Pre(middleware.RemoveTrailingSlash())
e.HTTPErrorHandler = ErrorHandler
apiGroup := e.Group("/api/v1")
Expand Down Expand Up @@ -247,7 +247,7 @@ func Authenticate(config config.DeltaConfig) func(next echo.HandlerFunc) echo.Ha
}
// if everything is good. we can check the token against estuary-auth.
response, err := http.Post(
utils.API_AUTH,
config.ExternalApis.AuthSvcApi+"/check-api-key",
"application/json",
strings.NewReader(fmt.Sprintf(`{"token": "%s"}`, authParts[1])),
)
Expand Down
Loading