From fa61997bd814c1acf65001cdef3838833be2258d Mon Sep 17 00:00:00 2001 From: LexLuthr <88259624+LexLuthr@users.noreply.github.com> Date: Wed, 6 Mar 2024 14:51:14 +0400 Subject: [PATCH] feat: LID clean up (#1881) * LID clean up * add test and fix lint errs * add minerID param for removing deal * Revert "add minerID param for removing deal" This reverts commit fb5997af42989a71a5a18990a765c55c6050f7cb. * add clean up interval config * fix test * account for FIP-45, change config type * fix test * fix TODOs * account sector mismatch * code cleanup * fix lint error --- api/api.go | 3 +- api/proxy_gen.go | 13 + build/openrpc/boost.json.gz | Bin 3064 -> 3112 bytes cmd/boostd/piecedir.go | 53 +++++ documentation/en/api-v1-methods.md | 18 ++ lib/pdcleaner/pdcleaner.go | 365 +++++++++++++++++++++++++++++ lib/pdcleaner/pdcleaner_test.go | 192 +++++++++++++++ node/builder.go | 2 + node/config/def.go | 1 + node/config/doc_gen.go | 8 + node/config/types.go | 4 + node/impl/boost.go | 8 + piecedirectory/piecedirectory.go | 6 + 13 files changed, 672 insertions(+), 1 deletion(-) create mode 100644 lib/pdcleaner/pdcleaner.go create mode 100644 lib/pdcleaner/pdcleaner_test.go diff --git a/api/api.go b/api/api.go index 5bdda5c09..327fb1866 100644 --- a/api/api.go +++ b/api/api.go @@ -46,7 +46,8 @@ type Boost interface { BlockstoreGetSize(ctx context.Context, c cid.Cid) (int, error) //perm:read // MethodGroup: PieceDirectory - PdBuildIndexForPieceCid(ctx context.Context, piececid cid.Cid) error //perm:admin + PdBuildIndexForPieceCid(ctx context.Context, piececid cid.Cid) error //perm:admin + PdRemoveDealForPiece(ctx context.Context, piececid cid.Cid, dealID string) error //perm:admin // MethodGroup: Misc OnlineBackup(context.Context, string) error //perm:admin diff --git a/api/proxy_gen.go b/api/proxy_gen.go index 019692161..372c6ee3b 100644 --- a/api/proxy_gen.go +++ b/api/proxy_gen.go @@ -67,6 +67,8 @@ type BoostStruct struct { OnlineBackup func(p0 context.Context, p1 string) error `perm:"admin"` PdBuildIndexForPieceCid func(p0 context.Context, p1 cid.Cid) error `perm:"admin"` + + PdRemoveDealForPiece func(p0 context.Context, p1 cid.Cid, p2 string) error `perm:"admin"` } } @@ -393,6 +395,17 @@ func (s *BoostStub) PdBuildIndexForPieceCid(p0 context.Context, p1 cid.Cid) erro return ErrNotSupported } +func (s *BoostStruct) PdRemoveDealForPiece(p0 context.Context, p1 cid.Cid, p2 string) error { + if s.Internal.PdRemoveDealForPiece == nil { + return ErrNotSupported + } + return s.Internal.PdRemoveDealForPiece(p0, p1, p2) +} + +func (s *BoostStub) PdRemoveDealForPiece(p0 context.Context, p1 cid.Cid, p2 string) error { + return ErrNotSupported +} + func (s *ChainIOStruct) ChainHasObj(p0 context.Context, p1 cid.Cid) (bool, error) { if s.Internal.ChainHasObj == nil { return false, ErrNotSupported diff --git a/build/openrpc/boost.json.gz b/build/openrpc/boost.json.gz index c44c6cf828ae689293a3b1f9aaceabd3fe320d12..f354288ca1dddec99ca1f3482664c380628e63c7 100644 GIT binary patch delta 3106 zcmV+-4Bhki7pNG2ABzY8000000RQY=dsExG68|b1-G88CLLQX1ndu(^+LCih!wU!q*@I${B9sX01_wYvQqmJ&;501i1a>7Bk|h zS?{R}Hm9t1;BxyakX*nAkl4Zw1h$~Qj7fa^_U)2?WrL8}Xe&T3qQ%an2yMBIW~OfW zb!2lU;YQXDH;_%8+97qe(bkuX{z!s_KnO0j(d}3ERnh?pYF#ElFyhYc1-*TX z7uPV?U)lI8`}XY{ zwr~)79uXtl#^*y2fQSIZxI`n6h_Df|x#t0s0UB_Dyg$O$24%lex`;gj0M43Zv zB0;U;g|4KwdUeMq_7ymjRpiKrZA6%Z2>Pf-4M{Lme1jX}`7Xq)*;RZagHf~pgV;b? z-veiVxIb!<*1qhwf-CU+fB4P6emLTu|4-i=z-Z9g8_-rb+7Cn6YhLf+@r8xeVz~VU z2U1XWi7&<$7VvLK1vt1ZMF@&nz5umt2e${r4M5Ek5QMH&j85(-ma!afCyxuRDRUpB)Ts_RA3d`}Qqfb$^;?ihHqWA8|W>P%F>w^8=N`u#W_ITmo__yTTR@g^SyG zD5W2?cXuzNAvK$OyMqJ`c6`DA0bB0&m7eeRUEbgINFYJ%5})q+g5Qk(cL}U^$^UcG ztk>~)Y%PJ5Gx{UX#qzMCrZ0yU%4BI=Q55yqQePQRl3f;qib08|i4v6MZD#yz$px{0 z(Cce1*lmBS*~5 zmCz&JqCt-5h8#-ja-2;bF83|eN}1iLc|h3XMuKLu014FOBOrNpEk=QKS`h+-A$4k> zKX;F|(bkK=Q@jjnJ;AT3142zalVnMMypk@VM+HXRN|=z@@SWqqy=_PVUkL*^)u z?GdTi7h5!RP25n)^0^=^7^r(nIWj*YK3;URxmS7#0_9m};aFn!nu_;-LqeH4o}%$` zPsva(?7K7=MuN&h*R}8&*pf#Kd}HC?27>@(DsU&jF6A)&?QjV8mCu#;-EL%mU8Feg zgvf9I6lA2tB$OOm_|w5iLU0N;&})SwZ!G+t1X7Kna6Ogv4r4rIHk|Wr?^8E^s^Eq= zP6W-da)OXlK|x}6{mi3(+Z>781mY$Jex7R@oK0pVzmI1bQ*C@FnxQ;3qIl>K2|JQ{ zFrAgSLE}03*X~=3Q5Be@QPm2w(~WuH(CCLgWkihLF`GN;2ui5}*pHMWceq#qmNRcU zm-m(96XE1Kr(HTJw_G0Rs#2KZZ5e4vu)FFaS$niF^vK;s#_p7V+27RORNkV-mQ`SDm%Qd1cItsZMZb&DX6)<)@BGS(KPJX4S5K>O0-0p5}a@qW3G8@_o6N znK+r_#AayNF=rHX7?`uV8ZsHrLPsJsdS#?(-g1?hlbYC4x|OlkRqrw}vKj_VEq<9j zWc`%N@LC|)ssFNnqO~XwP7}K_OJae3en}ra>QKMcS`z9XjFg=RCpDgN>S~qfUqfkm zN^s3|fM(6jW(f3$GD*bemP9m(3zN7oi3^jsFo_G3xG;$elejR63zN7oi3^jsFo}yh zN?h#iElONa0rtw33X{o2>4`Len~sdHdmC-N$Od(%pw2^oIqR|kU!Jd+P}^XNB72rQ z&q@^WrAl1;If@2K#Hp)-&m zx}LhoRQpcB-?FfR>NzZ zO|0gzF0{Ura|b7Xo&pJCFv{v&RYjQ))iE|*qS)v@4HNtFs_a;@7l{AUr`%MhrO}h3TvaxsUzL@l%N&lJj-y=%@Z6H?RXsLXBq`YV7YX2sOIQfg2F1>U3OJ-C)&^A}*kRqzOZ|-4Sjr$b3`Is+xn5jQXgp;Wk9<^w`dR1ODSB!@>REi?w;^Lugw3SqaEe(e? z>n-T+SFH>`YKajW$LR;g2g0Qo^XQ2)K{7MDpOze^s-vh zkN235{XCd|kNrH*{KV_m)$$W@#(I?x@f8zW{yDxiQWMD<_mrWSp2&QN?=zsP|68C{a+4GpF?I~-rf!s%OQ4bVyTZ=kdC$Mrt136(T ztWjn*N2tvA@_3WWoJ=eWy>qKA{L&uHGApBS{*(rSpR#7_zoJJYQ5p|kKdOv7fAIb7 z+c`IXe)Ap9=YxP1q{Y`zSgVB1e{t759M?W{37?|DGHEX+>Y`C)YEn(1=p7Y`POHB*+BTx&IPXK|1QF4=OfoISm_CVw zVD+-o$%U*qKdy;x?==$vNUBV+g+GEM4v}Pk`sBg|5Q#BLo4t)Rdw|+&F~* zL{#SzUF0bYqDxBZL-aMQn=PuHHfKmKB`f*OJqw?b@6lI6Wr|=s*Mz#H@4|a7atEF* zj*aTAC}-z1!i9p8*7U%KL&6xi-AL}bj7mx*Uqc{HA@DhK){!Z7l-8ueC-$78dfKFa zF6yvzy6y*_vYJt~B&pykii=tGRdJ8AOm$YcR%yplwqmP9o!p#0SKUokoHREyff4O% zS#Z(~h+(y;HA`{h7E=an%77nw8F0>Dsdy(DC!~5k=%Au%tlkQ#qU_9C?NT#VDsP^3 zRZ3Ka&Rfz_qLuNr%86p+$5koCo$)+>DqLt;1CnuOG(em2Q=VPp!q&@0jSG{yas>x} zP&tfTjp`{F4V9%|y=Wo7iQ-GJkl#WIg4S!gq}j#h+AryksE(u#qE2AGwDB&!v~eo9 zely>us(gXQTz$r>?=nqy)A~tY;cA*vJk_QYt^I=jcfT zzzX|MWQnZXZ(?pk#hL57>oKvg@7;X^ifMRp(t1_Ui>2orgw%DSw|(DpG4bsyX+Qyy zrSA?EYC{3Rbrqv{pASt(%$q?E?9HI(h8M338szkx__I{Xvq@E`Dp=$xEJ2D)7_$&G w@`Tq%4DDvqki@(p{8TaEVTD8)29Zu$Z(if^_+tG300030|K>94xwMY}0PXk~`2YX_ delta 3055 zcmVKnESwlZ{s#@)S9(J zY+=d<9JlYWq|yc4#z&k75?!1hqt1C3Tlf}4Ksm#0+^jW!YE3-0um^I;odCB#-(yBR zHR~O9!RD0Jj$Cfv1dMYLE?iqMw3XlCk; zUq?1)67FQ}IR4*4egoOmsU1^i7wvq$>W?H?2!!Bb7u|nhUnCu%pw?v)1S9V3UeKF2 zh*38l_eBbSkfC7c1%q!1rCkfX72+e8z0mi$kl?71WA#rmxQO(QUsrqE{ldmy*w?RL zv4w-s^N1MXHohE!07L{J#w8koM1+ly%{>p84A6iJB!@IWF;i>U!afnii>+q0d)*OlnM7TG2biP-3L;1Am^w&+FCd6^Oi%#s zU@soir|deuCngbb905T94x$rkHPn@eM(6avLH!XT=(kv;EJV;TcfdwF--gs4BFY?U z6A5Y!FLWif)vJ3xv2VbctRhD~>>|P(M9>E{YDj{i;v3u%&vzka&A#Fr8H}3!@5Bbu z`W85U!^2UFv<_vz72JU5|IP3I@xu}K{J;C&07iq>!GN~H(P0?ELG$(ikFPAO7Q^k& zIFN#}YkW1fuz-I;D!{>QDMC=p@&%}EJGea{ZUAbYfFN|GVsvsxv5e(-J9%7iO_?*{ zKbc$qt9LZC+0R4x@0&Mx)%|InDelFheZuX3K&?Ex&-YXg!#)z=aS6z!>;_vn6fSP# zp_G2m-rv8DhSY5C?GF+(*z*Pd8*I7XS9-qRcX@x`BY_05Pkg%X3w}5H-!-t>HUH09 z(2a4!G`01?uq(& z6HsWy6eI7x-Ke*k7XA%{LU<=q)r9}?#&8=C>WyaW09!caTlU!*>4O_{ttW|)Z+&hL z71>6;Vd1yTnSIccT##!x7u1Gx5XpT4n;7_cu4!;KnUVZHo@Gq6@ttUf^3;gpp+h9> zN$SCLR^kSY=j30zZ!JbuV2(yrE6h$e=7B?_ANrILF?!2v?x-Uur3&CMQjXl=Vg*>v zyy;xtSB_7Flk1#z>7?9pd7!IGVT!k9q$R=bs*7ap(ZbLpcNZCdyHjR=Q+rc+iyB)} zl}6T_mD#P{v{hYo))MEHF)yY%!JRc<_ZpR-IwoaNV&0flyZWi`bf0>f^MQ)quUyLa zzHiqC7ZF?8+>O1^W3pee|S5{Z?a1sDCt4b{?G6c*d!#Rib|l zrR6EXHPZo_H8-0f&>zYq5r4EKqDfqs#Dz&*n8bxiT$sd#NnDu3g-KkP#Dz&*n8bxi zTx=MlTkoriMPWdpuEUooM!!4yUI zEL+b?6!E1>T>B}C21>@Y9oOYHi8y?M+x65Zh~a%RQBJJo9mkmoH_ODB#evZYxFDf3 zkRZCAy2w=fF2Lir@QDRLTyT!LKT2Qe-B4dS^r?HlT5K@Gu2PPC=JZuvi0O6yo2FL7 zYn@H3=CLl5ZwD!VUH}PVFv{v&RYjQ))iE|*qS)v@4-@;cwaCPVo;O<6!lrYV8rV&l z)^x!QTGq@N9?r0C-=u1?k|r_yu!-UAq-!4_8@pxq#iajC`p=~Qo>2O41F5=yJ$_h{ zOWUl=ZZc6O)c7Go4JCYqp+=WEa0en)osR3O8?5?K#08XpG-1eY98wfU6a|#@@kPbZ zqIj1N-6uh(aUVlBAM+&-GxbM_a5DA6lNQZaFUyPOit(_9N>OB7Ts*Xkc9N>FrQxt< zy#+l!(qg>IA+r8hTMw2^eU%BLO?{QAuQK&jroPG~SWJTD87EjweU+)NGWAu@FRL~E zc#rwm&!hQ&*v}))PyBRPEk6-wtXKIEUoo-epW|C2HIc0GKpBeZiOh%ip0hR^*nJ`w z^+;j&^{eu*d#0|TYD?!rI@)(DRt4CYKF=n9jIU-JBF+nQM!?r?U}%8I08v>W zY7{kG*6A8t6MNNq>q1&vi`%A4JOmD9&tsyt=d8&Fa-YaWJyOVRE$VEYz{(j7QyK_<&YG?NiXM$bX*_uSs50*S(YH5m z=G^#y&38DT4+2(<3mz%<9V}|~H}#>8M(efuE1r{uo;&U4O`_b@ED^fXD7mX@YlWGb@w^S%Pt^V3*+lY?iybGBVL`35<$+Q$>`Xmy9 z)yqyN7qa5~xF))N&`bm%sWQbD{s59VM3U)$lb%b0;aq4A@jgfVctk=%6|m6S-nhCp0E;B)4zBU9=qtx1JX>^VjCv`Jlm z)N$ux-48rvHKS@tQo&Ue7qjZC;+|%i>a1|B(vGEU#a4+rxjB8Vx|^&xX>Mo&Bih%p z;G`Q6!)j4$mg2@OrVQAW0YCOK;GDlw@lG;MNcDQqK}FSAy%kbL*_pN4rDm*D-aPB7 zl&A`wx1^;+E8}aG6UE4nt5S+P<9SqnxX`i&B;(9zfHvc&{BVs6TQ3(iE==mm6&(Cd ziZ8)JehVoGTCeGnW*3`lzob8+I+8kwI)VAp#%+9Q<5Y0{ zX1+~T`2vl(`ixcIWt#4$^)G#et7%H{T$@s~4h#C<{mK;d91;5_tXLM7omtps`z|Q~ zE9^g!C9-b6iMb6GXRf!`V`5?7yN3o8)9~V~^{Sv3%eiwDQrC&z_I=02#J8`c0R=>s xzB^Q?4Fv?(RgB_&J~kaOZw5WGH-mmOy!h!Q9*?ia{|^8F|NkUP%2XnY0035M1Z)5R diff --git a/cmd/boostd/piecedir.go b/cmd/boostd/piecedir.go index 08854b6aa..f508af37d 100644 --- a/cmd/boostd/piecedir.go +++ b/cmd/boostd/piecedir.go @@ -7,6 +7,7 @@ import ( bcli "github.com/filecoin-project/boost/cli" lcli "github.com/filecoin-project/lotus/cli" + "github.com/google/uuid" "github.com/ipfs/go-cid" "github.com/urfave/cli/v2" ) @@ -17,6 +18,7 @@ var pieceDirCmd = &cli.Command{ Subcommands: []*cli.Command{ pdIndexGenerate, recoverCmd, + removeDealCmd, }, } @@ -56,3 +58,54 @@ var pdIndexGenerate = &cli.Command{ return nil }, } + +var removeDealCmd = &cli.Command{ + Name: "remove-deal", + Usage: "Removes a deal from piece metadata in LID. If the specified deal is the only one in piece metadata, index and metadata are also removed", + ArgsUsage: " ", + Action: func(cctx *cli.Context) error { + + ctx := lcli.ReqContext(cctx) + + if cctx.Args().Len() > 2 { + return fmt.Errorf("must specify piece CID and deal UUID/Proposal CID") + } + + napi, closer, err := bcli.GetBoostAPI(cctx) + if err != nil { + return err + } + defer closer() + + // parse piececid + piececid, err := cid.Decode(cctx.Args().Get(0)) + if err != nil { + return fmt.Errorf("parsing piece CID: %w", err) + } + + id := cctx.Args().Get(1) + + // Parse to avoid sending garbage data to API + dealUuid, err := uuid.Parse(id) + if err != nil { + propCid, err := cid.Decode(id) + if err != nil { + return fmt.Errorf("could not parse '%s' as deal uuid or proposal cid", id) + } + err = napi.PdRemoveDealForPiece(ctx, piececid, propCid.String()) + if err != nil { + return err + } + fmt.Printf("Deal %s removed for piece %s\n", propCid, piececid) + return nil + } + + err = napi.PdRemoveDealForPiece(ctx, piececid, dealUuid.String()) + if err != nil { + return err + } + fmt.Printf("Deal %s removed for piece %s\n", dealUuid, piececid) + return nil + + }, +} diff --git a/documentation/en/api-v1-methods.md b/documentation/en/api-v1-methods.md index 033e7d633..065025d50 100644 --- a/documentation/en/api-v1-methods.md +++ b/documentation/en/api-v1-methods.md @@ -55,6 +55,7 @@ * [OnlineBackup](#onlinebackup) * [Pd](#pd) * [PdBuildIndexForPieceCid](#pdbuildindexforpiececid) + * [PdRemoveDealForPiece](#pdremovedealforpiece) ## @@ -1244,3 +1245,20 @@ Inputs: Response: `{}` +### PdRemoveDealForPiece + + +Perms: admin + +Inputs: +```json +[ + { + "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" + }, + "string value" +] +``` + +Response: `{}` + diff --git a/lib/pdcleaner/pdcleaner.go b/lib/pdcleaner/pdcleaner.go new file mode 100644 index 000000000..4efca4d56 --- /dev/null +++ b/lib/pdcleaner/pdcleaner.go @@ -0,0 +1,365 @@ +package pdcleaner + +import ( + "fmt" + "strconv" + "sync" + "time" + + "github.com/filecoin-project/boost/db" + "github.com/filecoin-project/boost/lib/legacy" + "github.com/filecoin-project/boost/node/config" + "github.com/filecoin-project/boost/piecedirectory" + "github.com/filecoin-project/boost/storagemarket/types" + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-bitfield" + "github.com/filecoin-project/go-state-types/abi" + verifregtypes "github.com/filecoin-project/go-state-types/builtin/v9/verifreg" + "github.com/filecoin-project/lotus/api/v1api" + "github.com/filecoin-project/lotus/blockstore" + "github.com/filecoin-project/lotus/chain/actors/adt" + "github.com/filecoin-project/lotus/chain/actors/builtin/miner" + "github.com/filecoin-project/lotus/chain/actors/builtin/verifreg" + cbor "github.com/ipfs/go-ipld-cbor" + logging "github.com/ipfs/go-log/v2" + "go.uber.org/fx" + "golang.org/x/net/context" +) + +var log = logging.Logger("pdcleaner") + +type PieceDirectoryCleanup interface { + Start(ctx context.Context) + CleanOnce() error +} + +type pdcleaner struct { + ctx context.Context + miner address.Address + dealsDB *db.DealsDB + directDealsDB *db.DirectDealsDB + legacyDeals legacy.LegacyDealManager + pd *piecedirectory.PieceDirectory + full v1api.FullNode + startOnce sync.Once + lk sync.Mutex + cleanupInterval time.Duration +} + +func NewPieceDirectoryCleaner(cfg *config.Boost) func(lc fx.Lifecycle, dealsDB *db.DealsDB, directDealsDB *db.DirectDealsDB, legacyDeals legacy.LegacyDealManager, pd *piecedirectory.PieceDirectory, full v1api.FullNode) PieceDirectoryCleanup { + return func(lc fx.Lifecycle, dealsDB *db.DealsDB, directDealsDB *db.DirectDealsDB, legacyDeals legacy.LegacyDealManager, pd *piecedirectory.PieceDirectory, full v1api.FullNode) PieceDirectoryCleanup { + + // Don't start cleanup loop if duration is '0s' + if time.Duration(cfg.LocalIndexDirectory.LidCleanupInterval).Seconds() == 0 { + return nil + } + + pdc := newPDC(dealsDB, directDealsDB, legacyDeals, pd, full, time.Duration(cfg.LocalIndexDirectory.LidCleanupInterval)) + + cctx, cancel := context.WithCancel(context.Background()) + + lc.Append(fx.Hook{ + OnStart: func(ctx context.Context) error { + mid, err := address.NewFromString(cfg.Wallets.Miner) + if err != nil { + return fmt.Errorf("failed to parse the miner ID %s: %w", cfg.Wallets.Miner, err) + } + pdc.miner = mid + go pdc.Start(cctx) + return nil + }, + OnStop: func(ctx context.Context) error { + cancel() + return nil + }, + }) + + return pdc + + } +} + +func newPDC(dealsDB *db.DealsDB, directDealsDB *db.DirectDealsDB, legacyDeals legacy.LegacyDealManager, pd *piecedirectory.PieceDirectory, full v1api.FullNode, cleanupInterval time.Duration) *pdcleaner { + return &pdcleaner{ + dealsDB: dealsDB, + directDealsDB: directDealsDB, + legacyDeals: legacyDeals, + pd: pd, + full: full, + cleanupInterval: cleanupInterval, + } +} + +func (p *pdcleaner) Start(ctx context.Context) { + p.startOnce.Do(func() { + p.ctx = ctx + go p.clean() + }) + +} + +func (p *pdcleaner) clean() { + // Create a ticker with an hour tick + ticker := time.NewTicker(p.cleanupInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + log.Infof("Starting LID clean up") + err := p.CleanOnce() + if err != nil { + log.Errorf("Failed to cleanup LID: %s", err) + continue + } + log.Debugf("Finished cleaning up LID") + case <-p.ctx.Done(): + return + } + } +} + +// CleanOnce generates a list of all Expired-Boost, Legacy and Direct deals. It then attempts to clean up these deals. +// It also generated a list of all pieces in LID and tries to find any pieceMetadata with no deals in Boost, Direct or Legacy DB. +// If such a deal is found, it is cleaned up as well +func (p *pdcleaner) CleanOnce() error { + p.lk.Lock() + defer p.lk.Unlock() + + head, err := p.full.ChainHead(p.ctx) + if err != nil { + return fmt.Errorf("getting chain head: %w", err) + } + tskey := head.Key() + deals, err := p.full.StateMarketDeals(p.ctx, tskey) + if err != nil { + return fmt.Errorf("getting market deals: %w", err) + } + + boostCompleteDeals, err := p.dealsDB.ListCompleted(p.ctx) + if err != nil { + return fmt.Errorf("getting complete boost deals: %w", err) + } + boostActiveDeals, err := p.dealsDB.ListActive(p.ctx) + if err != nil { + return fmt.Errorf("getting active boost deals: %w", err) + } + + boostDeals := make([]*types.ProviderDealState, 0, len(boostActiveDeals)+len(boostCompleteDeals)) + + boostDeals = append(boostDeals, boostCompleteDeals...) + boostDeals = append(boostDeals, boostActiveDeals...) + + legacyDeals, err := p.legacyDeals.ListDeals() + if err != nil { + return fmt.Errorf("getting legacy deals: %w", err) + } + completeDirectDeals, err := p.directDealsDB.ListCompleted(p.ctx) + if err != nil { + return fmt.Errorf("getting complete direct deals: %w", err) + } + + // Clean up completed/slashed Boost deals + for _, d := range boostDeals { + // Confirm deal did not reach termination before Publishing. Otherwise, no need to clean up + if d.ChainDealID > abi.DealID(0) { + // If deal exists online + md, ok := deals[strconv.FormatInt(int64(d.ChainDealID), 10)] + if ok { + // If deal is slashed or end epoch has passed. No other reason for deal to reach termination + // Same is true for verified deals. We rely on EndEpoch/SlashEpoch for verified deals created by f05 + toCheck := termOrSlash(md.Proposal.EndEpoch, md.State.SlashEpoch) + if toCheck < head.Height() { + err = p.pd.RemoveDealForPiece(p.ctx, d.ClientDealProposal.Proposal.PieceCID, d.DealUuid.String()) + if err != nil { + // Don't return if cleaning up a deal results in error. Try them all. + log.Errorf("cleaning up boost deal %s for piece %s: %s", d.DealUuid.String(), d.ClientDealProposal.Proposal.PieceCID.String(), err.Error()) + } + } + } + } + } + + // Clean up completed/slashed legacy deals + for _, d := range legacyDeals { + // Confirm deal did not reach termination before Publishing. Otherwise, no need to clean up + if d.DealID > abi.DealID(0) { + // If deal exists online + md, ok := deals[strconv.FormatInt(int64(d.DealID), 10)] + if ok { + // If deal is slashed or end epoch has passed. No other reason for deal to reach termination + toCheck := termOrSlash(md.Proposal.EndEpoch, md.State.SlashEpoch) + if toCheck < head.Height() { + err = p.pd.RemoveDealForPiece(p.ctx, d.ClientDealProposal.Proposal.PieceCID, d.ProposalCid.String()) + if err != nil { + // Don't return if cleaning up a deal results in error. Try them all. + log.Errorf("cleaning up legacy deal %s for piece %s: %s", d.ProposalCid.String(), d.ClientDealProposal.Proposal.PieceCID.String(), err.Error()) + } + } + } + } + } + + // Clean up direct deals if there are any otherwise skip this step + if len(completeDirectDeals) > 0 { + claims, err := p.full.StateGetClaims(p.ctx, p.miner, tskey) + if err != nil { + return fmt.Errorf("getting claims for the miner %s: %w", p.miner, err) + } + + // Loading miner actor locally is preferred to avoid getting unnecessary data from full.StateMinerActiveSectors() + mActor, err := p.full.StateGetActor(p.ctx, p.miner, tskey) + if err != nil { + return fmt.Errorf("getting actor for the miner %s: %w", p.miner, err) + } + store := adt.WrapStore(p.ctx, cbor.NewCborStore(blockstore.NewAPIBlockstore(p.full))) + mas, err := miner.Load(store, mActor) + if err != nil { + return fmt.Errorf("loading miner actor state %s: %w", p.miner, err) + } + activeSectors, err := miner.AllPartSectors(mas, miner.Partition.ActiveSectors) + if err != nil { + return fmt.Errorf("getting active sector sets for miner %s: %w", p.miner, err) + } + unProvenSectors, err := miner.AllPartSectors(mas, miner.Partition.UnprovenSectors) + if err != nil { + return fmt.Errorf("getting unproven sector sets for miner %s: %w", p.miner, err) + } + finalSectors, err := bitfield.MergeBitFields(activeSectors, unProvenSectors) + if err != nil { + return fmt.Errorf("merging bitfields to generate all deal sectors on miner %s: %w", p.miner, err) + } + + // Load verifreg actor locally + verifregActor, err := p.full.StateGetActor(p.ctx, verifreg.Address, tskey) + if err != nil { + return fmt.Errorf("getting verified registry actor state: %w", err) + } + verifregState, err := verifreg.Load(store, verifregActor) + if err != nil { + return fmt.Errorf("loading verified registry actor state: %w", err) + } + + for _, d := range completeDirectDeals { + cID := verifregtypes.ClaimId(d.AllocationID) + c, ok := claims[cID] + // If claim found + if ok { + // Claim Sector number and Deal Sector number should match(regardless of how DDO works) + // If they don't match and older sector is removed, then we can't use the metadata + // This check can be removed once Curio has resealing enabled, and it can provide + // new replacement sector details to Boost before deal reached "Complete" state. + if c.Sector != d.SectorID { + err = p.pd.RemoveDealForPiece(p.ctx, d.PieceCID, d.ID.String()) + if err != nil { + // Don't return if cleaning up a deal results in error. Try them all. + log.Errorf("cleaning up direct deal %s for piece %s: %s", d.ID.String(), d.PieceCID, err.Error()) + } + continue + } + present, err := finalSectors.IsSet(uint64(c.Sector)) + if err != nil { + return fmt.Errorf("checking if bitfield is set: %w", err) + } + // Each claim is created with ProveCommit message. So, a sector in claim cannot be unproven. + // it must be either Active(Proving, Faulty, Recovering) or terminated. If bitfield is not set + // then sector must have been terminated. This method will also account for future change in sector numbers + // of a claim. Even if the sector is changed then it must be Active as this change will require a + // ProveCommit message. + if !present { + err = p.pd.RemoveDealForPiece(p.ctx, d.PieceCID, d.ID.String()) + if err != nil { + // Don't return if cleaning up a deal results in error. Try them all. + log.Errorf("cleaning up direct deal %s for piece %s: %s", d.ID.String(), d.PieceCID, err.Error()) + } + } + continue + } + + // If no claim found + alloc, ok, err := verifregState.GetAllocation(d.Client, d.AllocationID) + if err != nil { + return fmt.Errorf("getting allocation %d for client %s: %w", d.AllocationID, d.Client, err) + } + if !ok || alloc.Expiration < head.Height() { + // If allocation is expired, clean up the deal. If the allocation does not exist anymore. + // Either it was claimed and then claim was cleaned up after TermMax + // or allocation expired before it could be claimed and was cleaned up + // Deal should be cleaned up in either case + err = p.pd.RemoveDealForPiece(p.ctx, d.PieceCID, d.ID.String()) + if err != nil { + // Don't return if cleaning up a deal results in error. Try them all. + log.Errorf("cleaning up direct deal %s for piece %s: %s", d.ID.String(), d.PieceCID, err.Error()) + } + continue + } + + if alloc.Expiration < head.Height() { + err = p.pd.RemoveDealForPiece(p.ctx, d.PieceCID, d.ID.String()) + if err != nil { + // Don't return if cleaning up a deal results in error. Try them all. + log.Errorf("cleaning up direct deal %s for piece %s: %s", d.ID.String(), d.PieceCID, err.Error()) + } + continue + } + } + } + + // Clean up dangling LID deals with no Boost, Direct or Legacy deals attached to them + plist, err := p.pd.ListPieces(p.ctx) + if err != nil { + return fmt.Errorf("getting piece list from LID: %w", err) + } + + for _, piece := range plist { + pdeals, err := p.pd.GetPieceDeals(p.ctx, piece) + if err != nil { + return fmt.Errorf("getting piece deals from LID: %w", err) + } + for _, deal := range pdeals { + // Remove only if the miner ID matches to avoid removing for other miners in case of shared LID + if deal.MinerAddr == p.miner { + + bd, err := p.dealsDB.ByPieceCID(p.ctx, piece) + if err != nil { + return err + } + if len(bd) > 0 { + continue + } + + ld, err := p.legacyDeals.ByPieceCid(p.ctx, piece) + if err != nil { + return err + } + if len(ld) > 0 { + continue + } + + dd, err := p.directDealsDB.ByPieceCID(p.ctx, piece) + if err != nil { + return err + } + if len(dd) > 0 { + continue + } + + err = p.pd.RemoveDealForPiece(p.ctx, piece, deal.DealUuid) + if err != nil { + // Don't return if cleaning up a deal results in error. Try them all. + log.Errorf("cleaning up dangling deal %s for piece %s: %s", deal.DealUuid, piece, err.Error()) + } + } + } + } + + return nil +} + +func termOrSlash(term, slash abi.ChainEpoch) abi.ChainEpoch { + if term > slash && slash > 0 { + return slash + } + + return term +} diff --git a/lib/pdcleaner/pdcleaner_test.go b/lib/pdcleaner/pdcleaner_test.go new file mode 100644 index 000000000..bad5d52df --- /dev/null +++ b/lib/pdcleaner/pdcleaner_test.go @@ -0,0 +1,192 @@ +package pdcleaner + +import ( + "context" + "fmt" + "os" + "strconv" + "testing" + + bdb "github.com/filecoin-project/boost/db" + "github.com/filecoin-project/boost/db/migrations" + "github.com/filecoin-project/boost/extern/boostd-data/client" + "github.com/filecoin-project/boost/extern/boostd-data/model" + "github.com/filecoin-project/boost/extern/boostd-data/svc" + mocks_legacy "github.com/filecoin-project/boost/lib/legacy/mocks" + "github.com/filecoin-project/boost/piecedirectory" + "github.com/filecoin-project/boost/storagemarket/types/dealcheckpoints" + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lotus/api" + lotusmocks "github.com/filecoin-project/lotus/api/mocks" + test "github.com/filecoin-project/lotus/chain/events/state/mock" + chaintypes "github.com/filecoin-project/lotus/chain/types" + "github.com/golang/mock/gomock" + "github.com/google/uuid" + "github.com/ipfs/go-cid" + "github.com/ipld/go-car/v2" + "github.com/stretchr/testify/require" +) + +func TestPieceDirectoryCleaner(t *testing.T) { + ctx := context.Background() + + sqldb := bdb.CreateTestTmpDB(t) + require.NoError(t, bdb.CreateAllBoostTables(ctx, sqldb, sqldb)) + require.NoError(t, migrations.Migrate(sqldb)) + + dealsDB := bdb.NewDealsDB(sqldb) + directDB := bdb.NewDirectDealsDB(sqldb) + + bdsvc, err := svc.NewLevelDB("") + require.NoError(t, err) + ln, err := bdsvc.Start(ctx, "localhost:0") + require.NoError(t, err) + + cl := client.NewStore() + err = cl.Dial(ctx, fmt.Sprintf("ws://%s", ln)) + require.NoError(t, err) + defer cl.Close(ctx) + + pieceCount := 5 + readers := make(map[abi.SectorNumber]car.SectionReader) + for i := 0; i < pieceCount; i++ { + // Create a random CAR file + _, carFilePath := piecedirectory.CreateCarFile(t, i+1) + carFile, err := os.Open(carFilePath) + require.NoError(t, err) + defer carFile.Close() + + carReader, err := car.OpenReader(carFilePath) + require.NoError(t, err) + defer carReader.Close() + carv1Reader, err := carReader.DataReader() + require.NoError(t, err) + + readers[abi.SectorNumber(i+1)] = carv1Reader + } + + // Any calls to get a reader over data should return a reader over the random CAR file + pr := piecedirectory.CreateMockPieceReaders(t, readers) + + pm := piecedirectory.NewPieceDirectory(cl, pr, 1) + pm.Start(ctx) + + type dealData struct { + sector abi.SectorNumber + chainDealID abi.DealID + piece cid.Cid + used bool + } + + deals, err := bdb.GenerateDeals() + require.NoError(t, err) + + // Create and update a map to keep track of chainDealID and UUID bindings + dealMap := make(map[uuid.UUID]*dealData) + for _, deal := range deals { + dealMap[deal.DealUuid] = &dealData{chainDealID: deal.ChainDealID, used: false} + } + + // Add deals to LID and note down details to update SQL DB + for sectorNumber, reader := range readers { + pieceCid := piecedirectory.CalculateCommp(t, reader).PieceCID + + var uid uuid.UUID + var cdid abi.DealID + + for id, data := range dealMap { + // If this value from deals list has not be used + if !data.used { + uid = id // Use the UUID from deals list + cdid = data.chainDealID + data.used = true + data.sector = sectorNumber // Use the sector number from deals list + data.piece = pieceCid + break + } + } + + // Add deal info for each piece + di := model.DealInfo{ + DealUuid: uid.String(), + ChainDealID: cdid, + SectorID: sectorNumber, + PieceOffset: 0, + PieceLength: 0, + } + err := pm.AddDealForPiece(ctx, pieceCid, di) + require.NoError(t, err) + } + + // Setup Full node, legacy manager + ctrl := gomock.NewController(t) + fn := lotusmocks.NewMockFullNode(ctrl) + legacyProv := mocks_legacy.NewMockLegacyDealManager(ctrl) + provAddr, err := address.NewIDAddress(1523) + require.NoError(t, err) + + // Start a new PieceDirectoryCleaner + pdc := newPDC(dealsDB, directDB, legacyProv, pm, fn, 1) + pdc.ctx = ctx + + chainHead, err := test.MockTipset(provAddr, 1) + require.NoError(t, err) + chainHeadFn := func(ctx context.Context) (*chaintypes.TipSet, error) { + return chainHead, nil + } + + // Add deals to SQL DB + cDealMap := make(map[string]*api.MarketDeal) + for i, deal := range deals { + data, ok := dealMap[deal.DealUuid] + require.True(t, ok) + deal.SectorID = data.sector + deal.ClientDealProposal.Proposal.PieceCID = data.piece + deal.ClientDealProposal.Proposal.EndEpoch = 3 // because chain head is always 5 + deal.Checkpoint = dealcheckpoints.Complete + p, err := deal.SignedProposalCid() + require.NoError(t, err) + t.Logf("signed p %s", p.String()) + // Test a slashed deal + if i == 0 { + deal.Checkpoint = dealcheckpoints.Accepted + deal.ClientDealProposal.Proposal.EndEpoch = 6 + cDealMap[strconv.FormatInt(int64(deal.ChainDealID), 10)] = &api.MarketDeal{ + Proposal: deal.ClientDealProposal.Proposal, + State: api.MarketDealState{ + SlashEpoch: 3, // Slash this deal + }, + } + err = dealsDB.Insert(ctx, &deal) + require.NoError(t, err) + continue + } + cDealMap[strconv.FormatInt(int64(deal.ChainDealID), 10)] = &api.MarketDeal{ + Proposal: deal.ClientDealProposal.Proposal, + State: api.MarketDealState{ + SlashEpoch: -1, + }, + } + err = dealsDB.Insert(ctx, &deal) + require.NoError(t, err) + } + + // Confirm we have 5 pieces in LID + pl, err := pm.ListPieces(ctx) + require.NoError(t, err) + require.Len(t, pl, 5) + + fn.EXPECT().ChainHead(gomock.Any()).DoAndReturn(chainHeadFn).AnyTimes() + fn.EXPECT().StateMarketDeals(gomock.Any(), gomock.Any()).Return(cDealMap, nil).AnyTimes() + legacyProv.EXPECT().ListDeals().Return(nil, nil).AnyTimes() + legacyProv.EXPECT().ByPieceCid(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() + + err = pdc.CleanOnce() + require.NoError(t, err) + + // Confirm we have 0 pieces in LID after clean up + pl, err = pm.ListPieces(ctx) + require.NoError(t, err) + require.Len(t, pl, 0) +} diff --git a/node/builder.go b/node/builder.go index 4ae61cde6..152511d1d 100644 --- a/node/builder.go +++ b/node/builder.go @@ -18,6 +18,7 @@ import ( "github.com/filecoin-project/boost/indexprovider" "github.com/filecoin-project/boost/lib/legacy" "github.com/filecoin-project/boost/lib/mpoolmonitor" + "github.com/filecoin-project/boost/lib/pdcleaner" "github.com/filecoin-project/boost/markets/idxprov" "github.com/filecoin-project/boost/markets/storageadapter" "github.com/filecoin-project/boost/node/config" @@ -555,6 +556,7 @@ func ConfigBoost(cfg *config.Boost) Option { Override(new(sealer.StorageAuth), lotus_modules.StorageAuthWithURL(cfg.SectorIndexApiInfo)), Override(new(*backupmgr.BackupMgr), modules.NewOnlineBackupMgr(cfg)), + Override(new(pdcleaner.PieceDirectoryCleanup), pdcleaner.NewPieceDirectoryCleaner(cfg)), // Dynamic Boost configs Override(new(dtypes.ConsiderOnlineStorageDealsConfigFunc), modules.NewConsiderOnlineStorageDealsConfigFunc), diff --git a/node/config/def.go b/node/config/def.go index 817e7411f..7348db15c 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -87,6 +87,7 @@ func DefaultBoost() *Boost { ServiceApiInfo: "", ServiceRPCTimeout: Duration(15 * time.Minute), EnablePieceDoctor: true, + LidCleanupInterval: Duration(6 * time.Hour), }, ContractDeals: ContractDealsConfig{ diff --git a/node/config/doc_gen.go b/node/config/doc_gen.go index b846e018a..73861abee 100644 --- a/node/config/doc_gen.go +++ b/node/config/doc_gen.go @@ -662,6 +662,14 @@ Set this value to "" if the local index directory data service is embedded.`, Comment: `PieceDoctor runs a continuous background process to check each piece in LID for retrievability`, }, + { + Name: "LidCleanupInterval", + Type: "Duration", + + Comment: `Interval at which LID clean up job should rerun. The cleanup entails removing indices and metadata +for the expired/slashed deals. Disabled if set to '0s'. Please DO NOT set a value lower than 6 hours +as this task consumes considerable resources and time`, + }, }, "LocalIndexDirectoryLeveldbConfig": []DocField{ { diff --git a/node/config/types.go b/node/config/types.go index d6717ce39..872f4f90f 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -289,6 +289,10 @@ type LocalIndexDirectoryConfig struct { ServiceRPCTimeout Duration // PieceDoctor runs a continuous background process to check each piece in LID for retrievability EnablePieceDoctor bool + // Interval at which LID clean up job should rerun. The cleanup entails removing indices and metadata + // for the expired/slashed deals. Disabled if set to '0s'. Please DO NOT set a value lower than 6 hours + // as this task consumes considerable resources and time + LidCleanupInterval Duration } type LocalIndexDirectoryLeveldbConfig struct { diff --git a/node/impl/boost.go b/node/impl/boost.go index 0e23c3215..305b833de 100644 --- a/node/impl/boost.go +++ b/node/impl/boost.go @@ -213,3 +213,11 @@ func (sm *BoostAPI) PdBuildIndexForPieceCid(ctx context.Context, piececid cid.Ci func (sm *BoostAPI) OnlineBackup(ctx context.Context, dstDir string) error { return sm.Bkp.Backup(ctx, dstDir) } + +func (sm *BoostAPI) PdRemoveDealForPiece(ctx context.Context, piececid cid.Cid, dealID string) error { + ctx, span := tracing.Tracer.Start(ctx, "Boost.PdRemoveDealForPiece") + span.SetAttributes(attribute.String("piececid", piececid.String())) + defer span.End() + + return sm.Pd.RemoveDealForPiece(ctx, piececid, dealID) +} diff --git a/piecedirectory/piecedirectory.go b/piecedirectory/piecedirectory.go index ed7e758f5..947f252a4 100644 --- a/piecedirectory/piecedirectory.go +++ b/piecedirectory/piecedirectory.go @@ -156,6 +156,12 @@ func (ps *PieceDirectory) PiecesCount(ctx context.Context, maddr address.Address return ps.store.PiecesCount(ctx, maddr) } +func (ps *PieceDirectory) ListPieces(ctx context.Context) ([]cid.Cid, error) { + defer func(start time.Time) { log.Debugw("piece directory ; PiecesList span", "took", time.Since(start)) }(time.Now()) + + return ps.store.ListPieces(ctx) +} + func (ps *PieceDirectory) ScanProgress(ctx context.Context, maddr address.Address) (*bdtypes.ScanProgress, error) { defer func(start time.Time) { log.Debugw("piece directory ; ScanProgress span", "took", time.Since(start)) }(time.Now())