-
Notifications
You must be signed in to change notification settings - Fork 2
banditcallback: emit per-device first-seen to lantern-cloud #668
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,192 @@ | ||
| // Package banditcallback emits a best-effort per-device first-seen | ||
| // callback to the lantern-cloud bandit API so http-proxy arms get a | ||
| // reward signal without the client having to make the callback itself. | ||
| // | ||
| // The emitter holds a TTL-bounded map of device-ids it has already | ||
| // reported within the window. A device's first connection within the | ||
| // window triggers an async GET to the configured callback URL with the | ||
| // per-arm token (plumbed at provisioning) and the device-id; subsequent | ||
| // connections from the same device within TTL are suppressed. After the | ||
| // window, the next connection re-fires the callback. This matches the | ||
| // API-side dedup TTL so legitimate "device returns after a gap" events | ||
| // land, while a misbehaving client hammering the proxy can't pump the | ||
| // arm's reward signal. | ||
| package banditcallback | ||
|
|
||
| import ( | ||
| "context" | ||
| "net/http" | ||
| "net/url" | ||
| "sync" | ||
| "time" | ||
|
|
||
| "github.com/getlantern/golog" | ||
| "github.com/getlantern/proxy/v3/filters" | ||
| ) | ||
|
|
||
| var log = golog.LoggerFor("banditcallback") | ||
|
|
||
| // Emitter fires per-device first-seen callbacks to the lantern-cloud | ||
| // API. Disabled when CallbackURL or Token is empty — the right | ||
| // default for non-bandit-eligible tracks. | ||
| type Emitter struct { | ||
| token string | ||
| callbackURL string | ||
| ttl time.Duration | ||
| client *http.Client | ||
|
|
||
| mu sync.Mutex | ||
| seen map[string]time.Time | ||
| lastSweep time.Time | ||
|
|
||
| // Counters exposed for the proxy's metrics layer; readable via | ||
| // the accessor methods. Bumped under mu since the proxy reads | ||
| // them rarely (periodic flush) and bumps them on a hot path | ||
| // (every device-id request); atomic.AddUint64 would add a CPU | ||
| // barrier on every Apply for no observable win. | ||
| emitted uint64 | ||
| suppressed uint64 | ||
| } | ||
|
|
||
| // New returns an emitter. token and callbackURL come from the daemon's | ||
| // INI (banditcallbacktoken / banditcallbackurl). ttl is the per-device | ||
| // dedup window — typically matches the API-side ProbeTTLForPollInterval | ||
| // at the daemon's expected poll. Zero ttl uses 10m as a sensible default. | ||
| func New(token, callbackURL string, ttl time.Duration) *Emitter { | ||
| if ttl <= 0 { | ||
| ttl = 10 * time.Minute | ||
| } | ||
| return &Emitter{ | ||
| token: token, | ||
| callbackURL: callbackURL, | ||
| ttl: ttl, | ||
| client: &http.Client{ | ||
| Timeout: 5 * time.Second, | ||
| }, | ||
| seen: make(map[string]time.Time), | ||
| } | ||
| } | ||
|
|
||
| // Enabled reports whether the emitter will actually fire callbacks. | ||
| // Callers can use this to skip computing the device-id when the daemon | ||
| // wasn't provisioned with a token. | ||
| func (e *Emitter) Enabled() bool { | ||
| return e != nil && e.token != "" && e.callbackURL != "" | ||
| } | ||
|
|
||
| // EmitIfFirstSeen records the device-id and, if it hasn't been seen | ||
| // within the TTL window, fires an async best-effort GET to the | ||
| // configured callback URL. Returns immediately — the HTTP request runs | ||
| // in its own goroutine and any failure is logged at debug only (the | ||
| // callback is a hint to the bandit, not a correctness signal). | ||
| func (e *Emitter) EmitIfFirstSeen(ctx context.Context, deviceID string) { | ||
| if !e.Enabled() || deviceID == "" { | ||
| return | ||
| } | ||
|
|
||
| now := time.Now() | ||
| first := e.checkAndRecord(deviceID, now) | ||
| if !first { | ||
| return | ||
| } | ||
|
|
||
| go e.fire(ctx, deviceID) | ||
| } | ||
|
|
||
| // checkAndRecord returns true if the deviceID is first-seen within the | ||
| // TTL window. The map is swept opportunistically whenever the gap | ||
| // since the last sweep exceeds the TTL — bounding worst-case map size | ||
| // at ~2× the unique device-ids seen in one TTL window without a | ||
| // dedicated reaper goroutine. | ||
| func (e *Emitter) checkAndRecord(deviceID string, now time.Time) bool { | ||
| e.mu.Lock() | ||
| defer e.mu.Unlock() | ||
|
|
||
| if now.Sub(e.lastSweep) > e.ttl { | ||
| for k, t := range e.seen { | ||
| if now.Sub(t) > e.ttl { | ||
| delete(e.seen, k) | ||
| } | ||
| } | ||
| e.lastSweep = now | ||
| } | ||
|
|
||
| if prev, ok := e.seen[deviceID]; ok && now.Sub(prev) <= e.ttl { | ||
| e.suppressed++ | ||
| return false | ||
| } | ||
| e.seen[deviceID] = now | ||
| e.emitted++ | ||
| return true | ||
| } | ||
|
|
||
| func (e *Emitter) fire(ctx context.Context, deviceID string) { | ||
| // Detach from the request context so a closing client connection | ||
| // doesn't cancel the outbound HTTP request. The callback is for | ||
| // the bandit's benefit, not the client's; we want it to complete | ||
| // even if the client just hung up. | ||
| fireCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) | ||
| defer cancel() | ||
|
|
||
| u, err := url.Parse(e.callbackURL) | ||
| if err != nil { | ||
| log.Debugf("banditcallback: invalid callback URL %q: %v", e.callbackURL, err) | ||
| return | ||
| } | ||
| q := u.Query() | ||
| q.Set("token", e.token) | ||
| q.Set("did", deviceID) | ||
| u.RawQuery = q.Encode() | ||
|
|
||
| req, err := http.NewRequestWithContext(fireCtx, http.MethodGet, u.String(), nil) | ||
| if err != nil { | ||
|
reflog marked this conversation as resolved.
|
||
| log.Debugf("banditcallback: build request: %v", err) | ||
| return | ||
| } | ||
|
|
||
| resp, err := e.client.Do(req) | ||
| if err != nil { | ||
| log.Debugf("banditcallback: request failed: %v", err) | ||
| return | ||
| } | ||
| defer resp.Body.Close() | ||
|
|
||
| if resp.StatusCode >= 400 { | ||
| log.Debugf("banditcallback: API returned %d for did %s", resp.StatusCode, deviceID) | ||
| } | ||
| } | ||
|
|
||
| // Stats returns (emitted, suppressed) counter values. Cheap; takes the | ||
| // same mu the hot path uses but the hot path holds it for microseconds. | ||
| func (e *Emitter) Stats() (emitted, suppressed uint64) { | ||
| e.mu.Lock() | ||
| defer e.mu.Unlock() | ||
| return e.emitted, e.suppressed | ||
| } | ||
|
|
||
| // Filter is a proxy filter that calls EmitIfFirstSeen on the device-id | ||
| // header. Wraps Emitter so the proxy can install it in its filter chain | ||
| // independently of devicefilter (which only runs for non-pro tracks). | ||
| // The filter is a near-noop hot path: under TTL, just a map read + | ||
| // counter bump under a brief mutex; misses kick off an async goroutine. | ||
| type Filter struct { | ||
| deviceIDHeader string | ||
| emitter *Emitter | ||
| } | ||
|
|
||
| // NewFilter returns a proxy filter that drives the emitter from the | ||
| // request's device-id header. headerName is typically common.DeviceIdHeader | ||
| // so the filter package itself doesn't depend on the http-proxy-lantern | ||
| // common package. | ||
| func NewFilter(headerName string, emitter *Emitter) *Filter { | ||
| return &Filter{deviceIDHeader: headerName, emitter: emitter} | ||
| } | ||
|
|
||
| // Apply implements filters.Filter. Forwards unconditionally (the | ||
| // emitter is a side-effect; failures are non-fatal). | ||
| func (f *Filter) Apply(cs *filters.ConnectionState, req *http.Request, next filters.Next) (*http.Response, *filters.ConnectionState, error) { | ||
| if f.emitter != nil && f.emitter.Enabled() { | ||
| f.emitter.EmitIfFirstSeen(req.Context(), req.Header.Get(f.deviceIDHeader)) | ||
| } | ||
| return next(cs, req) | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,159 @@ | ||
| package banditcallback | ||
|
|
||
| import ( | ||
| "context" | ||
| "net/http" | ||
| "net/http/httptest" | ||
| "sync" | ||
| "sync/atomic" | ||
| "testing" | ||
| "time" | ||
| ) | ||
|
|
||
| func TestEmitter_DisabledWhenUnconfigured(t *testing.T) { | ||
| // Empty token disables emission entirely so non-bandit-eligible | ||
| // tracks can carry the same daemon binary without firing | ||
| // callbacks. Verify Enabled and that EmitIfFirstSeen is a no-op. | ||
| cases := []struct { | ||
| name string | ||
| token string | ||
| callbackURL string | ||
| }{ | ||
| {"empty token", "", "https://api.example/v1/bandit/callback"}, | ||
| {"empty url", "arm-xyz", ""}, | ||
| {"both empty", "", ""}, | ||
| } | ||
| for _, tc := range cases { | ||
| t.Run(tc.name, func(t *testing.T) { | ||
| e := New(tc.token, tc.callbackURL, time.Minute) | ||
| if e.Enabled() { | ||
| t.Fatal("expected disabled") | ||
| } | ||
| e.EmitIfFirstSeen(context.Background(), "did-1") | ||
| emitted, _ := e.Stats() | ||
| if emitted != 0 { | ||
| t.Fatalf("expected 0 emits, got %d", emitted) | ||
| } | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| func TestEmitter_FirstSeenFires(t *testing.T) { | ||
| var hits int32 | ||
| srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||
| atomic.AddInt32(&hits, 1) | ||
| if got := r.URL.Query().Get("token"); got != "arm-test" { | ||
| t.Errorf("token mismatch: %q", got) | ||
| } | ||
| if got := r.URL.Query().Get("did"); got == "" { | ||
| t.Error("missing did") | ||
| } | ||
| w.WriteHeader(http.StatusNoContent) | ||
| })) | ||
| defer srv.Close() | ||
|
|
||
| e := New("arm-test", srv.URL, time.Minute) | ||
| e.EmitIfFirstSeen(context.Background(), "device-a") | ||
|
|
||
| // Emission is async; wait for the goroutine. 1s ceiling is generous. | ||
| deadline := time.Now().Add(time.Second) | ||
| for time.Now().Before(deadline) { | ||
| if atomic.LoadInt32(&hits) == 1 { | ||
| break | ||
| } | ||
| time.Sleep(5 * time.Millisecond) | ||
| } | ||
| if got := atomic.LoadInt32(&hits); got != 1 { | ||
| t.Fatalf("expected 1 callback hit, got %d", got) | ||
| } | ||
|
|
||
| emitted, suppressed := e.Stats() | ||
| if emitted != 1 || suppressed != 0 { | ||
| t.Fatalf("counters: emitted=%d suppressed=%d", emitted, suppressed) | ||
| } | ||
| } | ||
|
|
||
| func TestEmitter_DedupSuppressesRepeat(t *testing.T) { | ||
| var hits int32 | ||
| srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||
| atomic.AddInt32(&hits, 1) | ||
| w.WriteHeader(http.StatusNoContent) | ||
| })) | ||
| defer srv.Close() | ||
|
|
||
| e := New("arm-test", srv.URL, time.Minute) | ||
| for i := 0; i < 10; i++ { | ||
| e.EmitIfFirstSeen(context.Background(), "device-a") | ||
| } | ||
|
|
||
| // One emit, nine suppressed. Wait for async emit to complete. | ||
| deadline := time.Now().Add(time.Second) | ||
| for time.Now().Before(deadline) { | ||
| emitted, suppressed := e.Stats() | ||
| if emitted == 1 && suppressed == 9 { | ||
| break | ||
| } | ||
| time.Sleep(5 * time.Millisecond) | ||
| } | ||
| emitted, suppressed := e.Stats() | ||
| if emitted != 1 || suppressed != 9 { | ||
| t.Fatalf("counters: emitted=%d suppressed=%d", emitted, suppressed) | ||
| } | ||
| // Confirm the server only received one hit. | ||
| time.Sleep(50 * time.Millisecond) | ||
| if got := atomic.LoadInt32(&hits); got != 1 { | ||
| t.Fatalf("expected 1 callback hit, got %d", got) | ||
| } | ||
| } | ||
|
|
||
| func TestEmitter_ConcurrentFirstSeenIsSingleFire(t *testing.T) { | ||
| // 100 goroutines racing on the same device-id must yield exactly | ||
| // one outbound call. This is the contention case the mu lock is | ||
| // designed for; a TOCTOU bug here would explode reward signal. | ||
| var hits int32 | ||
| srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||
| atomic.AddInt32(&hits, 1) | ||
| w.WriteHeader(http.StatusNoContent) | ||
| })) | ||
| defer srv.Close() | ||
|
|
||
| e := New("arm-test", srv.URL, time.Minute) | ||
| var wg sync.WaitGroup | ||
| for i := 0; i < 100; i++ { | ||
| wg.Add(1) | ||
| go func() { | ||
| defer wg.Done() | ||
| e.EmitIfFirstSeen(context.Background(), "race-device") | ||
| }() | ||
| } | ||
| wg.Wait() | ||
|
|
||
| // Wait for async send. | ||
| time.Sleep(200 * time.Millisecond) | ||
| if got := atomic.LoadInt32(&hits); got != 1 { | ||
| t.Fatalf("expected exactly 1 hit under contention, got %d", got) | ||
| } | ||
| } | ||
|
|
||
| func TestEmitter_ReEmitsAfterTTL(t *testing.T) { | ||
| // After the TTL window, a returning device should re-emit. Use a | ||
| // tiny TTL to keep the test fast. | ||
| var hits int32 | ||
| srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||
| atomic.AddInt32(&hits, 1) | ||
| w.WriteHeader(http.StatusNoContent) | ||
| })) | ||
| defer srv.Close() | ||
|
|
||
| e := New("arm-test", srv.URL, 50*time.Millisecond) | ||
| e.EmitIfFirstSeen(context.Background(), "ttl-device") | ||
| time.Sleep(20 * time.Millisecond) // within TTL — suppressed | ||
| e.EmitIfFirstSeen(context.Background(), "ttl-device") | ||
| time.Sleep(80 * time.Millisecond) // past TTL — fires again | ||
| e.EmitIfFirstSeen(context.Background(), "ttl-device") | ||
|
|
||
| time.Sleep(100 * time.Millisecond) | ||
| if got := atomic.LoadInt32(&hits); got != 2 { | ||
| t.Fatalf("expected 2 hits across TTL window, got %d", got) | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.