Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions config/local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -317,8 +317,9 @@ checks:
postage-depth: 21
postage-label: test-label
request-timeout: 5m
upload-r-level: 3
download-r-level: 4
upload-r-level: 2
download-r-level: 2
cache : false
timeout: 5m
type: soc
ci-content-availability:
Expand Down
3 changes: 3 additions & 0 deletions config/public-testnet.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ checks:
postage-depth: 22
postage-label: test-label
request-timeout: 5m
upload-r-level: 2
download-r-level: 3
cache : true
timeout: 15m
type: soc
pt-pushsync-chunks:
Expand Down
63 changes: 63 additions & 0 deletions pkg/check/soc/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# SOC Check Documentation

## Overview

The SOC (Single Owner Chunk) check validates the upload and retrieval of SOC chunks in the Swarm network, with configurable redundancy levels and cache behavior.

## What it does

1. **SOC Chunk Creation**: Creates a SOC chunk with test payload "Hello Swarm :)" using cryptographic signing
2. **Upload**: Uploads the SOC chunk to a bee node with specified redundancy level
3. **Original Retrieval**: Downloads and validates the original chunk from the upload node
4. **Replica Testing**: Tests retrieval of replica chunks created by the redundancy system

## Key Features

### Configurable Redundancy

- **Upload Redundancy Level**: Controls how many replica chunks are created during upload
- **Download Redundancy Level**: Controls how many replica chunks are tested during download
- Supports levels: NONE, MEDIUM, STRONG, INSANE, PARANOID

### Cache Behavior Control

- **Cache = true**: Downloads replicas from local node storage (cache)
- **Cache = false**: Downloads replicas from network peers with retry logic

### Multi-Node Support

- **Cache = true**: Uses single node for upload and download
- **Cache = false**: Uses separate nodes (upload to node1, download replicas from node2)

### Retry Logic

When `Cache = false`, implements retry mechanism:

- Up to 5 retry attempts per replica download
- 1-second delay between retries
- Allows time for chunks to propagate through the network

## Validation Logic

The check validates that:

- Original chunk data matches after upload/download
- Expected number of replica retrievals succeed/fail based on redundancy levels
- Failed replica retrievals return expected error types (HTTP 500 "read chunk failed")
- Replica chunk data matches the original chunk data

## Use Cases

- **Network Dispersal Testing**: Verify chunks are properly distributed across peer nodes
- **Cache vs Network Performance**: Compare local cache vs network retrieval speeds
- **Redundancy Validation**: Ensure redundancy levels work as expected
- **Error Handling**: Test proper error responses when replicas are unavailable

## Configuration

```yaml
cache: true/false # Local cache vs network retrieval
upload-r-level: 3 # Upload redundancy level
download-r-level: 4 # Download redundancy level
request-timeout: 5m # Timeout for operations
```
106 changes: 82 additions & 24 deletions pkg/check/soc/soc.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Options struct {
RequestTimeout time.Duration
UploadRLevel redundancy.Level
DownloadRLevel redundancy.Level
Cache bool // // if true fetches from bee local storage, if false fetches from network (peers)
}

// NewDefaultOptions returns new default options
Expand All @@ -45,6 +46,7 @@ func NewDefaultOptions() Options {
RequestTimeout: 5 * time.Minute,
UploadRLevel: redundancy.PARANOID,
DownloadRLevel: redundancy.PARANOID,
Cache: true,
}
}

Expand Down Expand Up @@ -113,25 +115,30 @@ func (c *Check) Run(ctx context.Context, cluster orchestration.Cluster, opts any
return fmt.Errorf("shuffled full node clients: %w", err)
}

