Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 39 additions & 9 deletions pdp/abi.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,23 @@
package pdp

import (
"encoding/hex"
"fmt"
"math/big"
"strings"

"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/common"
)



var (
addressType, _ = abi.NewType("address", "", nil)
uint256Type, _ = abi.NewType("uint256", "", nil)
stringArrayType, _ = abi.NewType("string[]", "", nil)
addressType, _ = abi.NewType("address", "", nil)
uint256Type, _ = abi.NewType("uint256", "", nil)
stringArrayType, _ = abi.NewType("string[]", "", nil)
stringArray2DType, _ = abi.NewType("string[][]", "", nil)
bytesType, _ = abi.NewType("bytes", "", nil)
bytesType, _ = abi.NewType("bytes", "", nil)
)


func EncodeDataSetCreateData(payer common.Address, clientDataSetID *big.Int, metadata []MetadataEntry, signature []byte) (string, error) {
keys := make([]string, len(metadata))
values := make([]string, len(metadata))
Expand All @@ -43,7 +42,6 @@ func EncodeDataSetCreateData(payer common.Address, clientDataSetID *big.Int, met
return "0x" + common.Bytes2Hex(encoded), nil
}


func EncodeAddPiecesExtraData(nonce *big.Int, metadata [][]MetadataEntry, signature []byte) (string, error) {
keys := make([][]string, len(metadata))
values := make([][]string, len(metadata))
Expand Down Expand Up @@ -71,7 +69,6 @@ func EncodeAddPiecesExtraData(nonce *big.Int, metadata [][]MetadataEntry, signat
return "0x" + common.Bytes2Hex(encoded), nil
}


func EncodeScheduleRemovalsExtraData(signature []byte) (string, error) {
args := abi.Arguments{
{Type: bytesType},
Expand All @@ -84,3 +81,36 @@ func EncodeScheduleRemovalsExtraData(signature []byte) (string, error) {

return "0x" + common.Bytes2Hex(encoded), nil
}

// EncodeCreateDataSetAndAddPiecesExtraData wraps the two extraData blobs
// (from EncodeDataSetCreateData and EncodeAddPiecesExtraData) into the
// combined abi.encode(bytes,bytes) form Curio's /pdp/piece/pull expects
// when atomically creating a new data set and adding pieces in one shot.
// Inputs are hex strings (with or without 0x prefix), as produced by the
// sibling encoders in this file.
func EncodeCreateDataSetAndAddPiecesExtraData(createDataSetExtraHex, addPiecesExtraHex string) (string, error) {
createBytes, err := decodeHex(createDataSetExtraHex)
if err != nil {
return "", fmt.Errorf("invalid createDataSet extra data: %w", err)
}
addPiecesBytes, err := decodeHex(addPiecesExtraHex)
if err != nil {
return "", fmt.Errorf("invalid addPieces extra data: %w", err)
}

args := abi.Arguments{
{Type: bytesType},
{Type: bytesType},
}

encoded, err := args.Pack(createBytes, addPiecesBytes)
if err != nil {
return "", fmt.Errorf("failed to encode create-and-add extra data: %w", err)
}

return "0x" + common.Bytes2Hex(encoded), nil
}

func decodeHex(s string) ([]byte, error) {
return hex.DecodeString(strings.TrimPrefix(s, "0x"))
}
134 changes: 134 additions & 0 deletions pdp/abi_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package pdp

import (
"encoding/hex"
"math/big"
"strings"
"testing"

"github.com/ethereum/go-ethereum/accounts/abi"
)

func TestEncodeCreateDataSetAndAddPiecesExtraData(t *testing.T) {
t.Run("round-trips through abi.Unpack", func(t *testing.T) {
create := []byte{0xde, 0xad, 0xbe, 0xef}
add := []byte{0xfe, 0xed, 0xfa, 0xce, 0xca, 0xfe}

out, err := EncodeCreateDataSetAndAddPiecesExtraData(
"0x"+hex.EncodeToString(create),
"0x"+hex.EncodeToString(add),
)
if err != nil {
t.Fatalf("encode failed: %v", err)
}
if !strings.HasPrefix(out, "0x") {
t.Fatalf("output missing 0x prefix: %s", out)
}

raw, err := hex.DecodeString(out[2:])
if err != nil {
t.Fatalf("decode output: %v", err)
}

args := abi.Arguments{
{Type: bytesType},
{Type: bytesType},
}
unpacked, err := args.Unpack(raw)
if err != nil {
t.Fatalf("unpack: %v", err)
}
if len(unpacked) != 2 {
t.Fatalf("expected 2 fields, got %d", len(unpacked))
}
gotCreate, ok := unpacked[0].([]byte)
if !ok {
t.Fatalf("first field not []byte: %T", unpacked[0])
}
gotAdd, ok := unpacked[1].([]byte)
if !ok {
t.Fatalf("second field not []byte: %T", unpacked[1])
}
if string(gotCreate) != string(create) {
t.Errorf("createDataSet round-trip mismatch: got %x, want %x", gotCreate, create)
}
if string(gotAdd) != string(add) {
t.Errorf("addPieces round-trip mismatch: got %x, want %x", gotAdd, add)
}
})

t.Run("accepts inputs without 0x prefix", func(t *testing.T) {
_, err := EncodeCreateDataSetAndAddPiecesExtraData("deadbeef", "feedface")
if err != nil {
t.Fatalf("expected no-prefix inputs to work, got %v", err)
}
})

t.Run("rejects non-hex input", func(t *testing.T) {
_, err := EncodeCreateDataSetAndAddPiecesExtraData("0xnothex!", "0xdeadbeef")
if err == nil {
t.Error("expected error on non-hex createDataSet input")
}
_, err = EncodeCreateDataSetAndAddPiecesExtraData("0xdeadbeef", "0xnothex!")
if err == nil {
t.Error("expected error on non-hex addPieces input")
}
})

t.Run("round-trips real CreateDataSet+AddPieces extras", func(t *testing.T) {
// produce a CreateDataSet extraData and an AddPieces extraData via
// the sibling encoders, then wrap. Verifies that the canonical caller
// path (sign -> encode each -> wrap combined) round-trips cleanly.
auth := testAuthHelper(t)
clientDataSetID := big.NewInt(1)
payee := auth.Address()

createSig, err := auth.SignCreateDataSet(clientDataSetID, payee, nil)
if err != nil {
t.Fatalf("sign create: %v", err)
}
createExtra, err := EncodeDataSetCreateData(payee, clientDataSetID, nil, createSig.Signature)
if err != nil {
t.Fatalf("encode create extra: %v", err)
}

nonce := big.NewInt(42)
addSig, err := auth.SignAddPieces(clientDataSetID, nonce, nil, nil)
if err != nil {
t.Fatalf("sign add: %v", err)
}
addExtra, err := EncodeAddPiecesExtraData(nonce, nil, addSig.Signature)
if err != nil {
t.Fatalf("encode add extra: %v", err)
}

combined, err := EncodeCreateDataSetAndAddPiecesExtraData(createExtra, addExtra)
if err != nil {
t.Fatalf("combine: %v", err)
}
if !strings.HasPrefix(combined, "0x") {
t.Fatalf("combined missing 0x prefix: %s", combined)
}

raw, err := hex.DecodeString(combined[2:])
if err != nil {
t.Fatalf("decode combined: %v", err)
}
args := abi.Arguments{
{Type: bytesType},
{Type: bytesType},
}
unpacked, err := args.Unpack(raw)
if err != nil {
t.Fatalf("unpack combined: %v", err)
}
gotCreateHex := "0x" + hex.EncodeToString(unpacked[0].([]byte))
gotAddHex := "0x" + hex.EncodeToString(unpacked[1].([]byte))
if gotCreateHex != createExtra {
t.Errorf("createDataSet extra mismatch")
}
if gotAddHex != addExtra {
t.Errorf("addPieces extra mismatch")
}
})
}
86 changes: 73 additions & 13 deletions pdp/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ type Server struct {
uploadClientVal *http.Client
}


func NewServer(baseURL string, authHelper *AuthHelper) *Server {
baseURL = strings.TrimSuffix(baseURL, "/")

Expand All @@ -51,12 +50,10 @@ func (s *Server) uploadClient() *http.Client {
return s.uploadClientVal
}


func (s *Server) BaseURL() string {
return s.baseURL
}


func (s *Server) CreateDataSet(ctx context.Context, recordKeeper string, extraData string) (*CreateDataSetResponse, error) {
reqBody := map[string]string{
"recordKeeper": recordKeeper,
Expand Down Expand Up @@ -104,7 +101,6 @@ func (s *Server) CreateDataSet(ctx context.Context, recordKeeper string, extraDa
}, nil
}


func (s *Server) GetDataSetCreationStatus(ctx context.Context, txHash string) (*DataSetCreationStatus, error) {
req, err := http.NewRequestWithContext(ctx, "GET", s.baseURL+"/pdp/data-sets/created/"+txHash, nil)
if err != nil {
Expand Down Expand Up @@ -135,7 +131,6 @@ func (s *Server) GetDataSetCreationStatus(ctx context.Context, txHash string) (*
return &status, nil
}


func (s *Server) WaitForDataSetCreation(ctx context.Context, txHash string, timeout time.Duration) (*DataSetCreationStatus, error) {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
Expand All @@ -155,7 +150,6 @@ func (s *Server) WaitForDataSetCreation(ctx context.Context, txHash string, time
return status, nil
}


func (s *Server) AddPieces(ctx context.Context, dataSetID int, pieceCIDs []cid.Cid, extraData string) (*AddPiecesResponse, error) {
pieces := make([]PieceData, len(pieceCIDs))
for i, c := range pieceCIDs {
Expand Down Expand Up @@ -213,7 +207,6 @@ func (s *Server) AddPieces(ctx context.Context, dataSetID int, pieceCIDs []cid.C
}, nil
}


func (s *Server) GetPieceAdditionStatus(ctx context.Context, dataSetID int, txHash string) (*PieceAdditionStatus, error) {
url := fmt.Sprintf("%s/pdp/data-sets/%d/pieces/added/%s", s.baseURL, dataSetID, txHash)
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
Expand Down Expand Up @@ -245,7 +238,6 @@ func (s *Server) GetPieceAdditionStatus(ctx context.Context, dataSetID int, txHa
return &status, nil
}


func (s *Server) WaitForPieceAddition(ctx context.Context, dataSetID int, txHash string, timeout time.Duration) (*PieceAdditionStatus, error) {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
Expand All @@ -265,7 +257,6 @@ func (s *Server) WaitForPieceAddition(ctx context.Context, dataSetID int, txHash
return status, nil
}


func (s *Server) UploadPiece(ctx context.Context, data io.Reader, size int64, pieceCID cid.Cid) (*UploadPieceResponse, error) {
createReq, err := http.NewRequestWithContext(ctx, "POST", s.baseURL+"/pdp/piece/uploads", nil)
if err != nil {
Expand Down Expand Up @@ -345,7 +336,6 @@ func (s *Server) UploadPiece(ctx context.Context, data io.Reader, size int64, pi
}, nil
}


func (s *Server) FindPiece(ctx context.Context, pieceCID cid.Cid) error {
params := url.Values{}
params.Set("pieceCid", pieceCID.String())
Expand Down Expand Up @@ -374,7 +364,6 @@ func (s *Server) FindPiece(ctx context.Context, pieceCID cid.Cid) error {
return nil
}


func (s *Server) WaitForPiece(ctx context.Context, pieceCID cid.Cid, timeout time.Duration) error {
return retry.Poll(ctx, 5*time.Second, timeout, func() (bool, error) {
err := s.FindPiece(ctx, pieceCID)
Expand All @@ -388,7 +377,6 @@ func (s *Server) WaitForPiece(ctx context.Context, pieceCID cid.Cid, timeout tim
})
}


func (s *Server) DownloadPiece(ctx context.Context, pieceCID cid.Cid) ([]byte, error) {
reqURL := fmt.Sprintf("%s/pdp/piece/%s", s.baseURL, pieceCID.String())
req, err := http.NewRequestWithContext(ctx, "GET", reqURL, nil)
Expand All @@ -414,7 +402,6 @@ func (s *Server) DownloadPiece(ctx context.Context, pieceCID cid.Cid) ([]byte, e
return io.ReadAll(resp.Body)
}


func (s *Server) GetDataSet(ctx context.Context, dataSetID int) (*DataSetData, error) {
reqURL := fmt.Sprintf("%s/pdp/data-sets/%d", s.baseURL, dataSetID)
req, err := http.NewRequestWithContext(ctx, "GET", reqURL, nil)
Expand Down Expand Up @@ -445,6 +432,79 @@ func (s *Server) GetDataSet(ctx context.Context, dataSetID int) (*DataSetData, e
return &data, nil
}

// PullPieces issues POST /pdp/piece/pull. The endpoint is idempotent on
// (service, sha256(extraData), dataSetId, recordKeeper); calling with the
// same arguments returns the current status of an existing pull rather
// than starting a new one. Authorization is the EIP-712-signed extraData,
// which Curio verifies via eth_call against PDPVerifier.addPieces() before
// any state change. Pass DataSetID == 0 to atomically create a new data
// set and add the pieces in one shot.
func (s *Server) PullPieces(ctx context.Context, opts PullPiecesOptions) (*PullPiecesResponse, error) {
reqBody := PullPiecesRequest{
ExtraData: opts.ExtraData,
RecordKeeper: opts.RecordKeeper,
Pieces: opts.Pieces,
}
if opts.DataSetID > 0 {
id := opts.DataSetID
reqBody.DataSetID = &id
}

body, err := json.Marshal(reqBody)
if err != nil {
return nil, fmt.Errorf("failed to marshal pull pieces request: %w", err)
}

req, err := http.NewRequestWithContext(ctx, "POST", s.baseURL+"/pdp/piece/pull", bytes.NewReader(body))
if err != nil {
return nil, fmt.Errorf("failed to create pull pieces request: %w", err)
}
req.Header.Set("Content-Type", "application/json")

resp, err := s.httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("pull pieces request failed: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated && resp.StatusCode != http.StatusAccepted {
respBody, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("unexpected status %d: %s", resp.StatusCode, string(respBody))
}

var pullResp PullPiecesResponse
if err := json.NewDecoder(resp.Body).Decode(&pullResp); err != nil {
return nil, fmt.Errorf("failed to decode pull pieces response: %w", err)
}

return &pullResp, nil
}

// WaitForPullPieces re-POSTs the same pull request (idempotent) until the
// aggregate status is complete or failed, or the timeout elapses.
func (s *Server) WaitForPullPieces(ctx context.Context, opts PullPiecesOptions, timeout time.Duration) (*PullPiecesResponse, error) {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

var last *PullPiecesResponse
err := retry.Poll(ctx, 4*time.Second, timeout, func() (bool, error) {
resp, err := s.PullPieces(ctx, opts)
if err != nil {
return false, err
}
last = resp
switch resp.Status {
case PullStatusComplete, PullStatusFailed:
return true, nil
default:
return false, nil
}
})
if err != nil {
return last, err
}
return last, nil
}

func (s *Server) Ping(ctx context.Context) error {
req, err := http.NewRequestWithContext(ctx, "GET", s.baseURL+"/pdp/ping", nil)
Expand Down
Loading
Loading