Skip to content

Commit

Permalink
Log request information as transaction metadata
Browse files Browse the repository at this point in the history
Signed-off-by: Stefano Scafiti <[email protected]>
  • Loading branch information
ostafen committed May 30, 2024
1 parent 8de95c5 commit a741ec9
Show file tree
Hide file tree
Showing 23 changed files with 466 additions and 66 deletions.
1 change: 1 addition & 0 deletions cmd/immudb/command/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ func (cl *Commandline) setupFlags(cmd *cobra.Command, options *server.Options) {
cmd.Flags().MarkHidden("sessions-guard-check-interval")
cmd.Flags().Bool("grpc-reflection", options.GRPCReflectionServerEnabled, "GRPC reflection server enabled")
cmd.Flags().Bool("swaggerui", options.SwaggerUIEnabled, "Swagger UI enabled")
cmd.Flags().Bool("log-request-metadata", options.LogRequestMetadata, "log request information in transaction metadata")

flagNameMapping := map[string]string{
"replication-enabled": "replication-is-replica",
Expand Down
4 changes: 3 additions & 1 deletion cmd/immudb/command/parse_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func parseOptions() (options *server.Options, err error) {

grpcReflectionServerEnabled := viper.GetBool("grpc-reflection")
swaggerUIEnabled := viper.GetBool("swaggerui")
logRequestMetadata := viper.GetBool("log-request-metadata")

s3Storage := viper.GetBool("s3-storage")
s3RoleEnabled := viper.GetBool("s3-role-enabled")
Expand Down Expand Up @@ -153,7 +154,8 @@ func parseOptions() (options *server.Options, err error) {
WithPProf(pprof).
WithLogFormat(logFormat).
WithSwaggerUIEnabled(swaggerUIEnabled).
WithGRPCReflectionServerEnabled(grpcReflectionServerEnabled)
WithGRPCReflectionServerEnabled(grpcReflectionServerEnabled).
WithLogRequestMetadata(logRequestMetadata)

return options, nil
}
3 changes: 2 additions & 1 deletion embedded/store/immustore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1549,7 +1549,8 @@ func (s *ImmuStore) precommit(ctx context.Context, otx *OngoingTx, hdr *TxHeader
return nil, fmt.Errorf("%w: transaction does not validate against header", err)
}

if len(otx.entries) == 0 && otx.metadata.IsEmpty() {
// extra metadata are specified by the client and thus they are only allowed when entries is non empty
if len(otx.entries) == 0 && (otx.metadata.IsEmpty() || otx.metadata.HasExtraOnly()) {
return nil, ErrNoEntriesProvided
}

Expand Down
2 changes: 1 addition & 1 deletion embedded/store/ongoing_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (tx *OngoingTx) IsReadOnly() bool {

func (tx *OngoingTx) WithMetadata(md *TxMetadata) *OngoingTx {
tx.metadata = md
return nil
return tx
}

func (tx *OngoingTx) Timestamp() time.Time {
Expand Down
4 changes: 4 additions & 0 deletions embedded/store/tx_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ func (md *TxMetadata) IsEmpty() bool {
return md == nil || len(md.attributes) == 0
}

func (md *TxMetadata) HasExtraOnly() bool {
return len(md.attributes) == 1 && md.Extra() != nil
}

func (md *TxMetadata) Equal(amd *TxMetadata) bool {
if amd == nil || md == nil {
return false
Expand Down
101 changes: 101 additions & 0 deletions pkg/api/schema/metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package schema

import (
"context"
"errors"
)

const maxMetadataLen = 256

var (
ErrEmptyMetadataKey = errors.New("metadata key cannot be empty")
ErrEmptyMetadataValue = errors.New("metadata value cannot be empty")
ErrMetadataTooLarge = errors.New("metadata exceeds maximum size")
ErrCorruptedMetadata = errors.New("corrupted metadata")
)

const (
UserRequestMetadataKey = "usr"
IpRequestMetadataKey = "ip"
)

type Metadata map[string]string

func (m Metadata) Marshal() ([]byte, error) {
if err := m.validate(); err != nil {
return nil, err
}

var data [maxMetadataLen]byte

off := 0
for k, v := range m {
data[off] = byte(len(k) - 1)
data[off+1] = byte(len(v) - 1)

off += 2
copy(data[off:], []byte(k))
off += len(k)

copy(data[off:], []byte(v))
off += len(v)
}
return data[:off], nil
}

func (m Metadata) validate() error {
size := 0
for k, v := range m {
if len(k) == 0 {
return ErrEmptyMetadataKey
}

if len(v) == 0 {
return ErrEmptyMetadataValue
}

size += len(k) + len(v) + 2

if size > maxMetadataLen {
return ErrMetadataTooLarge
}
}
return nil
}

func (m Metadata) Unmarshal(data []byte) error {
off := 0
for off <= len(data)-2 {
keySize := int(data[off]) + 1
valueSize := int(data[off+1]) + 1

off += 2

if off+keySize+valueSize > len(data) {
return ErrCorruptedMetadata
}

m[string(data[off:off+keySize])] = string(data[off+keySize : off+keySize+valueSize])

off += keySize + valueSize
}

if off != len(data) {
return ErrCorruptedMetadata
}
return nil
}

type metadataKey struct{}

func ContextWithMetadata(ctx context.Context, md Metadata) context.Context {
return context.WithValue(ctx, metadataKey{}, md)
}

func MetadataFromContext(ctx context.Context) Metadata {
md, ok := ctx.Value(metadataKey{}).(Metadata)
if !ok {
return nil
}
return md
}
58 changes: 58 additions & 0 deletions pkg/api/schema/metadata_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package schema

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestMetadataMarshalUnmarshal(t *testing.T) {
meta := Metadata{
"user": "default",
"ip": "127.0.0.1:8080",
}

data, err := meta.Marshal()
require.NoError(t, err)

t.Run("valid metadata", func(t *testing.T) {
unmarshalled := Metadata{}
err := unmarshalled.Unmarshal(data)
require.NoError(t, err)
require.Equal(t, meta, unmarshalled)
})

t.Run("corrupted metadata", func(t *testing.T) {
unmarshalled := Metadata{}
err := unmarshalled.Unmarshal(data[:len(data)/2])
require.ErrorIs(t, err, ErrCorruptedMetadata)
})

t.Run("empty metadata", func(t *testing.T) {
m := Metadata{}
data, err := m.Marshal()
require.NoError(t, err)
require.Empty(t, data)

unmarshalled := Metadata{}
err = unmarshalled.Unmarshal([]byte{})
require.NoError(t, err)
require.Empty(t, unmarshalled)
})

t.Run("invalid metadata", func(t *testing.T) {
x := make([]byte, 256)

m := Metadata{"x": string(x)}
_, err := m.Marshal()
require.ErrorIs(t, err, ErrMetadataTooLarge)

m = Metadata{"": "v"}
_, err = m.Marshal()
require.ErrorIs(t, err, ErrEmptyMetadataKey)

m = Metadata{"k": ""}
_, err = m.Marshal()
require.ErrorIs(t, err, ErrEmptyMetadataValue)
})
}
6 changes: 4 additions & 2 deletions pkg/auth/passwords.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@ func ComparePasswords(hashedPassword []byte, plainPassword []byte) error {
return bcrypt.CompareHashAndPassword(hashedPassword, plainPassword)
}

const minPasswordLen = 8
const maxPasswordLen = 32
const (
minPasswordLen = 8
maxPasswordLen = 32
)

// PasswordRequirementsMsg message used to inform the user about password strength requirements
var PasswordRequirementsMsg = fmt.Sprintf(
Expand Down
6 changes: 3 additions & 3 deletions pkg/auth/serverinterceptors.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ func ServerUnaryInterceptor(ctx context.Context, req interface{}, info *grpc.Una
}

var localAddress = map[string]struct{}{
"127.0.0.1": struct{}{},
"localhost": struct{}{},
"bufconn": struct{}{},
"127.0.0.1": {},
"localhost": {},
"bufconn": {},
}

func isLocalClient(ctx context.Context) bool {
Expand Down
30 changes: 19 additions & 11 deletions pkg/auth/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,13 @@ type User struct {
CreatedAt time.Time `json:"createdat"` //time in which this user is created/updated
}

// SysAdminUsername the system admin username
var SysAdminUsername = "immudb"
var (
// SysAdminUsername the system admin username
SysAdminUsername = "immudb"

// SysAdminPassword the admin password (can be default or from command flags, config or env var)
var SysAdminPassword = SysAdminUsername
// SysAdminPassword the admin password (can be default or from command flags, config or env var)
SysAdminPassword = SysAdminUsername
)

// SetPassword Hashes and salts the password and assigns it to hashedPassword of User
func (u *User) SetPassword(plainPassword []byte) ([]byte, error) {
Expand All @@ -63,10 +65,16 @@ func (u *User) ComparePasswords(plainPassword []byte) error {
return ComparePasswords(u.HashedPassword, plainPassword)
}

// IsValidUsername is a regexp function used to check username requirements
var IsValidUsername = regexp.MustCompile(`^[a-zA-Z0-9_]+$`).MatchString
const maxUsernameLen = 63

var usernameRegex = regexp.MustCompile(`^[a-zA-Z0-9_]+$`)

// IsValidUsername is a function used to check username requirements
func IsValidUsername(user string) bool {
return len(user) <= maxUsernameLen && usernameRegex.MatchString(user)
}

//HasPermission checks if user has such permission for this database
// HasPermission checks if user has such permission for this database
func (u *User) HasPermission(database string, permission uint32) bool {
for _, val := range u.Permissions {
if (val.Database == database) &&
Expand All @@ -77,7 +85,7 @@ func (u *User) HasPermission(database string, permission uint32) bool {
return false
}

//HasAtLeastOnePermission checks if user has this permission for at least one database
// HasAtLeastOnePermission checks if user has this permission for at least one database
func (u *User) HasAtLeastOnePermission(permission uint32) bool {
for _, val := range u.Permissions {
if val.Permission == permission {
Expand All @@ -87,7 +95,7 @@ func (u *User) HasAtLeastOnePermission(permission uint32) bool {
return false
}

//WhichPermission returns the permission that this user has on this database
// WhichPermission returns the permission that this user has on this database
func (u *User) WhichPermission(database string) uint32 {
if u.IsSysAdmin {
return PermissionSysAdmin
Expand All @@ -100,7 +108,7 @@ func (u *User) WhichPermission(database string) uint32 {
return PermissionNone
}

//RevokePermission revoke database permission from user
// RevokePermission revoke database permission from user
func (u *User) RevokePermission(database string) bool {
for i, val := range u.Permissions {
if val.Database == database {
Expand All @@ -112,7 +120,7 @@ func (u *User) RevokePermission(database string) bool {
return false
}

//GrantPermission add permission to database
// GrantPermission add permission to database
func (u *User) GrantPermission(database string, permission uint32) bool {
//first remove any previous permission for this db
u.RevokePermission(database)
Expand Down
1 change: 0 additions & 1 deletion pkg/database/all_ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ func (d *db) ExecAll(ctx context.Context, req *schema.ExecAllRequest) (*schema.T
kmap := make(map[[sha256.Size]byte]bool)

for i, op := range req.Operations {

e := &store.EntrySpec{}

switch x := op.Operation.(type) {
Expand Down
Loading

0 comments on commit a741ec9

Please sign in to comment.