Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions internal/handlers/httphandlers/contracts.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ func (p *HTTPHandler) mapContract(ctx context.Context, item resources.Contract)
ApplicationStatus: item.State().String(), // rw mutex canceable
BlockchainStatus: item.BlockchainState().String(), // readonly
Comment on lines 323 to 324
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ApplicationStatus is derived from item.State().String(), which is noted as rw mutex cancelable. This implies potential read-write conflicts or race conditions if the state is accessed concurrently without proper synchronization. To ensure thread safety and data consistency, consider implementing locking mechanisms (such as mutexes) around accesses to the state, or ensure that the state is accessed in a thread-safe manner if not already handled within State() method.

Error: errString(item.Error()), // atomic
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error handling in Error: errString(item.Error()) converts the error to a string, which is a common practice. However, it's important to ensure that this conversion does not omit necessary details about the error, which are crucial for effective debugging and logging. Consider enhancing the errString function to include more detailed error information or context, especially if item.Error() can contain multiple error states or complex error information.

IsUndeliverable: item.IsUndeliverable(), // atomic
Dest: item.Dest(), // readonly
PoolDest: item.PoolDest(), // readonly
// Miners: p.allocator.GetMinersFulfillingContract(item.ID(), p.cycleDuration),
Expand Down
1 change: 1 addition & 0 deletions internal/handlers/httphandlers/schemas.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ type Contract struct {
ApplicationStatus string
BlockchainStatus string
Error string
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Error field is currently a string, which may not be the most effective way to handle errors, especially when the system needs to make decisions based on these errors. Consider using a more structured error handling approach, such as custom error types or error codes, which can be easily checked and handled programmatically.

Suggested Change:

type ContractError struct {
    Code    int
    Message string
}

Error ContractError

This change allows easier error handling and integration with automated systems, improving maintainability and reliability.

IsUndeliverable bool
Dest string
PoolDest string
Comment on lines 106 to 107
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fields Dest and PoolDest are used to store destination addresses and are both of type string. This could lead to potential data integrity issues if the values are not validated or sanitized properly. It's recommended to add validation checks to ensure these fields contain valid data before they are processed further in the system.

Suggested Change:
Implement validation functions that check the format and correctness of the data in Dest and PoolDest fields. This could prevent errors and security issues related to incorrect data handling.

Miners []*allocator.MinerItemJobScheduled
Expand Down
4 changes: 4 additions & 0 deletions internal/resources/hashrate/contract/contract_buyer.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,3 +339,7 @@ func (p *ContractWatcherBuyer) StarvingGHS() int {
func (p *ContractWatcherBuyer) Error() error {
return p.contractErr.Load()
Comment thread
shev-titan marked this conversation as resolved.
}

func (p *ContractWatcherBuyer) IsUndeliverable() bool {
return false
Comment on lines +343 to +344
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The IsUndeliverable() function always returns false, which might not accurately reflect the actual deliverability status of the contract. This could lead to misleading behavior in other parts of the application that depend on this function. It is important to implement this function to check real conditions that would make the contract undeliverable.

Suggested Improvement:
Implement logic that checks specific conditions under which the contract would be considered undeliverable. For example, checking if there are any critical errors or if certain thresholds are not met:

func (p *ContractWatcherBuyer) IsUndeliverable() bool {
    return p.contractErr.Load() != nil || p.starvingGHS.Load() > someThreshold
}

}
133 changes: 102 additions & 31 deletions internal/resources/hashrate/contract/contract_seller_v2.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package contract

import (
"errors"
"fmt"
"math"
"net/url"
Expand All @@ -14,6 +15,7 @@ import (
hashrateContract "github.com/Lumerin-protocol/proxy-router/internal/resources/hashrate"
"github.com/Lumerin-protocol/proxy-router/internal/resources/hashrate/allocator"
hr "github.com/Lumerin-protocol/proxy-router/internal/resources/hashrate/hashrate"
"github.com/Lumerin-protocol/proxy-router/internal/resources/hashrate/proxy"
"go.uber.org/atomic"
"golang.org/x/exp/slices"
)
Comment on lines 15 to 21
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The import aliases for the hashrate package are potentially confusing due to their inconsistency (hashrateContract, hr). Consider standardizing the import aliases across the file to improve readability and maintainability.

Recommendation: Use a consistent alias for the hashrate package throughout the file, or avoid aliasing if not necessary, to reduce confusion and potential errors in package usage.

Expand All @@ -22,10 +24,17 @@ var (
ErrNotRunningBlockchain = fmt.Errorf("contract is not running in blockchain")
ErrStopped = fmt.Errorf("contract is stopped")
ErrAlreadyRunning = fmt.Errorf("contract is already running")
ErrUndeliverable = fmt.Errorf("contract is undeliverable")
)

var (
AdjustmentThresholdGHS = 100.0
// max number of cycles without ability to connect to destination to treat contract as undeliverable
MaxConsequentCyclesWoDestConn int64 = 2
FulfillmentDelay time.Duration = 10 * time.Second

FullMinerThresholdGHS float64 = 1000.0 // min hashrate required for next cycle to allocate full miners (miners that stay for the entire contract duration)
PartialMinerThresholdGHS float64 = 100.0 // min hashrate required for next cycle to allocate partial miners (miners allocated for just one cycle)
)

type ContractWatcherSellerV2 struct {
Expand All @@ -40,15 +49,19 @@ type ContractWatcherSellerV2 struct {
cycleEndsAt time.Time
minerDisconnectCh *lib.ChanRecvStop[allocator.MinerItem]
deliveryLog *DeliveryLog
iter int64

firstCycleWithoutDestConnection atomic.Int64

// shared state
fulfillmentStartedAt atomic.Value // time.Time
starvingGHS atomic.Uint64
err *atomic.Error

isRunning bool
isRunningMutex sync.RWMutex
contractErr atomic.Error // keeps the last error that happened in the contract that prevents it from fulfilling correctly, like invalid destination
isRunning bool
isRunningMutex sync.RWMutex
contractErr atomic.Error // keeps the last error that happened in the contract that prevents it from fulfilling correctly, like invalid destination
isUndeliverable atomic.Bool

// deps
*hashrate.Terms
Expand All @@ -63,16 +76,17 @@ func NewContractWatcherSellerV2(terms *hashrateContract.Terms, cycleDuration tim
stats: &stats{
actualHRGHS: hashrateFactory(),
},
isRunning: false,
startCh: make(chan struct{}),
stopCh: make(chan struct{}),
doneCh: make(chan struct{}),
err: atomic.NewError(nil),
deliveryLog: NewDeliveryLog(),
Terms: terms,
allocator: allocator,
hrFactory: hashrateFactory,
log: log,
isRunning: false,
startCh: make(chan struct{}),
stopCh: make(chan struct{}),
doneCh: make(chan struct{}),
err: atomic.NewError(nil),
deliveryLog: NewDeliveryLog(),
Terms: terms,
allocator: allocator,
hrFactory: hashrateFactory,
log: log,
firstCycleWithoutDestConnection: *atomic.NewInt64(math.MaxInt64),
}
}

Expand Down Expand Up @@ -141,6 +155,8 @@ func (p *ContractWatcherSellerV2) Reset() {
p.startCh = make(chan struct{})
p.stopCh = make(chan struct{})
p.doneCh = make(chan struct{})
p.iter = 0
p.firstCycleWithoutDestConnection.Store(math.MaxInt64)
p.err = atomic.NewError(nil)
}

Comment on lines 155 to 162
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reinitializing channels (startCh, stopCh, doneCh) in the Reset method without ensuring that any previous goroutines using these channels have been properly terminated can lead to goroutine leaks and unpredictable behavior.

Recommendation: Ensure all goroutines are properly terminated before reinitializing the channels, or consider using a different approach to manage the lifecycle of these channels to prevent resource leaks.

Expand All @@ -151,7 +167,7 @@ func (p *ContractWatcherSellerV2) run() error {
select {
case <-p.stopCh:
return ErrStopped
case <-time.After(10 * time.Second):
case <-time.After(FulfillmentDelay):
}

fullMiners := lib.NewSet()
Expand All @@ -177,7 +193,7 @@ func (p *ContractWatcherSellerV2) run() error {
p.stats.deliveryTargetGHS = p.HashrateGHS()

CONTRACT_CYCLE:
for {
for p.iter = 0; ; p.iter++ {
p.log.Debugf("new contract cycle started")
if !p.isRunningBlockchain() {
return ErrNotRunningBlockchain
Expand All @@ -186,6 +202,11 @@ CONTRACT_CYCLE:
return nil
}

if p.IsUndeliverable() {
p.log.Warnf("contract is undeliverable, exiting contract cycle")
return ErrUndeliverable
}

p.stats.partialMiners = p.stats.partialMiners[:0]
p.stats.jobFullMiners.Store(0)
p.stats.jobPartialMiners.Store(0)
Expand Down Expand Up @@ -280,26 +301,24 @@ func (p *ContractWatcherSellerV2) onCycleEnd(cycleDuration time.Duration) {
func (p *ContractWatcherSellerV2) adjustHashrate(hashrateGHS float64) (adjustedGHS float64) {
// TODO: move this function to allocator, optimize to make only one snapshot of miners
expectedAdjustmentGHS := hashrateGHS
fullMinerThresholdGHS := 1000.0
partialMinersThresholdGHS := 100.0

adjustmentRequired := math.Abs(hashrateGHS) > AdjustmentThresholdGHS
if !adjustmentRequired {
p.starvingGHS.Store(0)
return 0
}

if hashrateGHS < -fullMinerThresholdGHS {
if hashrateGHS < -FullMinerThresholdGHS {
hashrateGHS += p.removeFullMiners(hashrateGHS)
}

if hashrateGHS > fullMinerThresholdGHS {
if hashrateGHS > FullMinerThresholdGHS {
hashrateGHS -= p.addFullMiners(hashrateGHS)
}

remainingCycleDuration := p.remainingCycleDuration()

if hashrateGHS > partialMinersThresholdGHS {
if hashrateGHS > PartialMinerThresholdGHS {
job := hr.GHSToJobSubmittedV2(hashrateGHS, remainingCycleDuration)
addedJob := p.addPartialMiners(job, remainingCycleDuration)
addedGHS := hr.JobSubmittedToGHSV2(addedJob, remainingCycleDuration)
Expand Down Expand Up @@ -337,8 +356,9 @@ func (p *ContractWatcherSellerV2) addFullMiners(hashrateGHS float64) (addedGHS f
})
},
func(ID string, HrGHS, remainingJob float64, err error) {
p.log.Warnf("full miner ended, id %s, hr %.f, remaining job %d, error %s", ID, HrGHS, remainingJob, err)
p.log.Warnf("full miner ended, id %s, hr %.f, remaining job %.f, error %s", ID, HrGHS, remainingJob, err)
p.stats.fullMiners.Remove(ID)
p.checkErrorForUndeliverable(err)
},
)
if len(fullMiners) > 0 {
Expand All @@ -350,27 +370,37 @@ func (p *ContractWatcherSellerV2) addFullMiners(hashrateGHS float64) (addedGHS f
}

// removeFullMiners removes full miners, cause they persist for the duration of the contract
//
// Expects hrGHS to be a negative number
func (p *ContractWatcherSellerV2) removeFullMiners(hrGHS float64) (removedGHS float64) {
items := p.getFullMinersSorted()

if len(items) == 0 {
p.log.Warnf("no miners found to be removed")
if hrGHS >= 0 {
p.log.Warnf("removeFullMiners expects hrGHS to be a negative number, got %.f", hrGHS)
return 0
}

hrToRemove := -hrGHS

items := p.getFullMinersSorted()

for _, item := range items {
minerToRemove := item.ID
miner, ok := p.allocator.GetMiners().Load(minerToRemove)
if ok {
miner.RemoveTasksByID(p.ID())
removedGHS = +miner.HashrateGHS()
removedGHS += miner.HashrateGHS()
}
_ = p.stats.removeFullMiner(minerToRemove)
if hrGHS-removedGHS < 0 {
if removedGHS >= hrToRemove {
break
}
}

p.log.Debugf("removed %d full miners, removedGHS %.f", len(items)-p.stats.fullMiners.Len(), removedGHS)
if len(items) == 0 {
p.log.Warnf("no miners found to be removed")
} else {
p.log.Debugf("removed %d full miners, removedGHS %.f", len(items)-p.stats.fullMiners.Len(), removedGHS)
}

p.log.Debugf("full miners: %v", p.stats.fullMiners.ToSlice())
return removedGHS
}
Expand Down Expand Up @@ -403,6 +433,7 @@ func (p *ContractWatcherSellerV2) addPartialMiners(job float64, cycleEndTimeout
},
func(ID string, HrGHS, remainingJob float64, err error) {
p.log.Warnf("partial miner ended, id %s, hr %.f, remaining job %.f, error %s", ID, HrGHS, remainingJob, err)
p.checkErrorForUndeliverable(err)
p.stats.fullMiners.Remove(ID)
_ = p.stats.removePartialMiner(ID)
},
Expand Down Expand Up @@ -434,7 +465,6 @@ func (p *ContractWatcherSellerV2) removeAllFullMiners() {
miner.RemoveTasksByID(p.ID())
p.log.Debugf("full miner %s was removed from this contract", miner.ID())
}
return
}

func (p *ContractWatcherSellerV2) removeAllPartialMiners() {
Expand Down Expand Up @@ -473,6 +503,31 @@ func (p *ContractWatcherSellerV2) reportTotalStats() {
int(undeliveredJob), undeliveredFraction)
}

// checks if error is a destination error and if so, checks if conditions for undeliverable contract are met
func (p *ContractWatcherSellerV2) checkErrorForUndeliverable(err error) {
if err == nil {
p.firstCycleWithoutDestConnection.Store(math.MaxInt64)
return
}

p.SetError(err)

if errors.Is(err, proxy.ErrDest) || errors.Is(err, proxy.ErrConnectDest) {
p.log.Warnf("contract cycle %d is undeliverable, destination error: %s", p.iter, err)

swapped := p.firstCycleWithoutDestConnection.CompareAndSwap(math.MaxInt64, p.iter)
if swapped {
return
}

if p.iter-p.firstCycleWithoutDestConnection.Load() >= MaxConsequentCyclesWoDestConn {
p.log.Warnf("contract is undeliverable, max cycles without destination connection reached")
p.isUndeliverable.Store(true)
}
}

}

func (p *ContractWatcherSellerV2) isRunningBlockchain() bool {
return p.BlockchainState() == hashrateContract.BlockchainStateRunning
}
Expand Down Expand Up @@ -576,7 +631,11 @@ func (p *ContractWatcherSellerV2) ValidationStage() hashrateContract.ValidationS

// state getters
func (p *ContractWatcherSellerV2) FulfillmentStartTime() time.Time {
return p.fulfillmentStartedAt.Load().(time.Time)
startedAt, ok := p.fulfillmentStartedAt.Load().(time.Time)
if !ok {
return time.Time{}
}
return startedAt
}
func (p *ContractWatcherSellerV2) ResourceEstimatesActual() map[string]float64 {
return p.stats.actualHRGHS.GetHashrateAvgGHSAll()
Expand All @@ -598,6 +657,11 @@ func (p *ContractWatcherSellerV2) IsRunning() bool {
defer p.isRunningMutex.RUnlock()
return p.isRunning
}

func (p *ContractWatcherSellerV2) IsUndeliverable() bool {
return p.isUndeliverable.Load()
}

func (p *ContractWatcherSellerV2) StarvingGHS() int {
return int(p.starvingGHS.Load())
}
Expand Down Expand Up @@ -632,12 +696,19 @@ func (p *ContractWatcherSellerV2) SetTerms(terms *hashrate.Terms) {
}

p.Terms = terms
var poolDestStr string
if terms.PoolDest() != nil {
poolDestStr = terms.PoolDest().String()
}

p.log.Infof(
"contract terms updated: price %.f LMR, hashrate %.f GHS, duration %s, state %s",
"contract terms updated: price %s LMR, hashrate %.f GHS, duration %s, state %s, dest %s, poolDest %s",
terms.Price().String(),
terms.HashrateGHS(),
terms.Duration().Round(time.Second),
terms.BlockchainState().String(),
terms.Dest().String(),
poolDestStr,
)
}

Expand Down
4 changes: 4 additions & 0 deletions internal/resources/hashrate/contract/controller_seller.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,12 @@ func (c *ControllerSeller) handleDestinationUpdated(ctx context.Context, _ *impl
newDest := terms.Dest().String()

if currentDest == newDest {
c.log.Debugf("destination not changed, %s", newDest)
return nil
}

c.log.Infof("destination changed,\n OLD: %s \n NEW: %s", currentDest, newDest)

if c.IsRunning() {
c.ContractWatcherSellerV2.StopFulfilling()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error handling for StopFulfilling() is not addressed. If StopFulfilling() encounters an error or fails to execute as expected, the error is not captured or logged, which could lead to unmanaged states or issues within the application. It is crucial to handle such errors to maintain the integrity and reliability of the application.

Suggested Improvement:

if err := c.ContractWatcherSellerV2.StopFulfilling(); err != nil {
    c.log.Errorf("Failed to stop fulfilling: %s", err)
    return err
}

<-c.ContractWatcherSellerV2.Done()
Comment on lines 191 to 192
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method StopFulfilling() is called within a conditional block, and the completion is awaited using <-c.ContractWatcherSellerV2.Done(). This introduces a potential blocking operation which could degrade performance if the Done() channel does not receive a signal promptly. Consider implementing a timeout mechanism or handling potential delays/errors more robustly to prevent the application from hanging.

Suggested Improvement:

select {
case <-c.ContractWatcherSellerV2.Done():
    // Proceed after successful stop
case <-time.After(timeoutDuration):
    // Handle timeout scenario
}

Expand Down
1 change: 1 addition & 0 deletions internal/resources/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type Contract interface {
BlockchainState() hashrate.BlockchainState // the state of the contract in blockchain (pending or running)
ValidationStage() hashrate.ValidationStage // the stage of the contract validation (only buyer)
Error() error // the error that prevents contract from being fulfilled (only seller)
Comment on lines 22 to 24
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The methods BlockchainState(), ValidationStage(), and Error() expose internal state details directly, which could lead to security risks if not properly managed. Consider encapsulating these details more thoroughly and providing methods that offer decision-relevant information instead of direct state exposure. This approach can enhance security by limiting the accessibility of sensitive contract details and reducing the risk of misuse in broader application contexts.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Error() method currently returns only the last error encountered, which might not be sufficient for robust error handling in complex systems. Consider implementing an error logging or aggregation mechanism that can provide a more comprehensive view of errors over the lifecycle of a contract. This enhancement will allow for better diagnosis and resolution of issues, contributing to more reliable contract management.

IsUndeliverable() bool // whether the contract is undeliverable (only seller)

ID() string // ID is the unique identifier of the contract, for smart contract data source this is the smart contract address
Seller() string // ID of the seller (address of the seller for smart contract data source)
Expand Down
Loading