Skip to content

Commit

Permalink
add changes
Browse files Browse the repository at this point in the history
  • Loading branch information
etaques committed Oct 3, 2023
2 parents 116d11b + 65de143 commit d0bdb73
Show file tree
Hide file tree
Showing 9 changed files with 160 additions and 101 deletions.
33 changes: 21 additions & 12 deletions sinker/otel/bridgeservice/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,19 +67,28 @@ func (bs *SinkerOtelBridgeService) IncrementMessageCounter(publisher, subtopic,

// NotifyActiveSink notify the sinker that a sink is active
func (bs *SinkerOtelBridgeService) NotifyActiveSink(ctx context.Context, mfOwnerId, sinkId, size string) error {
bs.logger.Debug("notifying active sink", zap.String("sink_id", sinkId), zap.String("owner_id", mfOwnerId),
zap.String("payload_size", size))
event := producer.SinkActivityEvent{
OwnerID: mfOwnerId,
SinkID: sinkId,
State: "active",
Size: size,
Timestamp: time.Now(),
}
err := bs.sinkerActivitySvc.PublishSinkActivity(ctx, event)
if err != nil {
bs.logger.Error("error publishing sink activity", zap.Error(err))
cacheKey := fmt.Sprintf("active_sink-%s-%s", mfOwnerId, sinkId)
_, found := bs.inMemoryCache.Get(cacheKey)
if !found {
bs.logger.Debug("notifying active sink", zap.String("sink_id", sinkId), zap.String("owner_id", mfOwnerId),
zap.String("payload_size", size))
event := producer.SinkActivityEvent{
OwnerID: mfOwnerId,
SinkID: sinkId,
State: "active",
Size: size,
Timestamp: time.Now(),
}
err := bs.sinkerActivitySvc.PublishSinkActivity(ctx, event)
if err != nil {
bs.logger.Error("error publishing sink activity", zap.Error(err))
}
bs.inMemoryCache.Set(cacheKey, true, cache.DefaultExpiration)
} else {
bs.logger.Debug("active sink already notified", zap.String("sink_id", sinkId), zap.String("owner_id", mfOwnerId),
zap.String("payload_size", size))
}

return nil
}

Expand Down
6 changes: 3 additions & 3 deletions sinker/otel/kafkafanoutexporter/kafka_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (ke kafkaErrors) Error() string {
func (e *kafkaTracesProducer) tracesPusher(ctx context.Context, td ptrace.Traces) error {
sinkId := ctx.Value("sink_id").(string)
topic := e.topic + "-" + sinkId
e.logger.Info("Pushing traces to kafka topic = " + topic)
e.logger.Debug("Pushing traces to kafka topic = " + topic)
messages, err := e.marshaler.Marshal(td, topic)
if err != nil {
return consumererror.NewPermanent(err)
Expand Down Expand Up @@ -83,7 +83,7 @@ type kafkaMetricsProducer struct {
func (e *kafkaMetricsProducer) metricsDataPusher(ctx context.Context, md pmetric.Metrics) error {
sinkId := ctx.Value("sink_id").(string)
topic := e.topic + "-" + sinkId
e.logger.Info("Pushing metrics to kafka topic = " + topic)
e.logger.Debug("Pushing metrics to kafka topic = " + topic)
messages, err := e.marshaler.Marshal(md, topic)
if err != nil {
return consumererror.NewPermanent(err)
Expand Down Expand Up @@ -116,7 +116,7 @@ type kafkaLogsProducer struct {
func (e *kafkaLogsProducer) logsDataPusher(ctx context.Context, ld plog.Logs) error {
sinkId := ctx.Value("sink_id").(string)
topic := e.topic + "-" + sinkId
e.logger.Info("Pushing logs to kafka topic = " + topic)
e.logger.Debug("Pushing logs to kafka topic = " + topic)
messages, err := e.marshaler.Marshal(ld, topic)
if err != nil {
return consumererror.NewPermanent(err)
Expand Down
3 changes: 3 additions & 0 deletions sinks/redis/consumer/sink_status_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ func (s *sinkStatusListener) SubscribeToMaestroSinkStatus(ctx context.Context) e
Count: 1000,
}).Result()
if err != nil || len(streams) == 0 {
if err != nil {
rLogger.Error("failed to read group", zap.Error(err))
}
continue
}
for _, msg := range streams[0].Messages {
Expand Down
5 changes: 4 additions & 1 deletion sinks/sinks.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ package sinks
import (
"context"
"database/sql/driver"
"time"

"github.com/orb-community/orb/pkg/errors"
"github.com/orb-community/orb/pkg/types"
"github.com/orb-community/orb/sinks/authentication_type"
"github.com/orb-community/orb/sinks/authentication_type/basicauth"
"github.com/orb-community/orb/sinks/backend"
"go.uber.org/zap"
"time"
)

var (
Expand Down Expand Up @@ -158,6 +159,8 @@ type SinkService interface {
UpdateSink(ctx context.Context, token string, s Sink) (Sink, error)
// UpdateSinkInternal by id
UpdateSinkInternal(ctx context.Context, s Sink) (Sink, error)
// UpdateSinkStatusInternal by id
UpdateSinkStatusInternal(ctx context.Context, s Sink) (Sink, error)
// ListSinks retrieves data about sinks
ListSinks(ctx context.Context, token string, pm PageMetadata) (Page, error)
// ListSinksInternal retrieves data from sinks filtered by SinksFilter for Services like Maestro, to build DeploymentEntries
Expand Down
23 changes: 20 additions & 3 deletions ui/src/app/@theme/styles/_overrides.scss
Original file line number Diff line number Diff line change
Expand Up @@ -199,13 +199,13 @@
}

.orb-service- {
&new {
&new, &unknown {
color: #9b51e0;
}
&online, &healthy {
&online, &healthy, &active {
color: #6fcf97;
}
&stale {
&stale, &idle {
color: #f2994a;
}
&error, &failure {
Expand All @@ -215,6 +215,23 @@
color: #969fb9;
}
}
.orb-service-background- {
&new, &unknown {
background-color: #9b51e0;
}
&online, &healthy, &active {
background-color: #6fcf97;
}
&stale, &idle {
background-color: #f2994a;
}
&error, &failure {
background-color: #df316f;
}
&offline, &none {
background-color: #969fb9;
}
}
.required {
color: #df316f;
padding-left: 2px;
Expand Down
59 changes: 32 additions & 27 deletions ui/src/app/pages/fleet/agents/view/agent.view.component.html
Original file line number Diff line number Diff line change
Expand Up @@ -4,36 +4,41 @@
</xng-breadcrumb>
<div><h4>Agent View</h4></div>
</div>
<div class="right-content" style="margin-right: 15px;">
<ngx-poll-control></ngx-poll-control>
</div>
<div *ngIf="!isLoading" style="margin: 0 20px 0 10px;">
<button (click)="openDeleteModal()" class="card-button delete-button" data-orb-qa-id="button#reset" nbButton shape="round" style="color: #df316f !important;">
<nb-icon icon="trash-2-outline"></nb-icon>
Delete Agent
</button>
</div>
<div *ngIf="!isLoading" style="margin-bottom: 23px;">
<div style="float: right;">
<span class="orb-service-{{ agent?.state }} state">
<i aria-hidden="true" *ngIf="agent?.state !== agentStates.offline" class="fa fa-circle orb-service-{{ agent?.state }}"></i>
<i aria-hidden="true" *ngIf="agent?.state === agentStates.offline" class="fa offline-circle"></i>
{{ agent?.state | ngxCapitalize }}
</span>
<div class="right-content">
<ngx-poll-control style="margin-top: 3px;"></ngx-poll-control>
<div *ngIf="!isLoading">
<button
(click)="openDeleteModal()"
class="card-button delete-button"
nbButton
shape="round"
style="color: #df316f !important; margin: 0 20px;">
<nb-icon icon="trash-2-outline"></nb-icon>
Delete Agent
</button>
</div>
<div style="font-size: 14px; font-weight: 400; margin-top: 23px;">
<span *ngIf="agent?.state !== agentStates.new">
Last activity
<span *ngIf="isToday()">
today, at {{ agent?.ts_last_hb | date: 'HH:mm z' }}
<div *ngIf="!isLoading" class="state-div">
<div style="float: right;">
<span class="orb-service-{{ agent?.state }} state">
<i aria-hidden="true" *ngIf="agent?.state !== agentStates.offline" class="fa state-circle orb-service-background-{{ agent?.state }}"></i>
<i aria-hidden="true" *ngIf="agent?.state === agentStates.offline" class="fa offline-circle"></i>
{{ agent?.state | ngxCapitalize }}
</span>
<span *ngIf="!isToday()">
on {{ agent?.ts_last_hb | date: 'M/d/yy, HH:mm z' }}
</div>
<div class="date">
<span *ngIf="agent?.state !== agentStates.new">
Last activity
<span *ngIf="isToday()">
today, at {{ agent?.ts_last_hb | date: 'HH:mm z' }}
</span>
<span *ngIf="!isToday()">
on {{ agent?.ts_last_hb | date: 'M/d/yy, HH:mm z' }}
</span>
</span>
</span>
<span *ngIf="agent?.state === agentStates.new">
This Agent has been provisioned but never connected.
</span>
<span *ngIf="agent?.state === agentStates.new">
This Agent has been provisioned but never connected.
</span>
</div>
</div>
</div>
</header>
Expand Down
29 changes: 21 additions & 8 deletions ui/src/app/pages/fleet/agents/view/agent.view.component.scss
Original file line number Diff line number Diff line change
Expand Up @@ -202,18 +202,31 @@ nb-card {
color: #969fb9;
font-size: 14px;
}
.state {
font-size: 15px;
font-weight: 700;
}
.fa.fa-circle {
font-size: 11px;

.state-circle {
width: 9px;
height: 9px;
border-radius: 50%;
}
.offline-circle {
width: 10px;
height: 10px;
width: 9px;
height: 9px;
border: 2px solid #969fb9;
border-radius: 50%;
background-color: transparent;
}
.state {
font-size: 15px;
font-weight: 700;
font-family: 'Montserrat';
}
.state-div {
margin-bottom: 23px;
}
.date {
font-size: 14px;
font-weight: 400;
margin-top: 23px;
line-height: 1.25rem;
}

85 changes: 41 additions & 44 deletions ui/src/app/pages/sinks/view/sink.view.component.html
Original file line number Diff line number Diff line change
Expand Up @@ -12,53 +12,50 @@
<h4>{{ strings.sink.view.header }}</h4>
</div>
<div class="right-content">
<ngx-poll-control></ngx-poll-control>
</div>
<div *ngIf="!isLoading" style="margin: 0 20px;">
<button
(click)="openDeleteModal()"
class="card-button delete-button"
nbButton shape="round"
*ngIf="!isEditMode()"
style="color: #df316f !important; float: none !important;">
<nb-icon icon="trash-2-outline"></nb-icon>
Delete Sink
</button>
<button
(click)="save()"
[disabled]="!canSave() || isRequesting || !hasChanges()"
class="policy-save"
nbButton
*ngIf="isEditMode()"
shape="round"
>
<nb-icon icon="save-outline"></nb-icon>
Save
</button>
<button
(click)="discard()"
class="policy-discard"
nbButton
shape="round"
*ngIf="isEditMode()"
>
Discard
</button>
</div>
<div style="margin-bottom: 23px;" *ngIf="!isLoading && !isEditMode()">
<div style="float: right;">
<span class="orb-service-{{ sink?.state }} state" style="margin-left: 10px;">
<i aria-hidden="true" class="fa fa-circle orb-service-{{ sink?.state }}"></i>
{{ sink?.state | ngxCapitalize }}
</span>
<ngx-poll-control style="margin-top: 3px;"></ngx-poll-control>
<div *ngIf="!isLoading">
<button
(click)="openDeleteModal()"
class="card-button delete-button"
nbButton shape="round"
*ngIf="!isEditMode()"
style="color: #df316f !important; float: none !important; margin: 0 20px;">
<nb-icon icon="trash-2-outline"></nb-icon>
Delete Sink
</button>
<button
(click)="save()"
[disabled]="!canSave() || isRequesting || !hasChanges()"
class="policy-save"
nbButton
*ngIf="isEditMode()"
shape="round">
<nb-icon icon="save-outline"></nb-icon>
Save
</button>
<button
(click)="discard()"
class="policy-discard"
nbButton
shape="round"
*ngIf="isEditMode()">
Discard
</button>
</div>
<div style="font-size: 14px; font-weight: 400; margin-top: 23px;">
<span>
Created on {{ sink?.ts_created | date: 'M/d/yy, HH:mm z' }}
</span>
<div class="state-div" *ngIf="!isLoading && !isEditMode()">
<div style="float: right;">
<span class="orb-service-{{ sink?.state }} state" style="margin-left: 10px;">
<i aria-hidden="true" class="fa state-circle orb-service-background-{{ sink?.state }}"></i>
{{ sink?.state | ngxCapitalize }}
</span>
</div>
<div class="date">
<span>
Created on {{ sink?.ts_created | date: 'M/d/yy, HH:mm z' }}
</span>
</div>
</div>
</div>

</header>

<div *ngIf="isLoading">
Expand Down
18 changes: 15 additions & 3 deletions ui/src/app/pages/sinks/view/sink.view.component.scss
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,15 @@ h4 {
}
}
}
.fa.fa-circle {
font-size: 11px;
.state-circle {
width: 9px;
height: 9px;
border-radius: 50%;
}
.state {
font-size: 16px;
font-size: 15px;
font-weight: 700;
font-family: 'Montserrat';
}
.orb-service- {
&active {
Expand All @@ -122,4 +125,13 @@ h4 {
color: #969fb9;
font-size: 14px;
}
.state-div {
margin-bottom: 23px;
}
.date {
font-size: 14px;
font-weight: 400;
margin-top: 23px;
line-height: 1.25rem;
}

0 comments on commit d0bdb73

Please sign in to comment.