-
Notifications
You must be signed in to change notification settings - Fork 2
feat: handling undeliverable contracts on seller side #51
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -323,6 +323,7 @@ func (p *HTTPHandler) mapContract(ctx context.Context, item resources.Contract) | |
| ApplicationStatus: item.State().String(), // rw mutex canceable | ||
| BlockchainStatus: item.BlockchainState().String(), // readonly | ||
| Error: errString(item.Error()), // atomic | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The error handling in |
||
| IsUndeliverable: item.IsUndeliverable(), // atomic | ||
| Dest: item.Dest(), // readonly | ||
| PoolDest: item.PoolDest(), // readonly | ||
| // Miners: p.allocator.GetMinersFulfillingContract(item.ID(), p.cycleDuration), | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -102,6 +102,7 @@ type Contract struct { | |
| ApplicationStatus string | ||
| BlockchainStatus string | ||
| Error string | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The Suggested Change: type ContractError struct {
Code int
Message string
}
Error ContractErrorThis change allows easier error handling and integration with automated systems, improving maintainability and reliability. |
||
| IsUndeliverable bool | ||
| Dest string | ||
| PoolDest string | ||
|
Comment on lines
106
to
107
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The fields Suggested Change: |
||
| Miners []*allocator.MinerItemJobScheduled | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -339,3 +339,7 @@ func (p *ContractWatcherBuyer) StarvingGHS() int { | |
| func (p *ContractWatcherBuyer) Error() error { | ||
| return p.contractErr.Load() | ||
|
shev-titan marked this conversation as resolved.
|
||
| } | ||
|
|
||
| func (p *ContractWatcherBuyer) IsUndeliverable() bool { | ||
| return false | ||
|
Comment on lines
+343
to
+344
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The Suggested Improvement: func (p *ContractWatcherBuyer) IsUndeliverable() bool {
return p.contractErr.Load() != nil || p.starvingGHS.Load() > someThreshold
} |
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,7 @@ | ||
| package contract | ||
|
|
||
| import ( | ||
| "errors" | ||
| "fmt" | ||
| "math" | ||
| "net/url" | ||
|
|
@@ -14,6 +15,7 @@ import ( | |
| hashrateContract "github.com/Lumerin-protocol/proxy-router/internal/resources/hashrate" | ||
| "github.com/Lumerin-protocol/proxy-router/internal/resources/hashrate/allocator" | ||
| hr "github.com/Lumerin-protocol/proxy-router/internal/resources/hashrate/hashrate" | ||
| "github.com/Lumerin-protocol/proxy-router/internal/resources/hashrate/proxy" | ||
| "go.uber.org/atomic" | ||
| "golang.org/x/exp/slices" | ||
| ) | ||
|
Comment on lines
15
to
21
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The import aliases for the Recommendation: Use a consistent alias for the |
||
|
|
@@ -22,10 +24,17 @@ var ( | |
| ErrNotRunningBlockchain = fmt.Errorf("contract is not running in blockchain") | ||
| ErrStopped = fmt.Errorf("contract is stopped") | ||
| ErrAlreadyRunning = fmt.Errorf("contract is already running") | ||
| ErrUndeliverable = fmt.Errorf("contract is undeliverable") | ||
| ) | ||
|
|
||
| var ( | ||
| AdjustmentThresholdGHS = 100.0 | ||
| // max number of cycles without ability to connect to destination to treat contract as undeliverable | ||
| MaxConsequentCyclesWoDestConn int64 = 2 | ||
| FulfillmentDelay time.Duration = 10 * time.Second | ||
|
|
||
| FullMinerThresholdGHS float64 = 1000.0 // min hashrate required for next cycle to allocate full miners (miners that stay for the entire contract duration) | ||
| PartialMinerThresholdGHS float64 = 100.0 // min hashrate required for next cycle to allocate partial miners (miners allocated for just one cycle) | ||
| ) | ||
|
|
||
| type ContractWatcherSellerV2 struct { | ||
|
|
@@ -40,15 +49,19 @@ type ContractWatcherSellerV2 struct { | |
| cycleEndsAt time.Time | ||
| minerDisconnectCh *lib.ChanRecvStop[allocator.MinerItem] | ||
| deliveryLog *DeliveryLog | ||
| iter int64 | ||
|
|
||
| firstCycleWithoutDestConnection atomic.Int64 | ||
|
|
||
| // shared state | ||
| fulfillmentStartedAt atomic.Value // time.Time | ||
| starvingGHS atomic.Uint64 | ||
| err *atomic.Error | ||
|
|
||
| isRunning bool | ||
| isRunningMutex sync.RWMutex | ||
| contractErr atomic.Error // keeps the last error that happened in the contract that prevents it from fulfilling correctly, like invalid destination | ||
| isRunning bool | ||
| isRunningMutex sync.RWMutex | ||
| contractErr atomic.Error // keeps the last error that happened in the contract that prevents it from fulfilling correctly, like invalid destination | ||
| isUndeliverable atomic.Bool | ||
|
|
||
| // deps | ||
| *hashrate.Terms | ||
|
|
@@ -63,16 +76,17 @@ func NewContractWatcherSellerV2(terms *hashrateContract.Terms, cycleDuration tim | |
| stats: &stats{ | ||
| actualHRGHS: hashrateFactory(), | ||
| }, | ||
| isRunning: false, | ||
| startCh: make(chan struct{}), | ||
| stopCh: make(chan struct{}), | ||
| doneCh: make(chan struct{}), | ||
| err: atomic.NewError(nil), | ||
| deliveryLog: NewDeliveryLog(), | ||
| Terms: terms, | ||
| allocator: allocator, | ||
| hrFactory: hashrateFactory, | ||
| log: log, | ||
| isRunning: false, | ||
| startCh: make(chan struct{}), | ||
| stopCh: make(chan struct{}), | ||
| doneCh: make(chan struct{}), | ||
| err: atomic.NewError(nil), | ||
| deliveryLog: NewDeliveryLog(), | ||
| Terms: terms, | ||
| allocator: allocator, | ||
| hrFactory: hashrateFactory, | ||
| log: log, | ||
| firstCycleWithoutDestConnection: *atomic.NewInt64(math.MaxInt64), | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -141,6 +155,8 @@ func (p *ContractWatcherSellerV2) Reset() { | |
| p.startCh = make(chan struct{}) | ||
| p.stopCh = make(chan struct{}) | ||
| p.doneCh = make(chan struct{}) | ||
| p.iter = 0 | ||
| p.firstCycleWithoutDestConnection.Store(math.MaxInt64) | ||
| p.err = atomic.NewError(nil) | ||
| } | ||
|
|
||
|
Comment on lines
155
to
162
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Reinitializing channels ( Recommendation: Ensure all goroutines are properly terminated before reinitializing the channels, or consider using a different approach to manage the lifecycle of these channels to prevent resource leaks. |
||
|
|
@@ -151,7 +167,7 @@ func (p *ContractWatcherSellerV2) run() error { | |
| select { | ||
| case <-p.stopCh: | ||
| return ErrStopped | ||
| case <-time.After(10 * time.Second): | ||
| case <-time.After(FulfillmentDelay): | ||
| } | ||
|
|
||
| fullMiners := lib.NewSet() | ||
|
|
@@ -177,7 +193,7 @@ func (p *ContractWatcherSellerV2) run() error { | |
| p.stats.deliveryTargetGHS = p.HashrateGHS() | ||
|
|
||
| CONTRACT_CYCLE: | ||
| for { | ||
| for p.iter = 0; ; p.iter++ { | ||
| p.log.Debugf("new contract cycle started") | ||
| if !p.isRunningBlockchain() { | ||
| return ErrNotRunningBlockchain | ||
|
|
@@ -186,6 +202,11 @@ CONTRACT_CYCLE: | |
| return nil | ||
| } | ||
|
|
||
| if p.IsUndeliverable() { | ||
| p.log.Warnf("contract is undeliverable, exiting contract cycle") | ||
| return ErrUndeliverable | ||
| } | ||
|
|
||
| p.stats.partialMiners = p.stats.partialMiners[:0] | ||
| p.stats.jobFullMiners.Store(0) | ||
| p.stats.jobPartialMiners.Store(0) | ||
|
|
@@ -280,26 +301,24 @@ func (p *ContractWatcherSellerV2) onCycleEnd(cycleDuration time.Duration) { | |
| func (p *ContractWatcherSellerV2) adjustHashrate(hashrateGHS float64) (adjustedGHS float64) { | ||
| // TODO: move this function to allocator, optimize to make only one snapshot of miners | ||
| expectedAdjustmentGHS := hashrateGHS | ||
| fullMinerThresholdGHS := 1000.0 | ||
| partialMinersThresholdGHS := 100.0 | ||
|
|
||
| adjustmentRequired := math.Abs(hashrateGHS) > AdjustmentThresholdGHS | ||
| if !adjustmentRequired { | ||
| p.starvingGHS.Store(0) | ||
| return 0 | ||
| } | ||
|
|
||
| if hashrateGHS < -fullMinerThresholdGHS { | ||
| if hashrateGHS < -FullMinerThresholdGHS { | ||
| hashrateGHS += p.removeFullMiners(hashrateGHS) | ||
| } | ||
|
|
||
| if hashrateGHS > fullMinerThresholdGHS { | ||
| if hashrateGHS > FullMinerThresholdGHS { | ||
| hashrateGHS -= p.addFullMiners(hashrateGHS) | ||
| } | ||
|
|
||
| remainingCycleDuration := p.remainingCycleDuration() | ||
|
|
||
| if hashrateGHS > partialMinersThresholdGHS { | ||
| if hashrateGHS > PartialMinerThresholdGHS { | ||
| job := hr.GHSToJobSubmittedV2(hashrateGHS, remainingCycleDuration) | ||
| addedJob := p.addPartialMiners(job, remainingCycleDuration) | ||
| addedGHS := hr.JobSubmittedToGHSV2(addedJob, remainingCycleDuration) | ||
|
|
@@ -337,8 +356,9 @@ func (p *ContractWatcherSellerV2) addFullMiners(hashrateGHS float64) (addedGHS f | |
| }) | ||
| }, | ||
| func(ID string, HrGHS, remainingJob float64, err error) { | ||
| p.log.Warnf("full miner ended, id %s, hr %.f, remaining job %d, error %s", ID, HrGHS, remainingJob, err) | ||
| p.log.Warnf("full miner ended, id %s, hr %.f, remaining job %.f, error %s", ID, HrGHS, remainingJob, err) | ||
| p.stats.fullMiners.Remove(ID) | ||
| p.checkErrorForUndeliverable(err) | ||
| }, | ||
| ) | ||
| if len(fullMiners) > 0 { | ||
|
|
@@ -350,27 +370,37 @@ func (p *ContractWatcherSellerV2) addFullMiners(hashrateGHS float64) (addedGHS f | |
| } | ||
|
|
||
| // removeFullMiners removes full miners, cause they persist for the duration of the contract | ||
| // | ||
| // Expects hrGHS to be a negative number | ||
| func (p *ContractWatcherSellerV2) removeFullMiners(hrGHS float64) (removedGHS float64) { | ||
| items := p.getFullMinersSorted() | ||
|
|
||
| if len(items) == 0 { | ||
| p.log.Warnf("no miners found to be removed") | ||
| if hrGHS >= 0 { | ||
| p.log.Warnf("removeFullMiners expects hrGHS to be a negative number, got %.f", hrGHS) | ||
| return 0 | ||
| } | ||
|
|
||
| hrToRemove := -hrGHS | ||
|
|
||
| items := p.getFullMinersSorted() | ||
|
|
||
| for _, item := range items { | ||
| minerToRemove := item.ID | ||
| miner, ok := p.allocator.GetMiners().Load(minerToRemove) | ||
| if ok { | ||
| miner.RemoveTasksByID(p.ID()) | ||
| removedGHS = +miner.HashrateGHS() | ||
| removedGHS += miner.HashrateGHS() | ||
| } | ||
| _ = p.stats.removeFullMiner(minerToRemove) | ||
| if hrGHS-removedGHS < 0 { | ||
| if removedGHS >= hrToRemove { | ||
| break | ||
| } | ||
| } | ||
|
|
||
| p.log.Debugf("removed %d full miners, removedGHS %.f", len(items)-p.stats.fullMiners.Len(), removedGHS) | ||
| if len(items) == 0 { | ||
| p.log.Warnf("no miners found to be removed") | ||
| } else { | ||
| p.log.Debugf("removed %d full miners, removedGHS %.f", len(items)-p.stats.fullMiners.Len(), removedGHS) | ||
| } | ||
|
|
||
| p.log.Debugf("full miners: %v", p.stats.fullMiners.ToSlice()) | ||
| return removedGHS | ||
| } | ||
|
|
@@ -403,6 +433,7 @@ func (p *ContractWatcherSellerV2) addPartialMiners(job float64, cycleEndTimeout | |
| }, | ||
| func(ID string, HrGHS, remainingJob float64, err error) { | ||
| p.log.Warnf("partial miner ended, id %s, hr %.f, remaining job %.f, error %s", ID, HrGHS, remainingJob, err) | ||
| p.checkErrorForUndeliverable(err) | ||
| p.stats.fullMiners.Remove(ID) | ||
| _ = p.stats.removePartialMiner(ID) | ||
| }, | ||
|
|
@@ -434,7 +465,6 @@ func (p *ContractWatcherSellerV2) removeAllFullMiners() { | |
| miner.RemoveTasksByID(p.ID()) | ||
| p.log.Debugf("full miner %s was removed from this contract", miner.ID()) | ||
| } | ||
| return | ||
| } | ||
|
|
||
| func (p *ContractWatcherSellerV2) removeAllPartialMiners() { | ||
|
|
@@ -473,6 +503,31 @@ func (p *ContractWatcherSellerV2) reportTotalStats() { | |
| int(undeliveredJob), undeliveredFraction) | ||
| } | ||
|
|
||
| // checks if error is a destination error and if so, checks if conditions for undeliverable contract are met | ||
| func (p *ContractWatcherSellerV2) checkErrorForUndeliverable(err error) { | ||
| if err == nil { | ||
| p.firstCycleWithoutDestConnection.Store(math.MaxInt64) | ||
| return | ||
| } | ||
|
|
||
| p.SetError(err) | ||
|
|
||
| if errors.Is(err, proxy.ErrDest) || errors.Is(err, proxy.ErrConnectDest) { | ||
| p.log.Warnf("contract cycle %d is undeliverable, destination error: %s", p.iter, err) | ||
|
|
||
| swapped := p.firstCycleWithoutDestConnection.CompareAndSwap(math.MaxInt64, p.iter) | ||
| if swapped { | ||
| return | ||
| } | ||
|
|
||
| if p.iter-p.firstCycleWithoutDestConnection.Load() >= MaxConsequentCyclesWoDestConn { | ||
| p.log.Warnf("contract is undeliverable, max cycles without destination connection reached") | ||
| p.isUndeliverable.Store(true) | ||
| } | ||
| } | ||
|
|
||
| } | ||
|
|
||
| func (p *ContractWatcherSellerV2) isRunningBlockchain() bool { | ||
| return p.BlockchainState() == hashrateContract.BlockchainStateRunning | ||
| } | ||
|
|
@@ -576,7 +631,11 @@ func (p *ContractWatcherSellerV2) ValidationStage() hashrateContract.ValidationS | |
|
|
||
| // state getters | ||
| func (p *ContractWatcherSellerV2) FulfillmentStartTime() time.Time { | ||
| return p.fulfillmentStartedAt.Load().(time.Time) | ||
| startedAt, ok := p.fulfillmentStartedAt.Load().(time.Time) | ||
| if !ok { | ||
| return time.Time{} | ||
| } | ||
| return startedAt | ||
| } | ||
| func (p *ContractWatcherSellerV2) ResourceEstimatesActual() map[string]float64 { | ||
| return p.stats.actualHRGHS.GetHashrateAvgGHSAll() | ||
|
|
@@ -598,6 +657,11 @@ func (p *ContractWatcherSellerV2) IsRunning() bool { | |
| defer p.isRunningMutex.RUnlock() | ||
| return p.isRunning | ||
| } | ||
|
|
||
| func (p *ContractWatcherSellerV2) IsUndeliverable() bool { | ||
| return p.isUndeliverable.Load() | ||
| } | ||
|
|
||
| func (p *ContractWatcherSellerV2) StarvingGHS() int { | ||
| return int(p.starvingGHS.Load()) | ||
| } | ||
|
|
@@ -632,12 +696,19 @@ func (p *ContractWatcherSellerV2) SetTerms(terms *hashrate.Terms) { | |
| } | ||
|
|
||
| p.Terms = terms | ||
| var poolDestStr string | ||
| if terms.PoolDest() != nil { | ||
| poolDestStr = terms.PoolDest().String() | ||
| } | ||
|
|
||
| p.log.Infof( | ||
| "contract terms updated: price %.f LMR, hashrate %.f GHS, duration %s, state %s", | ||
| "contract terms updated: price %s LMR, hashrate %.f GHS, duration %s, state %s, dest %s, poolDest %s", | ||
| terms.Price().String(), | ||
| terms.HashrateGHS(), | ||
| terms.Duration().Round(time.Second), | ||
| terms.BlockchainState().String(), | ||
| terms.Dest().String(), | ||
| poolDestStr, | ||
| ) | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -181,8 +181,12 @@ func (c *ControllerSeller) handleDestinationUpdated(ctx context.Context, _ *impl | |
| newDest := terms.Dest().String() | ||
|
|
||
| if currentDest == newDest { | ||
| c.log.Debugf("destination not changed, %s", newDest) | ||
| return nil | ||
| } | ||
|
|
||
| c.log.Infof("destination changed,\n OLD: %s \n NEW: %s", currentDest, newDest) | ||
|
|
||
| if c.IsRunning() { | ||
| c.ContractWatcherSellerV2.StopFulfilling() | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The error handling for Suggested Improvement: if err := c.ContractWatcherSellerV2.StopFulfilling(); err != nil {
c.log.Errorf("Failed to stop fulfilling: %s", err)
return err
} |
||
| <-c.ContractWatcherSellerV2.Done() | ||
|
Comment on lines
191
to
192
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The method Suggested Improvement: select {
case <-c.ContractWatcherSellerV2.Done():
// Proceed after successful stop
case <-time.After(timeoutDuration):
// Handle timeout scenario
} |
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,6 +22,7 @@ type Contract interface { | |
| BlockchainState() hashrate.BlockchainState // the state of the contract in blockchain (pending or running) | ||
| ValidationStage() hashrate.ValidationStage // the stage of the contract validation (only buyer) | ||
| Error() error // the error that prevents contract from being fulfilled (only seller) | ||
|
Comment on lines
22
to
24
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The methods There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||
| IsUndeliverable() bool // whether the contract is undeliverable (only seller) | ||
|
|
||
| ID() string // ID is the unique identifier of the contract, for smart contract data source this is the smart contract address | ||
| Seller() string // ID of the seller (address of the seller for smart contract data source) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
ApplicationStatusis derived fromitem.State().String(), which is noted asrw mutex cancelable. This implies potential read-write conflicts or race conditions if the state is accessed concurrently without proper synchronization. To ensure thread safety and data consistency, consider implementing locking mechanisms (such as mutexes) around accesses to the state, or ensure that the state is accessed in a thread-safe manner if not already handled withinState()method.