Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -836,7 +836,18 @@ func (s *Service) newStampedPutter(ctx context.Context, opts putterOptions, stam
return nil, errInvalidPostageBatch
}

return s.newStampedPutterWithBatch(ctx, opts, stamp, storedBatch)
}

// newStampedPutterWithBatch creates a stamped putter using a pre-fetched batch
// This avoids the database lookup when batch info is already cached
func (s *Service) newStampedPutterWithBatch(ctx context.Context, opts putterOptions, stamp *postage.Stamp, storedBatch *postage.Batch) (storer.PutterSession, error) {
if !opts.Deferred && s.beeMode == DevMode {
return nil, errUnsupportedDevNodeOperation
}

var session storer.PutterSession
var err error
if opts.Deferred || opts.Pin {
session, err = s.storer.Upload(ctx, opts.Pin, opts.TagID)
if err != nil {
Expand Down
178 changes: 149 additions & 29 deletions pkg/api/chunk_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (s *Service) chunkUploadStreamHandler(w http.ResponseWriter, r *http.Reques
logger := s.logger.WithName("chunks_stream").Build()

headers := struct {
BatchID []byte `map:"Swarm-Postage-Batch-Id" validate:"required"`
BatchID []byte `map:"Swarm-Postage-Batch-Id"` // Optional: can be omitted for per-chunk stamping
SwarmTag uint64 `map:"Swarm-Tag"`
}{}
if response := s.mapStructure(r.Header, &headers); response != nil {
Expand All @@ -55,29 +55,34 @@ func (s *Service) chunkUploadStreamHandler(w http.ResponseWriter, r *http.Reques
}
}

// if tag not specified use direct upload
// Using context.Background here because the putter's lifetime extends beyond that of the HTTP request.
putter, err := s.newStamperPutter(context.Background(), putterOptions{
BatchID: headers.BatchID,
TagID: tag,
Deferred: tag != 0,
})
if err != nil {
logger.Debug("get putter failed", "error", err)
logger.Error(nil, "get putter failed")
switch {
case errors.Is(err, errBatchUnusable) || errors.Is(err, postage.ErrNotUsable):
jsonhttp.UnprocessableEntity(w, "batch not usable yet or does not exist")
case errors.Is(err, postage.ErrNotFound):
jsonhttp.NotFound(w, "batch with id not found")
case errors.Is(err, errInvalidPostageBatch):
jsonhttp.BadRequest(w, "invalid batch id")
case errors.Is(err, errUnsupportedDevNodeOperation):
jsonhttp.BadRequest(w, errUnsupportedDevNodeOperation)
default:
jsonhttp.BadRequest(w, nil)
// Create connection-level putter only if BatchID is provided
// If BatchID is not provided, per-chunk stamps must be used
var putter storer.PutterSession
if len(headers.BatchID) > 0 {
// if tag not specified use direct upload
// Using context.Background here because the putter's lifetime extends beyond that of the HTTP request.
putter, err = s.newStamperPutter(context.Background(), putterOptions{
BatchID: headers.BatchID,
TagID: tag,
Deferred: tag != 0,
})
if err != nil {
logger.Debug("get putter failed", "error", err)
logger.Error(nil, "get putter failed")
switch {
case errors.Is(err, errBatchUnusable) || errors.Is(err, postage.ErrNotUsable):
jsonhttp.UnprocessableEntity(w, "batch not usable yet or does not exist")
case errors.Is(err, postage.ErrNotFound):
jsonhttp.NotFound(w, "batch with id not found")
case errors.Is(err, errInvalidPostageBatch):
jsonhttp.BadRequest(w, "invalid batch id")
case errors.Is(err, errUnsupportedDevNodeOperation):
jsonhttp.BadRequest(w, errUnsupportedDevNodeOperation)
default:
jsonhttp.BadRequest(w, nil)
}
return
}
return
}

upgrader := websocket.Upgrader{
Expand All @@ -95,13 +100,14 @@ func (s *Service) chunkUploadStreamHandler(w http.ResponseWriter, r *http.Reques
}

s.wsWg.Add(1)
go s.handleUploadStream(logger, wsConn, putter)
go s.handleUploadStream(logger, wsConn, putter, tag)
}

func (s *Service) handleUploadStream(
logger log.Logger,
conn *websocket.Conn,
putter storer.PutterSession,
tag uint64,
) {
defer s.wsWg.Done()

Expand All @@ -111,11 +117,23 @@ func (s *Service) handleUploadStream(
gone = make(chan struct{})
err error
)

// Cache for batch validation to avoid database lookups for every chunk
// Key: batch ID hex string, Value: stored batch info
// This avoids the expensive batchStore.Get() call for each chunk
batchCache := make(map[string]*postage.Batch)

defer func() {
cancel()
_ = conn.Close()
if err = putter.Done(swarm.ZeroAddress); err != nil {
logger.Error(err, "chunk upload stream: syncing chunks failed")

// No cleanup needed for batch cache - it's just metadata

// Only call Done on connection-level putter if it exists
if putter != nil {
if err = putter.Done(swarm.ZeroAddress); err != nil {
logger.Error(err, "chunk upload stream: syncing chunks failed")
}
}
}()

Expand Down Expand Up @@ -190,14 +208,109 @@ func (s *Service) handleUploadStream(
return
}

chunk, err := cac.NewWithDataSpan(msg)
logger.Debug("chunk upload stream",
"message_size", len(msg),
"stamp_size", postage.StampSize,
"first_8_bytes", msg[:8])

// Check if this message contains a per-chunk stamp prepended to the chunk data
// Format: stamp[113 bytes] + chunk[data]
var (
chunk swarm.Chunk
chunkPutter = putter // default to connection-level putter
chunkData = msg
)

// If message is large enough to contain a stamp + chunk data, try to extract the stamp
if len(msg) >= postage.StampSize+swarm.SpanSize {
potentialStamp := msg[:postage.StampSize]
potentialChunkData := msg[postage.StampSize:]

logger.Debug("chunk upload stream: attempting to extract stamp",
"message_size", len(msg),
"stamp_size", postage.StampSize,
"potential_stamp_len", len(potentialStamp),
"potential_chunk_len", len(potentialChunkData),
"first_8_bytes", msg[:8])

// Try to unmarshal as a stamp
stamp := postage.Stamp{}
if err := stamp.UnmarshalBinary(potentialStamp); err == nil {
// Valid stamp found - validate using cached batch info
batchID := stamp.BatchID()
batchIDHex := string(batchID) // Use batch ID bytes as map key
logger.Debug("chunk upload stream: per-chunk stamp detected", "batch_id", batchID, "chunk_size", len(potentialChunkData))

// Check if we already have cached batch info
storedBatch, exists := batchCache[batchIDHex]
if !exists {
// Fetch and cache batch info
storedBatch, err = s.batchStore.Get(stamp.BatchID())
if err != nil {
logger.Debug("chunk upload stream: batch validation failed", "error", err)
logger.Error(nil, "chunk upload stream: batch validation failed")
if errors.Is(err, storage.ErrNotFound) {
sendErrorClose(websocket.CloseInternalServerErr, "batch not found")
} else {
sendErrorClose(websocket.CloseInternalServerErr, "batch validation failed")
}
return
}
batchCache[batchIDHex] = storedBatch
logger.Debug("chunk upload stream: cached batch info", "batch_id", batchIDHex)
}

// Create a stamped putter using cached batch info
// This skips the expensive database lookup
chunkPutter, err = s.newStampedPutterWithBatch(ctx, putterOptions{
BatchID: stamp.BatchID(),
TagID: tag,
Deferred: tag != 0,
}, &stamp, storedBatch)
if err != nil {
logger.Debug("chunk upload stream: failed to create stamped putter", "error", err)
logger.Error(nil, "chunk upload stream: failed to create stamped putter")
switch {
case errors.Is(err, errBatchUnusable) || errors.Is(err, postage.ErrNotUsable):
sendErrorClose(websocket.CloseInternalServerErr, "batch not usable")
case errors.Is(err, postage.ErrNotFound):
sendErrorClose(websocket.CloseInternalServerErr, "batch not found")
default:
sendErrorClose(websocket.CloseInternalServerErr, "stamped putter creation failed")
}
return
}

// Use the chunk data without the stamp
chunkData = potentialChunkData
} else {
// Stamp unmarshal failed - log for debugging
logger.Debug("chunk upload stream: stamp unmarshal failed, treating message as unstamped chunk",
"error", err,
"message_size", len(msg),
"stamp_size_expected", postage.StampSize,
"potential_stamp_len", len(potentialStamp))
}
// If unmarshal failed, fall through to use the whole message as chunk data
}

// If we don't have a putter at this point, the client must provide per-chunk stamps
if chunkPutter == nil {
logger.Debug("chunk upload stream: no stamp provided")
logger.Error(nil, "chunk upload stream: no batch ID in headers and no per-chunk stamp in message")
sendErrorClose(websocket.CloseInternalServerErr, "batch ID or per-chunk stamp required")
return
}

chunk, err = cac.NewWithDataSpan(chunkData)
if err != nil {
logger.Debug("chunk upload stream: create chunk failed", "error", err)
logger.Debug("chunk upload stream: create chunk failed", "error", err, "chunk_size", len(chunkData))
logger.Error(nil, "chunk upload stream: create chunk failed")
sendErrorClose(websocket.CloseInternalServerErr, "invalid chunk data")
return
}

err = putter.Put(ctx, chunk)
err = chunkPutter.Put(ctx, chunk)
if err != nil {
logger.Debug("chunk upload stream: write chunk failed", "address", chunk.Address(), "error", err)
logger.Error(nil, "chunk upload stream: write chunk failed")
Expand All @@ -210,6 +323,13 @@ func (s *Service) handleUploadStream(
return
}

// Clean up per-chunk putter
if chunkPutter != putter {
if err := chunkPutter.Done(swarm.ZeroAddress); err != nil {
logger.Error(err, "chunk upload stream: failed to finalize per-chunk putter")
}
}

err = sendMsg(websocket.BinaryMessage, successWsMsg)
if err != nil {
s.logger.Debug("chunk upload stream: sending success message failed", "error", err)
Expand Down