From 61b456c2c2453bb54f2b81c0f6ed63921910d775 Mon Sep 17 00:00:00 2001 From: Bariq Date: Tue, 9 Nov 2021 22:06:27 +0800 Subject: [PATCH 1/3] add resolver for subscribing active exec sessions through nats Signed-off-by: Bariq --- meshsync/exec.go | 45 ++++++++++++++++++++++++++++++++++++++++++++ meshsync/handlers.go | 7 +++++++ 2 files changed, 52 insertions(+) diff --git a/meshsync/exec.go b/meshsync/exec.go index 14552b52..d0e6c157 100644 --- a/meshsync/exec.go +++ b/meshsync/exec.go @@ -39,18 +39,63 @@ func (h *Handler) processExecRequest(obj interface{}, cfg config.ListenerConfig) if !bool(req.Stop) { h.channelPool[id] = channels.NewStructChannel() h.Log.Info("Starting session") + + err := h.Broker.Publish("active_sessions.exec", &broker.Message{ + ObjectType: broker.ActiveExecObject, + Object: h.getActiveChannels(), + }) + if err != nil { + h.Log.Error(ErrGetObject(err)) + } go h.streamSession(id, req, cfg) } } else { // Already running subscription if bool(req.Stop) { h.channelPool[id].(channels.StructChannel) <- struct{}{} + + err := h.Broker.Publish("active_sessions.exec", &broker.Message{ + ObjectType: broker.ActiveExecObject, + Object: h.getActiveChannels(), + }) + if err != nil { + h.Log.Error(ErrGetObject(err)) + } } } } return nil } +func (h *Handler) processActiveExecRequest() error { + go h.streamChannelPool() + + return nil +} +func (h *Handler) getActiveChannels() []*string { + activeChannels := make([]*string, 0, len(h.channelPool)) + for k := range h.channelPool { + activeChannels = append(activeChannels, &k) + } + + return activeChannels +} + +func (h *Handler) streamChannelPool() error { + go func() { + for { + err := h.Broker.Publish("active_sessions.exec", &broker.Message{ + ObjectType: broker.ActiveExecObject, + Object: h.getActiveChannels(), + }) + if err != nil { + h.Log.Error(ErrGetObject(err)) + } + } + }() + + return nil +} func (h *Handler) streamSession(id string, req model.ExecRequest, cfg config.ListenerConfig) { subCh := make(chan *broker.Message) diff --git a/meshsync/handlers.go b/meshsync/handlers.go index 488866fd..ff2b4f1f 100644 --- a/meshsync/handlers.go +++ b/meshsync/handlers.go @@ -65,6 +65,13 @@ func (h *Handler) ListenToRequests() { h.Log.Error(err) continue } + case broker.ActiveExecEntity: + h.Log.Info("Connecting to channel pool") + err := h.processActiveExecRequest() + if err != nil { + h.Log.Error(err) + continue + } } } } From 6a75f7f65bbb48d29cfe80af448712994065d115 Mon Sep 17 00:00:00 2001 From: Bariq Date: Tue, 9 Nov 2021 23:29:54 +0800 Subject: [PATCH 2/3] remove unnecessary lines and add comments Signed-off-by: Bariq --- meshsync/exec.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/meshsync/exec.go b/meshsync/exec.go index e8a5f283..5e4d7e20 100644 --- a/meshsync/exec.go +++ b/meshsync/exec.go @@ -56,8 +56,7 @@ func (h *Handler) processExecRequest(obj interface{}, cfg config.ListenerConfig) } else { // Already running subscription if bool(req.Stop) { - h.channelPool[id].(channels.StructChannel) <- struct{}{} - + // TODO: once we have a unsubscribe functionality, need to publish message to active sessions subject execCleanup(h, id) } } From ea857955c8d567f3b9805129c2e8743d2cf4660f Mon Sep 17 00:00:00 2001 From: Utkarsh Srivastava Date: Tue, 16 Nov 2021 19:37:27 +0530 Subject: [PATCH 3/3] add sleep Signed-off-by: Utkarsh Srivastava --- meshsync/exec.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/meshsync/exec.go b/meshsync/exec.go index 5e4d7e20..ee82a813 100644 --- a/meshsync/exec.go +++ b/meshsync/exec.go @@ -7,6 +7,7 @@ import ( "io/ioutil" "os" "strings" + "time" "github.com/google/uuid" "github.com/layer5io/meshkit/broker" @@ -88,6 +89,8 @@ func (h *Handler) streamChannelPool() error { if err != nil { h.Log.Error(ErrGetObject(err)) } + + time.Sleep(10 * time.Second) } }()