From 9463f7ddfa7dad3d9497e71c2e98ebdd4cec1e58 Mon Sep 17 00:00:00 2001 From: Song Gao Date: Fri, 25 Oct 2024 21:02:19 +0800 Subject: [PATCH] fix: fix connection notify (#3335) Signed-off-by: Song Gao --- pkg/connection/conn.go | 27 ++++++++++----------------- pkg/connection/pool.go | 4 +--- 2 files changed, 11 insertions(+), 20 deletions(-) diff --git a/pkg/connection/conn.go b/pkg/connection/conn.go index 842f42ea47..7417a9aa81 100644 --- a/pkg/connection/conn.go +++ b/pkg/connection/conn.go @@ -15,13 +15,11 @@ package connection import ( - rawContext "context" "sync" "sync/atomic" "github.com/lf-edge/ekuiper/contract/v2/api" - "github.com/lf-edge/ekuiper/v2/internal/conf" "github.com/lf-edge/ekuiper/v2/internal/topo/context" "github.com/lf-edge/ekuiper/v2/pkg/modules" ) @@ -32,10 +30,8 @@ type ConnWrapper struct { conn modules.Connection err error l sync.RWMutex - // context for connection wait only - waitCtx rawContext.Context - // context for the lifecycle - stop rawContext.CancelFunc + readCh chan struct{} + detachCh chan struct{} } func (cw *ConnWrapper) setConn(conn modules.Connection, err error) { @@ -50,7 +46,8 @@ func (cw *ConnWrapper) Wait(connectorCtx api.StreamContext) (modules.Connection, select { case <-connectorCtx.Done(): connectorCtx.GetLogger().Infof("stop waiting connection") - case <-cw.waitCtx.Done(): + case <-cw.readCh: + case <-cw.detachCh: } cw.l.RLock() defer cw.l.RUnlock() @@ -63,20 +60,16 @@ func (cw *ConnWrapper) IsInitialized() bool { return cw.initialized } -func newConnWrapper(callerCtx api.StreamContext, meta *Meta) *ConnWrapper { - callerCtx.GetLogger().Infof("creating new connection wrapper") - wctx, onConnect := rawContext.WithCancel(rawContext.Background()) - contextLogger := conf.Log.WithField("conn", meta.ID) - connCtx, stop := context.WithValue(context.Background(), context.LoggerKey, contextLogger).WithCancel() +func newConnWrapper(ctx api.StreamContext, meta *Meta) *ConnWrapper { cw := &ConnWrapper{ - ID: meta.ID, - waitCtx: wctx, - stop: stop, + ID: meta.ID, + readCh: make(chan struct{}), + detachCh: make(chan struct{}), } go func() { - conn, err := createConnection(connCtx, meta) + conn, err := createConnection(ctx, meta) cw.setConn(conn, err) - onConnect() + close(cw.readCh) }() return cw } diff --git a/pkg/connection/pool.go b/pkg/connection/pool.go index 8aa7a99375..c1d4ccfa9d 100644 --- a/pkg/connection/pool.go +++ b/pkg/connection/pool.go @@ -321,9 +321,7 @@ func detachConnection(ctx api.StreamContext, conId string) error { globalConnectionManager.connectionPool[conId] = meta conf.Log.Infof("detachConnection remove conn:%v,ref:%v", conId, refId) if !meta.Named && meta.GetRefCount() == 0 { - if meta.cw.stop != nil { - meta.cw.stop() - } + close(meta.cw.detachCh) conn, err := meta.cw.Wait(ctx) if conn != nil && err == nil { conn.Close(ctx)