diff --git a/go.mod b/go.mod index 9df2f80..5562592 100644 --- a/go.mod +++ b/go.mod @@ -1,8 +1,6 @@ module github.com/aserto-dev/go-edge-ds -go 1.22.10 - -toolchain go1.23.4 +go 1.23.4 // replace github.com/aserto-dev/azm => ../azm @@ -11,7 +9,7 @@ toolchain go1.23.4 require ( github.com/Masterminds/semver/v3 v3.3.1 github.com/aserto-dev/aserto-grpc v0.2.6 - github.com/aserto-dev/azm v0.2.2 + github.com/aserto-dev/azm v0.2.3-0.20241220173410-22058cc2818a github.com/aserto-dev/errors v0.0.11 github.com/aserto-dev/go-directory v0.33.2 github.com/bufbuild/protovalidate-go v0.7.3 diff --git a/go.sum b/go.sum index 668ae05..30801d7 100644 --- a/go.sum +++ b/go.sum @@ -12,8 +12,8 @@ github.com/antlr4-go/antlr/v4 v4.13.1 h1:SqQKkuVZ+zWkMMNkjy5FZe5mr5WURWnlpmOuzYW github.com/antlr4-go/antlr/v4 v4.13.1/go.mod h1:GKmUxMtwp6ZgGwZSva4eWPC5mS6vUAmOABFgjdkM7Nw= github.com/aserto-dev/aserto-grpc v0.2.6 h1:h64MYALF5zLm2sSKcLEtyXyrJvZSxfqTOmQ1j/J44kI= github.com/aserto-dev/aserto-grpc v0.2.6/go.mod h1:Vki74KINVfnwtJ8QGzRm+xHNjsJ2KUWFtXhezJK9DEg= -github.com/aserto-dev/azm v0.2.2 h1:JsxPth3m9J655vMlBzPwbvsd5siBxWziXKG77aq2KIs= -github.com/aserto-dev/azm v0.2.2/go.mod h1:VaGqraXye8HqTgU6CUpwu0HjJb1u02Di1da83yKYnaY= +github.com/aserto-dev/azm v0.2.3-0.20241220173410-22058cc2818a h1:gFmxPv2IOYDLz/XcrQ5Z0577zB4KMqZKLIzU1PLNlwg= +github.com/aserto-dev/azm v0.2.3-0.20241220173410-22058cc2818a/go.mod h1:MkeGlkGFmK8US3s9V2x2pM7YLFF9ZbsQtx9EgfsIBVc= github.com/aserto-dev/errors v0.0.11 h1:CXo+Uwmh09doG2HvL1SC8Fnne8f9VPrGyEQPtogAfyY= github.com/aserto-dev/errors v0.0.11/go.mod h1:T1YQOtcxpgBriPTn5HXJkD/QukYz5YojYOIzGMo0ybM= github.com/aserto-dev/go-directory v0.33.2 h1:QJwzSmfxJ7EG0RzWsgu7In5cAeGtZURZklSsHhMOFh8= diff --git a/pkg/ds/checks.go b/pkg/ds/checks.go index a59f6fc..e635d30 100644 --- a/pkg/ds/checks.go +++ b/pkg/ds/checks.go @@ -3,21 +3,24 @@ package ds import ( "context" "runtime" - "sync" "github.com/aserto-dev/azm/cache" + "github.com/aserto-dev/azm/jobpool" + "github.com/aserto-dev/azm/safe" + "google.golang.org/protobuf/types/known/structpb" dsr3 "github.com/aserto-dev/go-directory/aserto/directory/reader/v3" + "github.com/aserto-dev/go-directory/pkg/prop" bolt "go.etcd.io/bbolt" ) type checks struct { - *dsr3.ChecksRequest + *safe.SafeChecks } func Checks(i *dsr3.ChecksRequest) *checks { - return &checks{i} + return &checks{safe.Checks(i)} } func (i *checks) Validate(mc *cache.Cache) error { @@ -25,92 +28,40 @@ func (i *checks) Validate(mc *cache.Cache) error { } func (i *checks) Exec(ctx context.Context, tx *bolt.Tx, mc *cache.Cache) (*dsr3.ChecksResponse, error) { - inbox := make(chan *checksReq, len(i.Checks)) - outbox := make(chan *checksResp, len(i.Checks)) - - // setup Check workers. - var wg sync.WaitGroup - dop := min(runtime.GOMAXPROCS(0), len(i.Checks)) - for wc := 0; wc < dop; wc++ { - wg.Add(1) - go func() { - defer wg.Done() - - // handle Check request. - for req := range inbox { - - check := Check(req.CheckRequest) - - // validate check request, on error, deny access, set context and return early. - if err := check.Validate(mc); err != nil { - outbox <- &checksResp{ - Index: req.Index, - CheckResponse: &dsr3.CheckResponse{ - Check: false, - Context: SetContextWithReason(err), - }, - } - return - } - - resp, err := check.Exec(ctx, tx, mc) - if err != nil { - // TODO log err - _ = err - } - - outbox <- &checksResp{Index: req.Index, CheckResponse: resp} - } - }() - } - - // substitute defaults. - for index := 0; index < len(i.Checks); index++ { - if i.Checks[index].GetObjectType() == "" { - i.Checks[index].ObjectType = i.Default.GetObjectType() - } - if i.Checks[index].GetObjectId() == "" { - i.Checks[index].ObjectId = i.Default.GetObjectId() - } - if i.Checks[index].GetRelation() == "" { - i.Checks[index].Relation = i.Default.GetRelation() + consumer := func(in *dsr3.CheckRequest) *dsr3.CheckResponse { + check := Check(in) + if err := check.Validate(mc); err != nil { + return checkError(err) } - if i.Checks[index].GetSubjectType() == "" { - i.Checks[index].SubjectType = i.Default.GetSubjectType() - } - if i.Checks[index].GetSubjectId() == "" { - i.Checks[index].SubjectId = i.Default.GetSubjectId() - } - if i.Default.GetTrace() { - i.Checks[index].Trace = true + + resp, err := check.Exec(ctx, tx, mc) + if err != nil { + return checkError(err) } - // send request to inbox - inbox <- &checksReq{Index: index, CheckRequest: i.Checks[index]} + return resp } - close(inbox) - - go func() { - wg.Wait() - close(outbox) - }() - - resp := &dsr3.ChecksResponse{Checks: make([]*dsr3.CheckResponse, len(i.Checks))} + pool := jobpool.NewJobPool(len(i.Checks), runtime.GOMAXPROCS(0), consumer) + pool.Start() - for result := range outbox { - resp.Checks[result.Index] = result.CheckResponse + resp := &dsr3.ChecksResponse{} + for check := range i.CheckRequests() { + if err := pool.Produce(check.CheckRequest); err != nil { + return resp, err + } } return resp, nil } -type checksReq struct { - Index int - *dsr3.CheckRequest -} - -type checksResp struct { - Index int - *dsr3.CheckResponse +func checkError(err error) *dsr3.CheckResponse { + return &dsr3.CheckResponse{ + Check: false, + Context: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + prop.Reason: structpb.NewStringValue(err.Error()), + }, + }, + } }