Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions pkg/codec/decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
raptorq "github.com/LumeraProtocol/rq-go"
"github.com/LumeraProtocol/supernode/v2/pkg/errors"
"github.com/LumeraProtocol/supernode/v2/pkg/logtrace"
"github.com/google/uuid"
)

type DecodeRequest struct {
Expand All @@ -29,7 +30,7 @@ type DecodeResponse struct {
// Workspace holds paths & reverse index for prepared decoding.
type Workspace struct {
ActionID string
SymbolsDir string // .../<base>/<actionID>
SymbolsDir string // .../<base>/downloads/<actionID>/<uuid>
BlockDirs []string // index = blockID (or 0 if single block)
symbolToBlock map[string]int
mu sync.RWMutex // protects symbolToBlock reads if you expand it later
Expand All @@ -51,8 +52,12 @@ func (rq *raptorQ) PrepareDecode(
}
logtrace.Info(ctx, "rq: prepare-decode start", fields)

// Create root symbols dir for this action
symbolsDir := filepath.Join(rq.symbolsBaseDir, actionID)
// Create per-request workspace under <base>/downloads/<actionID>/<uuid>
base := rq.symbolsBaseDir
if base == "" {
base = os.TempDir()
}
symbolsDir := filepath.Join(base, "downloads", actionID, uuid.NewString())
if err := os.MkdirAll(symbolsDir, 0o755); err != nil {
fields[logtrace.FieldError] = err.Error()
logtrace.Error(ctx, "mkdir symbols base dir failed", fields)
Expand Down
51 changes: 51 additions & 0 deletions pkg/codec/decode_workspace_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package codec

import (
"context"
"os"
"testing"
)

func TestPrepareDecode_UniqueWorkspacePerCall(t *testing.T) {
base := t.TempDir()
c := NewRaptorQCodec(base)
layout := Layout{Blocks: []Block{{BlockID: 0, Symbols: []string{"s1"}}}}

_, _, cleanup1, ws1, err := c.PrepareDecode(context.Background(), "actionA", layout)
if err != nil {
t.Fatalf("prepare decode 1: %v", err)
}
t.Cleanup(func() { _ = cleanup1() })
if ws1 == nil || ws1.SymbolsDir == "" {
t.Fatalf("prepare decode 1 returned empty workspace")
}
if _, err := os.Stat(ws1.SymbolsDir); err != nil {
t.Fatalf("stat ws1: %v", err)
}

_, _, cleanup2, ws2, err := c.PrepareDecode(context.Background(), "actionA", layout)
if err != nil {
t.Fatalf("prepare decode 2: %v", err)
}
t.Cleanup(func() { _ = cleanup2() })
if ws2 == nil || ws2.SymbolsDir == "" {
t.Fatalf("prepare decode 2 returned empty workspace")
}
if _, err := os.Stat(ws2.SymbolsDir); err != nil {
t.Fatalf("stat ws2: %v", err)
}

if ws1.SymbolsDir == ws2.SymbolsDir {
t.Fatalf("expected unique workspace per call; got same dir: %s", ws1.SymbolsDir)
}

if err := cleanup1(); err != nil {
t.Fatalf("cleanup 1: %v", err)
}
if _, err := os.Stat(ws1.SymbolsDir); !os.IsNotExist(err) {
t.Fatalf("expected ws1 removed; stat err=%v", err)
}
if _, err := os.Stat(ws2.SymbolsDir); err != nil {
t.Fatalf("expected ws2 still present after ws1 cleanup; stat err=%v", err)
}
}
34 changes: 34 additions & 0 deletions pkg/task/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ package task

import (
"context"
"errors"
"sync"
"time"

"github.com/LumeraProtocol/supernode/v2/pkg/logtrace"
)

var ErrAlreadyRunning = errors.New("task already running")

// Handle manages a running task with an optional watchdog.
// It ensures Start and End are paired, logs start/end, and auto-ends on timeout.
type Handle struct {
Expand Down Expand Up @@ -41,6 +44,37 @@ func StartWith(tr Tracker, ctx context.Context, service, id string, timeout time
return g
}

// StartUniqueWith starts tracking a task only if it's not already tracked for the same
// (service, id) pair. It returns ErrAlreadyRunning if the task is already in-flight.
func StartUniqueWith(tr Tracker, ctx context.Context, service, id string, timeout time.Duration) (*Handle, error) {
if tr == nil || service == "" || id == "" {
return &Handle{}, nil
}

if ts, ok := tr.(interface {
TryStart(service, taskID string) bool
}); ok {
if !ts.TryStart(service, id) {
return nil, ErrAlreadyRunning
}
} else { // fallback: can't enforce uniqueness with unknown Tracker implementations
tr.Start(service, id)
}

logtrace.Info(ctx, "task: started", logtrace.Fields{"service": service, "task_id": id})
g := &Handle{tr: tr, service: service, id: id, stop: make(chan struct{})}
if timeout > 0 {
go func() {
select {
case <-time.After(timeout):
g.endWith(ctx, true)
case <-g.stop:
}
}()
}
return g, nil
}

// End stops tracking the task. Safe to call multiple times.
func (g *Handle) End(ctx context.Context) {
g.endWith(ctx, false)
Expand Down
22 changes: 22 additions & 0 deletions pkg/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,28 @@ func New() *InMemoryTracker {
return &InMemoryTracker{data: make(map[string]map[string]struct{})}
}

// TryStart attempts to mark a task as running under a given service.
// It returns true if the task was newly started, or false if it was already running
// (or if inputs are invalid). This is useful for "only one in-flight task" guards.
func (t *InMemoryTracker) TryStart(service, taskID string) bool {
if service == "" || taskID == "" {
return false
}
t.mu.Lock()
m, ok := t.data[service]
if !ok {
m = make(map[string]struct{})
t.data[service] = m
}
if _, exists := m[taskID]; exists {
t.mu.Unlock()
return false
}
m[taskID] = struct{}{}
t.mu.Unlock()
return true
}

// Start marks a task as running under a given service. Empty arguments
// are ignored. Calling Start with the same (service, taskID) pair is idempotent.
func (t *InMemoryTracker) Start(service, taskID string) {
Expand Down
25 changes: 25 additions & 0 deletions pkg/task/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package task

import (
"context"
"errors"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -155,3 +156,27 @@ func TestHandleIdempotentAndWatchdog(t *testing.T) {
}
}
}

func TestStartUniqueWith_PreventsDuplicates(t *testing.T) {
tr := New()
ctx := context.Background()

h1, err := StartUniqueWith(tr, ctx, "svc.unique", "id-1", 0)
if err != nil {
t.Fatalf("StartUniqueWith 1: %v", err)
}
t.Cleanup(func() { h1.End(ctx) })

h2, err := StartUniqueWith(tr, ctx, "svc.unique", "id-1", 0)
if !errors.Is(err, ErrAlreadyRunning) {
t.Fatalf("expected ErrAlreadyRunning, got handle=%v err=%v", h2, err)
}

// After ending, it should be startable again.
h1.End(ctx)
h3, err := StartUniqueWith(tr, ctx, "svc.unique", "id-1", 0)
if err != nil {
t.Fatalf("StartUniqueWith 2: %v", err)
}
h3.End(ctx)
}
2 changes: 1 addition & 1 deletion supernode/adaptors/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (p *p2pImpl) storeCascadeSymbolsAndData(ctx context.Context, taskID, action
}
start = end
}
if err := p.rqStore.UpdateIsFirstBatchStored(actionID); err != nil {
if err := p.rqStore.UpdateIsFirstBatchStored(taskID); err != nil {
return totalSymbols, totalAvailable, fmt.Errorf("update first-batch flag: %w", err)
}
return totalSymbols, totalAvailable, nil
Expand Down
39 changes: 31 additions & 8 deletions supernode/cascade/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ func (task *CascadeRegistrationTask) Download(ctx context.Context, req *Download
return task.wrapErr(ctx, "failed to get action", err, fields)
}
logtrace.Info(ctx, "download: action fetched", fields)
task.streamDownloadEvent(SupernodeEventTypeActionRetrieved, "Action retrieved", "", "", send)
if err := task.streamDownloadEvent(ctx, SupernodeEventTypeActionRetrieved, "Action retrieved", "", "", send); err != nil {
return err
}

if actionDetails.GetAction().State != actiontypes.ActionStateDone {
err = errors.New("action is not in a valid state")
Expand All @@ -64,7 +66,9 @@ func (task *CascadeRegistrationTask) Download(ctx context.Context, req *Download
return task.wrapErr(ctx, "error decoding cascade metadata", err, fields)
}
logtrace.Info(ctx, "download: metadata decoded", fields)
task.streamDownloadEvent(SupernodeEventTypeMetadataDecoded, "Cascade metadata decoded", "", "", send)
if err := task.streamDownloadEvent(ctx, SupernodeEventTypeMetadataDecoded, "Cascade metadata decoded", "", "", send); err != nil {
return err
}

if !metadata.Public {
if req.Signature == "" {
Expand All @@ -80,7 +84,9 @@ func (task *CascadeRegistrationTask) Download(ctx context.Context, req *Download
logtrace.Info(ctx, "download: public cascade (no signature)", fields)
}

task.streamDownloadEvent(SupernodeEventTypeNetworkRetrieveStarted, "Network retrieval started", "", "", send)
if err := task.streamDownloadEvent(ctx, SupernodeEventTypeNetworkRetrieveStarted, "Network retrieval started", "", "", send); err != nil {
return err
}

logtrace.Info(ctx, "download: network retrieval start", logtrace.Fields{logtrace.FieldActionID: actionDetails.GetAction().ActionID})
filePath, tmpDir, err := task.downloadArtifacts(ctx, actionDetails.GetAction().ActionID, metadata, fields, send)
Expand All @@ -91,10 +97,20 @@ func (task *CascadeRegistrationTask) Download(ctx context.Context, req *Download
logtrace.Warn(ctx, "cleanup of tmp dir after error failed", logtrace.Fields{"tmp_dir": tmpDir, logtrace.FieldError: cerr.Error()})
}
}
if ctx.Err() != nil {
return ctx.Err()
}
return task.wrapErr(ctx, "failed to download artifacts", err, fields)
}
logtrace.Debug(ctx, "File reconstructed and hash verified", fields)
task.streamDownloadEvent(SupernodeEventTypeDecodeCompleted, "Decode completed", filePath, tmpDir, send)
if err := task.streamDownloadEvent(ctx, SupernodeEventTypeDecodeCompleted, "Decode completed", filePath, tmpDir, send); err != nil {
if tmpDir != "" {
if cerr := task.CleanupDownload(ctx, tmpDir); cerr != nil {
logtrace.Warn(ctx, "cleanup of tmp dir after stream failure failed", logtrace.Fields{"tmp_dir": tmpDir, logtrace.FieldError: cerr.Error()})
}
}
return err
}

return nil
}
Expand Down Expand Up @@ -127,8 +143,11 @@ func (task *CascadeRegistrationTask) VerifyDownloadSignature(ctx context.Context
return nil
}

func (task *CascadeRegistrationTask) streamDownloadEvent(eventType SupernodeEventType, msg, filePath, dir string, send func(resp *DownloadResponse) error) {
_ = send(&DownloadResponse{EventType: eventType, Message: msg, FilePath: filePath, DownloadedDir: dir})
func (task *CascadeRegistrationTask) streamDownloadEvent(ctx context.Context, eventType SupernodeEventType, msg, filePath, dir string, send func(resp *DownloadResponse) error) error {
if err := ctx.Err(); err != nil {
return err
}
return send(&DownloadResponse{EventType: eventType, Message: msg, FilePath: filePath, DownloadedDir: dir})
}

func (task *CascadeRegistrationTask) downloadArtifacts(ctx context.Context, actionID string, metadata actiontypes.CascadeMetadata, fields logtrace.Fields, send func(resp *DownloadResponse) error) (string, string, error) {
Expand Down Expand Up @@ -244,7 +263,9 @@ func (task *CascadeRegistrationTask) restoreFileFromLayoutDeprecated(ctx context
// Emit minimal JSON payload (metrics system removed)
info := map[string]interface{}{"action_id": actionID, "found_symbols": len(symbols), "target_percent": targetRequiredPercent}
if b, err := json.Marshal(info); err == nil {
task.streamDownloadEvent(SupernodeEventTypeArtefactsDownloaded, string(b), decodeInfo.FilePath, decodeInfo.DecodeTmpDir, send)
if err := task.streamDownloadEvent(ctx, SupernodeEventTypeArtefactsDownloaded, string(b), decodeInfo.FilePath, decodeInfo.DecodeTmpDir, send); err != nil {
return "", decodeInfo.DecodeTmpDir, err
}
}
return decodeInfo.FilePath, decodeInfo.DecodeTmpDir, nil
}
Expand Down Expand Up @@ -399,7 +420,9 @@ func (task *CascadeRegistrationTask) restoreFileFromLayout(
// Event
info := map[string]interface{}{"action_id": actionID, "found_symbols": int(atomic.LoadInt32(&written)), "target_percent": targetRequiredPercent}
if b, err := json.Marshal(info); err == nil {
task.streamDownloadEvent(SupernodeEventTypeArtefactsDownloaded, string(b), decodeInfo.FilePath, decodeInfo.DecodeTmpDir, send)
if err := task.streamDownloadEvent(ctx, SupernodeEventTypeArtefactsDownloaded, string(b), decodeInfo.FilePath, decodeInfo.DecodeTmpDir, send); err != nil {
return "", decodeInfo.DecodeTmpDir, err
}
}

success = true
Expand Down
4 changes: 2 additions & 2 deletions supernode/cascade/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,13 @@ func (task *CascadeRegistrationTask) wrapErr(ctx context.Context, msg string, er
return status.Errorf(codes.Internal, "%s", msg)
}

func (task *CascadeRegistrationTask) emitArtefactsStored(ctx context.Context, fields logtrace.Fields, _ codec.Layout, send func(resp *RegisterResponse) error) {
func (task *CascadeRegistrationTask) emitArtefactsStored(ctx context.Context, fields logtrace.Fields, _ codec.Layout, send func(resp *RegisterResponse) error) error {
if fields == nil {
fields = logtrace.Fields{}
}
msg := "Artefacts stored"
logtrace.Info(ctx, "register: artefacts stored", fields)
task.streamEvent(SupernodeEventTypeArtefactsStored, msg, "", send)
return task.streamEvent(ctx, SupernodeEventTypeArtefactsStored, msg, "", send)
}

func (task *CascadeRegistrationTask) verifyActionFee(ctx context.Context, action *actiontypes.Action, dataSize int, fields logtrace.Fields) error {
Expand Down
Loading
Loading