diff --git a/cache/async_cache.go b/cache/async_cache.go index ee55e44c..29dc003b 100644 --- a/cache/async_cache.go +++ b/cache/async_cache.go @@ -20,8 +20,9 @@ type AsyncCache struct { graceTime time.Duration - MaxPayloadSize config.ByteSize - SharedWithAllUsers bool + MaxPayloadSize config.ByteSize + SharedWithAllUsers bool + CheckGrantsForSharedCache bool } func (c *AsyncCache) Close() error { @@ -109,10 +110,11 @@ func NewAsyncCache(cfg config.Cache, maxExecutionTime time.Duration) (*AsyncCach maxPayloadSize := cfg.MaxPayloadSize return &AsyncCache{ - Cache: cache, - TransactionRegistry: transaction, - graceTime: graceTime, - MaxPayloadSize: maxPayloadSize, - SharedWithAllUsers: cfg.SharedWithAllUsers, + Cache: cache, + TransactionRegistry: transaction, + graceTime: graceTime, + MaxPayloadSize: maxPayloadSize, + SharedWithAllUsers: cfg.SharedWithAllUsers, + CheckGrantsForSharedCache: cfg.CheckGrantsForSharedCache, }, nil } diff --git a/config/README.md b/config/README.md index bb2f27b2..07e45ddb 100644 --- a/config/README.md +++ b/config/README.md @@ -100,6 +100,10 @@ max_payload_size: # Whether a query cached by a user can be used by another user shared_with_all_users: | default = false [optional] + +# Whether `shared_with_all_users` option is enabled +# check permissions for cached query used by another user +check_grants_for_shared_cache: | default = false [optional] ``` ### diff --git a/config/config.go b/config/config.go index 652cfe9b..6a42281c 100644 --- a/config/config.go +++ b/config/config.go @@ -938,6 +938,10 @@ type Cache struct { // Whether a query cached by a user could be used by another user SharedWithAllUsers bool `yaml:"shared_with_all_users,omitempty"` + + // Whether `shared_with_all_users` option is enabled + // check permissions for cached query used by another user + CheckGrantsForSharedCache bool `yaml:"check_grants_for_shared_cache,omitempty"` } func (c *Cache) setDefaults() { diff --git a/io.go b/io.go index 6598d8c7..b24b2d31 100644 --- a/io.go +++ b/io.go @@ -177,3 +177,49 @@ func (crc *cachedReadCloser) String() string { crc.bLock.Unlock() return s } + +var _ ResponseWriterWithCode = &checkGrantsResponseWriter{} + +type checkGrantsResponseWriter struct { + http.ResponseWriter + + statusCode int +} + +func (rw *checkGrantsResponseWriter) SetStatusCode(code int) { + rw.statusCode = code + rw.ResponseWriter.WriteHeader(rw.statusCode) +} + +func (rw *checkGrantsResponseWriter) StatusCode() int { + if rw.statusCode == 0 { + return http.StatusOK + } + + return rw.statusCode +} + +func (rw *checkGrantsResponseWriter) WriteHeader(statusCode int) { + // cache statusCode to keep the opportunity to change it in further + rw.statusCode = statusCode + rw.SetStatusCode(statusCode) +} + +func (rw *checkGrantsResponseWriter) Write(b []byte) (int, error) { + if rw.statusCode == http.StatusOK { + return 0, nil + } + + n, err := rw.ResponseWriter.Write(b) + return n, err +} + +// CloseNotify implements http.CloseNotifier +func (rw *checkGrantsResponseWriter) CloseNotify() <-chan bool { + // The rw.ResponseWriter must implement http.CloseNotifier + rwc, ok := rw.ResponseWriter.(http.CloseNotifier) + if !ok { + panic("BUG: the wrapped ResponseWriter must implement http.CloseNotifier") + } + return rwc.CloseNotify() +} diff --git a/proxy.go b/proxy.go index 24940762..629f2789 100644 --- a/proxy.go +++ b/proxy.go @@ -146,6 +146,25 @@ func (rp *reverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) { } if shouldReturnFromCache { + // if cache shared between all users + // try to check if cached query is allowed for current user + if s.user.cache != nil && s.user.cache.SharedWithAllUsers && s.user.cache.CheckGrantsForSharedCache { + checkReq, checkQuery, _ := s.createCheckGrantsRequest(req) + + srwCheck := &checkGrantsResponseWriter{ + ResponseWriter: srw.ResponseWriter, + } + + rp.proxyRequest(s, srwCheck, srw, checkReq) + + if srwCheck.statusCode == http.StatusOK { + log.Debugf("%s: check grants for shared cached query request success; query: %q; Method: %s; URL: %q", s, checkQuery, checkReq.Method, checkReq.URL.String()) + } else { + log.Debugf("%s: check grants for shared cached query request failure: non-200 status code %d; query: %q; Method: %s; URL: %q", s, srwCheck.statusCode, checkQuery, checkReq.Method, checkReq.URL.String()) + return + } + } + rp.serveFromCache(s, srw, req, origParams, q) } else { rp.proxyRequest(s, srw, srw, req) @@ -925,3 +944,40 @@ func (rp *reverseProxy) getScope(req *http.Request) (*scope, int, error) { s.requestPacketSize = len(q) return s, 0, nil } + +// create a new request based on proxied one +// with query wrapped to fetch result types like: +// 'DESC ({original_query})' +// along with query parsed and analyzed for return types (which is fast) +// ClickHouse check permissions to execute this query for the user +func (s *scope) createCheckGrantsRequest(originalReq *http.Request) (*http.Request, string, error) { + originalQuery := originalReq.URL.Query().Get("query") + checkQuery := fmt.Sprintf("DESC (%s);", strings.TrimRight(originalQuery, ";")) + + newURL := *originalReq.URL + + queryParams, err := url.ParseQuery(newURL.RawQuery) + if err != nil { + return nil, checkQuery, err + } + + queryParams.Set("query", checkQuery) + + newURL.RawQuery = queryParams.Encode() + + req := &http.Request{ + Method: originalReq.Method, + URL: &newURL, + Proto: originalReq.Proto, + ProtoMajor: originalReq.ProtoMajor, + ProtoMinor: originalReq.ProtoMinor, + Header: originalReq.Header.Clone(), + Body: originalReq.Body, + Host: originalReq.Host, + ContentLength: originalReq.ContentLength, + Close: originalReq.Close, + TLS: originalReq.TLS, + } + + return req, checkQuery, nil +}