Skip to content

Commit

Permalink
Use sync.Pool for relation filter buffers (#118)
Browse files Browse the repository at this point in the history
  • Loading branch information
ronenh authored Dec 12, 2024
1 parent 02ca521 commit 398da65
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 42 deletions.
14 changes: 10 additions & 4 deletions pkg/directory/v3/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,13 +184,16 @@ func (s *Reader) GetRelation(ctx context.Context, req *dsr3.GetRelationRequest)
return resp, err
}

path, filter, err := getRelation.PathAndFilter()
filter := ds.RelationIdentifierBuffer()
defer ds.ReturnRelationIdentifierBuffer(filter)

path, err := getRelation.PathAndFilter(filter)
if err != nil {
return resp, err
}

err = s.store.DB().View(func(tx *bolt.Tx) error {
relations, err := bdb.Scan[dsc3.Relation](ctx, tx, path, filter)
relations, err := bdb.Scan[dsc3.Relation](ctx, tx, path, filter.Bytes())
if err != nil {
return err
}
Expand Down Expand Up @@ -259,11 +262,14 @@ func (s *Reader) GetRelations(ctx context.Context, req *dsr3.GetRelationsRequest
return resp, err
}

path, keyFilter, valueFilter := getRelations.RelationValueFilter()
keyFilter := ds.RelationIdentifierBuffer()
defer ds.ReturnRelationIdentifierBuffer(keyFilter)

path, valueFilter := getRelations.RelationValueFilter(keyFilter)

opts := []bdb.ScanOption{
bdb.WithPageToken(req.Page.Token),
bdb.WithKeyFilter(keyFilter),
bdb.WithKeyFilter(keyFilter.Bytes()),
}

err := s.store.DB().View(func(tx *bolt.Tx) error {
Expand Down
7 changes: 5 additions & 2 deletions pkg/ds/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,12 @@ func (i *check) Exec(ctx context.Context, tx *bolt.Tx, mc *cache.Cache) (*dsr3.C

func getRelations(ctx context.Context, tx *bolt.Tx) graph.RelationReader {
return func(r *dsc3.RelationIdentifier, pool graph.RelationPool, out *[]*dsc3.RelationIdentifier) error {
path, keyFilter, valueFilter := RelationIdentifier(r).Filter()
keyFilter := RelationIdentifierBuffer()
defer ReturnRelationIdentifierBuffer(keyFilter)

return bdb.ScanWithFilter(ctx, tx, path, keyFilter, valueFilter, pool, out)
path, valueFilter := RelationIdentifier(r).Filter(keyFilter)

return bdb.ScanWithFilter(ctx, tx, path, keyFilter.Bytes(), valueFilter, pool, out)
}
}

Expand Down
24 changes: 24 additions & 0 deletions pkg/ds/pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package ds

import (
"bytes"

"github.com/aserto-dev/azm/mempool"
)

const maxRelationIdentifierSize = 384

var relidBufPool = mempool.NewPool[*bytes.Buffer](NewRelationIdentifierBuf)

func NewRelationIdentifierBuf() *bytes.Buffer {
return bytes.NewBuffer(make([]byte, 0, maxRelationIdentifierSize))
}

func RelationIdentifierBuffer() *bytes.Buffer {
return relidBufPool.Get()
}

func ReturnRelationIdentifierBuffer(b *bytes.Buffer) {
b.Reset()
relidBufPool.Put(b)
}
60 changes: 24 additions & 36 deletions pkg/ds/relation.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,103 +100,95 @@ func (i *relation) SubKey() []byte {
return buf.Bytes()
}

func (i *relation) PathAndFilter() ([]string, []byte, error) {
func (i *relation) PathAndFilter(filter *bytes.Buffer) ([]string, error) {
switch {
case ObjectSelector(i.Object()).IsComplete():
return bdb.RelationsObjPath, i.ObjFilter(), nil
i.ObjFilter(filter)
return bdb.RelationsObjPath, nil
case ObjectSelector(i.Subject()).IsComplete():
return bdb.RelationsSubPath, i.SubFilter(), nil
i.SubFilter(filter)
return bdb.RelationsSubPath, nil
default:
return []string{}, []byte{}, ErrNoCompleteObjectIdentifier
return []string{}, ErrNoCompleteObjectIdentifier
}
}

// ObjFilter
// format: obj_type : obj_id # relation @ sub_type : sub_id (# sub_relation).
// TODO: if subject relation exists add subject relation to filter clause.
func (i *relation) ObjFilter() []byte {
buf := newRelationBuffer()

func (i *relation) ObjFilter(buf *bytes.Buffer) {
buf.WriteString(i.GetObjectType())
buf.WriteByte(TypeIDSeparator)
buf.WriteString(i.GetObjectId())
buf.WriteByte(InstanceSeparator)

if IsNotSet(i.GetRelation()) {
return buf.Bytes()
return
}

buf.WriteString(i.GetRelation())
buf.WriteByte(InstanceSeparator)

if IsNotSet(i.GetSubjectType()) {
return buf.Bytes()
return
}

buf.WriteString(i.GetSubjectType())
buf.WriteByte(TypeIDSeparator)

if IsNotSet(i.GetSubjectId()) {
return buf.Bytes()
return
}

buf.WriteString(i.GetSubjectId())

return buf.Bytes()
}

// SubFilter
// format: sub_type : sub_id (# sub_relation) | obj_type : obj_id # relation.
// TODO: if subject relation exists add subject relation to filter clause.
func (i *relation) SubFilter() []byte {
buf := newRelationBuffer()

func (i *relation) SubFilter(buf *bytes.Buffer) {
buf.WriteString(i.GetSubjectType())
buf.WriteByte(TypeIDSeparator)
buf.WriteString(i.GetSubjectId())
buf.WriteByte(InstanceSeparator)

if IsNotSet(i.GetRelation()) {
return buf.Bytes()
return
}

buf.WriteString(i.GetRelation())
buf.WriteByte(InstanceSeparator)

if IsNotSet(i.GetObjectType()) {
return buf.Bytes()
return
}

buf.WriteString(i.GetObjectType())
buf.WriteByte(TypeIDSeparator)

if IsNotSet(i.GetObjectId()) {
return buf.Bytes()
return
}

buf.WriteString(i.GetObjectId())

return buf.Bytes()
}

// nolint: gocritic
func (i *relation) Filter() (path bdb.Path, keyFilter []byte, valueFilter func(*dsc3.RelationIdentifier) bool) {
func (i *relation) Filter(keyFilter *bytes.Buffer) (path bdb.Path, valueFilter func(*dsc3.RelationIdentifier) bool) {
// #1 determine if object identifier is complete (has type+id)
// set index path accordingly
// set keyFilter to match covering path
// when no complete object identifier, fallback to a full table scan
if ObjectIdentifier(i.Object()).IsComplete() {
path = bdb.RelationsObjPath
keyFilter = i.ObjFilter()
}
if ObjectIdentifier(i.Subject()).IsComplete() {
i.ObjFilter(keyFilter)
} else if ObjectIdentifier(i.Subject()).IsComplete() {
path = bdb.RelationsSubPath
keyFilter = i.SubFilter()
i.SubFilter(keyFilter)
}
if len(path) == 0 {
log.Debug().Msg("no covering index path, default to scan of relation object path")
path = bdb.RelationsObjPath
keyFilter = []byte{}
}

// #2 build valueFilter function
Expand Down Expand Up @@ -260,27 +252,25 @@ func (i *relation) Filter() (path bdb.Path, keyFilter []byte, valueFilter func(*
return true
}

return path, keyFilter, valueFilter
return path, valueFilter
}

// nolint: gocritic // commentedOutCode
func (i *relation) RelationValueFilter() (path bdb.Path, keyFilter []byte, valueFilter func(*dsc3.Relation) bool) {
func (i *relation) RelationValueFilter(keyFilter *bytes.Buffer) (path bdb.Path, valueFilter func(*dsc3.Relation) bool) {
// #1 determine if object identifier is complete (has type+id)
// set index path accordingly
// set keyFilter to match covering path
// when no complete object identifier, fallback to a full table scan
if ObjectIdentifier(i.Object()).IsComplete() {
path = bdb.RelationsObjPath
keyFilter = i.ObjFilter()
}
if ObjectIdentifier(i.Subject()).IsComplete() {
i.ObjFilter(keyFilter)
} else if ObjectIdentifier(i.Subject()).IsComplete() {
path = bdb.RelationsSubPath
keyFilter = i.SubFilter()
i.SubFilter(keyFilter)
}
if len(path) == 0 {
log.Debug().Msg("no covering index path, default to scan of relation object path")
path = bdb.RelationsObjPath
keyFilter = []byte{}
}

// #2 build valueFilter function
Expand Down Expand Up @@ -344,11 +334,9 @@ func (i *relation) RelationValueFilter() (path bdb.Path, keyFilter []byte, value
return true
}

return path, keyFilter, valueFilter
return path, valueFilter
}

const maxRelationIdentifierSize = 384

func newRelationBuffer() *bytes.Buffer {
return bytes.NewBuffer(make([]byte, 0, maxRelationIdentifierSize))
}

0 comments on commit 398da65

Please sign in to comment.