Skip to content

Commit

Permalink
Use azm.jobpool in ds.Checks
Browse files Browse the repository at this point in the history
  • Loading branch information
ronenh committed Dec 20, 2024
1 parent 398da65 commit c729a45
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 86 deletions.
6 changes: 2 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
module github.com/aserto-dev/go-edge-ds

go 1.22.10

toolchain go1.23.4
go 1.23.4

// replace github.com/aserto-dev/azm => ../azm

Expand All @@ -11,7 +9,7 @@ toolchain go1.23.4
require (
github.com/Masterminds/semver/v3 v3.3.1
github.com/aserto-dev/aserto-grpc v0.2.6
github.com/aserto-dev/azm v0.2.2
github.com/aserto-dev/azm v0.2.3-0.20241220173410-22058cc2818a
github.com/aserto-dev/errors v0.0.11
github.com/aserto-dev/go-directory v0.33.2
github.com/bufbuild/protovalidate-go v0.7.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ github.com/antlr4-go/antlr/v4 v4.13.1 h1:SqQKkuVZ+zWkMMNkjy5FZe5mr5WURWnlpmOuzYW
github.com/antlr4-go/antlr/v4 v4.13.1/go.mod h1:GKmUxMtwp6ZgGwZSva4eWPC5mS6vUAmOABFgjdkM7Nw=
github.com/aserto-dev/aserto-grpc v0.2.6 h1:h64MYALF5zLm2sSKcLEtyXyrJvZSxfqTOmQ1j/J44kI=
github.com/aserto-dev/aserto-grpc v0.2.6/go.mod h1:Vki74KINVfnwtJ8QGzRm+xHNjsJ2KUWFtXhezJK9DEg=
github.com/aserto-dev/azm v0.2.2 h1:JsxPth3m9J655vMlBzPwbvsd5siBxWziXKG77aq2KIs=
github.com/aserto-dev/azm v0.2.2/go.mod h1:VaGqraXye8HqTgU6CUpwu0HjJb1u02Di1da83yKYnaY=
github.com/aserto-dev/azm v0.2.3-0.20241220173410-22058cc2818a h1:gFmxPv2IOYDLz/XcrQ5Z0577zB4KMqZKLIzU1PLNlwg=
github.com/aserto-dev/azm v0.2.3-0.20241220173410-22058cc2818a/go.mod h1:MkeGlkGFmK8US3s9V2x2pM7YLFF9ZbsQtx9EgfsIBVc=
github.com/aserto-dev/errors v0.0.11 h1:CXo+Uwmh09doG2HvL1SC8Fnne8f9VPrGyEQPtogAfyY=
github.com/aserto-dev/errors v0.0.11/go.mod h1:T1YQOtcxpgBriPTn5HXJkD/QukYz5YojYOIzGMo0ybM=
github.com/aserto-dev/go-directory v0.33.2 h1:QJwzSmfxJ7EG0RzWsgu7In5cAeGtZURZklSsHhMOFh8=
Expand Down
111 changes: 31 additions & 80 deletions pkg/ds/checks.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,114 +3,65 @@ package ds
import (
"context"
"runtime"
"sync"

"github.com/aserto-dev/azm/cache"
"github.com/aserto-dev/azm/jobpool"
"github.com/aserto-dev/azm/safe"
"google.golang.org/protobuf/types/known/structpb"

dsr3 "github.com/aserto-dev/go-directory/aserto/directory/reader/v3"
"github.com/aserto-dev/go-directory/pkg/prop"

bolt "go.etcd.io/bbolt"
)

type checks struct {
*dsr3.ChecksRequest
*safe.SafeChecks
}

func Checks(i *dsr3.ChecksRequest) *checks {
return &checks{i}
return &checks{safe.Checks(i)}
}

func (i *checks) Validate(mc *cache.Cache) error {
return nil
}

func (i *checks) Exec(ctx context.Context, tx *bolt.Tx, mc *cache.Cache) (*dsr3.ChecksResponse, error) {
inbox := make(chan *checksReq, len(i.Checks))
outbox := make(chan *checksResp, len(i.Checks))

// setup Check workers.
var wg sync.WaitGroup
dop := min(runtime.GOMAXPROCS(0), len(i.Checks))
for wc := 0; wc < dop; wc++ {
wg.Add(1)
go func() {
defer wg.Done()

// handle Check request.
for req := range inbox {

check := Check(req.CheckRequest)

// validate check request, on error, deny access, set context and return early.
if err := check.Validate(mc); err != nil {
outbox <- &checksResp{
Index: req.Index,
CheckResponse: &dsr3.CheckResponse{
Check: false,
Context: SetContextWithReason(err),
},
}
return
}

resp, err := check.Exec(ctx, tx, mc)
if err != nil {
// TODO log err
_ = err
}

outbox <- &checksResp{Index: req.Index, CheckResponse: resp}
}
}()
}

// substitute defaults.
for index := 0; index < len(i.Checks); index++ {
if i.Checks[index].GetObjectType() == "" {
i.Checks[index].ObjectType = i.Default.GetObjectType()
}
if i.Checks[index].GetObjectId() == "" {
i.Checks[index].ObjectId = i.Default.GetObjectId()
}
if i.Checks[index].GetRelation() == "" {
i.Checks[index].Relation = i.Default.GetRelation()
consumer := func(in *dsr3.CheckRequest) *dsr3.CheckResponse {
check := Check(in)
if err := check.Validate(mc); err != nil {
return checkError(err)
}
if i.Checks[index].GetSubjectType() == "" {
i.Checks[index].SubjectType = i.Default.GetSubjectType()
}
if i.Checks[index].GetSubjectId() == "" {
i.Checks[index].SubjectId = i.Default.GetSubjectId()
}
if i.Default.GetTrace() {
i.Checks[index].Trace = true

resp, err := check.Exec(ctx, tx, mc)
if err != nil {
return checkError(err)
}

// send request to inbox
inbox <- &checksReq{Index: index, CheckRequest: i.Checks[index]}
return resp
}

close(inbox)

go func() {
wg.Wait()
close(outbox)
}()

resp := &dsr3.ChecksResponse{Checks: make([]*dsr3.CheckResponse, len(i.Checks))}
pool := jobpool.NewJobPool(len(i.Checks), runtime.GOMAXPROCS(0), consumer)
pool.Start()

for result := range outbox {
resp.Checks[result.Index] = result.CheckResponse
resp := &dsr3.ChecksResponse{}
for check := range i.CheckRequests() {
if err := pool.Produce(check.CheckRequest); err != nil {
return resp, err
}
}

return resp, nil
}

type checksReq struct {
Index int
*dsr3.CheckRequest
}

type checksResp struct {
Index int
*dsr3.CheckResponse
func checkError(err error) *dsr3.CheckResponse {
return &dsr3.CheckResponse{
Check: false,
Context: &structpb.Struct{
Fields: map[string]*structpb.Value{
prop.Reason: structpb.NewStringValue(err.Error()),
},
},
}
}

0 comments on commit c729a45

Please sign in to comment.