Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions go/delay.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package main

import (
"fmt"
"strconv"
"time"

"github.com/envoyproxy/dynamic-modules-examples/go/gosdk"
)

type (
// delayFilterConfig implements [gosdk.HttpFilterConfig].
delayFilterConfig struct{}
// delayFilter implements [gosdk.HttpFilter].
//
// This filter demostrates how to use the scheduler to delay the request processing,
// and how to use goroutines to perform the asynchronous operations.
delayFilter struct {
onRequestHeaders time.Time
delayLapsed time.Duration
}
)

// Destroy implements [gosdk.HttpFilterConfig].
func (p delayFilterConfig) Destroy() {}

// NewFilter implements [gosdk.HttpFilterConfig].
func (p delayFilterConfig) NewFilter() gosdk.HttpFilter { return &delayFilter{} }

// Destroy implements [gosdk.HttpFilter].
func (p *delayFilter) Destroy() {}

// RequestHeaders implements [gosdk.HttpFilter].
func (p *delayFilter) RequestHeaders(e gosdk.EnvoyHttpFilter, endOfStream bool) gosdk.RequestHeadersStatus {
// Check if the headers contain the "do-delay" header to trigger the delay.
if _, ok := e.GetRequestHeader("do-delay"); !ok {
// If the header is not present, continue the request processing.
fmt.Println("gosdk: RequestHeaders, do-delay header not found, continuing request processing")
return gosdk.RequestHeadersStatusContinue
}

schduler := e.NewScheduler()
now := time.Now()
p.onRequestHeaders = now
go func() {
// Simulate some delay.
time.Sleep(2 * time.Second)
// Commit the event to continue the request processing.
schduler.Commit(0)
}()
fmt.Printf("gosdk: RequestHeaders, delaying request processing for 2 seconds at %s\n", now)
return gosdk.RequestHeadersStatusStopIteration
}

// Sheduled implements gosdk.HttpFilter.
func (p *delayFilter) Sheduled(e gosdk.EnvoyHttpFilter, eventID uint64) {
if eventID != 0 {
panic("unexpected eventID in Sheduled: " + strconv.Itoa(int(eventID)))
}
fmt.Println("gosdk: Sheduled, continuing request processing after delay")
p.delayLapsed = time.Since(p.onRequestHeaders)
// We can insert some headers at this phase.
e.SetRequestHeader("delay-filter-on-scheduled", []byte("yes"))
// Then continue the request processing.
e.ContinueRequest()
}

// RequestBody implements [gosdk.HttpFilter].
func (p *delayFilter) RequestBody(e gosdk.EnvoyHttpFilter, endOfStream bool) gosdk.RequestBodyStatus {
return gosdk.RequestBodyStatusContinue
}

// ResponseHeaders implements [gosdk.HttpFilter].
func (p *delayFilter) ResponseHeaders(e gosdk.EnvoyHttpFilter, endOfStream bool) gosdk.ResponseHeadersStatus {
// Add a response header to indicate the delay.
if p.delayLapsed > 0 {
e.SetResponseHeader("x-delay-filter-lapsed", []byte(p.delayLapsed.String()))
}
return gosdk.ResponseHeadersStatusContinue
}

// ResponseBody implements [gosdk.HttpFilter].
func (p *delayFilter) ResponseBody(e gosdk.EnvoyHttpFilter, endOfStream bool) gosdk.ResponseBodyStatus {
return gosdk.ResponseBodyStatusContinue
}
63 changes: 62 additions & 1 deletion go/gosdk/abi.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,31 @@ bool envoy_dynamic_module_callback_http_filter_get_attribute_string(
uintptr_t filter_envoy_ptr,
size_t attribute_id,
uintptr_t* result, size_t* result_length);

#cgo noescape envoy_dynamic_module_callback_http_filter_continue_decoding
#cgo nocallback envoy_dynamic_module_callback_http_filter_continue_decoding
void envoy_dynamic_module_callback_http_filter_continue_decoding(
uintptr_t filter_envoy_ptr);

#cgo noescape envoy_dynamic_module_callback_http_filter_continue_encoding
#cgo nocallback envoy_dynamic_module_callback_http_filter_continue_encoding
void envoy_dynamic_module_callback_http_filter_continue_encoding(
uintptr_t filter_envoy_ptr);

#cgo noescape envoy_dynamic_module_callback_http_filter_scheduler_new
#cgo nocallback envoy_dynamic_module_callback_http_filter_scheduler_new
uintptr_t envoy_dynamic_module_callback_http_filter_scheduler_new(
uintptr_t filter_envoy_ptr);

#cgo noescape envoy_dynamic_module_callback_http_filter_scheduler_delete
#cgo nocallback envoy_dynamic_module_callback_http_filter_scheduler_delete
void envoy_dynamic_module_callback_http_filter_scheduler_delete(
uintptr_t scheduler_ptr);

