Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add changes to production #2732

Merged
merged 184 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
184 commits
Select commit Hold shift + click to select a range
35555e3
feat(maestro): WIP
Sep 4, 2023
fd216ae
feat(maestro): WIP
Sep 4, 2023
4c701bd
feat(maestro): WIP
Sep 11, 2023
e98da1a
feat(maestro): WIP
Sep 13, 2023
b9a932d
feat(maestro): WIP
Sep 14, 2023
c535232
feat(maestro): WIP
Sep 14, 2023
2006b67
feat(maestro): WIP
Sep 18, 2023
c7f920c
feat(maestro): WIP
Sep 18, 2023
6759d35
feat(maestro): WIP, still missing monitor piece.
Sep 18, 2023
1e5ece6
feat(sinker): WIP.
Sep 19, 2023
320c14a
feat(sinker): WIP.
Sep 21, 2023
884710c
feat(sinker): adding test cases to sinker new flow.
Sep 21, 2023
312d599
feat(sinker): adding test cases to sinker new flow.
Sep 22, 2023
89d8d71
feat(sinker): adding test cases to sinker new flow.
Sep 22, 2023
03632b6
feat(sinker): wiring new services.
Sep 22, 2023
4bcadec
feat(sinker): clean up.
Sep 22, 2023
9527920
feat(sinker): fix and clean up.
Sep 22, 2023
6586e90
feat(sinker): fix and clean up.
Sep 22, 2023
d92ebb2
feat(sinker): fix and clean up.
Sep 22, 2023
50cd372
feat(sinks): sinks new flow.
Sep 22, 2023
5d5d435
feat(sinks): WIP
Sep 25, 2023
1be66c0
fix(orb-ui): V1 Policy View new layout (#2646)
joao-mendonca-encora Sep 25, 2023
2090e9d
feat(sinks): WIP
Sep 25, 2023
a81994b
feat(sinks): WIP
Sep 25, 2023
f604b9d
feat(sinker): removed sinker non-otel pieces.
Sep 25, 2023
db39167
feat(sinker): cleaning and fixes on sinker.
Sep 25, 2023
67f13c5
feat(sinker): cleaning and fixes on sinker.
Sep 25, 2023
98d328b
improvement(orb-ui): #1191 Delete Selected Modal (#2649)
joao-mendonca-encora Sep 25, 2023
af11874
feat(maestro): cleaning and fixes.
Sep 25, 2023
851388c
feat(maestro): cleaning and fixes.
Sep 25, 2023
13e20bd
feat(sinks): cleaning and fixes.
Sep 25, 2023
2390bf1
feat(sinker): cleaning and fixes.
Sep 25, 2023
9e67a63
feat(sinker): cleaning and fixes.
Sep 25, 2023
2d8d859
feat(maestro): cleaning and fixes.
Sep 26, 2023
86b8ffc
feat(maestro): cleaning and fixes.
Sep 26, 2023
089acb0
feat(maestro): cleaning and fixes.
Sep 26, 2023
417cd1d
feat(maestro): cleaning and fixes.
Sep 26, 2023
450fffb
fix(maestro): increase log tail on otel-collectors.
Sep 26, 2023
d6135bd
Merge pull request #2619 from lpegoraro/poc/messages
lpegoraro Sep 26, 2023
7fa7ad8
fix(orb-ui): Sink details partial edit (#2658)
joao-mendonca-encora Sep 26, 2023
1c0e960
feat(orb-ui): #26 Policy View Tabs (#2657)
joao-mendonca-encora Sep 26, 2023
ed27b4f
New develop version 0.28.0 (#2656)
etaques Sep 26, 2023
00ee6fe
feat(maestro): adding the subtle pg import. (#2659)
lpegoraro Sep 26, 2023
b8eca6a
fix migrate query
etaques Sep 26, 2023
e2d679d
adding uuid as ID to follow the standard
etaques Sep 27, 2023
fccef8a
Merge pull request #2660 from orb-community/etaques-patch-2
lpegoraro Sep 27, 2023
275d128
feat(maestro): add observability and logs to new flow on maestro.
Sep 27, 2023
4571cf6
feat(orb-ui): #1267 Agent View Tabs (#2661)
joao-mendonca-encora Sep 27, 2023
402d689
feat(maestro): adding more logs and removing return which could be om…
Sep 27, 2023
9470473
feat(maestro): adding more logs and removing return which could be om…
Sep 27, 2023
8181afa
feat(logs): fixing logs and changing size source in otlp receiver.
Sep 27, 2023
47eeaf7
Merge pull request #2662 from lpegoraro/eng-1118-maestro-deployment-c…
lpegoraro Sep 27, 2023
53a0fd3
feat(logs): add more logs to check for errors.
Sep 27, 2023
738ce30
Merge branch 'develop' into eng-1118-maestro-deployment-cache-postgresql
Sep 27, 2023
6908130
fix: (maestro) reference for redis streams
etaques Sep 28, 2023
fe5ddaf
Merge pull request #2664 from orb-community/etaques-patch-2
lpegoraro Sep 28, 2023
a5354fe
feat(maestro): enhanced readability of sinker activity listener code.
Sep 28, 2023
52fb7a5
fix(sinks-maestro): fix encode and decode message.
Sep 28, 2023
31fc06a
fix(maestro): fix config as byte for postgres support.
Sep 28, 2023
bd34206
fix(maestro): fix config as byte for postgres support.
Sep 28, 2023
9e4a7fc
Merge pull request #2663 from lpegoraro/eng-1118-maestro-deployment-c…
lpegoraro Sep 28, 2023
1380cb5
feat(logs): adding logs and unit tests.
Sep 28, 2023
0837afd
fix(orb-ui): Minor Adjustments on tabs components (#2665)
joao-mendonca-encora Sep 28, 2023
6261e45
feat(logs): adding logs and unit tests.
Sep 28, 2023
e491554
feat(maestro): fixing and adding new tests.
Sep 28, 2023
b14e3b2
Merge pull request #2666 from lpegoraro/fix-eng-1118-create-event
lpegoraro Sep 28, 2023
9bddaba
adding backend field on migrate db (#2668)
etaques Sep 28, 2023
f80c944
fix (maestro): ommiting id once that is default random uuid (#2669)
etaques Sep 28, 2023
418bb2b
fix (maestro): lastInsertId not necessary (#2670)
etaques Sep 29, 2023
6c6caeb
fix(maestro): adding test cases and fixes logic for handling events (…
lpegoraro Sep 29, 2023
b1ea558
fix (ci): required step to publish on stg
etaques Sep 29, 2023
5bbe66a
Merge pull request #2671 from orb-community/etaques-patch-2
lpegoraro Sep 29, 2023
00449af
fix(maestro): fixes sink idle event (#2672)
lpegoraro Sep 29, 2023
58ef83c
fix(orb-ui): fit height tabs components (#2674)
joao-mendonca-encora Sep 29, 2023
d5316d8
fix(maestro): fix unit tests and flows. (#2673)
lpegoraro Sep 29, 2023
10e26c8
fix(maestro): fix maestro
Oct 2, 2023
52fe4f2
fix(maestro): fix SQL.
Oct 2, 2023
8fc7127
fix(maestro): fix SQL.
Oct 2, 2023
a3ddf16
fix(maestro): fix SQL.
Oct 2, 2023
6ca8375
fix(maestro): fix sqls (#2675)
lpegoraro Oct 2, 2023
68d6221
fix(maestro): change how read sinker events.
Oct 2, 2023
dae1111
fix(orb-ui): User able to create and edit yaml policies using json … …
joao-mendonca-encora Oct 2, 2023
213310d
fix(maestro): fix event reading.
Oct 2, 2023
1f2a18b
Merge pull request #2677 from lpegoraro/fix-eng-1118-fix-sql
lpegoraro Oct 2, 2023
482831c
feat(maestro): fix reading event and add logs to errors. (#2678)
lpegoraro Oct 2, 2023
182e0c9
feat(maestro): fix args sent to find.
Oct 2, 2023
1d1ed00
Merge pull request #2679 from lpegoraro/fix-eng-1118-fix-select-query
lpegoraro Oct 2, 2023
9f92bb6
feat(maestro): add repository tests and fixed errors on repository.
Oct 2, 2023
d237900
feat(maestro): add repository tests and fixed errors on repository.
Oct 2, 2023
549fe83
Merge pull request #2688 from lpegoraro/fix-1118-add-repo
lpegoraro Oct 2, 2023
db2337c
fix (maestro): upgrade kubectl to k8s 1.27.4
etaques Oct 2, 2023
8f0445d
feat(maestro): fix get deployment and add logs to sink_activity. (#2690)
lpegoraro Oct 3, 2023
fcc0bf4
Merge pull request #2689 from orb-community/etaques-patch-2
lpegoraro Oct 3, 2023
7688703
feat(maestro): add debug logs to sinker listeners. (#2691)
lpegoraro Oct 3, 2023
f64f8e1
fix(maestro): fix decode event (#2692)
lpegoraro Oct 3, 2023
155fc2b
feat(maestro): add logs to understand better the fixes.
Oct 3, 2023
3bed42b
feat(maestro): add logs to understand better the fixes.
Oct 3, 2023
b49cf46
Merge pull request #2693 from lpegoraro/fix-eng-1118-fix-monitor
lpegoraro Oct 3, 2023
b3da1cc
feat(maestro): fix decrypt. (#2694)
lpegoraro Oct 3, 2023
ce5d7dd
fix(maestro): injection on monitor was missing (#2695)
lpegoraro Oct 3, 2023
d457682
feat(maestro): fix storing decoded auths. (#2696)
lpegoraro Oct 3, 2023
ff5ee15
fix (sinks): maestro status event source subscribe
Oct 3, 2023
ae17963
Merge pull request #2697 from orb-community/etaques-patch-10
lpegoraro Oct 3, 2023
5ef18cd
fix (maestro): event source topic name
Oct 3, 2023
6e8fd1e
Merge pull request #2698 from orb-community/etaques-patch-11
lpegoraro Oct 3, 2023
93fe5ea
set sink status when deployment status change
Oct 3, 2023
42977c2
feat(maestro): fix doubling update. (#2699)
lpegoraro Oct 3, 2023
6b71ea9
set sink status when deployment status change (#2700)
etaques Oct 3, 2023
03a1a3d
fix(orb-ui): #1281 Accordions and Tabs closing after refresh (#2701)
joao-mendonca-encora Oct 3, 2023
480cf76
Merge branch 'develop' of github.com:orb-community/orb into develop
Oct 3, 2023
7932734
fix(maestro): logs filtering to make it more clear the error (#2702)
lpegoraro Oct 3, 2023
587df2c
fix (maestro) collectorName should be deployment name (#2703)
etaques Oct 3, 2023
ea08366
feat(sinks): add log on receiveing message from maestro.
Oct 3, 2023
5057530
feat(sinks): add log on receiveing message from maestro. (#2704)
lpegoraro Oct 3, 2023
bad7dc2
Merge branch 'develop' of github.com:orb-community/orb into develop
Oct 3, 2023
f2436a0
feat(sinks): add log on receiveing message from maestro.
Oct 3, 2023
a47e062
fix: (maestro) monitor to kill orphaned otelcollectors
Oct 3, 2023
d9bc6a0
fix: (sinks) update state
Oct 3, 2023
f09969e
fix: (maestro) monitor to kill orphaned otelcollectors (#2705)
etaques Oct 3, 2023
bed581a
feat(sinker): add debounce of 1 min per sinker to not publish every m…
Oct 3, 2023
b767364
Merge pull request #2707 from lpegoraro/new_sink_logs
lpegoraro Oct 3, 2023
65de143
fix(orb-ui): improvements on view headers (#2706)
joao-mendonca-encora Oct 3, 2023
116d11b
fix: (sinks) update state
Oct 3, 2023
d0bdb73
add changes
Oct 3, 2023
3c0eba9
add changes
Oct 3, 2023
7df6d9e
add changes
Oct 3, 2023
9563d48
refactor: update readme and fix linting errors (#2708)
gpazuch Oct 3, 2023
0d90cb7
fix(maestro): fix re-uping errored container without update. (#2710)
lpegoraro Oct 3, 2023
f649d04
Merge branch 'develop' of github.com:orb-community/orb into etaques-p…
Oct 3, 2023
b160603
add changes
Oct 3, 2023
9eb3a56
add changes
Oct 3, 2023
b15daba
add changes
Oct 3, 2023
8ca75e5
add changes
Oct 4, 2023
6b05013
add changes
Oct 4, 2023
37fa6c1
add changes
Oct 4, 2023
eb7058c
Merge pull request #2709 from orb-community/etaques-patch-12
lpegoraro Oct 4, 2023
b5fcc18
fix(sinks): add new changes to postgres. (#2711)
lpegoraro Oct 4, 2023
5bdcdb6
feat(maestro): update error message not showing and on update sink no…
Oct 4, 2023
b501449
Merge pull request #2712 from lpegoraro/fix-1118-fix-sinks-update
lpegoraro Oct 4, 2023
ae14511
feat(maestro): fix not deploying based on last status.
Oct 4, 2023
75eb241
feat(maestro): fix update deployment.
Oct 4, 2023
f51fa45
feat(maestro): fix update deployment.
Oct 4, 2023
aa673ac
Merge pull request #2715 from lpegoraro/fix_activity_skip_error
lpegoraro Oct 4, 2023
5ad547d
feat(maestro): fix update deployment.
Oct 4, 2023
5da4eb6
Merge pull request #2716 from lpegoraro/fix_update_error_msg
lpegoraro Oct 4, 2023
a1c8890
fix (maestro): reset error message when sink update (#2717)
etaques Oct 4, 2023
20fbad2
feat(maestro): add rules to activity, so sink does not switch from ac…
Oct 4, 2023
fa00cca
Merge branch 'develop' into fix_deploy_double
Oct 4, 2023
ad6d2c2
feat(maestro): change base status to unknown.
Oct 4, 2023
18bc376
feat(maestro): change base status to unknown.
Oct 4, 2023
f422be1
Merge pull request #2718 from lpegoraro/fix_deploy_double
lpegoraro Oct 4, 2023
1d67ca9
hotfix: update UI dockerfiles (#2713)
gpazuch Oct 4, 2023
3f320ed
fix (kind): update kind and kubectl to 1.27 (#2719)
etaques Oct 5, 2023
55fa185
fix (maestro): activity and error event synchronization (#2720)
etaques Oct 5, 2023
2e04506
fix: (sinker): idle state based on redis pubsub keystore events
Oct 5, 2023
a408cab
improvements(orb-ui): Dataset Windows (#2721)
joao-mendonca-encora Oct 5, 2023
78959bc
fix: (sinker): idle state based on redis pubsub keystore events
Oct 5, 2023
aa14e60
Update sink_key_expire.go
etaques Oct 5, 2023
20104cb
Update sinker_idle.go
etaques Oct 6, 2023
61099cb
Update sinker_idle.go
etaques Oct 6, 2023
05faf98
Update sinker_test.go
etaques Oct 6, 2023
45e1a75
Update sinker_test.go
etaques Oct 6, 2023
c5e125f
Merge pull request #2722 from orb-community/etaques-patch-17
lpegoraro Oct 6, 2023
d01f365
Update Chart.yaml
etaques Oct 6, 2023
4375190
Merge pull request #2723 from orb-community/etaques-patch-2
lpegoraro Oct 6, 2023
8e8a805
fix (kind): [ENG-1279] psp on kind (#2724)
etaques Oct 6, 2023
6a73979
fix (kind): chart version to 1.0.52 (#2725)
etaques Oct 6, 2023
5a5e8d0
fix(orb-ui) #1303 Code editors expose syntax errors (#2726)
joao-mendonca-encora Oct 6, 2023
2cdd42f
fix(orb-ui): Validate agent metadata before searching for agent versi…
joao-mendonca-encora Oct 6, 2023
c34d5b2
fix: (sinker) idle expire message (#2729)
etaques Oct 6, 2023
bcce593
fix (maestro): returning sink from idle to active (#2730)
etaques Oct 6, 2023
3b8d4ba
feat(kind): adding local development tools (#2731)
etaques Oct 9, 2023
c2c9266
fix(orb-ui): edit policy validate json config (#2728)
joao-mendonca-encora Oct 9, 2023
e475f4b
feat(ui): add color to new status (#2734)
joao-mendonca-encora Oct 9, 2023
86360e3
feat(orb-ui): Expose warning sink messages (#2735)
joao-mendonca-encora Oct 9, 2023
ba2fe17
fix(orb-ui): Sink state circle colors (#2736)
joao-mendonca-encora Oct 9, 2023
5d512a1
fix(orb-ui): Active policies formatting (#2737)
joao-mendonca-encora Oct 9, 2023
5599d7b
feat(maestro): add redundancy to create deployment when receives acti…
Oct 9, 2023
0aa6a1a
feat(maestro): fix unit test
Oct 9, 2023
b1c8f27
feat(maestro): fix unit test
Oct 9, 2023
9ae1056
feat(maestro): fix unit test
Oct 9, 2023
b448764
Merge pull request #2739 from lpegoraro/feat/add_redundancy_to_activity
lpegoraro Oct 10, 2023
6b55482
fix (maestro): create db, add unique constraint (#2742)
etaques Oct 10, 2023
5b071ec
fix (maestro) db and typo on activity variable (#2743)
etaques Oct 10, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/go-develop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,7 @@ jobs:
- package-policies
- package-sinker
- package-sinks
- package-maestro
- package-ui

runs-on: ubuntu-latest
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ install-helm:

install-kubectl:
cd /tmp && \
curl -LO "https://dl.k8s.io/release/v1.22.1/bin/linux/amd64/kubectl" && \
curl -LO "https://dl.k8s.io/release/v1.27.4/bin/linux/amd64/kubectl" && \
chmod a+x ./kubectl && \
sudo mv ./kubectl /usr/local/bin/kubectl

Expand Down Expand Up @@ -182,7 +182,7 @@ kind-create-all: kind-create-cluster kind-install-orb
kind-upgrade-all: kind-load-images kind-upgrade-orb

kind-create-cluster:
kind create cluster --image kindest/node:v1.22.15 --config=./kind/config.yaml
kind create cluster --image kindest/node:v1.24.0 --config=./kind/config.yaml

kind-delete-cluster:
kind delete cluster
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.27.0
0.28.0
29 changes: 23 additions & 6 deletions cmd/maestro/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ package main
import (
"context"
"fmt"
"github.com/jmoiron/sqlx"
"github.com/orb-community/orb/maestro/postgres"
"io"
"os"
"os/signal"
Expand All @@ -36,9 +38,10 @@ import (
)

const (
svcName = "maestro"
envPrefix = "orb_maestro"
httpPort = "8500"
svcName = "maestro"
envPrefix = "orb_maestro"
sinkPrefix = "orb_sinks"
httpPort = "8500"
)

func main() {
Expand All @@ -48,6 +51,9 @@ func main() {
svcCfg := config.LoadBaseServiceConfig(envPrefix, httpPort)
jCfg := config.LoadJaegerConfig(envPrefix)
sinksGRPCCfg := config.LoadGRPCConfig("orb", "sinks")
dbCfg := config.LoadPostgresConfig(envPrefix, svcName)
encryptionKey := config.LoadEncryptionKey(sinkPrefix)
svcCfg.EncryptionKey = encryptionKey.Key

// logger
var logger *zap.Logger
Expand Down Expand Up @@ -110,8 +116,10 @@ func main() {
}
sinksGRPCClient := sinksgrpc.NewClient(tracer, sinksGRPCConn, sinksGRPCTimeout, logger)
otelCfg := config.LoadOtelConfig(envPrefix)
db := connectToDB(dbCfg, logger)
defer db.Close()

svc := maestro.NewMaestroService(logger, streamEsClient, sinkerEsClient, sinksGRPCClient, streamEsCfg, otelCfg)
svc := maestro.NewMaestroService(logger, streamEsClient, sinkerEsClient, sinksGRPCClient, otelCfg, db, svcCfg)
errs := make(chan error, 2)

mainContext, mainCancelFunction := context.WithCancel(context.Background())
Expand All @@ -132,6 +140,15 @@ func main() {
logger.Error(fmt.Sprintf("Maestro service terminated: %s", err))
}

func connectToDB(cfg config.PostgresConfig, logger *zap.Logger) *sqlx.DB {
db, err := postgres.Connect(cfg)
if err != nil {
logger.Error("Failed to connect to postgres", zap.Error(err))
os.Exit(1)
}
return db
}

func connectToGRPC(cfg config.GRPCConfig, logger *zap.Logger) *grpc.ClientConn {
var opts []grpc.DialOption
tls, err := strconv.ParseBool(cfg.ClientTLS)
Expand Down Expand Up @@ -211,7 +228,7 @@ func loadStreamEsConfig(prefix string) config.EsConfig {
cfg.AllowEmptyEnv(true)
cfg.AutomaticEnv()
var esC config.EsConfig
cfg.Unmarshal(&esC)
_ = cfg.Unmarshal(&esC)
return esC
}

Expand All @@ -226,6 +243,6 @@ func loadSinkerEsConfig(prefix string) config.EsConfig {
cfg.AllowEmptyEnv(true)
cfg.AutomaticEnv()
var esC config.EsConfig
cfg.Unmarshal(&esC)
_ = cfg.Unmarshal(&esC)
return esC
}
24 changes: 7 additions & 17 deletions cmd/sinker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package main

import (
"context"
"fmt"
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
"github.com/go-redis/redis/v8"
Expand All @@ -20,10 +19,6 @@ import (
"github.com/orb-community/orb/pkg/config"
policiesgrpc "github.com/orb-community/orb/policies/api/grpc"
"github.com/orb-community/orb/sinker"
sinkconfig "github.com/orb-community/orb/sinker/config"
cacheconfig "github.com/orb-community/orb/sinker/redis"
"github.com/orb-community/orb/sinker/redis/consumer"
"github.com/orb-community/orb/sinker/redis/producer"
sinksgrpc "github.com/orb-community/orb/sinks/api/grpc"
stdprometheus "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
Expand Down Expand Up @@ -102,6 +97,12 @@ func main() {
}

cacheClient := connectToRedis(cacheCfg.URL, cacheCfg.Pass, cacheCfg.DB, logger)
defer func(client *redis.Client) {
err := client.Close()
if err != nil {
log.Fatalf(err.Error())
}
}(cacheClient)

esClient := connectToRedis(esCfg.URL, esCfg.Pass, esCfg.DB, logger)
defer func(esClient *redis.Client) {
Expand Down Expand Up @@ -168,8 +169,6 @@ func main() {
}
sinksGRPCClient := sinksgrpc.NewClient(tracer, sinksGRPCConn, sinksGRPCTimeout, logger)

configRepo := cacheconfig.NewSinkerCache(cacheClient, logger)
configRepo = producer.NewEventStoreMiddleware(configRepo, esClient, logger)
gauge := kitprometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: "sinker",
Subsystem: "sink",
Expand All @@ -192,7 +191,7 @@ func main() {
otelEnabled := otelCfg.Enable == "true"
otelKafkaUrl := otelCfg.KafkaUrl

svc := sinker.New(logger, pubSub, esClient, configRepo, policiesGRPCClient, fleetGRPCClient, sinksGRPCClient,
svc := sinker.New(logger, pubSub, esClient, cacheClient, policiesGRPCClient, fleetGRPCClient, sinksGRPCClient,
otelKafkaUrl, otelEnabled, gauge, counter, inputCounter, inMemoryCacheConfig.DefaultExpiration)
defer func(svc sinker.Service) {
err := svc.Stop()
Expand All @@ -204,7 +203,6 @@ func main() {
errs := make(chan error, 2)

go startHTTPServer(svcCfg, errs, logger)
go subscribeToSinksES(svc, configRepo, esClient, esCfg, logger)

err = svc.Start()
if err != nil {
Expand Down Expand Up @@ -307,11 +305,3 @@ func initJaeger(svcName, url string, logger *zap.Logger) (opentracing.Tracer, io

return tracer, closer
}

func subscribeToSinksES(svc sinker.Service, configRepo sinkconfig.ConfigRepo, client *redis.Client, cfg config.EsConfig, logger *zap.Logger) {
eventStore := consumer.NewEventStore(svc, configRepo, client, cfg.Consumer, logger)
logger.Info("Subscribed to Redis Event Store for sinks")
if err := eventStore.Subscribe(context.Background()); err != nil {
logger.Error("Bootstrap service failed to subscribe to event sourcing", zap.Error(err))
}
}
36 changes: 23 additions & 13 deletions cmd/sinks/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,18 @@ package main
import (
"context"
"fmt"
"io"
"io/ioutil"
"log"
"net"
"net/http"
"os"
"os/signal"
"strconv"
"strings"
"syscall"
"time"

authapi "github.com/mainflux/mainflux/auth/api/grpc"
mfsdk "github.com/mainflux/mainflux/pkg/sdk/go"
opentracing "github.com/opentracing/opentracing-go"
Expand All @@ -27,17 +39,6 @@ import (
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc/reflection"
"io"
"io/ioutil"
"log"
"net"
"net/http"
"os"
"os/signal"
"strconv"
"strings"
"syscall"
"time"

kitprometheus "github.com/go-kit/kit/metrics/prometheus"
r "github.com/go-redis/redis/v8"
Expand Down Expand Up @@ -126,6 +127,7 @@ func main() {
go startHTTPServer(tracer, svc, svcCfg, logger, errs)
go startGRPCServer(svc, tracer, sinksGRPCCfg, logger, errs)
go subscribeToSinkerES(svc, esClient, esCfg, logger)
go subscribeToMaestroStatusES(svc, esClient, esCfg, logger)

go func() {
c := make(chan os.Signal)
Expand Down Expand Up @@ -193,7 +195,7 @@ func newSinkService(auth mainflux.AuthServiceClient, logger *zap.Logger, esClien
mfsdk := mfsdk.NewSDK(config)

svc := sinks.NewSinkService(logger, auth, repoSink, mfsdk, passwordService)
svc = redisprod.NewEventStoreMiddleware(svc, esClient)
svc = redisprod.NewSinkStreamProducerMiddleware(svc, esClient)
svc = sinkshttp.NewLoggingMiddleware(svc, logger)
svc = sinkshttp.MetricsMiddleware(
auth,
Expand Down Expand Up @@ -286,6 +288,14 @@ func subscribeToSinkerES(svc sinks.SinkService, client *r.Client, cfg config.EsC
eventStore := rediscons.NewEventStore(svc, client, cfg.Consumer, logger)
logger.Info("Subscribed to Redis Event Store for sinker")
if err := eventStore.Subscribe(context.Background()); err != nil {
logger.Error("Bootstrap service failed to subscribe to event sourcing", zap.Error(err))
logger.Error("Bootstrap service failed to subscribe to sinker event sourcing", zap.Error(err))
}
}

func subscribeToMaestroStatusES(svc sinks.SinkService, client *r.Client, cfg config.EsConfig, logger *zap.Logger) {
eventStore := rediscons.NewSinkStatusListener(logger, client, svc)
logger.Info("Subscribed to Redis Event Store for maestro")
if err := eventStore.SubscribeToMaestroSinkStatus(context.Background()); err != nil {
logger.Error("Bootstrap service failed to subscribe to maestro event sourcing", zap.Error(err))
}
}
2 changes: 1 addition & 1 deletion docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ RUN make $SVC \
FROM alpine:latest
ARG SVC

RUN if [[ "maestro" == "$SVC" ]]; then apk update && apk add --no-cache docker-cli bash curl && curl -LO https://storage.googleapis.com/kubernetes-release/release/v1.18.0/bin/linux/amd64/kubectl && chmod +x ./kubectl && mv ./kubectl /usr/local/bin/kubectl; fi
RUN if [[ "maestro" == "$SVC" ]]; then apk update && apk add --no-cache docker-cli bash curl && curl -LO https://storage.googleapis.com/kubernetes-release/release/v1.27.4/bin/linux/amd64/kubectl && chmod +x ./kubectl && mv ./kubectl /usr/local/bin/kubectl; fi
# Certificates are needed so that mailing util can work.
COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ca-certificates.crt
COPY --from=builder /exe /
Expand Down
Loading
Loading