if len(nodes) < 1 {
return fmt.Errorf("soc test requires at least 1 full node")
minNodesRequired := 1
if !o.Cache {
minNodesRequired = 2 // Need at least 2 nodes when cache is disabled
}

node := nodes[0]
nodeName := node.Name()
c.logger.Infof("using node %s for soc test", nodeName)
if len(nodes) < minNodesRequired {
return fmt.Errorf("soc test requires at least %d full node(s) (cache=%t)", minNodesRequired, o.Cache)
}

uploadNode := nodes[0]
uploadNodeName := uploadNode.Name()
c.logger.Infof("using node %s for soc upload", uploadNodeName)

owner := hex.EncodeToString(ownerBytes)
id := hex.EncodeToString(idBytes)
sig := hex.EncodeToString(signatureBytes)

batchID, err := node.GetOrCreateMutableBatch(ctx, o.PostageTTL, o.PostageDepth, o.PostageLabel)
batchID, err := uploadNode.GetOrCreateMutableBatch(ctx, o.PostageTTL, o.PostageDepth, o.PostageLabel)
if err != nil {
return fmt.Errorf("node %s: batch id %w", nodeName, err)
return fmt.Errorf("node %s: batch id %w", uploadNodeName, err)
}

c.logger.Infof("node %s: batch id %s", nodeName, batchID)
c.logger.Infof("soc: submitting soc chunk %s to node %s", sch.Address().String(), nodeName)
c.logger.Infof("node %s: batch id %s", uploadNodeName, batchID)
c.logger.Infof("soc: submitting soc chunk %s to node %s", sch.Address().String(), uploadNodeName)
c.logger.Infof("soc: owner %s", owner)
c.logger.Infof("soc: id %s", id)
c.logger.Infof("soc: sig %s", sig)
Expand All @@ -140,33 +147,77 @@ func (c *Check) Run(ctx context.Context, cluster orchestration.Cluster, opts any
RLevel: &o.UploadRLevel,
}

ref, err := node.UploadSOC(ctx, owner, id, sig, ch.Data(), batchID, socOptions)
ref, err := uploadNode.UploadSOC(ctx, owner, id, sig, ch.Data(), batchID, socOptions)
if err != nil {
return fmt.Errorf("node %s: upload soc chunk %w", nodeName, err)
return fmt.Errorf("node %s: upload soc chunk %w", uploadNodeName, err)
}

c.logger.Infof("soc: chunk uploaded to node %s, reference %s", nodeName, ref.String())
c.logger.Infof("soc: chunk uploaded to node %s, reference %s", uploadNodeName, ref.String())

retrieved, err := node.DownloadChunk(ctx, ref, "", nil)
retrieved, err := uploadNode.DownloadChunk(ctx, ref, "", nil)
if err != nil {
return fmt.Errorf("node %s: download soc chunk %w", nodeName, err)
return fmt.Errorf("node %s: download soc chunk %w", uploadNodeName, err)
}

c.logger.Infof("soc: original chunk retrieved from node %s", nodeName)
c.logger.Infof("soc: original chunk retrieved from node %s", uploadNodeName)

if !bytes.Equal(retrieved, chunkData) {
return errors.New("soc: retrieved chunk data does NOT match soc chunk")
}

replicaErrs := c.testReplicaRetrieval(ctx, node, nodeName, ref, chunkData, o)
// Select node for replica downloads
downloadNode := uploadNode
downloadNodeName := uploadNodeName
if !o.Cache && len(nodes) > 1 {
downloadNode = nodes[1] // Use different node for replica downloads when cache is disabled
downloadNodeName = downloadNode.Name()
c.logger.Infof("using node %s for replica downloads (different from upload node)", downloadNodeName)
}

replicaErrs := c.testReplicaRetrieval(ctx, downloadNode, downloadNodeName, ref, chunkData, o)

return replicaErrs
}

