Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
yhabteab committed Mar 2, 2023
1 parent efec262 commit 2a4824a
Show file tree
Hide file tree
Showing 7 changed files with 217 additions and 6 deletions.
13 changes: 11 additions & 2 deletions pkg/icingadb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,9 @@ func (db *DB) BatchSizeByPlaceholders(n int) int {
// YieldAll executes the query with the supplied scope,
// scans each resulting row into an entity returned by the factory function,
// and streams them into a returned channel.
func (db *DB) YieldAll(ctx context.Context, factoryFunc contracts.EntityFactoryFunc, query string, scope interface{}) (<-chan contracts.Entity, <-chan error) {
func (db *DB) YieldAll(
ctx context.Context, factoryFunc contracts.EntityFactoryFunc, query string, namedQueryParams bool, scope ...interface{},
) (<-chan contracts.Entity, <-chan error) {
entities := make(chan contracts.Entity, 1)
g, ctx := errgroup.WithContext(ctx)

Expand All @@ -515,7 +517,14 @@ func (db *DB) YieldAll(ctx context.Context, factoryFunc contracts.EntityFactoryF
defer db.log(ctx, query, &counter).Stop()
defer close(entities)

rows, err := db.NamedQueryContext(ctx, query, scope)
var rows *sqlx.Rows
var err error
if namedQueryParams {
rows, err = db.NamedQueryContext(ctx, query, scope)
} else {
rows, err = db.QueryxContext(ctx, query, scope...)
}

if err != nil {
return internal.CantPerformQuery(err, query)
}
Expand Down
87 changes: 87 additions & 0 deletions pkg/icingadb/sla.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package icingadb

import (
"context"
"github.com/icinga/icingadb/pkg/contracts"
v1 "github.com/icinga/icingadb/pkg/icingadb/v1"
"github.com/icinga/icingadb/pkg/types"
"golang.org/x/sync/errgroup"
"time"
)

type SlaHistoryTrail struct {
Id types.Int `json:"id" db:"-"`
v1.EnvironmentMeta `json:",inline"`
HostId types.Binary `json:"host_id"`
ServiceId types.Binary `json:"service_id"`
EventType string `json:"event_type"`
EventTime types.UnixMilli `json:"event_time"`
}

// Fingerprint implements the contracts.Fingerprinter interface.
func (e SlaHistoryTrail) Fingerprint() contracts.Fingerprinter {
return e
}

// ID implements part of the contracts.IDer interface.
func (e SlaHistoryTrail) ID() contracts.ID {
return e.Id
}

// SetID implements part of the contracts.IDer interface.
func (e *SlaHistoryTrail) SetID(id contracts.ID) {
e.Id = id.(types.Int)
}

type SlaHostHistoryTrailColumns struct {
v1.EntityWithoutChecksum `json:",inline"`
v1.EnvironmentMeta `json:",inline"`
}

type SlaServiceHistoryTrailColumns struct {
v1.EntityWithoutChecksum `json:",inline"`
v1.EnvironmentMeta `json:",inline"`
HostId types.Binary `json:"host_id"`
}

func CheckableToSlaTrailEntities(ctx context.Context, g *errgroup.Group, checkables <-chan contracts.Entity, eventType string) <-chan contracts.Entity {
entities := make(chan contracts.Entity, 1)

g.Go(func() error {
defer close(entities)

for {
select {
case checkable, ok := <-checkables:
if !ok {
return nil
}

entity := &SlaHistoryTrail{
EventTime: types.UnixMilli(time.Now()),
EventType: eventType,
}

switch ptr := checkable.(type) {
case *v1.Host:
entity.HostId = ptr.Id
entity.EnvironmentId = ptr.EnvironmentId
case *v1.Service:
entity.HostId = ptr.HostId
entity.ServiceId = ptr.Id
entity.EnvironmentId = ptr.EnvironmentId
}

entities <- entity
case <-ctx.Done():
return ctx.Err()
}
}
})

return entities
}

var (
_ contracts.Entity = (*SlaHistoryTrail)(nil)
)
68 changes: 64 additions & 4 deletions pkg/icingadb/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"runtime"
"strings"
"time"
)

Expand Down Expand Up @@ -85,7 +86,8 @@ func (s Sync) Sync(ctx context.Context, subject *common.SyncSubject) error {

actual, dbErrs := s.db.YieldAll(
ctx, subject.FactoryForDelta(),
s.db.BuildSelectStmt(NewScopedEntity(subject.Entity(), e.Meta()), subject.Entity().Fingerprint()), e.Meta(),
s.db.BuildSelectStmt(NewScopedEntity(subject.Entity(), e.Meta()), subject.Entity().Fingerprint()),
true, e.Meta(),
)
// Let errors from DB cancel our group.
com.ErrgroupReceive(g, dbErrs)
Expand Down Expand Up @@ -128,9 +130,31 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error {
entities = delta.Create.Entities(ctx)
}

var slaTrailEntities chan contracts.Entity
onSuccessHandlers := []OnSuccess[contracts.Entity]{
OnSuccessIncrement[contracts.Entity](stat),
}

switch delta.Subject.Entity().(type) {
case *v1.Host, *v1.Service:
slaTrailEntities = make(chan contracts.Entity)
onSuccessHandlers = append(onSuccessHandlers, OnSuccessSendTo[contracts.Entity](slaTrailEntities))
}

g.Go(func() error {
return s.db.CreateStreamed(ctx, entities, OnSuccessIncrement[contracts.Entity](stat))
if slaTrailEntities != nil {
defer close(slaTrailEntities)
}

return s.db.CreateStreamed(ctx, entities, onSuccessHandlers...)
})

if slaTrailEntities != nil {
s.logger.Infof("Inserting %d items of type %s sla history trails of type create", len(delta.Create), utils.Key(utils.Name(delta.Subject.Entity()), ' '))
g.Go(func() error {
return s.db.CreateStreamed(ctx, CheckableToSlaTrailEntities(ctx, g, slaTrailEntities, "create"))
})
}
}

// Update
Expand Down Expand Up @@ -160,6 +184,40 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error {
// Delete
if len(delta.Delete) > 0 {
s.logger.Infof("Deleting %d items of type %s", len(delta.Delete), utils.Key(utils.Name(delta.Subject.Entity()), ' '))
entity := delta.Subject.Entity()
switch entity.(type) {
case *v1.Host, *v1.Service:
g.Go(func() error {
s.logger.Infof("Inserting %d items of type %s sla history trails of type delete", len(delta.Delete), utils.Key(utils.Name(entity), ' '))

var entities <-chan contracts.Entity
var columns interface{}

if _, ok := entity.(*v1.Host); ok {
columns = &SlaHostHistoryTrailColumns{}
} else {
columns = &SlaServiceHistoryTrailColumns{}
}

query := s.db.BuildSelectStmt(entity, columns)
if len(delta.Delete) == 1 {
query += ` WHERE id = ?`
} else {
var placeholders []string
for i := 0; i < len(delta.Delete); i++ {
placeholders = append(placeholders, "?")
}

query += fmt.Sprintf(` WHERE id IN (%s)`, strings.Join(placeholders, `, `))
}
var err <-chan error
entities, err = s.db.YieldAll(ctx, delta.Subject.Factory(), query, false, delta.Delete.IDs()...)
com.ErrgroupReceive(g, err)

return s.db.CreateStreamed(ctx, CheckableToSlaTrailEntities(ctx, g, entities, "delete"))
})
}

g.Go(func() error {
return s.db.Delete(ctx, delta.Subject.Entity(), delta.Delete.IDs(), OnSuccessIncrement[any](stat))
})
Expand Down Expand Up @@ -187,7 +245,8 @@ func (s Sync) SyncCustomvars(ctx context.Context) error {

actualCvs, errs := s.db.YieldAll(
ctx, cv.FactoryForDelta(),
s.db.BuildSelectStmt(NewScopedEntity(cv.Entity(), e.Meta()), cv.Entity().Fingerprint()), e.Meta(),
s.db.BuildSelectStmt(NewScopedEntity(cv.Entity(), e.Meta()), cv.Entity().Fingerprint()),
true, e.Meta(),
)
com.ErrgroupReceive(g, errs)

Expand All @@ -199,7 +258,8 @@ func (s Sync) SyncCustomvars(ctx context.Context) error {

actualFlatCvs, errs := s.db.YieldAll(
ctx, flatCv.FactoryForDelta(),
s.db.BuildSelectStmt(NewScopedEntity(flatCv.Entity(), e.Meta()), flatCv.Entity().Fingerprint()), e.Meta(),
s.db.BuildSelectStmt(NewScopedEntity(flatCv.Entity(), e.Meta()), flatCv.Entity().Fingerprint()),
true, e.Meta(),
)
com.ErrgroupReceive(g, errs)

Expand Down
7 changes: 7 additions & 0 deletions pkg/types/int.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import (
"database/sql/driver"
"encoding"
"encoding/json"
"fmt"
"github.com/icinga/icingadb/internal"
"github.com/icinga/icingadb/pkg/contracts"
"strconv"
)

Expand Down Expand Up @@ -58,11 +60,16 @@ func (i *Int) UnmarshalJSON(data []byte) error {
return nil
}

func (i Int) String() string {
return fmt.Sprintf("%d", i.Int64)
}

// Assert interface compliance.
var (
_ json.Marshaler = Int{}
_ json.Unmarshaler = (*Int)(nil)
_ encoding.TextUnmarshaler = (*Int)(nil)
_ driver.Valuer = Int{}
_ sql.Scanner = (*Int)(nil)
_ contracts.ID = (*Int)(nil)
)
12 changes: 12 additions & 0 deletions schema/mysql/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -1321,6 +1321,18 @@ CREATE TABLE sla_history_downtime (
INDEX idx_sla_history_downtime_env_downtime_end (environment_id, downtime_end) COMMENT 'Filter for sla history retention'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin ROW_FORMAT=DYNAMIC;

CREATE TABLE sla_history_trail (
id bigint NOT NULL AUTO_INCREMENT,
environment_id binary(20) NOT NULL COMMENT 'environment.id',
host_id binary(20) NOT NULL COMMENT 'host.id (may reference already deleted hosts)',
service_id binary(20) DEFAULT NULL COMMENT 'service.id (may reference already deleted services)',

event_type enum('delete', 'create') NOT NULL,
event_time bigint unsigned NOT NULL COMMENT 'unix timestamp the event occurred',

PRIMARY KEY (id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin ROW_FORMAT=DYNAMIC;

CREATE TABLE icingadb_schema (
id int unsigned NOT NULL AUTO_INCREMENT,
version smallint unsigned NOT NULL,
Expand Down
18 changes: 18 additions & 0 deletions schema/pgsql/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ CREATE TYPE boolenum AS ENUM ( 'n', 'y' );
CREATE TYPE acked AS ENUM ( 'n', 'y', 'sticky' );
CREATE TYPE state_type AS ENUM ( 'hard', 'soft' );
CREATE TYPE checkable_type AS ENUM ( 'host', 'service' );
CREATE TYPE sla_trail_event_type AS ENUM ( 'create', 'delete' );
CREATE TYPE comment_type AS ENUM ( 'comment', 'ack' );
CREATE TYPE notification_type AS ENUM ( 'downtime_start', 'downtime_end', 'downtime_removed', 'custom', 'acknowledgement', 'problem', 'recovery', 'flapping_start', 'flapping_end' );
CREATE TYPE history_type AS ENUM ( 'notification', 'state_change', 'downtime_start', 'downtime_end', 'comment_add', 'comment_remove', 'flapping_start', 'flapping_end', 'ack_set', 'ack_clear' );
Expand Down Expand Up @@ -2147,6 +2148,23 @@ COMMENT ON COLUMN sla_history_downtime.downtime_id IS 'downtime.id (may referenc
COMMENT ON COLUMN sla_history_downtime.downtime_start IS 'start time of the downtime';
COMMENT ON COLUMN sla_history_downtime.downtime_end IS 'end time of the downtime';

CREATE TABLE sla_history_trail (
id bigserial NOT NULL,
environment_id bytea20 NOT NULL,
host_id bytea20 NOT NULL,
service_id bytea20 DEFAULT NULL,

event_type sla_trail_event_type NOT NULL,
event_time biguint NOT NULL,

CONSTRAINT pk_sla_history_trail PRIMARY KEY (id)
);

COMMENT ON COLUMN sla_history_trail.environment_id IS 'environment.id';
COMMENT ON COLUMN sla_history_trail.host_id IS 'host.id (may reference already deleted hosts)';
COMMENT ON COLUMN sla_history_trail.service_id IS 'service.id (may reference already deleted services)';
COMMENT ON COLUMN sla_history_trail.event_time IS 'unix timestamp the event occurred';

CREATE SEQUENCE icingadb_schema_id_seq;

CREATE TABLE icingadb_schema (
Expand Down
18 changes: 18 additions & 0 deletions tests/object_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,24 @@ func TestObjectSync(t *testing.T) {
t.Skip()
})

t.Run("Sla History Trail", func(t *testing.T) {
t.Parallel()

assert.Eventuallyf(t, func() bool {
var count int
err := db.Get(&count, "SELECT COUNT(*) FROM sla_history_trail WHERE service_id IS NULL")
require.NoError(t, err, "querying hosts sla history trail should not fail")
return count == len(data.Hosts)
}, 20*time.Second, 200*time.Millisecond, "Newly created hosts should exists in database")

assert.Eventuallyf(t, func() bool {
var count int
err := db.Get(&count, "SELECT COUNT(*) FROM sla_history_trail WHERE service_id IS NOT NULL")
require.NoError(t, err, "querying services sla history trail should not fail")
return count == len(data.Services)
}, 20*time.Second, 200*time.Millisecond, "Newly created services should exists in database")
})

t.Run("RuntimeUpdates", func(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit 2a4824a

Please sign in to comment.