Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Shared cache grants check option #466

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions cache/async_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ type AsyncCache struct {

graceTime time.Duration

MaxPayloadSize config.ByteSize
SharedWithAllUsers bool
MaxPayloadSize config.ByteSize
SharedWithAllUsers bool
CheckGrantsForSharedCache bool
}

func (c *AsyncCache) Close() error {
Expand Down Expand Up @@ -109,10 +110,11 @@ func NewAsyncCache(cfg config.Cache, maxExecutionTime time.Duration) (*AsyncCach
maxPayloadSize := cfg.MaxPayloadSize

return &AsyncCache{
Cache: cache,
TransactionRegistry: transaction,
graceTime: graceTime,
MaxPayloadSize: maxPayloadSize,
SharedWithAllUsers: cfg.SharedWithAllUsers,
Cache: cache,
TransactionRegistry: transaction,
graceTime: graceTime,
MaxPayloadSize: maxPayloadSize,
SharedWithAllUsers: cfg.SharedWithAllUsers,
CheckGrantsForSharedCache: cfg.CheckGrantsForSharedCache,
}, nil
}
4 changes: 4 additions & 0 deletions config/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ max_payload_size: <byte_size>

# Whether a query cached by a user can be used by another user
shared_with_all_users: <bool> | default = false [optional]

# Whether `shared_with_all_users` option is enabled
# check permissions for cached query used by another user
check_grants_for_shared_cache: <bool> | default = false [optional]
```

### <distributed_cache_config>
Expand Down
4 changes: 4 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -938,6 +938,10 @@ type Cache struct {

// Whether a query cached by a user could be used by another user
SharedWithAllUsers bool `yaml:"shared_with_all_users,omitempty"`

// Whether `shared_with_all_users` option is enabled
// check permissions for cached query used by another user
CheckGrantsForSharedCache bool `yaml:"check_grants_for_shared_cache,omitempty"`
}

func (c *Cache) setDefaults() {
Expand Down
46 changes: 46 additions & 0 deletions io.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,3 +177,49 @@ func (crc *cachedReadCloser) String() string {
crc.bLock.Unlock()
return s
}

var _ ResponseWriterWithCode = &checkGrantsResponseWriter{}

type checkGrantsResponseWriter struct {
http.ResponseWriter

statusCode int
}

func (rw *checkGrantsResponseWriter) SetStatusCode(code int) {
rw.statusCode = code
rw.ResponseWriter.WriteHeader(rw.statusCode)
}

func (rw *checkGrantsResponseWriter) StatusCode() int {
if rw.statusCode == 0 {
return http.StatusOK
}

return rw.statusCode
}

func (rw *checkGrantsResponseWriter) WriteHeader(statusCode int) {
// cache statusCode to keep the opportunity to change it in further
rw.statusCode = statusCode
rw.SetStatusCode(statusCode)
}

func (rw *checkGrantsResponseWriter) Write(b []byte) (int, error) {
if rw.statusCode == http.StatusOK {
return 0, nil
}

n, err := rw.ResponseWriter.Write(b)
return n, err
}

// CloseNotify implements http.CloseNotifier
func (rw *checkGrantsResponseWriter) CloseNotify() <-chan bool {
// The rw.ResponseWriter must implement http.CloseNotifier
rwc, ok := rw.ResponseWriter.(http.CloseNotifier)
if !ok {
panic("BUG: the wrapped ResponseWriter must implement http.CloseNotifier")
}
return rwc.CloseNotify()
}
56 changes: 56 additions & 0 deletions proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@
}
}

func (rp *reverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) {

Check failure on line 88 in proxy.go

View workflow job for this annotation

GitHub Actions / lint

calculated cyclomatic complexity for function ServeHTTP is 13, max is 10 (cyclop)
startTime := time.Now()
s, status, err := rp.getScope(req)
if err != nil {
Expand Down Expand Up @@ -146,6 +146,25 @@
}

if shouldReturnFromCache {
// if cache shared between all users
// try to check if cached query is allowed for current user
if s.user.cache != nil && s.user.cache.SharedWithAllUsers && s.user.cache.CheckGrantsForSharedCache {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure to properly understand why we need to check s.user.cache here ? what is the goal of this.

Isn't Checking if a cache is shared && that we should check grants enough ?

checkReq, checkQuery, _ := s.createCheckGrantsRequest(req)

srwCheck := &checkGrantsResponseWriter{
ResponseWriter: srw.ResponseWriter,
}

rp.proxyRequest(s, srwCheck, srw, checkReq)

if srwCheck.statusCode == http.StatusOK {
log.Debugf("%s: check grants for shared cached query request success; query: %q; Method: %s; URL: %q", s, checkQuery, checkReq.Method, checkReq.URL.String())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe adding the user would be nice in the debug logs, wdyt ?

} else {
log.Debugf("%s: check grants for shared cached query request failure: non-200 status code %d; query: %q; Method: %s; URL: %q", s, srwCheck.statusCode, checkQuery, checkReq.Method, checkReq.URL.String())
return
}
}

rp.serveFromCache(s, srw, req, origParams, q)
} else {
rp.proxyRequest(s, srw, srw, req)
Expand Down Expand Up @@ -925,3 +944,40 @@
s.requestPacketSize = len(q)
return s, 0, nil
}

// create a new request based on proxied one
// with query wrapped to fetch result types like:
// 'DESC ({original_query})'
// along with query parsed and analyzed for return types (which is fast)
// ClickHouse check permissions to execute this query for the user
func (s *scope) createCheckGrantsRequest(originalReq *http.Request) (*http.Request, string, error) {
originalQuery := originalReq.URL.Query().Get("query")
checkQuery := fmt.Sprintf("DESC (%s);", strings.TrimRight(originalQuery, ";"))

newURL := *originalReq.URL

queryParams, err := url.ParseQuery(newURL.RawQuery)
if err != nil {
return nil, checkQuery, err
}

queryParams.Set("query", checkQuery)

newURL.RawQuery = queryParams.Encode()

req := &http.Request{
Method: originalReq.Method,
URL: &newURL,
Proto: originalReq.Proto,
ProtoMajor: originalReq.ProtoMajor,
ProtoMinor: originalReq.ProtoMinor,
Header: originalReq.Header.Clone(),
Body: originalReq.Body,
Host: originalReq.Host,
ContentLength: originalReq.ContentLength,
Close: originalReq.Close,
TLS: originalReq.TLS,
}

return req, checkQuery, nil
}
Loading