// downloadReplicaWithRetry downloads a replica chunk with retry logic when cache is disabled
func (c *Check) downloadReplicaWithRetry(ctx context.Context, node *bee.Client, nodeName string, addr swarm.Address, chunkNumber int, opts Options) ([]byte, error) {
// If Cache is false, retry downloading replicas to allow time for chunks to be pushed to peers
maxRetries := 1
retryDelay := 0 * time.Second
if !opts.Cache {
maxRetries = 5
retryDelay = 1 * time.Second
}

var retrievedReplica []byte
var err error

for attempt := 0; attempt < maxRetries; attempt++ {
if attempt > 0 {
c.logger.Infof("node %s: retrying download of replica chunk %d (attempt %d/%d) after %v",
nodeName, chunkNumber, attempt+1, maxRetries, retryDelay)
time.Sleep(retryDelay)
}

retrievedReplica, err = node.DownloadChunk(ctx, addr, "", &api.DownloadOptions{
Cache: &opts.Cache,
})
if err == nil {
break // Success, exit retry loop
}

if attempt < maxRetries-1 && !opts.Cache {
c.logger.Infof("node %s: download soc replica chunk %d failed (attempt %d/%d): %v (address: %s)",
nodeName, chunkNumber, attempt+1, maxRetries, err, addr.String())
}
}

return retrievedReplica, err
}

// testReplicaRetrieval tests replica retrieval with configurable redundancy levels and automatic error handling
func (c *Check) testReplicaRetrieval(ctx context.Context, node *bee.Client, nodeName string, ref swarm.Address, originalChunkData []byte, opts Options) error {
countTotal, countGood, countBad := 0, 0, 0
getFromCache := true

replicaIter := combinator.IterateReplicaAddresses(ref, int(opts.DownloadRLevel))
var replicaErrors []error
Expand All @@ -177,9 +228,13 @@ func (c *Check) testReplicaRetrieval(ctx context.Context, node *bee.Client, node
expectedSuccesses := min(uploadExpected, downloadExpected)
expectedFailures := downloadExpected - expectedSuccesses

c.logger.Infof("soc: testing replica retrieval with upload level %s (%d replicas) and download level %s (%d replicas to check)",
cacheMode := "from cache"
if !opts.Cache {
cacheMode = "from network"
}
c.logger.Infof("soc: testing replica retrieval with upload level %s (%d replicas) and download level %s (%d replicas to check) %s",
redundancyLevelName(opts.UploadRLevel), uploadExpected,
redundancyLevelName(opts.DownloadRLevel), downloadExpected)
redundancyLevelName(opts.DownloadRLevel), downloadExpected, cacheMode)
c.logger.Infof("soc: expecting %d successful retrievals and %d failures", expectedSuccesses, expectedFailures)

for addr := range replicaIter {
Expand All @@ -189,13 +244,16 @@ func (c *Check) testReplicaRetrieval(ctx context.Context, node *bee.Client, node
continue
}

retrievedReplica, err := node.DownloadChunk(ctx, addr, "", &api.DownloadOptions{
Cache: &getFromCache,
})
retrievedReplica, err := c.downloadReplicaWithRetry(ctx, node, nodeName, addr, countTotal, opts)

if err != nil {
countBad++
c.logger.Infof("node %s: download soc replica chunk %d failed: %v (address: %s)",
nodeName, countTotal, err, addr.String())
retries := 1
if !opts.Cache {
retries = 5
}
c.logger.Infof("node %s: download soc replica chunk %d failed after %d attempts: %v (address: %s)",
nodeName, countTotal, retries, err, addr.String())
replicaErrors = append(replicaErrors, err)
} else {
countGood++
Expand Down
1 change: 1 addition & 0 deletions pkg/config/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,7 @@ var Checks = map[string]CheckType{
RequestTimeout *time.Duration `yaml:"request-timeout"`
UploadRLevel *fredundancy.Level `yaml:"upload-r-level"`
DownloadRLevel *fredundancy.Level `yaml:"download-r-level"`
Cache *bool `yaml:"cache"`
})
if err := check.Options.Decode(checkOpts); err != nil {
return nil, fmt.Errorf("decoding check %s options: %w", check.Type, err)
Expand Down