-
Notifications
You must be signed in to change notification settings - Fork 380
feat(gsoc): subscribe #4727
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
feat(gsoc): subscribe #4727
Changes from all commits
Commits
Show all changes
58 commits
Select commit
Hold shift + click to select a range
08e4572
feat: gsoc
nugaon de66d61
feat: add gsoc listener to pull and pushsync
nugaon 4e5d93c
feat: gsoc subscribe api
nugaon 340c233
fix: gsoc address path parsing
nugaon 9d5b641
test: gsoc as param for testServer
nugaon 84831ac
test: gsoc api
nugaon bbe5d00
test: add empty function for so clisten in pushsync t
nugaon d800e9a
refactor: remove unused pusher
nugaon 0f36ac7
docs: gsoc openapi
nugaon 1b77027
test: unit
nugaon d53a776
docs: fix yaml indentation
nugaon 66e44b8
feat: add new error handling
nugaon e5e79c0
feat: logger in gsoc listener
nugaon 5c5bb7f
refactor: handle instead of handler
nugaon 4f60796
docs: copypastes
nugaon f868608
refactor: rename register to subscribe
nugaon 29af22d
refactor: unnecessary go call on gsoc handler
nugaon f429c01
feat: identity address in pull sync
nugaon 1d5af44
test: multiple payload push
nugaon 385a529
test: gsoc listener
nugaon 6e28bb2
fix: param mismatch after rebasing
nugaon 83c9309
fix: idAddress in pushsync where it is needed
nugaon 615955b
test: working signature for pushsync
nugaon eefbfd7
refactor: log id_address on push failiure
nugaon 18702c6
feat: id address usage on pusher and its inflight handling
nugaon 7d42efd
fix: remove unnecessary stamp higher condition
nugaon b69dcc7
fix: reserve put
nugaon 4c40e9a
fix: important reserve changes same ps index
nugaon e6e98f7
fix: lock by batch id and stamp
nugaon a3faffd
fix: new multex to lock reserve put
nugaon 450d973
fix: remove ChunkTypeUnspecified check
nugaon 7134e02
Merge remote-tracking branch 'origin/master' into feat/gsoc-subscribe
nugaon 68f97ce
feat: postage stamping for gsoc
nugaon a50e693
Merge remote-tracking branch 'origin/master' into feat/gsoc-subscribe
nugaon 074602b
fix: use resenje multex (#4883)
acha-bill cbec217
refactor: remove waitgroup in hook function calls
nugaon 8abb1ad
refactor: add closer function for gsoc sub
nugaon ec01cf9
refactor: wrong parameter name ordering
nugaon d7e49e7
refactor: remove duplicated api route def
nugaon 8e4cb2e
docs: added comments
nugaon 160f258
test: add reserve case (#4886)
acha-bill c9098fd
test: correcting fata prompt message
nugaon 4b27d65
refactor: remove shouldDecrReserveSize
nugaon 691a43d
docs: verbosing
nugaon 7432ed3
fix: eviction locking
nugaon 0ce6407
test: identity address unit
nugaon d54dff5
refactor: gsoc handler param soc as reference
nugaon c03ddeb
docs: update put desc
nugaon 55e7af1
refactor: export handler
nugaon a39e9d6
Merge remote-tracking branch 'origin/master' into feat/gsoc-subscribe
nugaon 9e78194
feat: remove pinning and uploadstore usage
nugaon a9ba53b
feat: always save newer payload of soc
nugaon 4893749
test: always save newer payload of soc
nugaon b0d6912
test: move testcase away
nugaon 230c071
Merge remote-tracking branch 'origin/master' into feat/gsoc-subscribe
nugaon 4d8661f
fix: replace gsoc payload
nugaon 515b519
revert: remove pinning and uploadstore usage
nugaon 8c416af
fix: replace gsoc payload
nugaon File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,119 @@ | ||
| // Copyright 2024 The Swarm Authors. All rights reserved. | ||
| // Use of this source code is governed by a BSD-style | ||
| // license that can be found in the LICENSE file. | ||
|
|
||
| package api | ||
|
|
||
| import ( | ||
| "net/http" | ||
| "time" | ||
|
|
||
| "github.com/ethersphere/bee/v2/pkg/jsonhttp" | ||
| "github.com/ethersphere/bee/v2/pkg/swarm" | ||
| "github.com/gorilla/mux" | ||
| "github.com/gorilla/websocket" | ||
| ) | ||
|
|
||
| func (s *Service) gsocWsHandler(w http.ResponseWriter, r *http.Request) { | ||
| logger := s.logger.WithName("gsoc_subscribe").Build() | ||
|
|
||
| paths := struct { | ||
| Address []byte `map:"address" validate:"required"` | ||
| }{} | ||
| if response := s.mapStructure(mux.Vars(r), &paths); response != nil { | ||
| response("invalid path params", logger, w) | ||
| return | ||
| } | ||
|
|
||
| upgrader := websocket.Upgrader{ | ||
| ReadBufferSize: swarm.ChunkSize, | ||
| WriteBufferSize: swarm.ChunkSize, | ||
| CheckOrigin: s.checkOrigin, | ||
| } | ||
|
|
||
| conn, err := upgrader.Upgrade(w, r, nil) | ||
| if err != nil { | ||
| logger.Debug("upgrade failed", "error", err) | ||
| logger.Error(nil, "upgrade failed") | ||
| jsonhttp.InternalServerError(w, "upgrade failed") | ||
| return | ||
| } | ||
|
|
||
| s.wsWg.Add(1) | ||
| go s.gsocListeningWs(conn, paths.Address) | ||
| } | ||
|
|
||
| func (s *Service) gsocListeningWs(conn *websocket.Conn, socAddress []byte) { | ||
| defer s.wsWg.Done() | ||
|
|
||
| var ( | ||
| dataC = make(chan []byte) | ||
| gone = make(chan struct{}) | ||
| ticker = time.NewTicker(s.WsPingPeriod) | ||
| err error | ||
| ) | ||
| defer func() { | ||
| ticker.Stop() | ||
| _ = conn.Close() | ||
| }() | ||
| cleanup := s.gsoc.Subscribe([32]byte(socAddress), func(m []byte) { | ||
| select { | ||
| case dataC <- m: | ||
| case <-gone: | ||
| return | ||
| case <-s.quit: | ||
| return | ||
| } | ||
| }) | ||
|
|
||
| defer cleanup() | ||
|
|
||
| conn.SetCloseHandler(func(code int, text string) error { | ||
| s.logger.Debug("gsoc ws: client gone", "code", code, "message", text) | ||
| close(gone) | ||
| return nil | ||
| }) | ||
|
|
||
| for { | ||
| select { | ||
| case b := <-dataC: | ||
| err = conn.SetWriteDeadline(time.Now().Add(writeDeadline)) | ||
| if err != nil { | ||
| s.logger.Debug("gsoc ws: set write deadline failed", "error", err) | ||
| return | ||
| } | ||
|
|
||
| err = conn.WriteMessage(websocket.BinaryMessage, b) | ||
| if err != nil { | ||
| s.logger.Debug("gsoc ws: write message failed", "error", err) | ||
| return | ||
| } | ||
|
|
||
| case <-s.quit: | ||
| // shutdown | ||
| err = conn.SetWriteDeadline(time.Now().Add(writeDeadline)) | ||
| if err != nil { | ||
| s.logger.Debug("gsoc ws: set write deadline failed", "error", err) | ||
| return | ||
| } | ||
| err = conn.WriteMessage(websocket.CloseMessage, []byte{}) | ||
| if err != nil { | ||
| s.logger.Debug("gsoc ws: write close message failed", "error", err) | ||
| } | ||
| return | ||
| case <-gone: | ||
| // client gone | ||
| return | ||
| case <-ticker.C: | ||
| err = conn.SetWriteDeadline(time.Now().Add(writeDeadline)) | ||
| if err != nil { | ||
| s.logger.Debug("gsoc ws: set write deadline failed", "error", err) | ||
| return | ||
| } | ||
| if err = conn.WriteMessage(websocket.PingMessage, nil); err != nil { | ||
| // error encountered while pinging client. client probably gone | ||
| return | ||
| } | ||
| } | ||
| } | ||
| } | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.