#cgo noescape envoy_dynamic_module_callback_http_filter_scheduler_commit
#cgo nocallback envoy_dynamic_module_callback_http_filter_scheduler_commit
void envoy_dynamic_module_callback_http_filter_scheduler_commit(
uintptr_t scheduler_ptr, uint64_t event_id);
*/
import "C"

Expand Down Expand Up @@ -274,7 +299,9 @@ func envoy_dynamic_module_on_http_filter_scheduled(
filterEnvoyPtr uintptr,
filterModulePtr uintptr,
eventID C.uint64_t) {
panic("TODO")
pinned := unwrapPinnedHttpFilter(uintptr(filterModulePtr))
// Call the Scheduled method of the filter.
pinned.obj.Sheduled(envoyFilter{raw: uintptr(filterEnvoyPtr)}, uint64(eventID))
}

// GetRequestHeader implements [EnvoyHttpFilter].
Expand Down Expand Up @@ -396,6 +423,40 @@ type envoySlice struct {
// envoyFilter implements [EnvoyHttpFilter].
type envoyFilter struct{ raw uintptr }

// ContinueRequest implements EnvoyHttpFilter.
func (e envoyFilter) ContinueRequest() {
C.envoy_dynamic_module_callback_http_filter_continue_decoding(C.uintptr_t(e.raw))
}

// ContinueResponse implements EnvoyHttpFilter.
func (e envoyFilter) ContinueResponse() {
C.envoy_dynamic_module_callback_http_filter_continue_encoding(C.uintptr_t(e.raw))
}

// NewScheduler implements EnvoyHttpFilter.
func (e envoyFilter) NewScheduler() Scheduler {
// Create a new scheduler for the filter.
schedulerPtr := C.envoy_dynamic_module_callback_http_filter_scheduler_new(C.uintptr_t(e.raw))
if schedulerPtr == 0 {
return nil
}
return &envoyFilterScheduler{raw: uintptr(schedulerPtr)}
}

type envoyFilterScheduler struct {
raw uintptr
}

// Close implements Scheduler.
func (e *envoyFilterScheduler) Close() {
C.envoy_dynamic_module_callback_http_filter_scheduler_delete(C.uintptr_t(e.raw))
}

// Commit implements Scheduler.
func (e *envoyFilterScheduler) Commit(eventID uint64) {
C.envoy_dynamic_module_callback_http_filter_scheduler_commit(C.uintptr_t(e.raw), C.uint64_t(eventID))
}

// GetRequestProtocol implements [EnvoyHttpFilter].
func (e envoyFilter) GetRequestProtocol() string {
// https://github.com/envoyproxy/envoy/blob/05223ee2cd143d70b32402783c2a866a9dd18bd1/source/extensions/dynamic_modules/abi.h#L237-L372
Expand Down
25 changes: 25 additions & 0 deletions go/gosdk/gosdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type HttpFilterConfig interface {
// This is passed to each event hook of the HttpFilter.
//
// **WARNING**: This must not outlive each event hook since there's no guarantee that the EnvoyHttpFilter will be valid after the event hook is returned.
// To perform the asynchronous operations, use [EnvoyHttpFilter.NewScheduler] to create a [Scheduler] and perform the operations in a separate Goroutine.
// Then, use the [Scheduler.Commit] method to commit the event to the Envoy filter on the correct worker thread to continue processing the request.
type EnvoyHttpFilter interface {
// GetRequestHeader gets the first value of the request header. Returns the value and true if the header is found.
GetRequestHeader(key string) (string, bool)
Expand Down Expand Up @@ -59,6 +61,26 @@ type EnvoyHttpFilter interface {
GetSourceAddress() string
// GetRequestProtocol gets the request protocol. This corresponds to `request.protocol` attribute https://www.envoyproxy.io/docs/envoy/latest/intro/arch_overview/advanced/attributes.
GetRequestProtocol() string
// NewScheduler creates a new Scheduler that can be used to schedule events to the correct Envoy worker thread.
// Created schedulers must be closed when they are no longer needed.
//
// Returns nil if this is called from any other than normal event hooks such as RequestHeaders, RequestBody, ResponseHeaders, and ResponseBody.
NewScheduler() Scheduler
// ContinueRequest continues the request processing after the Stop variants are returned from the normal event hooks such as RequestHeaders, RequestBody, ResponseHeaders, and ResponseBody.
// Mainly this is intented to be used during the HttpFilter.Scheduled method being called.
ContinueRequest()
// ContinueResponse is the same as ContinueRequest but for the response processing.
ContinueResponse()
}

// Scheduler is an interface that can be used to schedule a generic event to the correct Envoy worker thread.
//
// This is created via [EnvoyHttpFilter.NewScheduler] and can be passed across Goroutines.
type Scheduler interface {
// Commit commits the event to the Envoy filter on the correct worker thread.
// The eventID is a unique identifier for the event, and it can be used to distinguish between different events.
Commit(eventID uint64)
Close()
}

// HttpFilter is an interface that represents each Http request.
Expand All @@ -77,6 +99,9 @@ type HttpFilter interface {
ResponseBody(e EnvoyHttpFilter, endOfStream bool) ResponseBodyStatus
// TODO: add ResponseTrailers support.

// Scheuled is called when the filter is scheduled to run.
Sheduled(e EnvoyHttpFilter, eventID uint64)

// Destroy is called when the stream is destroyed.
Destroy()
}
Expand Down
3 changes: 3 additions & 0 deletions go/header_auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ func (p headerAuthFilterConfig) NewFilter() gosdk.HttpFilter {
return &headerAuthFilter{authHeaderName: p.authHeaderName}
}

// Sheduled implements gosdk.HttpFilter.
func (p headerAuthFilter) Sheduled(gosdk.EnvoyHttpFilter, uint64) {}

// Destroy implements [gosdk.HttpFilter].
func (p *headerAuthFilter) Destroy() {}

Expand Down
2 changes: 2 additions & 0 deletions go/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ func newHttpFilterConfig(name string, config []byte) gosdk.HttpFilterConfig {
return passthroughFilterConfig{}
case "header_auth":
return headerAuthFilterConfig{authHeaderName: string(config)}
case "delay":
return delayFilterConfig{}
default:
panic("unknown filter: " + name)
}
Expand Down
3 changes: 3 additions & 0 deletions go/passthrough.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ func (p passthroughFilterConfig) Destroy() {}
// NewFilter implements [gosdk.HttpFilterConfig].
func (p passthroughFilterConfig) NewFilter() gosdk.HttpFilter { return passthroughFilter{} }

// Sheduled implements gosdk.HttpFilter.
func (p passthroughFilter) Sheduled(gosdk.EnvoyHttpFilter, uint64) {}

// Destroy implements [gosdk.HttpFilter].
func (p passthroughFilter) Destroy() {}

Expand Down
8 changes: 8 additions & 0 deletions integration/envoy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ static_resources:
dynamic_module_config:
name: rust_module
filter_name: passthrough
- name: dynamic_modules/conditional_deply
typed_config:
# https://www.envoyproxy.io/docs/envoy/latest/api-v3/extensions/dynamic_modules/v3/dynamic_modules.proto#envoy-v3-api-msg-extensions-dynamic-modules-v3-dynamicmoduleconfig
"@type": type.googleapis.com/envoy.extensions.filters.http.dynamic_modules.v3.DynamicModuleFilter
dynamic_module_config:
name: go_module
do_not_close: true
filter_name: delay
- name: dynamic_modules/access_logger
typed_config:
# https://www.envoyproxy.io/docs/envoy/latest/api-v3/extensions/dynamic_modules/v3/dynamic_modules.proto#envoy-v3-api-msg-extensions-dynamic-modules-v3-dynamicmoduleconfig
Expand Down
38 changes: 38 additions & 0 deletions integration/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,44 @@ func TestIntegration(t *testing.T) {
}, 30*time.Second, 1*time.Second)
})

t.Run("delay", func(t *testing.T) {
require.Eventually(t, func() bool {
req, err := http.NewRequest("GET", "http://localhost:1062/headers", nil)
require.NoError(t, err)
req.Header.Set("do-delay", "true")

resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Logf("Envoy not ready yet: %v", err)
return false
}
defer func() {
require.NoError(t, resp.Body.Close())
}()
body, err := io.ReadAll(resp.Body)
if err != nil {
t.Logf("Envoy not ready yet: %v", err)
return false
}

t.Logf("response: headers=%v, body=%s", resp.Header, string(body))
require.Equal(t, 200, resp.StatusCode)

// Check the request header "delay-filter-on-scheduled: yes" added in the Sheduled phase.
type httpBinHeadersBody struct {
Headers map[string][]string `json:"headers"`
}
var headersBody httpBinHeadersBody
require.NoError(t, json.Unmarshal(body, &headersBody))
require.Contains(t, headersBody.Headers["Delay-Filter-On-Scheduled"], "yes")

// We also need to check that the response headers were added.
require.NotEmpty(t, resp.Header.Get("x-delay-filter-lapsed"), "x-delay-filter-lapsed header should be set")
require.Regexp(t, `^2\.\d+s$`, resp.Header.Get("x-delay-filter-lapsed"), "x-delay-filter-lapsed header should be around 2s")
return true
}, 30*time.Second, 200*time.Millisecond)
})

t.Run("http_header_mutation", func(t *testing.T) {
require.Eventually(t, func() bool {
req, err := http.NewRequest("GET", "http://localhost:1062/headers", nil)
Expand Down
Loading