Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions pkg/codec/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ type Block struct {

// EncodeRequest represents the request to encode a file.
type EncodeRequest struct {
TaskID string
Data []byte
TaskID string
Path string
DataSize int
}

// RaptorQ contains methods for request services from RaptorQ service.
Expand Down
49 changes: 18 additions & 31 deletions pkg/codec/raptorq.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,61 +24,48 @@ func NewRaptorQCodec(dir string) Codec {

func (rq *raptorQ) Encode(ctx context.Context, req EncodeRequest) (EncodeResponse, error) {
/* ---------- 1. initialise RaptorQ processor ---------- */
fields := logtrace.Fields{
logtrace.FieldMethod: "Encode",
logtrace.FieldModule: "rq",
logtrace.FieldTaskID: req.TaskID,
"path": req.Path,
"data-size": req.DataSize,
}

processor, err := raptorq.NewDefaultRaptorQProcessor()
if err != nil {
return EncodeResponse{}, fmt.Errorf("create RaptorQ processor: %w", err)
}
defer processor.Free()
logtrace.Info(ctx, "RaptorQ processor created", fields)

logtrace.Info(ctx, "RaptorQ processor created", logtrace.Fields{
"data-size": len(req.Data)})
/* ---------- 2. persist req.Data to a temp file ---------- */

tmp, err := os.CreateTemp("", "rq-encode-*")
if err != nil {
return EncodeResponse{}, fmt.Errorf("create temp file: %w", err)
}
tmpPath := tmp.Name()
if _, err := tmp.Write(req.Data); err != nil {
tmp.Close()
os.Remove(tmpPath)
return EncodeResponse{}, fmt.Errorf("write temp file: %w", err)
}
if err := tmp.Close(); err != nil { // sync to disk
os.Remove(tmpPath)
return EncodeResponse{}, fmt.Errorf("close temp file: %w", err)
}

/* ---------- 3. run the encoder ---------- */

blockSize := processor.GetRecommendedBlockSize(uint64(len(req.Data)))
/* ---------- 1. run the encoder ---------- */
blockSize := processor.GetRecommendedBlockSize(uint64(req.DataSize))

symbolsDir := filepath.Join(rq.symbolsBaseDir, req.TaskID)
if err := os.MkdirAll(symbolsDir, 0o755); err != nil {
os.Remove(tmpPath)
fields[logtrace.FieldError] = err.Error()
os.Remove(req.Path)
return EncodeResponse{}, fmt.Errorf("mkdir %s: %w", symbolsDir, err)
}
logtrace.Info(ctx, "RaptorQ processor encoding", fields)

logtrace.Info(ctx, "RaptorQ processor encoding", logtrace.Fields{
"symbols-dir": symbolsDir,
"temp-file": tmpPath})

resp, err := processor.EncodeFile(tmpPath, symbolsDir, blockSize)
resp, err := processor.EncodeFile(req.Path, symbolsDir, blockSize)
if err != nil {
os.Remove(tmpPath)
fields[logtrace.FieldError] = err.Error()
os.Remove(req.Path)
return EncodeResponse{}, fmt.Errorf("raptorq encode: %w", err)
}

/* we no longer need the temp file */
// _ = os.Remove(tmpPath)

/* ---------- 4. read the layout JSON ---------- */
/* ---------- 2. read the layout JSON ---------- */
layoutData, err := os.ReadFile(resp.LayoutFilePath)

logtrace.Info(ctx, "RaptorQ processor layout file", logtrace.Fields{
"layout-file": resp.LayoutFilePath})
if err != nil {
fields[logtrace.FieldError] = err.Error()
return EncodeResponse{}, fmt.Errorf("read layout %s: %w", resp.LayoutFilePath, err)
}

Expand Down
1 change: 1 addition & 0 deletions pkg/logtrace/fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ const (
FieldTxHash = "tx_hash"
FieldTaskID = "task_id"
FieldActionID = "action_id"
FieldHashHex = "hash_hex"
)
98 changes: 88 additions & 10 deletions supernode/node/action/server/cascade/cascade_action_server.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
package cascade

import (
"encoding/hex"
"fmt"
"github.com/LumeraProtocol/supernode/pkg/errors"
"google.golang.org/grpc"
"io"
"lukechampine.com/blake3"
"os"
"path/filepath"

pb "github.com/LumeraProtocol/supernode/gen/supernode/action/cascade"
"github.com/LumeraProtocol/supernode/pkg/logtrace"
cascadeService "github.com/LumeraProtocol/supernode/supernode/services/cascade"

"google.golang.org/grpc"
)

type ActionServer struct {
Expand All @@ -34,9 +38,24 @@ func (server *ActionServer) Register(stream pb.CascadeService_RegisterServer) er
ctx := stream.Context()
logtrace.Info(ctx, "client streaming request to upload cascade input data received", fields)

// Collect data chunks
var allData []byte
var metadata *pb.Metadata
var (
metadata *pb.Metadata
totalSize int
)

hasher, tempFile, tempFilePath, err := initializeHasherAndTempFile()
if err != nil {
fields[logtrace.FieldError] = err.Error()
logtrace.Error(ctx, "failed to initialize hasher and temp file", fields)
return fmt.Errorf("initializing hasher and temp file: %w", err)
}
defer func(tempFile *os.File) {
err := tempFile.Close()
if err != nil && !errors.Is(err, os.ErrClosed) {
fields[logtrace.FieldError] = err.Error()
logtrace.Warn(ctx, "error closing temp file", fields)
}
}(tempFile)

// Process incoming stream
for {
Expand All @@ -55,11 +74,27 @@ func (server *ActionServer) Register(stream pb.CascadeService_RegisterServer) er
switch x := req.RequestType.(type) {
case *pb.RegisterRequest_Chunk:
if x.Chunk != nil {
// Add data chunk to our collection
allData = append(allData, x.Chunk.Data...)

// hash the chunks
_, err := hasher.Write(x.Chunk.Data)
if err != nil {
fields[logtrace.FieldError] = err.Error()
logtrace.Error(ctx, "failed to write chunk to hasher", fields)
return fmt.Errorf("hashing error: %w", err)
}

// write chunks to the file
_, err = tempFile.Write(x.Chunk.Data)
if err != nil {
fields[logtrace.FieldError] = err.Error()
logtrace.Error(ctx, "failed to write chunk to file", fields)
return fmt.Errorf("file write error: %w", err)
}
totalSize += len(x.Chunk.Data)

logtrace.Info(ctx, "received data chunk", logtrace.Fields{
"chunk_size": len(x.Chunk.Data),
"total_size_so_far": len(allData),
"total_size_so_far": totalSize,
})
}
case *pb.RegisterRequest_Metadata:
Expand All @@ -81,12 +116,26 @@ func (server *ActionServer) Register(stream pb.CascadeService_RegisterServer) er
fields[logtrace.FieldActionID] = metadata.GetActionId()
logtrace.Info(ctx, "metadata received from action-sdk", fields)

hash := hasher.Sum(nil)
hashHex := hex.EncodeToString(hash)
fields[logtrace.FieldHashHex] = hashHex
logtrace.Info(ctx, "final BLAKE3 hash generated", fields)

targetPath, err := replaceTempDirWithTaskDir(metadata.GetTaskId(), tempFilePath, tempFile)
if err != nil {
fields[logtrace.FieldError] = err.Error()
logtrace.Error(ctx, "failed to replace temp dir with task dir", fields)
return fmt.Errorf("failed to replace temp dir with task dir: %w", err)
}

// Process the complete data
task := server.factory.NewCascadeRegistrationTask()
err := task.Register(ctx, &cascadeService.RegisterRequest{
err = task.Register(ctx, &cascadeService.RegisterRequest{
TaskID: metadata.TaskId,
ActionID: metadata.ActionId,
Data: allData,
DataHash: hash,
DataSize: totalSize,
FilePath: targetPath,
}, func(resp *cascadeService.RegisterResponse) error {
grpcResp := &pb.RegisterResponse{
EventType: pb.SupernodeEventType(resp.EventType),
Expand All @@ -112,3 +161,32 @@ func (server *ActionServer) Register(stream pb.CascadeService_RegisterServer) er
logtrace.Info(ctx, "cascade registration completed successfully", fields)
return nil
}

func initializeHasherAndTempFile() (*blake3.Hasher, *os.File, string, error) {
hasher := blake3.New(32, nil)

tempFilePath := filepath.Join(os.TempDir(), fmt.Sprintf("cascade-upload-%d.tmp", os.Getpid()))
tempFile, err := os.Create(tempFilePath)
if err != nil {
return nil, nil, "", fmt.Errorf("could not create temp file: %w", err)
}

return hasher, tempFile, tempFilePath, nil
}

func replaceTempDirWithTaskDir(taskID, tempFilePath string, tempFile *os.File) (targetPath string, err error) {
if err := tempFile.Close(); err != nil && !errors.Is(err, os.ErrClosed) {
return "", fmt.Errorf("failed to close temp file: %w", err)
}

targetDir := filepath.Join(os.TempDir(), taskID)
if err := os.MkdirAll(targetDir, 0755); err != nil {
return "", fmt.Errorf("could not create task directory: %w", err)
}
targetPath = filepath.Join(targetDir, fmt.Sprintf("uploaded-%s.dat", taskID))
if err := os.Rename(tempFilePath, targetPath); err != nil {
return "", fmt.Errorf("could not move file to final location: %w", err)
}

return targetPath, nil
}
9 changes: 5 additions & 4 deletions supernode/services/cascade/adaptors/rq.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
//
//go:generate mockgen -destination=mocks/rq_mock.go -package=cascadeadaptormocks -source=rq.go
type CodecService interface {
EncodeInput(ctx context.Context, taskID string, data []byte) (EncodeResult, error)
EncodeInput(ctx context.Context, taskID string, path string, dataSize int) (EncodeResult, error)
}

// EncodeResult represents the outcome of encoding the input data.
Expand All @@ -30,10 +30,11 @@ func NewCodecService(codec codec.Codec) CodecService {
}

// EncodeInput encodes the provided data and returns symbols and metadata.
func (c *codecImpl) EncodeInput(ctx context.Context, taskID string, data []byte) (EncodeResult, error) {
func (c *codecImpl) EncodeInput(ctx context.Context, taskID string, path string, dataSize int) (EncodeResult, error) {
resp, err := c.codec.Encode(ctx, codec.EncodeRequest{
TaskID: taskID,
Data: data,
TaskID: taskID,
Path: path,
DataSize: dataSize,
})
if err != nil {
return EncodeResult{}, err
Expand Down
14 changes: 6 additions & 8 deletions supernode/services/cascade/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,7 @@ func (task *CascadeRegistrationTask) decodeCascadeMetadata(ctx context.Context,
return meta, nil
}

func (task *CascadeRegistrationTask) verifyDataHash(ctx context.Context, data []byte, expected string, f logtrace.Fields) error {
dh, _ := utils.Blake3Hash(data)
func (task *CascadeRegistrationTask) verifyDataHash(ctx context.Context, dh []byte, expected string, f logtrace.Fields) error {
b64 := utils.B64Encode(dh)
if string(b64) != expected {
return task.wrapErr(ctx, "data hash doesn't match", errors.New(""), f)
Expand All @@ -79,8 +78,8 @@ func (task *CascadeRegistrationTask) verifyDataHash(ctx context.Context, data []
return nil
}

func (task *CascadeRegistrationTask) encodeInput(ctx context.Context, data []byte, f logtrace.Fields) (*adaptors.EncodeResult, error) {
resp, err := task.rq.EncodeInput(ctx, task.ID(), data)
func (task *CascadeRegistrationTask) encodeInput(ctx context.Context, path string, dataSize int, f logtrace.Fields) (*adaptors.EncodeResult, error) {
resp, err := task.rq.EncodeInput(ctx, task.ID(), path, dataSize)
if err != nil {
return nil, task.wrapErr(ctx, "failed to encode data", err, f)
}
Expand Down Expand Up @@ -201,7 +200,7 @@ func verifyIDs(ticketMetadata, metadata codec.Layout) error {

// verifyActionFee checks if the action fee is sufficient for the given data size
// It fetches action parameters, calculates the required fee, and compares it with the action price
func (task *CascadeRegistrationTask) verifyActionFee(ctx context.Context, action *actiontypes.Action, data []byte, fields logtrace.Fields) error {
func (task *CascadeRegistrationTask) verifyActionFee(ctx context.Context, action *actiontypes.Action, dataSize int, fields logtrace.Fields) error {
// Fetch action parameters
params, err := task.lumeraClient.GetActionParams(ctx)
if err != nil {
Expand All @@ -213,16 +212,15 @@ func (task *CascadeRegistrationTask) verifyActionFee(ctx context.Context, action
feePerByte := params.Params.FeePerByte.Amount

// Calculate per-byte fee based on data size
dataBytes := len(data)
perByteFee := sdk.NewCoin(baseFee.Denom, feePerByte.Mul(math.NewInt(int64(dataBytes))))
perByteFee := sdk.NewCoin(baseFee.Denom, feePerByte.Mul(math.NewInt(int64(dataSize))))

// Calculate total fee
requiredFee := baseFee.Add(perByteFee)

// Log the calculated fee
logtrace.Info(ctx, "calculated required fee", logtrace.Fields{
"fee": requiredFee.String(),
"dataBytes": dataBytes,
"dataBytes": dataSize,
})
// Check if action price is less than required fee
if action.Price.IsLT(requiredFee) {
Expand Down
16 changes: 12 additions & 4 deletions supernode/services/cascade/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cascade

import (
"context"
"os"

"github.com/LumeraProtocol/supernode/pkg/logtrace"
)
Expand All @@ -10,7 +11,9 @@ import (
type RegisterRequest struct {
TaskID string
ActionID string
Data []byte
DataHash []byte
DataSize int
FilePath string
}

// RegisterResponse contains the result of upload
Expand Down Expand Up @@ -57,7 +60,7 @@ func (task *CascadeRegistrationTask) Register(
task.streamEvent(SupernodeEventTypeActionRetrieved, "action has been retrieved", "", send)

/* 2. Verify action fee -------------------------------------------------------- */
if err := task.verifyActionFee(ctx, action, req.Data, fields); err != nil {
if err := task.verifyActionFee(ctx, action, req.DataSize, fields); err != nil {
return err
}
logtrace.Info(ctx, "action fee has been validated", fields)
Expand All @@ -80,14 +83,14 @@ func (task *CascadeRegistrationTask) Register(
task.streamEvent(SupernodeEventTypeMetadataDecoded, "cascade metadata has been decoded", "", send)

/* 5. Verify data hash --------------------------------------------------------- */
if err := task.verifyDataHash(ctx, req.Data, cascadeMeta.DataHash, fields); err != nil {
if err := task.verifyDataHash(ctx, req.DataHash, cascadeMeta.DataHash, fields); err != nil {
return err
}
logtrace.Info(ctx, "data-hash has been verified", fields)
task.streamEvent(SupernodeEventTypeDataHashVerified, "data-hash has been verified", "", send)

/* 6. Encode the raw data ------------------------------------------------------ */
encResp, err := task.encodeInput(ctx, req.Data, fields)
encResp, err := task.encodeInput(ctx, req.FilePath, req.DataSize, fields)
if err != nil {
return err
}
Expand Down Expand Up @@ -136,5 +139,10 @@ func (task *CascadeRegistrationTask) Register(
logtrace.Info(ctx, "action has been finalized", fields)
task.streamEvent(SupernodeEventTypeActionFinalized, "action has been finalized", resp.TxHash, send)

err = os.RemoveAll(req.FilePath)
if err != nil {
logtrace.Warn(ctx, "error removing file", fields)
}

return nil
}
5 changes: 3 additions & 2 deletions tests/system/e2e_cascade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,9 @@ func TestCascadeE2E(t *testing.T) {

ctx := context.Background()
encodeRes, err := rqCodec.Encode(ctx, codec.EncodeRequest{
Data: data,
TaskID: "1",
Path: testFileFullpath,
DataSize: int(fileInfo.Size()),
TaskID: "1",
})

require.NoError(t, err, "Failed to encode data with RaptorQ")
Expand Down