-
Notifications
You must be signed in to change notification settings - Fork 12
feat: Add Traffic Limit Enforcer #25
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
Conversation
- Add limit_enforcer.go with real-time limit checking - Add traffic_limits.go for limit data structures - Update service.proto with limit enforcer config params - Update controller to pass limit config on start - Add tests for limit enforcer functionality
WalkthroughA traffic limit enforcement system is introduced, featuring a caching layer for user-per-node traffic limits fetched from a panel API, monitoring and enforcement logic integrated into the controller, and REST/RPC integration for configuration and metrics exposure. Changes
Sequence Diagram(s)sequenceDiagram
participant API as REST/RPC API
participant Ctrl as Controller
participant Enf as LimitEnforcer
participant Cache as TrafficLimitsCache
participant PanelAPI as Panel API
participant BE as Backend
API->>Ctrl: StartLimitEnforcer(params)
Ctrl->>Enf: NewLimitEnforcer + Start()
Enf->>Cache: NewTrafficLimitsCache(panelURL)
Enf->>Cache: StartAutoRefresh(interval)
activate Cache
Cache->>Cache: Periodic refresh goroutine
Cache->>PanelAPI: GET /limits (with ETag)
PanelAPI-->>Cache: 200 OK + Limits JSON
Cache->>Cache: Swap atomic limits map
deactivate Cache
loop Periodic Check (every checkInterval)
Enf->>Enf: Scan traffic stats from Backend
Enf->>Cache: GetLimit(userID)
Cache-->>Enf: Limit value
Enf->>Enf: Compare traffic vs limit
alt Traffic Exceeds Limit
Enf->>BE: SyncUser(userID, empty inbounds)
BE->>BE: Remove user connection
end
Enf->>Enf: Record metrics
end
Enf->>Enf: Background cleanup (inactive users)
Enf->>Enf: Backoff on API errors
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 7
🤖 Fix all issues with AI agents
In `@backend/backend.go`:
- Around line 29-33: The LimitEnforcer interface currently declares Start(ctx
context.Context, refreshInterval interface{}), which mismatches implementations
and callers; change the Start method signature on the LimitEnforcer interface to
use time.Duration (i.e., Start(ctx context.Context, refreshInterval
time.Duration)) and import the time package if not already imported so the
interface matches controller/limit_enforcer.go and controller/controller.go.
In `@common/traffic_limits_test.go`:
- Around line 98-131: The test TestTrafficLimitsCache_UpdateFromPush_Incremental
currently expects tlc.UpdateFromPush(...{UserID:2, DataLimit:0}...) to remove
the entry, but the struct doc semantics treat DataLimit==0 as "unlimited" and
should be retained; update the assertions instead of the implementation: after
calling UpdateFromPush with DataLimit:0 assert that tlc.HasLimit(2) is true and
tlc.GetLimit(2) == 0 (or otherwise reflect "unlimited"), or if you prefer
removal semantics change the push contract to use an explicit delete signal;
adjust the test assertions around UpdateFromPush, NodeUserLimit.DataLimit,
HasLimit and GetLimit accordingly.
In `@common/traffic_limits.go`:
- Around line 387-390: The Stop method on TrafficLimitsCache can panic if called
multiple times because it closes tlc.stopChan unguarded; make Stop idempotent by
adding a sync.Once field (e.g., stopOnce) to the TrafficLimitsCache struct and
replace the direct close call in Stop() with tlc.stopOnce.Do(func(){ if
tlc.stopChan != nil { close(tlc.stopChan) } }), ensuring the channel is only
closed once and safe if Stop() is invoked repeatedly.
- Around line 261-296: UpdateFromPush currently treats
NodeUserLimit.DataLimit==0 differently between fullSync and incremental paths
(fullSync stores 0 but incremental deletes the entry), which loses the explicit
"0 = unlimited" state; to fix, stop deleting entries on DataLimit==0 in the
incremental branch and instead store the 0 value into newLimits (i.e., always
set newLimits[limit.UserID]=limit.DataLimit), and reserve actual removals for
fullSync or when an explicit tombstone is provided in NodeUserLimit (add a
tombstone flag or use a negative sentinel) so the semantics of 0 (unlimited) are
preserved; update the logic around UpdateFromPush, the loop that processes
limits, and any consumers relying on tlc.limitsPtr/userLimitsData to expect 0 as
"unlimited" rather than absence.
In `@controller/controller.go`:
- Around line 66-106: StartLimitEnforcer currently treats zero as default but
allows negative intervals from LimitEnforcerParams (LimitCheckInterval,
LimitRefreshInterval) which can cause time.NewTicker panics inside
LimitEnforcer; update StartLimitEnforcer to treat any non‑positive values (<= 0)
as the defaults (30s for CheckInterval, 60s for RefreshInterval) before
constructing the LimitEnforcer config and calling NewLimitEnforcer/Start so the
enforcer always receives safe, positive durations.
In `@controller/limit_enforcer.go`:
- Around line 108-113: Make LimitEnforcer.Stop idempotent to avoid double-close
panics by introducing a sync.Once (e.g., a stopOnce field) on the LimitEnforcer
struct and moving the close(le.stopChan), le.limitsCache.Stop(), and
log.Println("Limit enforcer stopped") calls into stopOnce.Do(...). Update the
Stop method to only call stopOnce.Do with that closure so repeated Stop calls
are safe; ensure the LimitEnforcer struct declares the stopOnce sync.Once and
existing code references (Stop method, le.stopChan, le.limitsCache.Stop) are
used inside the closure.
- Around line 294-316: removeOverLimitUsers currently calls b.SyncUser(ctx, ...)
with the long-lived ctx (originating from Start), so a hung SyncUser can block
enforcement and Stop; wrap each SyncUser invocation in a bounded context: create
a child context with a short timeout (e.g. context.WithTimeout(ctx,
<reasonableDuration>)) and defer its cancel before calling b.SyncUser, use that
timed ctx for SyncUser so hung backend calls are aborted and metrics/logging
still occur; update removeOverLimitUsers to use the new timed context for each
user and ensure you reference removeOverLimitUsers and the backend method
SyncUser when making the change.
🧹 Nitpick comments (2)
common/traffic_limits.go (2)
114-117: Consider guarding against a nil logger.If
SetLogger(nil)is called, later logging will panic. A small guard avoids that.🔧 Suggested guard
func (tlc *TrafficLimitsCache) SetLogger(logger Logger) { - tlc.logger = logger + if logger == nil { + tlc.logger = &defaultLogger{} + return + } + tlc.logger = logger }
298-342: Optional: seed jitter to avoid synchronized backoff.
math/randdefaults to a deterministic seed; across many nodes this reduces jitter’s effectiveness. Consider seeding once in the constructor or using a per‑instancerand.Rand.
| // LimitEnforcer interface for traffic limit enforcement | ||
| type LimitEnforcer interface { | ||
| Start(ctx context.Context, refreshInterval interface{}) | ||
| Stop() | ||
| ResetUserTraffic(userID int) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
# First, check the actual file structure and locate backend.go
find . -name "backend.go" -type f | head -20Repository: PasarGuard/node
Length of output: 79
🏁 Script executed:
# Search for LimitEnforcer interface definition
rg -n "type LimitEnforcer interface" -A 10Repository: PasarGuard/node
Length of output: 337
🏁 Script executed:
# Find all implementers of LimitEnforcer
rg -n "func.*Start.*context\.Context" --type go | head -30Repository: PasarGuard/node
Length of output: 1247
🏁 Script executed:
# Find call sites where Start is called
rg -n "\.Start\(" --type go -B 2 -A 2 | head -50Repository: PasarGuard/node
Length of output: 2343
🏁 Script executed:
# Check the specific call site in controller.go to see what type refreshInterval is
rg -n "refreshInterval" --type go -B 3 -A 3Repository: PasarGuard/node
Length of output: 1954
🏁 Script executed:
# Get more context around the LimitEnforcer implementation
cat -n controller/limit_enforcer.go | sed -n '90,100p'Repository: PasarGuard/node
Length of output: 465
🏁 Script executed:
# Check what type refreshInterval is in the controller
cat -n controller/controller.go | sed -n '95,110p'Repository: PasarGuard/node
Length of output: 764
🏁 Script executed:
# Check if there are other implementations of LimitEnforcer interface
rg -n "func.*LimitEnforcer.*Start" --type goRepository: PasarGuard/node
Length of output: 175
🏁 Script executed:
# Check the imports in backend/backend.go to see what's currently imported
cat -n backend/backend.go | head -30Repository: PasarGuard/node
Length of output: 1126
🏁 Script executed:
# Verify the complete LimitEnforcer interface definition
cat -n backend/backend.go | sed -n '29,40p'Repository: PasarGuard/node
Length of output: 311
Update the LimitEnforcer interface signature to use time.Duration instead of interface{}.
The interface declaration mismatches the implementation and call site. The actual implementation in controller/limit_enforcer.go:95 already correctly uses time.Duration, and the caller in controller/controller.go:104 already passes time.Duration. Update the interface definition in backend/backend.go to match.
♻️ Required changes
import (
"context"
+ "time"
"github.com/pasarguard/node/common"
)
// LimitEnforcer interface for traffic limit enforcement
type LimitEnforcer interface {
- Start(ctx context.Context, refreshInterval interface{})
+ Start(ctx context.Context, refreshInterval time.Duration)
Stop()
ResetUserTraffic(userID int)
ResetAllTraffic()
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // LimitEnforcer interface for traffic limit enforcement | |
| type LimitEnforcer interface { | |
| Start(ctx context.Context, refreshInterval interface{}) | |
| Stop() | |
| ResetUserTraffic(userID int) | |
| import ( | |
| "context" | |
| "time" | |
| "github.com/pasarguard/node/common" | |
| ) | |
| // LimitEnforcer interface for traffic limit enforcement | |
| type LimitEnforcer interface { | |
| Start(ctx context.Context, refreshInterval time.Duration) | |
| Stop() | |
| ResetUserTraffic(userID int) | |
| ResetAllTraffic() | |
| } |
🤖 Prompt for AI Agents
In `@backend/backend.go` around lines 29 - 33, The LimitEnforcer interface
currently declares Start(ctx context.Context, refreshInterval interface{}),
which mismatches implementations and callers; change the Start method signature
on the LimitEnforcer interface to use time.Duration (i.e., Start(ctx
context.Context, refreshInterval time.Duration)) and import the time package if
not already imported so the interface matches controller/limit_enforcer.go and
controller/controller.go.
| func TestTrafficLimitsCache_UpdateFromPush_Incremental(t *testing.T) { | ||
| tlc := NewTrafficLimitsCache(1, "http://localhost", "test-key") | ||
|
|
||
| // Initial limits | ||
| tlc.UpdateFromPush([]NodeUserLimit{ | ||
| {UserID: 1, DataLimit: 1000}, | ||
| {UserID: 2, DataLimit: 2000}, | ||
| }, true) | ||
|
|
||
| // Incremental update - should update user 1 and add user 3 | ||
| tlc.UpdateFromPush([]NodeUserLimit{ | ||
| {UserID: 1, DataLimit: 1500}, | ||
| {UserID: 3, DataLimit: 3000}, | ||
| }, false) | ||
|
|
||
| if tlc.GetLimit(1) != 1500 { | ||
| t.Errorf("User 1 limit = %d, want 1500", tlc.GetLimit(1)) | ||
| } | ||
| if tlc.GetLimit(2) != 2000 { | ||
| t.Errorf("User 2 limit = %d, want 2000 (unchanged)", tlc.GetLimit(2)) | ||
| } | ||
| if tlc.GetLimit(3) != 3000 { | ||
| t.Errorf("User 3 limit = %d, want 3000", tlc.GetLimit(3)) | ||
| } | ||
|
|
||
| // Incremental update with 0 - should remove the limit | ||
| tlc.UpdateFromPush([]NodeUserLimit{ | ||
| {UserID: 2, DataLimit: 0}, | ||
| }, false) | ||
|
|
||
| if tlc.HasLimit(2) { | ||
| t.Error("User 2 should not have limit after removal") | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Align incremental test with “0 = unlimited” semantics.
If 0 means unlimited (per struct doc), incremental updates should keep the entry rather than remove it. Update expectations or use an explicit delete signal.
🧪 Possible adjustment (if 0 should be retained)
- // Incremental update with 0 - should remove the limit
+ // Incremental update with 0 - should keep explicit unlimited
tlc.UpdateFromPush([]NodeUserLimit{
{UserID: 2, DataLimit: 0},
}, false)
- if tlc.HasLimit(2) {
- t.Error("User 2 should not have limit after removal")
- }
+ if !tlc.HasLimit(2) {
+ t.Error("User 2 should keep explicit unlimited limit")
+ }
+ if tlc.GetLimit(2) != 0 {
+ t.Errorf("User 2 limit = %d, want 0", tlc.GetLimit(2))
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func TestTrafficLimitsCache_UpdateFromPush_Incremental(t *testing.T) { | |
| tlc := NewTrafficLimitsCache(1, "http://localhost", "test-key") | |
| // Initial limits | |
| tlc.UpdateFromPush([]NodeUserLimit{ | |
| {UserID: 1, DataLimit: 1000}, | |
| {UserID: 2, DataLimit: 2000}, | |
| }, true) | |
| // Incremental update - should update user 1 and add user 3 | |
| tlc.UpdateFromPush([]NodeUserLimit{ | |
| {UserID: 1, DataLimit: 1500}, | |
| {UserID: 3, DataLimit: 3000}, | |
| }, false) | |
| if tlc.GetLimit(1) != 1500 { | |
| t.Errorf("User 1 limit = %d, want 1500", tlc.GetLimit(1)) | |
| } | |
| if tlc.GetLimit(2) != 2000 { | |
| t.Errorf("User 2 limit = %d, want 2000 (unchanged)", tlc.GetLimit(2)) | |
| } | |
| if tlc.GetLimit(3) != 3000 { | |
| t.Errorf("User 3 limit = %d, want 3000", tlc.GetLimit(3)) | |
| } | |
| // Incremental update with 0 - should remove the limit | |
| tlc.UpdateFromPush([]NodeUserLimit{ | |
| {UserID: 2, DataLimit: 0}, | |
| }, false) | |
| if tlc.HasLimit(2) { | |
| t.Error("User 2 should not have limit after removal") | |
| } | |
| } | |
| func TestTrafficLimitsCache_UpdateFromPush_Incremental(t *testing.T) { | |
| tlc := NewTrafficLimitsCache(1, "http://localhost", "test-key") | |
| // Initial limits | |
| tlc.UpdateFromPush([]NodeUserLimit{ | |
| {UserID: 1, DataLimit: 1000}, | |
| {UserID: 2, DataLimit: 2000}, | |
| }, true) | |
| // Incremental update - should update user 1 and add user 3 | |
| tlc.UpdateFromPush([]NodeUserLimit{ | |
| {UserID: 1, DataLimit: 1500}, | |
| {UserID: 3, DataLimit: 3000}, | |
| }, false) | |
| if tlc.GetLimit(1) != 1500 { | |
| t.Errorf("User 1 limit = %d, want 1500", tlc.GetLimit(1)) | |
| } | |
| if tlc.GetLimit(2) != 2000 { | |
| t.Errorf("User 2 limit = %d, want 2000 (unchanged)", tlc.GetLimit(2)) | |
| } | |
| if tlc.GetLimit(3) != 3000 { | |
| t.Errorf("User 3 limit = %d, want 3000", tlc.GetLimit(3)) | |
| } | |
| // Incremental update with 0 - should keep explicit unlimited | |
| tlc.UpdateFromPush([]NodeUserLimit{ | |
| {UserID: 2, DataLimit: 0}, | |
| }, false) | |
| if !tlc.HasLimit(2) { | |
| t.Error("User 2 should keep explicit unlimited limit") | |
| } | |
| if tlc.GetLimit(2) != 0 { | |
| t.Errorf("User 2 limit = %d, want 0", tlc.GetLimit(2)) | |
| } | |
| } |
🤖 Prompt for AI Agents
In `@common/traffic_limits_test.go` around lines 98 - 131, The test
TestTrafficLimitsCache_UpdateFromPush_Incremental currently expects
tlc.UpdateFromPush(...{UserID:2, DataLimit:0}...) to remove the entry, but the
struct doc semantics treat DataLimit==0 as "unlimited" and should be retained;
update the assertions instead of the implementation: after calling
UpdateFromPush with DataLimit:0 assert that tlc.HasLimit(2) is true and
tlc.GetLimit(2) == 0 (or otherwise reflect "unlimited"), or if you prefer
removal semantics change the push contract to use an explicit delete signal;
adjust the test assertions around UpdateFromPush, NodeUserLimit.DataLimit,
HasLimit and GetLimit accordingly.
| // UpdateFromPush updates the cache from a gRPC push (for future use) | ||
| // fullSync: true = replace all limits, false = incremental update | ||
| func (tlc *TrafficLimitsCache) UpdateFromPush(limits []NodeUserLimit, fullSync bool) { | ||
| tlc.refreshMu.Lock() | ||
| defer tlc.refreshMu.Unlock() | ||
|
|
||
| var newLimits map[int]int64 | ||
|
|
||
| if fullSync { | ||
| // Full replacement | ||
| newLimits = make(map[int]int64, len(limits)) | ||
| } else { | ||
| // Incremental: copy existing and update | ||
| data := tlc.limitsPtr.Load() | ||
| if data != nil && data.limits != nil { | ||
| newLimits = make(map[int]int64, len(data.limits)+len(limits)) | ||
| for k, v := range data.limits { | ||
| newLimits[k] = v | ||
| } | ||
| } else { | ||
| newLimits = make(map[int]int64, len(limits)) | ||
| } | ||
| } | ||
|
|
||
| for _, limit := range limits { | ||
| if limit.DataLimit == 0 && !fullSync { | ||
| // Remove limit in incremental mode | ||
| delete(newLimits, limit.UserID) | ||
| } else { | ||
| newLimits[limit.UserID] = limit.DataLimit | ||
| } | ||
| } | ||
|
|
||
| tlc.limitsPtr.Store(&userLimitsData{limits: newLimits}) | ||
| tlc.lastUpdate = time.Now() | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DataLimit==0 handling is inconsistent with “0 = unlimited”.
Full sync keeps 0 values (so HasLimit is true), but incremental updates delete entries on 0, flipping HasLimit to false and losing explicit “unlimited” state. This makes behavior depend on update mode. Consider always storing 0 and reserving removals for full sync or an explicit tombstone.
🐛 Proposed fix (preserve explicit unlimited)
- for _, limit := range limits {
- if limit.DataLimit == 0 && !fullSync {
- // Remove limit in incremental mode
- delete(newLimits, limit.UserID)
- } else {
- newLimits[limit.UserID] = limit.DataLimit
- }
- }
+ for _, limit := range limits {
+ newLimits[limit.UserID] = limit.DataLimit
+ }🤖 Prompt for AI Agents
In `@common/traffic_limits.go` around lines 261 - 296, UpdateFromPush currently
treats NodeUserLimit.DataLimit==0 differently between fullSync and incremental
paths (fullSync stores 0 but incremental deletes the entry), which loses the
explicit "0 = unlimited" state; to fix, stop deleting entries on DataLimit==0 in
the incremental branch and instead store the 0 value into newLimits (i.e.,
always set newLimits[limit.UserID]=limit.DataLimit), and reserve actual removals
for fullSync or when an explicit tombstone is provided in NodeUserLimit (add a
tombstone flag or use a negative sentinel) so the semantics of 0 (unlimited) are
preserved; update the logic around UpdateFromPush, the loop that processes
limits, and any consumers relying on tlc.limitsPtr/userLimitsData to expect 0 as
"unlimited" rather than absence.
| // Stop gracefully stops the auto-refresh goroutine | ||
| func (tlc *TrafficLimitsCache) Stop() { | ||
| close(tlc.stopChan) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make Stop() idempotent to avoid panics.
Calling Stop() twice will panic due to closing a closed channel. A sync.Once guard fixes this.
🔒 Suggested fix
type TrafficLimitsCache struct {
nodeID int
panelAPIURL string
apiKey string
httpClient *http.Client
stopChan chan struct{}
logger Logger
+ stopOnce sync.Once
// ...
}
func (tlc *TrafficLimitsCache) Stop() {
- close(tlc.stopChan)
+ tlc.stopOnce.Do(func() {
+ close(tlc.stopChan)
+ })
}🤖 Prompt for AI Agents
In `@common/traffic_limits.go` around lines 387 - 390, The Stop method on
TrafficLimitsCache can panic if called multiple times because it closes
tlc.stopChan unguarded; make Stop idempotent by adding a sync.Once field (e.g.,
stopOnce) to the TrafficLimitsCache struct and replace the direct close call in
Stop() with tlc.stopOnce.Do(func(){ if tlc.stopChan != nil { close(tlc.stopChan)
} }), ensuring the channel is only closed once and safe if Stop() is invoked
repeatedly.
| // LimitEnforcerParams contains parameters for starting the limit enforcer | ||
| // These are passed from the panel via the Backend message | ||
| type LimitEnforcerParams struct { | ||
| NodeID int32 | ||
| PanelAPIURL string | ||
| LimitCheckInterval int32 // seconds | ||
| LimitRefreshInterval int32 // seconds | ||
| } | ||
|
|
||
| // StartLimitEnforcer starts the limit enforcer with config from panel | ||
| // Should be called after Connect() with params from Backend message | ||
| func (c *Controller) StartLimitEnforcer(params LimitEnforcerParams) { | ||
| if params.PanelAPIURL == "" || params.NodeID <= 0 { | ||
| return // Limit enforcer not configured by panel | ||
| } | ||
|
|
||
| c.mu.Lock() | ||
| defer c.mu.Unlock() | ||
|
|
||
| // Use defaults if not specified | ||
| checkInterval := time.Duration(params.LimitCheckInterval) * time.Second | ||
| if checkInterval == 0 { | ||
| checkInterval = 30 * time.Second | ||
| } | ||
| refreshInterval := time.Duration(params.LimitRefreshInterval) * time.Second | ||
| if refreshInterval == 0 { | ||
| refreshInterval = 60 * time.Second | ||
| } | ||
|
|
||
| // Create context that will be cancelled when controller disconnects | ||
| ctx := context.Background() | ||
|
|
||
| c.limitEnforcer = NewLimitEnforcer(c, LimitEnforcerConfig{ | ||
| NodeID: int(params.NodeID), | ||
| PanelAPIURL: params.PanelAPIURL, | ||
| APIKey: c.cfg.ApiKey.String(), | ||
| CheckInterval: checkInterval, | ||
| }) | ||
| c.limitEnforcer.Start(ctx, refreshInterval) | ||
| log.Printf("Limit enforcer started for node %d (panel: %s)", params.NodeID, params.PanelAPIURL) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Guard against non‑positive intervals from panel.
Negative values will flow into time.NewTicker inside the enforcer and panic. Treat <= 0 as default.
🛠️ Proposed fix
// Use defaults if not specified
checkInterval := time.Duration(params.LimitCheckInterval) * time.Second
- if checkInterval == 0 {
+ if checkInterval <= 0 {
checkInterval = 30 * time.Second
}
refreshInterval := time.Duration(params.LimitRefreshInterval) * time.Second
- if refreshInterval == 0 {
+ if refreshInterval <= 0 {
refreshInterval = 60 * time.Second
}🤖 Prompt for AI Agents
In `@controller/controller.go` around lines 66 - 106, StartLimitEnforcer currently
treats zero as default but allows negative intervals from LimitEnforcerParams
(LimitCheckInterval, LimitRefreshInterval) which can cause time.NewTicker panics
inside LimitEnforcer; update StartLimitEnforcer to treat any non‑positive values
(<= 0) as the defaults (30s for CheckInterval, 60s for RefreshInterval) before
constructing the LimitEnforcer config and calling NewLimitEnforcer/Start so the
enforcer always receives safe, positive durations.
| // Stop gracefully stops the limit enforcer | ||
| func (le *LimitEnforcer) Stop() { | ||
| close(le.stopChan) | ||
| le.limitsCache.Stop() | ||
| log.Println("Limit enforcer stopped") | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make Stop idempotent to avoid double-close panics.
close(le.stopChan) will panic if Stop is called twice (possible with concurrent Disconnect paths).
🛠️ Proposed fix
type LimitEnforcer struct {
controller *Controller
limitsCache *common.TrafficLimitsCache
checkInterval time.Duration
stopChan chan struct{}
+ stopOnce sync.Once
@@
func (le *LimitEnforcer) Stop() {
- close(le.stopChan)
- le.limitsCache.Stop()
- log.Println("Limit enforcer stopped")
+ le.stopOnce.Do(func() {
+ close(le.stopChan)
+ le.limitsCache.Stop()
+ log.Println("Limit enforcer stopped")
+ })
}🤖 Prompt for AI Agents
In `@controller/limit_enforcer.go` around lines 108 - 113, Make LimitEnforcer.Stop
idempotent to avoid double-close panics by introducing a sync.Once (e.g., a
stopOnce field) on the LimitEnforcer struct and moving the close(le.stopChan),
le.limitsCache.Stop(), and log.Println("Limit enforcer stopped") calls into
stopOnce.Do(...). Update the Stop method to only call stopOnce.Do with that
closure so repeated Stop calls are safe; ensure the LimitEnforcer struct
declares the stopOnce sync.Once and existing code references (Stop method,
le.stopChan, le.limitsCache.Stop) are used inside the closure.
| // removeOverLimitUsers removes users from all xray inbounds | ||
| func (le *LimitEnforcer) removeOverLimitUsers(ctx context.Context, users map[string]int64) { | ||
| b := le.controller.Backend() | ||
| if b == nil { | ||
| return | ||
| } | ||
|
|
||
| // Create empty user entries to trigger removal | ||
| for email, traffic := range users { | ||
| // Create a user with empty inbounds to remove from all inbounds | ||
| emptyUser := &common.User{ | ||
| Email: email, | ||
| Inbounds: []string{}, // Empty inbounds = remove from all | ||
| } | ||
|
|
||
| if err := b.SyncUser(ctx, emptyUser); err != nil { | ||
| log.Printf("Limit enforcer: failed to remove user %s: %v", email, err) | ||
| } else { | ||
| le.Metrics.UsersRemoved.Add(1) | ||
| log.Printf("Limit enforcer: removed user %s (exceeded limit by %d bytes)", email, traffic) | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bound SyncUser removals with a timeout.
ctx is currently context.Background() from Start; a hung SyncUser will block enforcement and ignore Stop.
🛠️ Proposed fix
// Create empty user entries to trigger removal
for email, traffic := range users {
+ syncCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
// Create a user with empty inbounds to remove from all inbounds
emptyUser := &common.User{
Email: email,
Inbounds: []string{}, // Empty inbounds = remove from all
}
- if err := b.SyncUser(ctx, emptyUser); err != nil {
+ if err := b.SyncUser(syncCtx, emptyUser); err != nil {
log.Printf("Limit enforcer: failed to remove user %s: %v", email, err)
} else {
le.Metrics.UsersRemoved.Add(1)
log.Printf("Limit enforcer: removed user %s (exceeded limit by %d bytes)", email, traffic)
}
+ cancel()
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // removeOverLimitUsers removes users from all xray inbounds | |
| func (le *LimitEnforcer) removeOverLimitUsers(ctx context.Context, users map[string]int64) { | |
| b := le.controller.Backend() | |
| if b == nil { | |
| return | |
| } | |
| // Create empty user entries to trigger removal | |
| for email, traffic := range users { | |
| // Create a user with empty inbounds to remove from all inbounds | |
| emptyUser := &common.User{ | |
| Email: email, | |
| Inbounds: []string{}, // Empty inbounds = remove from all | |
| } | |
| if err := b.SyncUser(ctx, emptyUser); err != nil { | |
| log.Printf("Limit enforcer: failed to remove user %s: %v", email, err) | |
| } else { | |
| le.Metrics.UsersRemoved.Add(1) | |
| log.Printf("Limit enforcer: removed user %s (exceeded limit by %d bytes)", email, traffic) | |
| } | |
| } | |
| } | |
| // removeOverLimitUsers removes users from all xray inbounds | |
| func (le *LimitEnforcer) removeOverLimitUsers(ctx context.Context, users map[string]int64) { | |
| b := le.controller.Backend() | |
| if b == nil { | |
| return | |
| } | |
| // Create empty user entries to trigger removal | |
| for email, traffic := range users { | |
| syncCtx, cancel := context.WithTimeout(ctx, 10*time.Second) | |
| // Create a user with empty inbounds to remove from all inbounds | |
| emptyUser := &common.User{ | |
| Email: email, | |
| Inbounds: []string{}, // Empty inbounds = remove from all | |
| } | |
| if err := b.SyncUser(syncCtx, emptyUser); err != nil { | |
| log.Printf("Limit enforcer: failed to remove user %s: %v", email, err) | |
| } else { | |
| le.Metrics.UsersRemoved.Add(1) | |
| log.Printf("Limit enforcer: removed user %s (exceeded limit by %d bytes)", email, traffic) | |
| } | |
| cancel() | |
| } | |
| } |
🤖 Prompt for AI Agents
In `@controller/limit_enforcer.go` around lines 294 - 316, removeOverLimitUsers
currently calls b.SyncUser(ctx, ...) with the long-lived ctx (originating from
Start), so a hung SyncUser can block enforcement and Stop; wrap each SyncUser
invocation in a bounded context: create a child context with a short timeout
(e.g. context.WithTimeout(ctx, <reasonableDuration>)) and defer its cancel
before calling b.SyncUser, use that timed ctx for SyncUser so hung backend calls
are aborted and metrics/logging still occur; update removeOverLimitUsers to use
the new timed context for each user and ensure you reference
removeOverLimitUsers and the backend method SyncUser when making the change.
Summary
Implements the
LimitEnforcercomponent to enforce per-user traffic limits on the node side in real-time.Changes
limit_enforcer.go: Monitors traffic usage against configured limits.Related PRs
Summary by CodeRabbit
Release Notes
New Features
Bug Fixes
✏️ Tip: You can customize this high-level summary in your review settings.