Skip to content

Commit

Permalink
fix init and close of logs stream
Browse files Browse the repository at this point in the history
Signed-off-by: MUzairS15 <[email protected]>
  • Loading branch information
MUzairS15 committed Feb 28, 2024
1 parent a8e2fb7 commit 8394632
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 12 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
github.com/layer5io/meshery-operator v0.7.0
github.com/layer5io/meshkit v0.7.9
github.com/myntra/pipeline v0.0.0-20180618182531-2babf4864ce8
github.com/sirupsen/logrus v1.9.3
github.com/spf13/viper v1.18.2
golang.org/x/exp v0.0.0-20231006140011-7918f672742d
golang.org/x/net v0.19.0
Expand Down Expand Up @@ -134,7 +135,6 @@ require (
github.com/sagikazarmark/locafero v0.4.0 // indirect
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
github.com/shopspring/decimal v1.3.1 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
github.com/spf13/afero v1.11.0 // indirect
github.com/spf13/cast v1.6.0 // indirect
Expand Down
7 changes: 4 additions & 3 deletions internal/config/crd_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package config
import (
"context"
"errors"
"fmt"

"github.com/layer5io/meshery-operator/pkg/client"
"github.com/layer5io/meshkit/utils"
Expand Down Expand Up @@ -168,7 +169,7 @@ func PopulateConfigs(configMap corev1.ConfigMap) (*MeshsyncConfig, error) {
func PatchCRVersion(config *rest.Config) error {
meshsyncClient, err := client.New(config)
if err != nil {
return err
return ErrInitConfig(fmt.Errorf("unable to update MeshSync configuration"))

Check warning on line 172 in internal/config/crd_config.go

View check run for this annotation

Codecov / codecov/patch

internal/config/crd_config.go#L172

Added line #L172 was not covered by tests
}

patchedResource := map[string]interface{}{
Expand All @@ -178,11 +179,11 @@ func PatchCRVersion(config *rest.Config) error {
}
byt, err := utils.Marshal(patchedResource)
if err != nil {
return err
return ErrInitConfig(fmt.Errorf("unable to update MeshSync configuration"))

Check warning on line 182 in internal/config/crd_config.go

View check run for this annotation

Codecov / codecov/patch

internal/config/crd_config.go#L182

Added line #L182 was not covered by tests
}
_, err = meshsyncClient.CoreV1Alpha1().MeshSyncs("meshery").Patch(context.TODO(), crName, types.MergePatchType, []byte(byt), metav1.PatchOptions{})
if err != nil {
return err
return ErrInitConfig(fmt.Errorf("unable to update MeshSync configuration"))

Check warning on line 186 in internal/config/crd_config.go

View check run for this annotation

Codecov / codecov/patch

internal/config/crd_config.go#L186

Added line #L186 was not covered by tests
}
return nil
}
4 changes: 3 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/layer5io/meshsync/internal/channels"
"github.com/layer5io/meshsync/internal/config"
"github.com/layer5io/meshsync/meshsync"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
)

Expand All @@ -33,7 +34,8 @@ func main() {

// Initialize Logger instance
log, err := logger.New(serviceName, logger.Options{
Format: logger.SyslogLogFormat,
Format: logger.SyslogLogFormat,
LogLevel: int(logrus.InfoLevel),
})
if err != nil {
fmt.Println(err)
Expand Down
16 changes: 9 additions & 7 deletions meshsync/logstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,22 @@ func (h *Handler) streamLogs(id string, req model.LogRequest, cfg config.Listene
return
}

defer resp.Close()
go func() {
<-h.channelPool[id].(channels.StructChannel)
h.Log.Info("Closing", id)
delete(h.channelPool, id)
resp.Close()
}()

Check warning on line 70 in meshsync/logstream.go

View check run for this annotation

Codecov / codecov/patch

meshsync/logstream.go#L65-L70

Added lines #L65 - L70 were not covered by tests

for {
buf := make([]byte, 2000)
numBytes, err := resp.Read(buf)
if numBytes == 0 {
continue
}
if err == io.EOF {
break
}
if numBytes == 0 {
continue

Check warning on line 79 in meshsync/logstream.go

View check run for this annotation

Codecov / codecov/patch

meshsync/logstream.go#L78-L79

Added lines #L78 - L79 were not covered by tests
}
if err != nil {
h.Log.Error(ErrCopyBuffer(err))
delete(h.channelPool, id)
Expand All @@ -94,7 +99,4 @@ func (h *Handler) streamLogs(id string, req model.LogRequest, cfg config.Listene
}
}

<-h.channelPool[id].(channels.StructChannel)
h.Log.Info("Closing", id)
delete(h.channelPool, id)
}

0 comments on commit 8394632

Please sign in to comment.