Skip to content

Commit d2cbc4e

Browse files
authored
refactor PDP to use SP-side pull (#678)
1 parent 713c670 commit d2cbc4e

17 files changed

Lines changed: 710 additions & 881 deletions

cmd/deal/send-manual-pdp.go

Lines changed: 46 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@ import (
1818

1919
var SendManualPDPCmd = &cli.Command{
2020
Name: "send-manual-pdp",
21-
Usage: "Send a manual PDP deal on-chain",
22-
Description: `Create/reuse a proof set and add a piece to it on-chain via PDPVerifier.
23-
Example: singularity deal send-manual-pdp --client f1xxx --provider t410fxxx --piece-cid bagaxxxx --piece-size 1048576 --eth-rpc http://localhost:5700/rpc/v1`,
21+
Usage: "Send a manual PDP deal via the FWSS-pull flow",
22+
Description: `Push a single piece to an SP via Curio's /pdp/piece/pull, then trigger the
23+
SP's on-chain commit (createDataSet+addPieces if no assembling set yet, or addPieces
24+
into the existing one). Useful for e2e/diagnostic testing of the FWSS pull path.
25+
Example: singularity deal send-manual-pdp --client f1xxx --provider t410fxxx --piece-cid bagaxxxx --piece-size 1048576 --eth-rpc http://localhost:5700/rpc/v1 --source-url-base https://static.example.org`,
2426
Flags: []cli.Flag{
2527
&cli.StringFlag{
2628
Name: "client",
@@ -34,12 +36,12 @@ var SendManualPDPCmd = &cli.Command{
3436
},
3537
&cli.StringFlag{
3638
Name: "piece-cid",
37-
Usage: "Piece CID (commp)",
39+
Usage: "Piece CID (commp v1)",
3840
Required: true,
3941
},
4042
&cli.Int64Flag{
4143
Name: "piece-size",
42-
Usage: "Piece size in bytes",
44+
Usage: "Padded piece size in bytes",
4345
Required: true,
4446
},
4547
&cli.StringFlag{
@@ -48,10 +50,21 @@ var SendManualPDPCmd = &cli.Command{
4850
EnvVars: []string{"ETH_RPC_URL"},
4951
Required: true,
5052
},
51-
&cli.Uint64Flag{
52-
Name: "confirmation-depth",
53-
Usage: "Blocks to wait for tx confirmation",
54-
Value: 5,
53+
&cli.StringFlag{
54+
Name: "source-url-base",
55+
Usage: "HTTPS base where Curio fetches the piece (sourceUrl = <base>/piece/<pieceCidV2>)",
56+
EnvVars: []string{"PDP_SOURCE_URL_BASE"},
57+
Required: true,
58+
},
59+
&cli.StringFlag{
60+
Name: "record-keeper",
61+
Usage: "FWSS contract address. Defaults to network FWSS from go-synapse.",
62+
EnvVars: []string{"PDP_RECORD_KEEPER"},
63+
},
64+
&cli.DurationFlag{
65+
Name: "pull-timeout",
66+
Usage: "How long to wait for Curio to finish each phase",
67+
Value: 5 * time.Minute,
5568
},
5669
},
5770
Action: func(c *cli.Context) error {
@@ -61,13 +74,17 @@ var SendManualPDPCmd = &cli.Command{
6174
}
6275
defer closer.Close()
6376

64-
pdp, err := dealpusher.NewOnChainPDP(c.Context, db, c.String("eth-rpc"))
77+
pdp, err := dealpusher.NewOnChainPDP(c.Context, dealpusher.OnChainPDPConfig{
78+
DB: db,
79+
RPCURL: c.String("eth-rpc"),
80+
SourceURLBase: c.String("source-url-base"),
81+
RecordKeeper: c.String("record-keeper"),
82+
})
6583
if err != nil {
6684
return errors.WithStack(err)
6785
}
6886
defer pdp.Close()
6987

70-
// load wallet
7188
var walletObj model.Wallet
7289
err = db.WithContext(c.Context).Where("address = ?", c.String("client")).First(&walletObj).Error
7390
if errors.Is(err, gorm.ErrRecordNotFound) {
@@ -86,56 +103,40 @@ var SendManualPDPCmd = &cli.Command{
86103
return errors.WithStack(err)
87104
}
88105

89-
provider := c.String("provider")
90-
91-
cfg := dealpusher.PDPSchedulingConfig{
92-
BatchSize: 1,
93-
MaxPiecesPerProofSet: 1024,
94-
ConfirmationDepth: c.Uint64("confirmation-depth"),
95-
PollingInterval: 5 * time.Second,
96-
}
97-
98-
// ensure proof set exists (or create one)
99-
fmt.Println("ensuring proof set...")
100-
proofSetID, err := pdp.EnsureProofSet(c.Context, evmSigner, provider, cfg)
101-
if err != nil {
102-
return errors.Wrap(err, "failed to ensure proof set")
103-
}
104-
fmt.Printf("proof set ID: %d\n", proofSetID)
105-
106-
// parse piece cid
107106
pieceCID, err := cid.Parse(c.String("piece-cid"))
108107
if err != nil {
109108
return errors.Wrap(err, "invalid piece CID")
110109
}
111-
112-
// add piece to proof set
113-
fmt.Println("submitting add-roots tx...")
114110
pieceSize := c.Int64("piece-size")
115-
queuedTx, err := pdp.QueueAddRoots(c.Context, evmSigner, proofSetID, []cid.Cid{pieceCID}, []int64{pieceSize}, cfg)
116-
if err != nil {
117-
return errors.Wrap(err, "failed to add roots")
111+
112+
cfg := dealpusher.PDPSchedulingConfig{
113+
BatchSize: 1,
114+
MaxPiecesPerProofSet: 1024,
115+
PullTimeout: c.Duration("pull-timeout"),
118116
}
119-
fmt.Printf("tx: %s\n", queuedTx.Hash)
120117

121-
// wait for confirmation
122-
fmt.Println("waiting for confirmation...")
123-
receipt, err := pdp.WaitForConfirmations(c.Context, queuedTx.Hash, cfg.ConfirmationDepth, cfg.PollingInterval)
118+
fmt.Println("pushing piece to SP via /pdp/piece/pull + on-chain commit...")
119+
result, err := pdp.PullPiecesToFWSS(
120+
c.Context,
121+
evmSigner,
122+
c.String("provider"),
123+
[]dealpusher.PDPPieceInput{{PieceCID: pieceCID, PieceSize: pieceSize}},
124+
cfg,
125+
)
124126
if err != nil {
125-
return errors.Wrap(err, "tx failed")
127+
return errors.Wrap(err, "FWSS pull push failed")
126128
}
127-
fmt.Printf("confirmed at block %d (gas: %d)\n", receipt.BlockNumber, receipt.GasUsed)
129+
fmt.Printf("data set ID: %d\n", result.DataSetID)
128130

129-
// save deal record
130-
proofSetIDCopy := proofSetID
131+
dataSetIDCopy := result.DataSetID
131132
dealModel := &model.Deal{
132133
State: model.DealProposed,
133134
DealType: model.DealTypePDP,
134-
Provider: provider,
135+
Provider: c.String("provider"),
135136
PieceCID: model.CID(pieceCID),
136137
PieceSize: pieceSize,
137138
WalletID: &walletObj.ID,
138-
ProofSetID: &proofSetIDCopy,
139+
ProofSetID: &dataSetIDCopy,
139140
}
140141
if err := db.WithContext(c.Context).Create(dealModel).Error; err != nil {
141142
return errors.Wrap(err, "failed to save deal")

cmd/run/dealpusher.go

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -29,23 +29,28 @@ var DealPusherCmd = &cli.Command{
2929
},
3030
&cli.IntFlag{
3131
Name: "pdp-batch-size",
32-
Usage: "Number of roots to include in each PDP add-roots transaction",
32+
Usage: "Number of pieces to include in each /pdp/piece/pull request",
3333
Value: 128,
3434
},
3535
&cli.IntFlag{
3636
Name: "pdp-max-pieces-per-proofset",
37-
Usage: "Maximum pieces per proof set before handing off to the storage provider",
37+
Usage: "Maximum pieces per proof set before starting a new one",
3838
Value: 1024,
3939
},
40-
&cli.Uint64Flag{
41-
Name: "pdp-confirmation-depth",
42-
Usage: "Number of block confirmations required for PDP transactions",
43-
Value: 5,
44-
},
4540
&cli.DurationFlag{
46-
Name: "pdp-poll-interval",
47-
Usage: "Polling interval for PDP transaction confirmation checks",
48-
Value: 30 * time.Second,
41+
Name: "pdp-pull-timeout",
42+
Usage: "How long to wait for Curio to finish pulling a batch (per request)",
43+
Value: 5 * time.Minute,
44+
},
45+
&cli.StringFlag{
46+
Name: "pdp-source-url-base",
47+
Usage: "HTTPS base URL where Curio fetches pieces from; sourceUrl is built as <base>/piece/<pieceCid>",
48+
EnvVars: []string{"PDP_SOURCE_URL_BASE"},
49+
},
50+
&cli.StringFlag{
51+
Name: "pdp-record-keeper",
52+
Usage: "FWSS contract address (recordKeeper). Defaults to the network default from go-synapse.",
53+
EnvVars: []string{"PDP_RECORD_KEEPER"},
4954
},
5055
&cli.StringFlag{
5156
Name: "eth-rpc",
@@ -114,8 +119,7 @@ var DealPusherCmd = &cli.Command{
114119
pdpCfg := dealpusher.PDPSchedulingConfig{
115120
BatchSize: c.Int("pdp-batch-size"),
116121
MaxPiecesPerProofSet: c.Int("pdp-max-pieces-per-proofset"),
117-
ConfirmationDepth: c.Uint64("pdp-confirmation-depth"),
118-
PollingInterval: c.Duration("pdp-poll-interval"),
122+
PullTimeout: c.Duration("pdp-pull-timeout"),
119123
}
120124
if err := pdpCfg.Validate(); err != nil {
121125
return errors.WithStack(err)
@@ -125,16 +129,19 @@ var DealPusherCmd = &cli.Command{
125129
dealpusher.WithPDPSchedulingConfig(pdpCfg),
126130
}
127131
if rpcURL := c.String("eth-rpc"); rpcURL != "" {
128-
pdpAdapter, err := dealpusher.NewOnChainPDP(c.Context, db, rpcURL)
132+
adapterCfg := dealpusher.OnChainPDPConfig{
133+
DB: db,
134+
RPCURL: rpcURL,
135+
SourceURLBase: c.String("pdp-source-url-base"),
136+
RecordKeeper: c.String("pdp-record-keeper"),
137+
}
138+
pdpAdapter, err := dealpusher.NewOnChainPDP(c.Context, adapterCfg)
129139
if err != nil {
130140
return errors.Wrap(err, "failed to initialize PDP on-chain adapter")
131141
}
132142
defer pdpAdapter.Close()
133143

134-
opts = append(opts,
135-
dealpusher.WithPDPProofSetManager(pdpAdapter),
136-
dealpusher.WithPDPTransactionConfirmer(pdpAdapter),
137-
)
144+
opts = append(opts, dealpusher.WithPDPProofSetManager(pdpAdapter))
138145
}
139146

140147
if ddoContract := c.String("ddo-contract"); ddoContract != "" {

docs/en/cli-reference/deal/README.md

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

docs/en/cli-reference/deal/send-manual-pdp.md

Lines changed: 15 additions & 11 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

docs/en/cli-reference/run/deal-pusher.md

Lines changed: 5 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ require (
88
github.com/bcicen/jstream v1.0.1
99
github.com/brianvoe/gofakeit/v6 v6.23.2
1010
github.com/cockroachdb/errors v1.11.3
11-
github.com/data-preservation-programs/go-synapse v0.0.0-20260326143334-8c86c7fc3b17
11+
github.com/data-preservation-programs/go-synapse v0.0.0-20260503115913-40ae25e74ba1
1212
github.com/data-preservation-programs/table v0.0.3
1313
github.com/dustin/go-humanize v1.0.1
1414
github.com/ethereum/go-ethereum v1.14.12

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -275,8 +275,8 @@ github.com/cronokirby/saferith v0.33.0 h1:TgoQlfsD4LIwx71+ChfRcIpjkw+RPOapDEVxa+
275275
github.com/cronokirby/saferith v0.33.0/go.mod h1:QKJhjoqUtBsXCAVEjw38mFqoi7DebT7kthcD7UzbnoA=
276276
github.com/cskr/pubsub v1.0.2 h1:vlOzMhl6PFn60gRlTQQsIfVwaPB/B/8MziK8FhEPt/0=
277277
github.com/cskr/pubsub v1.0.2/go.mod h1:/8MzYXk/NJAz782G8RPkFzXTZVu63VotefPnR9TIRis=
278-
github.com/data-preservation-programs/go-synapse v0.0.0-20260326143334-8c86c7fc3b17 h1:pTOIdr5W7pHrxFaQdONxHWiCtIOgdc6zfKL82SS4xhI=
279-
github.com/data-preservation-programs/go-synapse v0.0.0-20260326143334-8c86c7fc3b17/go.mod h1:qgzPsiGWjTPT/oACA6Uj1+WsASwsYFW/iJ8AWacJdjc=
278+
github.com/data-preservation-programs/go-synapse v0.0.0-20260503115913-40ae25e74ba1 h1:SGQs5b7eyjycfxKDRHmzZW613BjjRkDT0XITafKK50A=
279+
github.com/data-preservation-programs/go-synapse v0.0.0-20260503115913-40ae25e74ba1/go.mod h1:qgzPsiGWjTPT/oACA6Uj1+WsASwsYFW/iJ8AWacJdjc=
280280
github.com/data-preservation-programs/table v0.0.3 h1:hboeauxPXybE8KlMA+RjDXz/J4xaG5CAFCcxyOm8yWo=
281281
github.com/data-preservation-programs/table v0.0.3/go.mod h1:sRGP/IuuqFc/y9QfmDyb5h6Q2wrnhhnBofEOj9aDRJg=
282282
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=

model/replication.go

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -219,14 +219,22 @@ const (
219219
// This is a materialized view built from Shovel-indexed events, replacing
220220
// the per-cycle RPC scans of GetProofSets/GetProofSetsForClient.
221221
type PDPProofSet struct {
222-
SetID uint64 `gorm:"primaryKey;autoIncrement:false" json:"setId"`
223-
ClientAddress string `gorm:"not null;index" json:"clientAddress"`
224-
Provider string `gorm:"not null" json:"provider"`
225-
IsLive bool `gorm:"default:false" json:"isLive"`
226-
ChallengeEpoch *int64 ` json:"challengeEpoch,omitempty"`
227-
CreatedBlock int64 `gorm:"not null" json:"createdBlock"`
228-
Deleted bool `gorm:"default:false" json:"deleted"`
229-
HandoffState ProofSetHandoffState `gorm:"default:'assembling'" json:"handoffState"`
230-
ProposedProviderEVM string ` json:"proposedProviderEVM,omitempty"`
231-
PieceCount int `gorm:"default:0" json:"pieceCount"`
222+
SetID uint64 `gorm:"primaryKey;autoIncrement:false" json:"setId"`
223+
ClientAddress string `gorm:"not null;index" json:"clientAddress"`
224+
Provider string `gorm:"not null" json:"provider"`
225+
IsLive bool `gorm:"default:false" json:"isLive"`
226+
ChallengeEpoch *int64 ` json:"challengeEpoch,omitempty"`
227+
CreatedBlock int64 `gorm:"not null" json:"createdBlock"`
228+
Deleted bool `gorm:"default:false" json:"deleted"`
229+
HandoffState ProofSetHandoffState `gorm:"default:'assembling'" json:"handoffState"`
230+
PieceCount int `gorm:"default:0" json:"pieceCount"`
231+
// ClientDataSetID is the per-(payer, set) nonce the client signs into
232+
// CreateDataSet/AddPieces extraData. FWSS rejects reused IDs via its
233+
// clientNonces[payer][id] check, so we persist before signing to make
234+
// retries idempotent. Stored decimal because uint256 doesn't fit any
235+
// native int.
236+
ClientDataSetID string ` json:"clientDataSetId,omitempty"`
237+
// ServiceURL caches the SP's PDP HTTP endpoint, fetched from
238+
// ServiceProviderRegistry the first time we push to this provider.
239+
ServiceURL string ` json:"serviceUrl,omitempty"`
232240
}

service/dealpusher/dealpusher.go

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -40,15 +40,14 @@ var waitPendingInterval = time.Minute
4040

4141
// DealPusher represents a struct that encapsulates the data and functionality related to pushing deals in a replication process.
4242
type DealPusher struct {
43-
dbNoContext *gorm.DB // Pointer to a gorm.DB object representing a database connection.
44-
keyStore keystore.KeyStore // Keystore for loading private keys
45-
lotusClient jsonrpc.RPCClient // Lotus JSON-RPC client for chain queries
46-
dealMaker replication.DealMaker // Object responsible for making a deal in replication.
47-
pdpProofSetManager PDPProofSetManager // Optional PDP proof set lifecycle manager.
48-
pdpTxConfirmer PDPTransactionConfirmer // Optional PDP transaction confirmer.
49-
pdpSchedulingConfig PDPSchedulingConfig // PDP scheduling config for root batching and tx confirmation.
50-
ddoDealManager DDODealManager // Optional DDO deal lifecycle manager.
51-
ddoSchedulingConfig DDOSchedulingConfig // DDO scheduling config for allocation batching and tx confirmation.
43+
dbNoContext *gorm.DB // Pointer to a gorm.DB object representing a database connection.
44+
keyStore keystore.KeyStore // Keystore for loading private keys
45+
lotusClient jsonrpc.RPCClient // Lotus JSON-RPC client for chain queries
46+
dealMaker replication.DealMaker // Object responsible for making a deal in replication.
47+
pdpProofSetManager PDPProofSetManager // Optional PDP proof set lifecycle manager.
48+
pdpSchedulingConfig PDPSchedulingConfig // PDP scheduling config for batch sizing and pull timeouts.
49+
ddoDealManager DDODealManager // Optional DDO deal lifecycle manager.
50+
ddoSchedulingConfig DDOSchedulingConfig // DDO scheduling config for allocation batching and tx confirmation.
5251
// Resolver is injected so tests and future wiring can switch deal type behavior without coupling DealPusher to config storage.
5352
scheduleDealTypeResolver func(schedule *model.Schedule) model.DealType
5453
workerID uuid.UUID // UUID identifying the associated worker.

service/dealpusher/options.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,6 @@ func WithPDPProofSetManager(manager PDPProofSetManager) Option {
1111
}
1212
}
1313

14-
func WithPDPTransactionConfirmer(confirmer PDPTransactionConfirmer) Option {
15-
return func(d *DealPusher) {
16-
d.pdpTxConfirmer = confirmer
17-
}
18-
}
19-
2014
func WithPDPSchedulingConfig(cfg PDPSchedulingConfig) Option {
2115
return func(d *DealPusher) {
2216
d.pdpSchedulingConfig = cfg

0 commit comments

Comments
 (0)