diff --git a/cmd/sinks/main.go b/cmd/sinks/main.go index be4dffe70..b10e88f2a 100644 --- a/cmd/sinks/main.go +++ b/cmd/sinks/main.go @@ -193,7 +193,7 @@ func newSinkService(auth mainflux.AuthServiceClient, logger *zap.Logger, esClien mfsdk := mfsdk.NewSDK(config) svc := sinks.NewSinkService(logger, auth, repoSink, mfsdk, passwordService) - svc = redisprod.NewEventStoreMiddleware(svc, esClient) + svc = redisprod.NewSinkStreamProducerMiddleware(svc, esClient) svc = sinkshttp.NewLoggingMiddleware(svc, logger) svc = sinkshttp.MetricsMiddleware( auth, diff --git a/sinks/redis/producer/streams.go b/sinks/redis/producer/streams.go index 7086deb19..48b499320 100644 --- a/sinks/redis/producer/streams.go +++ b/sinks/redis/producer/streams.go @@ -23,28 +23,28 @@ const ( streamLen = 1000 ) -var _ sinks.SinkService = (*eventStore)(nil) +var _ sinks.SinkService = (*sinksStreamProducer)(nil) -type eventStore struct { +type sinksStreamProducer struct { svc sinks.SinkService client *redis.Client logger *zap.Logger } // ListSinksInternal will only call following service -func (es eventStore) ListSinksInternal(ctx context.Context, filter sinks.Filter) ([]sinks.Sink, error) { +func (es sinksStreamProducer) ListSinksInternal(ctx context.Context, filter sinks.Filter) ([]sinks.Sink, error) { return es.svc.ListSinksInternal(ctx, filter) } -func (es eventStore) ChangeSinkStateInternal(ctx context.Context, sinkID string, msg string, ownerID string, state sinks.State) error { +func (es sinksStreamProducer) ChangeSinkStateInternal(ctx context.Context, sinkID string, msg string, ownerID string, state sinks.State) error { return es.svc.ChangeSinkStateInternal(ctx, sinkID, msg, ownerID, state) } -func (es eventStore) ViewSinkInternal(ctx context.Context, ownerID string, key string) (sinks.Sink, error) { +func (es sinksStreamProducer) ViewSinkInternal(ctx context.Context, ownerID string, key string) (sinks.Sink, error) { return es.svc.ViewSinkInternal(ctx, ownerID, key) } -func (es eventStore) CreateSink(ctx context.Context, token string, s sinks.Sink) (sink sinks.Sink, err error) { +func (es sinksStreamProducer) CreateSink(ctx context.Context, token string, s sinks.Sink) (sink sinks.Sink, err error) { defer func() { event := createSinkEvent{ sinkID: sink.ID, @@ -74,7 +74,7 @@ func (es eventStore) CreateSink(ctx context.Context, token string, s sinks.Sink) return es.svc.CreateSink(ctx, token, s) } -func (es eventStore) UpdateSinkInternal(ctx context.Context, s sinks.Sink) (sink sinks.Sink, err error) { +func (es sinksStreamProducer) UpdateSinkInternal(ctx context.Context, s sinks.Sink) (sink sinks.Sink, err error) { defer func() { event := updateSinkEvent{ sinkID: sink.ID, @@ -102,7 +102,7 @@ func (es eventStore) UpdateSinkInternal(ctx context.Context, s sinks.Sink) (sink return es.svc.UpdateSinkInternal(ctx, s) } -func (es eventStore) UpdateSink(ctx context.Context, token string, s sinks.Sink) (sink sinks.Sink, err error) { +func (es sinksStreamProducer) UpdateSink(ctx context.Context, token string, s sinks.Sink) (sink sinks.Sink, err error) { defer func() { event := updateSinkEvent{ sinkID: sink.ID, @@ -130,35 +130,35 @@ func (es eventStore) UpdateSink(ctx context.Context, token string, s sinks.Sink) return es.svc.UpdateSink(ctx, token, s) } -func (es eventStore) ListSinks(ctx context.Context, token string, pm sinks.PageMetadata) (sinks.Page, error) { +func (es sinksStreamProducer) ListSinks(ctx context.Context, token string, pm sinks.PageMetadata) (sinks.Page, error) { return es.svc.ListSinks(ctx, token, pm) } -func (es eventStore) ListAuthenticationTypes(ctx context.Context, token string) ([]authentication_type.AuthenticationTypeConfig, error) { +func (es sinksStreamProducer) ListAuthenticationTypes(ctx context.Context, token string) ([]authentication_type.AuthenticationTypeConfig, error) { return es.svc.ListAuthenticationTypes(ctx, token) } -func (es eventStore) ViewAuthenticationType(ctx context.Context, token string, key string) (authentication_type.AuthenticationTypeConfig, error) { +func (es sinksStreamProducer) ViewAuthenticationType(ctx context.Context, token string, key string) (authentication_type.AuthenticationTypeConfig, error) { return es.svc.ViewAuthenticationType(ctx, token, key) } -func (es eventStore) ListBackends(ctx context.Context, token string) (_ []string, err error) { +func (es sinksStreamProducer) ListBackends(ctx context.Context, token string) (_ []string, err error) { return es.svc.ListBackends(ctx, token) } -func (es eventStore) ViewBackend(ctx context.Context, token string, key string) (_ backend.Backend, err error) { +func (es sinksStreamProducer) ViewBackend(ctx context.Context, token string, key string) (_ backend.Backend, err error) { return es.svc.ViewBackend(ctx, token, key) } -func (es eventStore) ViewSink(ctx context.Context, token string, key string) (_ sinks.Sink, err error) { +func (es sinksStreamProducer) ViewSink(ctx context.Context, token string, key string) (_ sinks.Sink, err error) { return es.svc.ViewSink(ctx, token, key) } -func (es eventStore) GetLogger() *zap.Logger { +func (es sinksStreamProducer) GetLogger() *zap.Logger { return es.logger } -func (es eventStore) DeleteSink(ctx context.Context, token, id string) (err error) { +func (es sinksStreamProducer) DeleteSink(ctx context.Context, token, id string) (err error) { sink, err := es.svc.ViewSink(ctx, token, id) if err != nil { return err @@ -193,14 +193,14 @@ func (es eventStore) DeleteSink(ctx context.Context, token, id string) (err erro return nil } -func (es eventStore) ValidateSink(ctx context.Context, token string, sink sinks.Sink) (sinks.Sink, error) { +func (es sinksStreamProducer) ValidateSink(ctx context.Context, token string, sink sinks.Sink) (sinks.Sink, error) { return es.svc.ValidateSink(ctx, token, sink) } -// NewEventStoreMiddleware returns wrapper around sinks service that sends +// NewSinkStreamProducerMiddleware returns wrapper around sinks service that sends // events to event store. -func NewEventStoreMiddleware(svc sinks.SinkService, client *redis.Client) sinks.SinkService { - return eventStore{ +func NewSinkStreamProducerMiddleware(svc sinks.SinkService, client *redis.Client) sinks.SinkService { + return sinksStreamProducer{ svc: svc, client: client, }