Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions cache/async_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ type AsyncCache struct {

graceTime time.Duration

MaxPayloadSize config.ByteSize
SharedWithAllUsers bool
MaxPayloadSize config.ByteSize
SharedWithAllUsers bool
CheckGrantsForSharedCache bool
}

func (c *AsyncCache) Close() error {
Expand Down Expand Up @@ -109,10 +110,11 @@ func NewAsyncCache(cfg config.Cache, maxExecutionTime time.Duration) (*AsyncCach
maxPayloadSize := cfg.MaxPayloadSize

return &AsyncCache{
Cache: cache,
TransactionRegistry: transaction,
graceTime: graceTime,
MaxPayloadSize: maxPayloadSize,
SharedWithAllUsers: cfg.SharedWithAllUsers,
Cache: cache,
TransactionRegistry: transaction,
graceTime: graceTime,
MaxPayloadSize: maxPayloadSize,
SharedWithAllUsers: cfg.SharedWithAllUsers,
CheckGrantsForSharedCache: cfg.CheckGrantsForSharedCache,
}, nil
}
4 changes: 4 additions & 0 deletions config/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ max_payload_size: <byte_size>

# Whether a query cached by a user can be used by another user
shared_with_all_users: <bool> | default = false [optional]

# Whether `shared_with_all_users` option is enabled
# check permissions for cached query used by another user
check_grants_for_shared_cache: <bool> | default = false [optional]
```

### <distributed_cache_config>
Expand Down
4 changes: 4 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -938,6 +938,10 @@ type Cache struct {

// Whether a query cached by a user could be used by another user
SharedWithAllUsers bool `yaml:"shared_with_all_users,omitempty"`

// Whether `shared_with_all_users` option is enabled
// check permissions for cached query used by another user
CheckGrantsForSharedCache bool `yaml:"check_grants_for_shared_cache,omitempty"`
}

func (c *Cache) setDefaults() {
Expand Down
46 changes: 46 additions & 0 deletions io.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,3 +177,49 @@ func (crc *cachedReadCloser) String() string {
crc.bLock.Unlock()
return s
}

var _ ResponseWriterWithCode = &checkGrantsResponseWriter{}

type checkGrantsResponseWriter struct {
http.ResponseWriter

statusCode int
}

func (rw *checkGrantsResponseWriter) SetStatusCode(code int) {
rw.statusCode = code
rw.ResponseWriter.WriteHeader(rw.statusCode)
}

func (rw *checkGrantsResponseWriter) StatusCode() int {
if rw.statusCode == 0 {
return http.StatusOK
}

return rw.statusCode
}

func (rw *checkGrantsResponseWriter) WriteHeader(statusCode int) {
// cache statusCode to keep the opportunity to change it in further
rw.statusCode = statusCode
rw.SetStatusCode(statusCode)
}

func (rw *checkGrantsResponseWriter) Write(b []byte) (int, error) {
if rw.statusCode == http.StatusOK {
return 0, nil
}

n, err := rw.ResponseWriter.Write(b)
return n, err
}

// CloseNotify implements http.CloseNotifier
func (rw *checkGrantsResponseWriter) CloseNotify() <-chan bool {
// The rw.ResponseWriter must implement http.CloseNotifier
rwc, ok := rw.ResponseWriter.(http.CloseNotifier)
if !ok {
panic("BUG: the wrapped ResponseWriter must implement http.CloseNotifier")
}
return rwc.CloseNotify()
}
56 changes: 56 additions & 0 deletions proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@
maxIdleConns: cfgCp.MaxIdleConnsPerHost,
maxIdleConnsPerHost: cfgCp.MaxIdleConnsPerHost,
}
}

Check failure on line 86 in proxy.go

View workflow job for this annotation

GitHub Actions / lint

calculated cyclomatic complexity for function ServeHTTP is 12, max is 10 (cyclop)

func (rp *reverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
startTime := time.Now()
Expand Down Expand Up @@ -146,6 +146,25 @@
}

if shouldReturnFromCache {
// if cache shared between all users
// try to check if cached query is allowed for current user
if s.user.cache.SharedWithAllUsers && s.user.cache.CheckGrantsForSharedCache {
checkReq, checkQuery, _ := s.createCheckGrantsRequest(req)

srwCheck := &checkGrantsResponseWriter{
ResponseWriter: srw.ResponseWriter,
}

rp.proxyRequest(s, srwCheck, srw, checkReq)

if srwCheck.statusCode == http.StatusOK {
log.Debugf("%s: check grants for shared cached query request success; user: %s; query: %q; Method: %s; URL: %q", s, s.user.name, checkQuery, checkReq.Method, checkReq.URL.String())
} else {
log.Debugf("%s: check grants for shared cached query request failure: non-200 status code %d; user: %s; query: %q; Method: %s; URL: %q", s, srwCheck.statusCode, s.user.name, checkQuery, checkReq.Method, checkReq.URL.String())
return
}
}

rp.serveFromCache(s, srw, req, origParams, q)
} else {
rp.proxyRequest(s, srw, srw, req)
Expand Down Expand Up @@ -925,3 +944,40 @@
s.requestPacketSize = len(q)
return s, 0, nil
}

// create a new request based on proxied one
// with query wrapped to fetch result types like:
// 'DESC ({original_query})'
// along with query parsed and analyzed for return types (which is fast)
// ClickHouse check permissions to execute this query for the user
func (s *scope) createCheckGrantsRequest(originalReq *http.Request) (*http.Request, string, error) {
originalQuery := originalReq.URL.Query().Get("query")
checkQuery := fmt.Sprintf("DESC (%s);", strings.TrimRight(originalQuery, ";"))

newURL := *originalReq.URL

queryParams, err := url.ParseQuery(newURL.RawQuery)
if err != nil {
return nil, checkQuery, err
}

queryParams.Set("query", checkQuery)

newURL.RawQuery = queryParams.Encode()

req := &http.Request{
Method: originalReq.Method,
URL: &newURL,
Proto: originalReq.Proto,
ProtoMajor: originalReq.ProtoMajor,
ProtoMinor: originalReq.ProtoMinor,
Header: originalReq.Header.Clone(),
Body: originalReq.Body,
Host: originalReq.Host,
ContentLength: originalReq.ContentLength,
Close: originalReq.Close,
TLS: originalReq.TLS,
}

return req, checkQuery, nil
}
Loading