Skip to content

Commit

Permalink
fix: add a restriction for CAS and lock, only one at the time and avo…
Browse files Browse the repository at this point in the history
…id overwriting data when lock release
  • Loading branch information
Juanadelacuesta committed Jul 25, 2023
1 parent 0853c6d commit 847625b
Show file tree
Hide file tree
Showing 4 changed files with 202 additions and 67 deletions.
104 changes: 75 additions & 29 deletions command/agent/variable_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package agent

import (
"errors"
"fmt"
"net/http"
"net/url"
Expand All @@ -13,10 +14,11 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
)

const (
renewLockQueryParam = "renew"
acquireLockQueryParam = "acquire"
releaseLockQueryParam = "release"
var (
renewLockQueryParam = "renew"

acquireLockQueryParam = string(structs.VarOpLockAcquire)
releaseLockQueryParam = string(structs.VarOpLockRelease)
)

func (s *HTTPServer) VariablesListRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
Expand Down Expand Up @@ -52,67 +54,84 @@ func (s *HTTPServer) VariableSpecificRequest(resp http.ResponseWriter, req *http
case http.MethodGet:
return s.variableQuery(resp, req, path)
case http.MethodPut, http.MethodPost:
urlParams := req.URL.Query()
lockOperation, err := getLockOperation(urlParams)
if err != nil {
return nil, CodedError(http.StatusBadRequest, err.Error())
}

queryParams := req.URL.Query()
_, renewLock := queryParams[renewLockQueryParam]
_, acquireLock := queryParams[acquireLockQueryParam]
_, releaseLock := queryParams[releaseLockQueryParam]
cq := req.URL.Query().Get("cas")

if cq != "" && lockOperation != "" {
return nil, CodedError(http.StatusBadRequest, "CAS can't be used with lock operations")
}

if renewLock || acquireLock || releaseLock {
if !isOneAndOnlyOneSet(renewLock, acquireLock, releaseLock) {
return nil, CodedError(http.StatusBadRequest, "multiple lock operations")
}
return s.variableLockOperation(resp, req, queryParams)
if lockOperation == "" {
return s.variableUpsert(resp, req, path)
}

return s.variableUpsert(resp, req, path)
if lockOperation == renewLockQueryParam {
return s.variableLockRenew(resp, req)
}

return s.variableLockOperation(resp, req, path, lockOperation)

case http.MethodDelete:
return s.variableDelete(resp, req, path)
default:
return nil, CodedError(http.StatusBadRequest, ErrInvalidMethod)
}
}

func (s *HTTPServer) variableLockOperation(resp http.ResponseWriter, req *http.Request,
operation url.Values) (interface{}, error) {
func (s *HTTPServer) variableLockRenew(resp http.ResponseWriter, req *http.Request) (interface{}, error) {

// Parse the Variable
var Variable structs.VariableDecrypted
if err := decodeBody(req, &Variable); err != nil {
return nil, CodedError(http.StatusBadRequest, err.Error())
}

if operation[renewLockQueryParam][0] == renewLockQueryParam {
args := structs.VariablesRenewLockRequest{
Path: Variable.Path,
args := structs.VariablesRenewLockRequest{
Path: Variable.Path,

LockID: Variable.Lock.ID,
}
LockID: Variable.Lock.ID,
}

s.parseWriteRequest(req, &args.WriteRequest)
s.parseWriteRequest(req, &args.WriteRequest)

var out structs.VariablesRenewLockResponse
if err := s.agent.RPC(structs.VariablesRenewLockRPCMethod, &args, &out); err != nil {
return nil, err
}
var out structs.VariablesRenewLockResponse
if err := s.agent.RPC(structs.VariablesRenewLockRPCMethod, &args, &out); err != nil {
return nil, err
}

return out.VarMeta, nil
}

func (s *HTTPServer) variableLockOperation(resp http.ResponseWriter, req *http.Request,
path, operation string) (interface{}, error) {

return out.VarMeta, nil
// Parse the Variable
var Variable structs.VariableDecrypted
if err := decodeBody(req, &Variable); err != nil {
return nil, CodedError(http.StatusBadRequest, err.Error())
}

// At this point, the operation can be either acquire or release, and they are
// both handled by the VariablesApplyRPCMethod.
args := structs.VariablesApplyRequest{
Op: structs.VarOpSet,
Op: structs.VarOp(operation),
Var: &Variable,
}

Variable.Path = path

s.parseWriteRequest(req, &args.WriteRequest)

var out structs.VariablesApplyResponse
err := s.agent.RPC(structs.VariablesApplyRPCMethod, &args, &out)
defer setIndex(resp, out.WriteMeta.Index)
if err != nil {
return nil, CodedError(http.StatusInternalServerError, err.Error())
return nil, err
}

if out.Conflict != nil {
Expand Down Expand Up @@ -263,3 +282,30 @@ func parseCAS(req *http.Request) (bool, uint64, error) {
func isOneAndOnlyOneSet(a, b, c bool) bool {
return (a || b || c) && !a != !b != !c != !(a && b && c)
}

// getLockOperation returns the lock operation to be performed in case there is
// one. It returns error if more than one is set.
func getLockOperation(queryParams url.Values) (string, error) {
_, renewLock := queryParams[renewLockQueryParam]
_, acquireLock := queryParams[acquireLockQueryParam]
_, releaseLock := queryParams[releaseLockQueryParam]

if !renewLock && !acquireLock && !releaseLock {
return "", nil
}

if !isOneAndOnlyOneSet(renewLock, acquireLock, releaseLock) {
return "", errors.New("multiple lock operations")
}

switch {
case renewLock:
return renewLockQueryParam, nil
case acquireLock:
return acquireLockQueryParam, nil
case releaseLock:
return releaseLockQueryParam, nil
default:
return "", errors.New("unspecified lock operation")
}
}
7 changes: 7 additions & 0 deletions nomad/state/state_store_variables.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ func (s *StateStore) varSetTxn(tx WriteTxn, idx uint64, req *structs.VarApplySta
if err != nil {
return req.ErrorResponse(idx, fmt.Errorf("failed sve lookup: %s", err))
}

existing, _ := existingRaw.(*structs.VariableEncrypted)

existingQuota, err := tx.First(TableVariablesQuotas, indexID, sv.Namespace)
Expand Down Expand Up @@ -609,6 +610,7 @@ func (s *StateStore) VarLockRelease(idx uint64,
VariableMetadata: structs.VariableMetadata{
Namespace: sv.Namespace,
Path: sv.Path,
Lock: &structs.VariableLock{},
},
}
return req.ConflictResponse(idx, zeroVal)
Expand All @@ -618,6 +620,11 @@ func (s *StateStore) VarLockRelease(idx uint64,
updated.Lock = nil
updated.ModifyIndex = idx

// Avoid overwriting the variable data when releasing the lock, to prevent
// a delay release to remove customer data.

updated.Data = sv.Data

err = s.updateVarsAndIndexTxn(tx, idx, &updated)
if err != nil {
req.ErrorResponse(idx, fmt.Errorf("failed lock release: %s", err))
Expand Down
79 changes: 78 additions & 1 deletion nomad/structs/variables.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package structs

import (
"bytes"
"encoding/json"
"errors"
"fmt"
"reflect"
Expand Down Expand Up @@ -153,6 +154,77 @@ func (vl *VariableLock) Equal(vl2 *VariableLock) bool {
return true
}

// MarshalJSON implements the json.Marshaler interface and allows
// VariableLock.TTL and VariableLock.Delay to be marshaled correctly.
func (vl *VariableLock) MarshalJSON() ([]byte, error) {
type Alias VariableLock
exported := &struct {
TTL string
LockDelay string
*Alias
}{
TTL: vl.TTL.String(),
LockDelay: vl.LockDelay.String(),
Alias: (*Alias)(vl),
}

if vl.TTL == 0 {
exported.TTL = ""
}

if vl.LockDelay == 0 {
exported.LockDelay = ""
}
return json.Marshal(exported)
}

// UnmarshalJSON implements the json.Unmarshaler interface and allows
// VariableLock.TTL and VariableLock.Delay to be unmarshalled correctly.
func (vl *VariableLock) UnmarshalJSON(data []byte) (err error) {
type Alias VariableLock
aux := &struct {
TTL interface{}
LockDelay interface{}
*Alias
}{
Alias: (*Alias)(vl),
}

if err = json.Unmarshal(data, &aux); err != nil {
return err
}

if aux.TTL != nil {
switch v := aux.TTL.(type) {
case string:
if v != "" {
if vl.TTL, err = time.ParseDuration(v); err != nil {
return err
}
}
case float64:
vl.TTL = time.Duration(v)
}

}

if aux.LockDelay != nil {
switch v := aux.LockDelay.(type) {
case string:
if v != "" {
if vl.LockDelay, err = time.ParseDuration(v); err != nil {
return err
}
}
case float64:
vl.LockDelay = time.Duration(v)
}

}

return nil
}

// Copy creates a deep copy of the variable lock. This copy can then be safely
// modified. It handles nil objects.
func (vl *VariableLock) Copy() *VariableLock {
Expand Down Expand Up @@ -192,7 +264,7 @@ func (vl *VariableLock) Validate() error {
mErr = multierror.Append(mErr, errInvalidTTL)
}

return mErr
return mErr.ErrorOrNil()
}

func (vi VariableItems) Size() uint64 {
Expand Down Expand Up @@ -303,6 +375,7 @@ func (vd VariableDecrypted) Validate() error {
if len(vd.Items) == 0 {
return errors.New("empty variables are invalid")
}

if vd.Items.Size() > maxVariableSize {
return errors.New("variables are limited to 64KiB in total size")
}
Expand All @@ -311,6 +384,10 @@ func (vd VariableDecrypted) Validate() error {
return err
}

if vd.Lock != nil {
return vd.Lock.Validate()
}

return nil
}

Expand Down
Loading

0 comments on commit 847625b

Please sign in to comment.