Skip to content

Commit

Permalink
feat: add adaper events consumer and telemtry aggregator
Browse files Browse the repository at this point in the history
Signed-off-by: Felix Gateru <[email protected]>
  • Loading branch information
felixgateru committed Jan 23, 2025
1 parent 3a7fc4d commit 2665f12
Show file tree
Hide file tree
Showing 9 changed files with 428 additions and 5 deletions.
13 changes: 13 additions & 0 deletions journal/api/responses.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
)

var _ supermq.Response = (*pageRes)(nil)
var _ supermq.Response = (*clientTelemetryRes)(nil)

type pageRes struct {
journal.JournalsPage `json:",inline"`
Expand All @@ -31,3 +32,15 @@ func (res pageRes) Empty() bool {
type clientTelemetryRes struct {
journal.ClientTelemetry `json:",inline"`
}

func (res clientTelemetryRes) Headers() map[string]string {
return map[string]string{}
}

func (res clientTelemetryRes) Code() int {
return http.StatusOK
}

func (res clientTelemetryRes) Empty() bool {
return false
}
10 changes: 10 additions & 0 deletions journal/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,4 +179,14 @@ type Repository interface {

// DeleteClientTelemetry removes telemetry data for a client from the database.
DeleteClientTelemetry(ctx context.Context, clientID, domainID string) error

AddSubscription(ctx context.Context, clientID, sub string) error

RemoveSubscription(ctx context.Context, clientID, sub string) error

RemoveSubscriptionWithConnID(ctx context.Context, connID, clientID string) error

IncrementInboundMessages(ctx context.Context, clientID string) error

IncrementOutboundMessages(ctx context.Context, channelID, subtopic string) error
}
2 changes: 1 addition & 1 deletion journal/middleware/authorization.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (am *authorizationMiddleware) RetrieveClientTelemetry(ctx context.Context,
Domain: session.DomainID,
SubjectType: policies.UserType,
SubjectKind: policies.UsersKind,
Subject: session.UserID,
Subject: session.DomainUserID,
Permission: readPermission,
ObjectType: policies.ClientType,
Object: clientID,
Expand Down
90 changes: 90 additions & 0 deletions journal/mocks/repository.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion journal/postgres/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,14 @@ func Migration() *migrate.MemoryMigrationSource {
`CREATE TABLE IF NOT EXISTS clients_telemetry (
client_id VARCHAR(36) NOT NULL,
domain_id VARCHAR(36) NOT NULL,
subscriptions TEXT[],
subscriptions TEXT[] DEFAULT '{}',
inbound_messages BIGINT DEFAULT 0,
outbound_messages BIGINT DEFAULT 0,
first_seen TIMESTAMP,
last_seen TIMESTAMP,
PRIMARY KEY (client_id, domain_id)
)`,
`CREATE INDEX idx_subscriptions_gin ON clients_telemetry USING GIN (subscriptions);`,
},
Down: []string{
`DROP TABLE IF EXISTS clients_telemetry`,
Expand Down
138 changes: 136 additions & 2 deletions journal/postgres/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import (
)

func (repo *repository) SaveClientTelemetry(ctx context.Context, ct journal.ClientTelemetry) error {
q := `INSERT INTO clients_telemetry (client_id, domain_id, messages, subscriptions, first_seen, last_seen)
VALUES (:client_id, :domain_id, :messages, :subscriptions, :first_seen, :last_seen);`
q := `INSERT INTO clients_telemetry (client_id, domain_id, inbound_messages, outbound_messages, subscriptions, first_seen, last_seen)
VALUES (:client_id, :domain_id, :inbound_messages, :outbound_messages, :subscriptions, :first_seen, :last_seen);`

dbct, err := toDBClientsTelemetry(ct)
if err != nil {
Expand Down Expand Up @@ -80,6 +80,140 @@ func (repo *repository) RetrieveClientTelemetry(ctx context.Context, clientID, d
return journal.ClientTelemetry{}, repoerr.ErrNotFound
}

func (repo *repository) AddSubscription(ctx context.Context, clientID, sub string) error {
q := `
UPDATE clients_telemetry
SET subscriptions = ARRAY_APPEND(subscriptions, :subscriptions),
last_seen = :last_seen
WHERE client_id = :client_id;
`
ct := journal.ClientTelemetry{
ClientID: clientID,
Subscriptions: []string{sub},
LastSeen: time.Now(),
}
dbct, err := toDBClientsTelemetry(ct)
if err != nil {
return errors.Wrap(repoerr.ErrUpdateEntity, err)
}

result, err := repo.db.NamedExecContext(ctx, q, dbct)
if err != nil {
return postgres.HandleError(repoerr.ErrUpdateEntity, err)
}

if rows, _ := result.RowsAffected(); rows == 0 {
return repoerr.ErrNotFound
}

return nil
}

func (repo *repository) RemoveSubscription(ctx context.Context, clientID, sub string) error {
q := `
UPDATE clients_telemetry
SET subscriptions = ARRAY_REMOVE(subscriptions, :subscriptions)
WHERE client_id = :client_id
AND (:subscriptions = ANY(subscriptions))
`
ct := journal.ClientTelemetry{
ClientID: clientID,
Subscriptions: []string{sub},
}
dbct, err := toDBClientsTelemetry(ct)
if err != nil {
return errors.Wrap(repoerr.ErrUpdateEntity, err)
}

result, err := repo.db.NamedExecContext(ctx, q, dbct)
if err != nil {
return postgres.HandleError(repoerr.ErrUpdateEntity, err)
}

if rows, _ := result.RowsAffected(); rows == 0 {
return repoerr.ErrNotFound
}

return nil
}

func (repo *repository) RemoveSubscriptionWithConnID(ctx context.Context, connID, clientID string) error {
q := `
UPDATE clients_telemetry
SET subscriptions = ARRAY(
SELECT sub
FROM unnest(subscriptions) AS sub
WHERE sub NOT LIKE '%' || $1 || '%'
)
WHERE client_id = $2;
`
_, err := repo.db.ExecContext(ctx, q, connID, clientID)
if err != nil {
return postgres.HandleError(repoerr.ErrUpdateEntity, err)
}

return nil
}

func (repo *repository) IncrementInboundMessages(ctx context.Context, clientID string) error {
q := `
UPDATE clients_telemetry
SET inbound_messages = inbound_messages + 1,
last_seen = :last_seen
WHERE client_id = :client_id;
`

ct := journal.ClientTelemetry{
ClientID: clientID,
LastSeen: time.Now(),
}
dbct, err := toDBClientsTelemetry(ct)
if err != nil {
return errors.Wrap(repoerr.ErrUpdateEntity, err)
}

result, err := repo.db.NamedExecContext(ctx, q, dbct)
if err != nil {
return postgres.HandleError(repoerr.ErrUpdateEntity, err)
}

if rows, _ := result.RowsAffected(); rows == 0 {
return repoerr.ErrNotFound
}

return nil
}

func (repo *repository) IncrementOutboundMessages(ctx context.Context, channelID, subtopic string) error {
q := `
WITH matched_clients AS (
SELECT
client_id,
domain_id,
COUNT(*) AS match_count
FROM
clients_telemetry,
unnest(subscriptions) AS sub
WHERE
sub LIKE '%' || $1 || ':' || $2 || '%'
GROUP BY
client_id, domain_id
)
UPDATE clients_telemetry
SET outbound_messages = outbound_messages + matched_clients.match_count
FROM matched_clients
WHERE clients_telemetry.client_id = matched_clients.client_id
AND clients_telemetry.domain_id = matched_clients.domain_id;
`

_, err := repo.db.ExecContext(ctx, q, channelID, subtopic)
if err != nil {
return postgres.HandleError(repoerr.ErrUpdateEntity, err)
}

return nil
}

type dbClientTelemetry struct {
ClientID string `db:"client_id"`
DomainID string `db:"domain_id"`
Expand Down
9 changes: 8 additions & 1 deletion journal/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,14 @@ func (svc *service) Save(ctx context.Context, journal Journal) error {
}
journal.ID = id

return svc.repository.Save(ctx, journal)
if err := svc.repository.Save(ctx, journal); err != nil {
return err
}
if err := svc.handleTelemetry(ctx, journal); err != nil {
return nil
}

return nil
}

func (svc *service) RetrieveAll(ctx context.Context, session smqauthn.Session, page Page) (JournalsPage, error) {
Expand Down
Loading

0 comments on commit 2665f12

Please sign in to comment.