From 912a49a5bcd38cc26029c7c75772ea7df25b6aad Mon Sep 17 00:00:00 2001 From: aeneasr <3372410+aeneasr@users.noreply.github.com> Date: Tue, 17 Sep 2024 13:26:42 +0200 Subject: [PATCH] feat: emit events in identity persister --- identity/manager.go | 10 ------ .../sql/identity/persister_identity.go | 35 +++++++++++++++---- x/events/events.go | 11 ++++++ 3 files changed, 39 insertions(+), 17 deletions(-) diff --git a/identity/manager.go b/identity/manager.go index f41f0b1a33bb..f35fd1468710 100644 --- a/identity/manager.go +++ b/identity/manager.go @@ -10,10 +10,7 @@ import ( "slices" "sort" - "go.opentelemetry.io/otel/trace" - "github.com/ory/kratos/schema" - "github.com/ory/kratos/x/events" "github.com/ory/x/sqlcon" "github.com/ory/x/otelx" @@ -101,7 +98,6 @@ func (m *Manager) Create(ctx context.Context, i *Identity, opts ...ManagerOption return err } - trace.SpanFromContext(ctx).AddEvent(events.NewIdentityCreated(ctx, i.ID)) return nil } @@ -346,10 +342,6 @@ func (m *Manager) CreateIdentities(ctx context.Context, identities []*Identity, return err } - for _, i := range identities { - trace.SpanFromContext(ctx).AddEvent(events.NewIdentityCreated(ctx, i.ID)) - } - return nil } @@ -416,7 +408,6 @@ func (m *Manager) UpdateSchemaID(ctx context.Context, id uuid.UUID, schemaID str return err } - trace.SpanFromContext(ctx).AddEvent(events.NewIdentityUpdated(ctx, id)) return m.r.PrivilegedIdentityPool().UpdateIdentity(ctx, original) } @@ -477,7 +468,6 @@ func (m *Manager) UpdateTraits(ctx context.Context, id uuid.UUID, traits Traits, return err } - trace.SpanFromContext(ctx).AddEvent(events.NewIdentityUpdated(ctx, id)) return m.r.PrivilegedIdentityPool().UpdateIdentity(ctx, updated) } diff --git a/persistence/sql/identity/persister_identity.go b/persistence/sql/identity/persister_identity.go index 807d3d67779d..127167613b98 100644 --- a/persistence/sql/identity/persister_identity.go +++ b/persistence/sql/identity/persister_identity.go @@ -13,6 +13,8 @@ import ( "sync" "time" + "github.com/ory/kratos/x/events" + "github.com/ory/x/crdbx" "github.com/gobuffalo/pop/v6" @@ -538,7 +540,7 @@ func (p *IdentityPersister) CreateIdentity(ctx context.Context, ident *identity. func (p *IdentityPersister) CreateIdentities(ctx context.Context, identities ...*identity.Identity) (err error) { ctx, span := p.r.Tracer(ctx).Tracer().Start(ctx, "persistence.sql.CreateIdentities", trace.WithAttributes( - attribute.Int("num_identities", len(identities)), + attribute.Int("identities.count", len(identities)), attribute.Stringer("network.id", p.NetworkID(ctx)))) defer otelx.End(span, &err) @@ -568,7 +570,7 @@ func (p *IdentityPersister) CreateIdentities(ctx context.Context, identities ... } } - return p.Transaction(ctx, func(ctx context.Context, tx *pop.Connection) error { + if err := p.Transaction(ctx, func(ctx context.Context, tx *pop.Connection) error { conn := &batch.TracerConnection{ Tracer: p.r.Tracer(ctx), Connection: tx, @@ -590,7 +592,15 @@ func (p *IdentityPersister) CreateIdentities(ctx context.Context, identities ... return sqlcon.HandleError(err) } return nil - }) + }); err != nil { + return err + } + + for _, ident := range identities { + span.AddEvent(events.NewIdentityCreated(ctx, ident.ID)) + } + + return nil } func (p *IdentityPersister) HydrateIdentityAssociations(ctx context.Context, i *identity.Identity, expand identity.Expandables) (err error) { @@ -960,10 +970,15 @@ func (p *IdentityPersister) UpdateIdentityColumns(ctx context.Context, i *identi attribute.Stringer("network.id", p.NetworkID(ctx)))) defer otelx.End(span, &err) - return p.Transaction(ctx, func(ctx context.Context, tx *pop.Connection) error { + if err := p.Transaction(ctx, func(ctx context.Context, tx *pop.Connection) error { _, err := tx.Where("id = ? AND nid = ?", i.ID, p.NetworkID(ctx)).UpdateQuery(i, columns...) return sqlcon.HandleError(err) - }) + }); err != nil { + return err + } + + span.AddEvent(events.NewIdentityUpdated(ctx, i.ID)) + return nil } func (p *IdentityPersister) UpdateIdentity(ctx context.Context, i *identity.Identity) (err error) { @@ -978,7 +993,7 @@ func (p *IdentityPersister) UpdateIdentity(ctx context.Context, i *identity.Iden } i.NID = p.NetworkID(ctx) - return sqlcon.HandleError(p.Transaction(ctx, func(ctx context.Context, tx *pop.Connection) error { + if err := sqlcon.HandleError(p.Transaction(ctx, func(ctx context.Context, tx *pop.Connection) error { // This returns "ErrNoRows" if the identity does not exist if err := update.Generic(WithTransaction(ctx, tx), tx, p.r.Tracer(ctx).Tracer(), i); err != nil { return err @@ -1003,7 +1018,12 @@ func (p *IdentityPersister) UpdateIdentity(ctx context.Context, i *identity.Iden } return sqlcon.HandleError(p.createIdentityCredentials(ctx, tx, i)) - })) + })); err != nil { + return err + } + + span.AddEvent(events.NewIdentityUpdated(ctx, i.ID)) + return nil } func (p *IdentityPersister) DeleteIdentity(ctx context.Context, id uuid.UUID) (err error) { @@ -1028,6 +1048,7 @@ func (p *IdentityPersister) DeleteIdentity(ctx context.Context, id uuid.UUID) (e if count == 0 { return errors.WithStack(sqlcon.ErrNoRows) } + span.AddEvent(events.NewIdentityDeleted(ctx, id)) return nil } diff --git a/x/events/events.go b/x/events/events.go index 13ec16955539..862a35a39e98 100644 --- a/x/events/events.go +++ b/x/events/events.go @@ -34,6 +34,7 @@ const ( VerificationSucceeded semconv.Event = "VerificationSucceeded" IdentityCreated semconv.Event = "IdentityCreated" IdentityUpdated semconv.Event = "IdentityUpdated" + IdentityDeleted semconv.Event = "IdentityDeleted" WebhookDelivered semconv.Event = "WebhookDelivered" WebhookSucceeded semconv.Event = "WebhookSucceeded" WebhookFailed semconv.Event = "WebhookFailed" @@ -262,6 +263,16 @@ func NewIdentityCreated(ctx context.Context, identityID uuid.UUID) (string, trac ) } +func NewIdentityDeleted(ctx context.Context, identityID uuid.UUID) (string, trace.EventOption) { + return IdentityDeleted.String(), + trace.WithAttributes( + append( + semconv.AttributesFromContext(ctx), + semconv.AttrIdentityID(identityID), + )..., + ) +} + func NewIdentityUpdated(ctx context.Context, identityID uuid.UUID) (string, trace.EventOption) { return IdentityUpdated.String(), trace.WithAttributes(