Skip to content

Commit

Permalink
Use sync.Pool for relation filter buffers (#114)
Browse files Browse the repository at this point in the history
* azm with shared relations poo

* sync.Pool for relation filter buffers
  • Loading branch information
ronenh authored Dec 11, 2024
1 parent deeb4f0 commit 107dd7f
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 42 deletions.
7 changes: 2 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ go 1.22.10

toolchain go1.23.4

// replace github.com/aserto-dev/azm => ../azm
replace github.com/aserto-dev/azm => ../azm

// replace github.com/aserto-dev/go-directory => ../go-directory

require (
github.com/Masterminds/semver/v3 v3.3.1
github.com/aserto-dev/aserto-grpc v0.2.6
github.com/aserto-dev/azm v0.2.2-0.20241210213546-f97c77e8252b
github.com/aserto-dev/azm v0.2.2-0.20241210214915-de60dbb1c83b
github.com/aserto-dev/errors v0.0.11
github.com/aserto-dev/go-directory v0.33.2-0.20241210213355-3d02dab1a2bd
github.com/bufbuild/protovalidate-go v0.7.3
Expand All @@ -22,7 +22,6 @@ require (
github.com/homeport/dyff v1.9.3
github.com/panmari/cuckoofilter v1.0.6
github.com/pkg/errors v0.9.1
github.com/pkg/profile v1.7.0
github.com/rs/zerolog v1.33.0
github.com/samber/lo v1.47.0
github.com/stretchr/testify v1.10.0
Expand All @@ -40,15 +39,13 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/deckarep/golang-set/v2 v2.7.0 // indirect
github.com/dgryski/go-metro v0.0.0-20211217172704-adc40b04c140 // indirect
github.com/felixge/fgprof v0.9.3 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/gonvenience/bunt v1.3.5 // indirect
github.com/gonvenience/neat v1.3.13 // indirect
github.com/gonvenience/term v1.0.2 // indirect
github.com/gonvenience/text v1.0.7 // indirect
github.com/gonvenience/wrap v1.2.0 // indirect
github.com/google/cel-go v0.22.0 // indirect
github.com/google/pprof v0.0.0-20240727154555-813a5fbdbec8 // indirect
github.com/gorilla/mux v1.8.1 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.24.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
Expand Down
12 changes: 0 additions & 12 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ github.com/antlr4-go/antlr/v4 v4.13.1 h1:SqQKkuVZ+zWkMMNkjy5FZe5mr5WURWnlpmOuzYW
github.com/antlr4-go/antlr/v4 v4.13.1/go.mod h1:GKmUxMtwp6ZgGwZSva4eWPC5mS6vUAmOABFgjdkM7Nw=
github.com/aserto-dev/aserto-grpc v0.2.6 h1:h64MYALF5zLm2sSKcLEtyXyrJvZSxfqTOmQ1j/J44kI=
github.com/aserto-dev/aserto-grpc v0.2.6/go.mod h1:Vki74KINVfnwtJ8QGzRm+xHNjsJ2KUWFtXhezJK9DEg=
github.com/aserto-dev/azm v0.2.2-0.20241210213546-f97c77e8252b h1:D+R5eIXvvN7HzIOwAeHf0w8xYa4ukDuukswGuqB8rXA=
github.com/aserto-dev/azm v0.2.2-0.20241210213546-f97c77e8252b/go.mod h1:4bU8iz8b4ZC4IuoghLXC4S2TThu9eJwbAfVxosVznLs=
github.com/aserto-dev/errors v0.0.11 h1:CXo+Uwmh09doG2HvL1SC8Fnne8f9VPrGyEQPtogAfyY=
github.com/aserto-dev/errors v0.0.11/go.mod h1:T1YQOtcxpgBriPTn5HXJkD/QukYz5YojYOIzGMo0ybM=
github.com/aserto-dev/go-directory v0.33.2-0.20241210213355-3d02dab1a2bd h1:WOEcH7FURcfZBrQ5GKCsNXcFhqIqhwVQdA2v0JNgI0A=
Expand All @@ -26,9 +24,6 @@ github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZx
github.com/bufbuild/protovalidate-go v0.7.3 h1:kKnoSueygR3xxppvuBpm9SEwIsP359MMRfMBGmRByPg=
github.com/bufbuild/protovalidate-go v0.7.3/go.mod h1:CFv34wMqiBzAHdQ4q/tWYi9ILFYKuaC3/4zh6eqdUck=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
Expand All @@ -46,8 +41,6 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/envoyproxy/protoc-gen-validate v1.1.0 h1:tntQDh69XqOCOZsDz0lVJQez/2L6Uu2PdjCQwWCJ3bM=
github.com/envoyproxy/protoc-gen-validate v1.1.0/go.mod h1:sXRDRVmzEbkM7CVcM06s9shE/m23dg3wzjl0UWqJ2q4=
github.com/felixge/fgprof v0.9.3 h1:VvyZxILNuCiUCSXtPtYmmtGvb65nqXh2QFWc0Wpf2/g=
github.com/felixge/fgprof v0.9.3/go.mod h1:RdbpDgzqYVh/T9fPELJyV7EYJuHB55UTEULNun8eiPw=
github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA=
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
github.com/go-http-utils/headers v0.0.0-20181008091004-fed159eddc2a h1:v6zMvHuY9yue4+QkG/HQ/W67wvtQmWJ4SDo9aK/GIno=
Expand Down Expand Up @@ -87,7 +80,6 @@ github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5a
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/pprof v0.0.0-20211214055906-6f57359322fd/go.mod h1:KgnwoLYCZ8IQu3XUZ8Nc/bM9CCZFOyjUNOSygVozoDg=
github.com/google/pprof v0.0.0-20240727154555-813a5fbdbec8 h1:FKHo8hFI3A+7w0aUQuYXQ+6EN5stWmeY/AZqtM8xk9k=
github.com/google/pprof v0.0.0-20240727154555-813a5fbdbec8/go.mod h1:K1liHPHnj73Fdn/EKuT8nrFqBihUSKXoLYU0BuatOYo=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
Expand All @@ -105,7 +97,6 @@ github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+l
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/homeport/dyff v1.9.3 h1:ltfPBSFppsJGE/Mmo8I1h3fqYxk+c3oTf6EMFpBNL9Y=
github.com/homeport/dyff v1.9.3/go.mod h1:smCju/EnwXnfxdeN//7Dt7/Z36I11NbTPNZNADRn1+A=
github.com/ianlancetaylor/demangle v0.0.0-20210905161508-09a460cdf81d/go.mod h1:aYm2/VgdVmcIU8iMfdMvDMsRAQjcfZSKFby6HOFvi/w=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
Expand Down Expand Up @@ -148,8 +139,6 @@ github.com/panmari/cuckoofilter v1.0.6/go.mod h1:bKADbQPGbN6TxUvo/IbMEIUbKuASnps
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/profile v1.7.0 h1:hnbDkaNWPCLMO9wGLdBFTIZvzDrDfBM2072E1S9gJkA=
github.com/pkg/profile v1.7.0/go.mod h1:8Uer0jas47ZQMJ7VD+OHknK4YDY07LPUC6dEvqDjvNo=
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 h1:GFCKgmp0tecUJ0sJuv4pzYCqS9+RGSn52M3FUwPs+uo=
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10/go.mod h1:t/avpk3KcrXxUnYOhZhMXJlSEyie6gQbtLq5NM3loB8=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand Down Expand Up @@ -229,7 +218,6 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
Expand Down
37 changes: 28 additions & 9 deletions pkg/ds/relation.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package ds
import (
"bytes"
"strings"
"sync"

"github.com/aserto-dev/azm/safe"
dsc3 "github.com/aserto-dev/go-directory/aserto/directory/common/v3"
Expand Down Expand Up @@ -55,8 +56,11 @@ func (i *relation) Key() []byte {
}

func (i *relation) ObjKey() []byte {
var buf bytes.Buffer
buf.Grow(832)
buf := bytes.NewBuffer(bufPool.Get().([]byte))
defer func() {
buf.Reset()
bufPool.Put(buf.Bytes())
}()

buf.WriteString(i.GetObjectType())
buf.WriteByte(TypeIDSeparator)
Expand All @@ -79,8 +83,11 @@ func (i *relation) ObjKey() []byte {
}

func (i *relation) SubKey() []byte {
var buf bytes.Buffer
buf.Grow(832)
buf := bytes.NewBuffer(bufPool.Get().([]byte))
defer func() {
buf.Reset()
bufPool.Put(buf.Bytes())
}()

buf.WriteString(i.GetSubjectType())
buf.WriteByte(TypeIDSeparator)
Expand Down Expand Up @@ -117,8 +124,11 @@ func (i *relation) PathAndFilter() ([]string, []byte, error) {
// format: obj_type : obj_id # relation @ sub_type : sub_id (# sub_relation).
// TODO: if subject relation exists add subject relation to filter clause.
func (i *relation) ObjFilter() []byte {
var buf bytes.Buffer
buf.Grow(832)
buf := bytes.NewBuffer(bufPool.Get().([]byte))
defer func() {
buf.Reset()
bufPool.Put(buf.Bytes())
}()

buf.WriteString(i.GetObjectType())
buf.WriteByte(TypeIDSeparator)
Expand Down Expand Up @@ -152,8 +162,11 @@ func (i *relation) ObjFilter() []byte {
// format: sub_type : sub_id (# sub_relation) | obj_type : obj_id # relation.
// TODO: if subject relation exists add subject relation to filter clause.
func (i *relation) SubFilter() []byte {
var buf bytes.Buffer
buf.Grow(832)
buf := bytes.NewBuffer(bufPool.Get().([]byte))
defer func() {
buf.Reset()
bufPool.Put(buf.Bytes())
}()

buf.WriteString(i.GetSubjectType())
buf.WriteByte(TypeIDSeparator)
Expand Down Expand Up @@ -204,7 +217,7 @@ func (i *relation) Filter() (path bdb.Path, keyFilter []byte, valueFilter func(*
}

// #2 build valueFilter function
filters := []func(item *dsc3.RelationIdentifier) bool{}
filters := make([]func(item *dsc3.RelationIdentifier) bool, 0, 6)

if fv := i.GetObjectType(); fv != "" {
filters = append(filters, func(item *dsc3.RelationIdentifier) bool {
Expand Down Expand Up @@ -350,3 +363,9 @@ func (i *relation) RelationValueFilter() (path bdb.Path, keyFilter []byte, value

return path, keyFilter, valueFilter
}

var bufPool = sync.Pool{
New: func() interface{} {
return make([]byte, 0, 832)
},
}
96 changes: 80 additions & 16 deletions tests/check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,43 +5,49 @@ import (
"encoding/json"
"io"
"os"
"runtime"
"testing"

dsi3 "github.com/aserto-dev/go-directory/aserto/directory/importer/v3"
dsr3 "github.com/aserto-dev/go-directory/aserto/directory/reader/v3"
"github.com/aserto-dev/go-edge-ds/pkg/server"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)

func BenchmarkCheck(b *testing.B) {
func BenchmarkCheckSerial(b *testing.B) {
assert := require.New(b)

checks, err := loadChecks()
checks, err := loadChecks[dsr3.CheckRequest]()
assert.NoError(err)
assert.NotEmpty(checks)

client, cleanup := testInit()
b.Cleanup(cleanup)

setupBenchmark(b, client)
ctx := context.Background()

manifest, err := os.ReadFile("./data/check/manifest.yaml")
assert.NoError(err)
b.ResetTimer()

assert.NoError(deleteManifest(client))
assert.NoError(setManifest(client, manifest))
for _, check := range checks {
_, err := client.V3.Reader.Check(ctx, check)
assert.NoError(err)
}
}

g, iCtx := errgroup.WithContext(ctx)
stream, err := client.V3.Importer.Import(iCtx)
assert.NoError(err)
func BenchmarkCheckParallel(b *testing.B) {
assert := require.New(b)

g.Go(receiver(stream))
checks, err := loadChecks[dsr3.CheckRequest]()
assert.NoError(err)
assert.NotEmpty(checks)

assert.NoError(importFile(stream, "./data/check/objects.json"))
assert.NoError(importFile(stream, "./data/check/relations.json"))
assert.NoError(stream.CloseSend())
client, cleanup := testInit()
b.Cleanup(cleanup)

assert.NoError(g.Wait())
setupBenchmark(b, client)
ctx := context.Background()

b.ResetTimer()

Expand All @@ -55,13 +61,71 @@ func BenchmarkCheck(b *testing.B) {
}
}

func loadChecks() ([]*dsr3.CheckRequest, error) {
func BenchmarkCheckParallelChunks(b *testing.B) {
assert := require.New(b)

checks, err := loadChecks[dsr3.CheckRequest]()
assert.NoError(err)
assert.NotEmpty(checks)

client, cleanup := testInit()
b.Cleanup(cleanup)

setupBenchmark(b, client)
ctx := context.Background()

var chunks [][]*dsr3.CheckRequest
numChunks := runtime.NumCPU()
chunkSize := (len(checks) + numChunks - 1) / numChunks

for i := 0; i < len(checks); i += chunkSize {
end := min(i+chunkSize, len(checks))
chunks = append(chunks, checks[i:end])
}

b.ResetTimer()

for _, chunk := range chunks {
b.RunParallel(func(pb *testing.PB) {
for _, check := range chunk {
for pb.Next() {
_, err := client.V3.Reader.Check(ctx, check)
assert.NoError(err)
}
}
})
}
}

func setupBenchmark(b *testing.B, client *server.TestEdgeClient) {
assert := require.New(b)

manifest, err := os.ReadFile("./data/check/manifest.yaml")
assert.NoError(err)

assert.NoError(deleteManifest(client))
assert.NoError(setManifest(client, manifest))

g, iCtx := errgroup.WithContext(context.Background())
stream, err := client.V3.Importer.Import(iCtx)
assert.NoError(err)

g.Go(receiver(stream))

assert.NoError(importFile(stream, "./data/check/objects.json"))
assert.NoError(importFile(stream, "./data/check/relations.json"))
assert.NoError(stream.CloseSend())

assert.NoError(g.Wait())
}

func loadChecks[T any]() ([]*T, error) {
bin, err := os.ReadFile("./data/check/check.json")
if err != nil {
return nil, err
}

var checks []*dsr3.CheckRequest
var checks []*T
if err := json.Unmarshal(bin, &checks); err != nil {
return nil, err

Expand Down
68 changes: 68 additions & 0 deletions tests/search_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package tests_test

import (
"context"
"runtime"
"testing"

"github.com/stretchr/testify/require"

dsr3 "github.com/aserto-dev/go-directory/aserto/directory/reader/v3"
)

func BenchmarkSearchSerial(b *testing.B) {
assert := require.New(b)

checks, err := loadChecks[dsr3.GetGraphRequest]()
assert.NoError(err)
assert.NotEmpty(checks)

client, cleanup := testInit()
b.Cleanup(cleanup)

setupBenchmark(b, client)
ctx := context.Background()

b.ResetTimer()

for _, check := range checks {
_, err := client.V3.Reader.GetGraph(ctx, check)
assert.NoError(err)
}
}

func BenchmarkSearchParallelChunks(b *testing.B) {
assert := require.New(b)

checks, err := loadChecks[dsr3.GetGraphRequest]()
assert.NoError(err)
assert.NotEmpty(checks)

client, cleanup := testInit()
b.Cleanup(cleanup)

setupBenchmark(b, client)
ctx := context.Background()

var chunks [][]*dsr3.GetGraphRequest
numChunks := runtime.NumCPU()
chunkSize := (len(checks) + numChunks - 1) / numChunks

for i := 0; i < len(checks); i += chunkSize {
end := min(i+chunkSize, len(checks))
chunks = append(chunks, checks[i:end])
}

b.ResetTimer()

for _, chunk := range chunks {
b.RunParallel(func(pb *testing.PB) {
for _, check := range chunk {
for pb.Next() {
_, err := client.V3.Reader.GetGraph(ctx, check)
assert.NoError(err)
}
}
})
}
}

0 comments on commit 107dd7f

Please sign in to comment.