diff --git a/pkg/cache/v3/linear.go b/pkg/cache/v3/linear.go
index f7786ac4f9..cd8f6bf2bd 100644
--- a/pkg/cache/v3/linear.go
+++ b/pkg/cache/v3/linear.go
@@ -27,7 +27,7 @@ import (
 	"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
 )
 
-type watches = map[chan Response]struct{}
+type watches = map[ResponseWatch]struct{}
 
 // LinearCache supports collections of opaque resources. This cache has a
 // single collection indexed by resource names and manages resource versions
@@ -106,6 +106,7 @@ func NewLinearCache(typeURL string, opts ...LinearCacheOption) *LinearCache {
 		versionMap:    nil,
 		version:       0,
 		versionVector: make(map[string]uint64),
+		log:           log.NewDefaultLogger(),
 	}
 	for _, opt := range opts {
 		opt(out)
@@ -113,15 +114,28 @@ func NewLinearCache(typeURL string, opts ...LinearCacheOption) *LinearCache {
 	return out
 }
 
-func (cache *LinearCache) respond(value chan Response, staleResources []string) {
+func (cache *LinearCache) respond(watch ResponseWatch, staleResources []string) {
 	var resources []types.ResourceWithTTL
 	// TODO: optimize the resources slice creations across different clients
 	if len(staleResources) == 0 {
+		// Wildcard case, we return all resources in the cache
 		resources = make([]types.ResourceWithTTL, 0, len(cache.resources))
 		for _, resource := range cache.resources {
 			resources = append(resources, types.ResourceWithTTL{Resource: resource})
 		}
+	} else if ResourceRequiresFullStateInSotw(cache.typeURL) {
+		// Non-wildcard request for a type requiring full state response
+		// We need to return all requested resources, if existing, for this type
+		requestedResources := watch.Request.GetResourceNames()
+		resources = make([]types.ResourceWithTTL, 0, len(requestedResources))
+		for _, resource := range requestedResources {
+			resource := cache.resources[resource]
+			if resource != nil {
+				resources = append(resources, types.ResourceWithTTL{Resource: resource})
+			}
+		}
 	} else {
+		// Non-wildcard request for other types. Only return stale resources
 		resources = make([]types.ResourceWithTTL, 0, len(staleResources))
 		for _, name := range staleResources {
 			resource := cache.resources[name]
@@ -130,8 +144,8 @@ func (cache *LinearCache) respond(value chan Response, staleResources []string)
 			}
 		}
 	}
-	value <- &RawResponse{
-		Request:   &Request{TypeUrl: cache.typeURL},
+	watch.Response <- &RawResponse{
+		Request:   watch.Request,
 		Resources: resources,
 		Version:   cache.getVersion(),
 		Ctx:       context.Background(),
@@ -140,18 +154,18 @@ func (cache *LinearCache) respond(value chan Response, staleResources []string)
 
 func (cache *LinearCache) notifyAll(modified map[string]struct{}) {
 	// de-duplicate watches that need to be responded
-	notifyList := make(map[chan Response][]string)
+	notifyList := make(map[ResponseWatch][]string)
 	for name := range modified {
 		for watch := range cache.watches[name] {
 			notifyList[watch] = append(notifyList[watch], name)
 		}
-		delete(cache.watches, name)
 	}
-	for value, stale := range notifyList {
-		cache.respond(value, stale)
+	for watch, stale := range notifyList {
+		cache.removeWatch(watch)
+		cache.respond(watch, stale)
 	}
-	for value := range cache.watchAll {
-		cache.respond(value, nil)
+	for watch := range cache.watchAll {
+		cache.respond(watch, nil)
 	}
 	cache.watchAll = make(watches)
 
@@ -318,6 +332,8 @@ func (cache *LinearCache) CreateWatch(request *Request, _ stream.StreamState, va
 		err = errors.New("mis-matched version prefix")
 	}
 
+	watch := ResponseWatch{Request: request, Response: value}
+
 	cache.mu.Lock()
 	defer cache.mu.Unlock()
 
@@ -325,8 +341,12 @@ func (cache *LinearCache) CreateWatch(request *Request, _ stream.StreamState, va
 	case err != nil:
 		stale = true
 		staleResources = request.GetResourceNames()
+		cache.log.Debugf("Watch is stale as version failed to parse %s", err.Error())
 	case len(request.GetResourceNames()) == 0:
-		stale = lastVersion != cache.version
+		stale = (lastVersion != cache.version)
+		if stale {
+			cache.log.Debugf("Watch is stale as cache version %d differs for wildcard watch %d", cache.version, lastVersion)
+		}
 	default:
 		for _, name := range request.GetResourceNames() {
 			// When a resource is removed, its version defaults 0 and it is not considered stale.
@@ -335,39 +355,50 @@ func (cache *LinearCache) CreateWatch(request *Request, _ stream.StreamState, va
 				staleResources = append(staleResources, name)
 			}
 		}
+		if stale {
+			cache.log.Debugf("Watch is stale with stale resources %v", staleResources)
+		}
 	}
 	if stale {
-		cache.respond(value, staleResources)
+		cache.respond(watch, staleResources)
 		return nil
 	}
 	// Create open watches since versions are up to date.
 	if len(request.GetResourceNames()) == 0 {
-		cache.watchAll[value] = struct{}{}
+		cache.log.Infof("[linear cache] open watch for %s all resources, system version %q", cache.typeURL, cache.getVersion())
+		cache.watchAll[watch] = struct{}{}
 		return func() {
 			cache.mu.Lock()
 			defer cache.mu.Unlock()
-			delete(cache.watchAll, value)
+			delete(cache.watchAll, watch)
 		}
 	}
+
+	cache.log.Infof("[linear cache] open watch for %s resources %v, system version %q", cache.typeURL, request.ResourceNames, cache.getVersion())
 	for _, name := range request.GetResourceNames() {
 		set, exists := cache.watches[name]
 		if !exists {
 			set = make(watches)
 			cache.watches[name] = set
 		}
-		set[value] = struct{}{}
+		set[watch] = struct{}{}
 	}
 	return func() {
 		cache.mu.Lock()
 		defer cache.mu.Unlock()
-		for _, name := range request.GetResourceNames() {
-			set, exists := cache.watches[name]
-			if exists {
-				delete(set, value)
-			}
-			if len(set) == 0 {
-				delete(cache.watches, name)
-			}
+		cache.removeWatch(watch)
+	}
+}
+
+// Must be called under lock
+func (cache *LinearCache) removeWatch(watch ResponseWatch) {
+	// Make sure we clean the watch for ALL resources it might be associated with,
+	// as the channel will no longer be listened to
+	for _, resource := range watch.Request.ResourceNames {
+		resourceWatches := cache.watches[resource]
+		delete(resourceWatches, watch)
+		if len(resourceWatches) == 0 {
+			delete(cache.watches, resource)
 		}
 	}
 }
diff --git a/pkg/cache/v3/linear_test.go b/pkg/cache/v3/linear_test.go
index 82478ca3aa..32d2bedd02 100644
--- a/pkg/cache/v3/linear_test.go
+++ b/pkg/cache/v3/linear_test.go
@@ -25,8 +25,12 @@ import (
 	"github.com/stretchr/testify/require"
 	"google.golang.org/protobuf/types/known/wrapperspb"
 
+	cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
 	endpoint "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
+	discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
 	"github.com/envoyproxy/go-control-plane/pkg/cache/types"
+	"github.com/envoyproxy/go-control-plane/pkg/log"
+	"github.com/envoyproxy/go-control-plane/pkg/resource/v3"
 	"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
 )
 
@@ -38,11 +42,18 @@ func testResource(s string) types.Resource {
 	return wrapperspb.String(s)
 }
 
-func verifyResponse(t *testing.T, ch <-chan Response, version string, num int) {
+func verifyResponseContent(t *testing.T, ch <-chan Response, expectedType string, expectedVersion string) (Response, *discovery.DiscoveryResponse) {
 	t.Helper()
-	r := <-ch
-	if r.GetRequest().GetTypeUrl() != testType {
-		t.Errorf("unexpected empty request type URL: %q", r.GetRequest().GetTypeUrl())
+	var r Response
+	select {
+	case r = <-ch:
+	case <-time.After(1 * time.Second):
+		t.Error("failed to receive response after 1 second")
+		return nil, nil
+	}
+
+	if r.GetRequest().GetTypeUrl() != expectedType {
+		t.Errorf("unexpected request type URL: %q", r.GetRequest().GetTypeUrl())
 	}
 	if r.GetContext() == nil {
 		t.Errorf("unexpected empty response context")
@@ -54,15 +65,41 @@ func verifyResponse(t *testing.T, ch <-chan Response, version string, num int) {
 	if out.GetVersionInfo() == "" {
 		t.Error("unexpected response empty version")
 	}
-	if n := len(out.GetResources()); n != num {
-		t.Errorf("unexpected number of responses: got %d, want %d", n, num)
+	if expectedVersion != "" && out.GetVersionInfo() != expectedVersion {
+		t.Errorf("unexpected version: got %q, want %q", out.GetVersionInfo(), expectedVersion)
 	}
-	if version != "" && out.GetVersionInfo() != version {
-		t.Errorf("unexpected version: got %q, want %q", out.GetVersionInfo(), version)
-	}
-	if out.GetTypeUrl() != testType {
+	if out.GetTypeUrl() != expectedType {
 		t.Errorf("unexpected type URL: %q", out.GetTypeUrl())
 	}
+	if len(r.GetRequest().GetResourceNames()) != 0 && len(r.GetRequest().GetResourceNames()) < len(out.Resources) {
+		t.Errorf("received more resources (%d) than requested (%d)", len(r.GetRequest().GetResourceNames()), len(out.Resources))
+	}
+	return r, out
+}
+
+func verifyResponse(t *testing.T, ch <-chan Response, expectedVersion string, expectedResourcesNb int) {
+	t.Helper()
+	_, r := verifyResponseContent(t, ch, testType, expectedVersion)
+	if r == nil {
+		return
+	}
+	if n := len(r.GetResources()); n != expectedResourcesNb {
+		t.Errorf("unexpected number of responses: got %d, want %d", n, expectedResourcesNb)
+	}
+}
+
+func verifyResponseResources(t *testing.T, ch <-chan Response, expectedType string, expectedVersion string, expectedResources ...string) {
+	t.Helper()
+	r, _ := verifyResponseContent(t, ch, expectedType, expectedVersion)
+	if r == nil {
+		return
+	}
+	out := r.(*RawResponse)
+	resourceNames := []string{}
+	for _, res := range out.Resources {
+		resourceNames = append(resourceNames, GetResourceName(res.Resource))
+	}
+	assert.ElementsMatch(t, resourceNames, expectedResources)
 }
 
 type resourceInfo struct {
@@ -162,6 +199,7 @@ func checkVersionMapSet(t *testing.T, c *LinearCache) {
 }
 
 func mustBlock(t *testing.T, w <-chan Response) {
+	t.Helper()
 	select {
 	case <-w:
 		t.Error("watch must block")
@@ -170,6 +208,7 @@ func mustBlock(t *testing.T, w <-chan Response) {
 }
 
 func mustBlockDelta(t *testing.T, w <-chan DeltaResponse) {
+	t.Helper()
 	select {
 	case <-w:
 		t.Error("watch must block")
@@ -178,6 +217,7 @@ func mustBlockDelta(t *testing.T, w <-chan DeltaResponse) {
 }
 
 func hashResource(t *testing.T, resource types.Resource) string {
+	t.Helper()
 	marshaledResource, err := MarshalResource(resource)
 	if err != nil {
 		t.Fatal(err)
@@ -773,3 +813,142 @@ func TestLinearMixedWatches(t *testing.T) {
 	verifyResponse(t, w, c.getVersion(), 0)
 	verifyDeltaResponse(t, wd, nil, []string{"b"})
 }
+
+func TestLinearSotwWatches(t *testing.T) {
+	t.Run("watches are properly removed from all objects", func(t *testing.T) {
+		cache := NewLinearCache(testType)
+		a := &endpoint.ClusterLoadAssignment{ClusterName: "a"}
+		err := cache.UpdateResource("a", a)
+		require.NoError(t, err)
+		b := &endpoint.ClusterLoadAssignment{ClusterName: "b"}
+		err = cache.UpdateResource("b", b)
+		require.NoError(t, err)
+		assert.Equal(t, 2, cache.NumResources())
+
+		// A watch tracks three different objects.
+		// An update is done for the three objects in a row
+		// If the watches are no properly purged, all three updates will send responses in the channel, but only the first one is tracked
+		// The buffer will therefore saturate and the third request will deadlock the entire cache as occurring under the mutex
+		sotwState := stream.NewStreamState(false, nil)
+		w := make(chan Response, 1)
+		_ = cache.CreateWatch(&Request{ResourceNames: []string{"a", "b", "c"}, TypeUrl: testType, VersionInfo: cache.getVersion()}, sotwState, w)
+		mustBlock(t, w)
+		checkVersionMapNotSet(t, cache)
+
+		assert.Len(t, cache.watches["a"], 1)
+		assert.Len(t, cache.watches["b"], 1)
+		assert.Len(t, cache.watches["c"], 1)
+
+		// Update a and c without touching b
+		a = &endpoint.ClusterLoadAssignment{ClusterName: "a", Endpoints: []*endpoint.LocalityLbEndpoints{ // resource update
+			{Priority: 25},
+		}}
+		err = cache.UpdateResources(map[string]types.Resource{"a": a}, nil)
+		require.NoError(t, err)
+		verifyResponseResources(t, w, testType, cache.getVersion(), "a")
+		checkVersionMapNotSet(t, cache)
+
+		assert.Empty(t, cache.watches["a"])
+		assert.Empty(t, cache.watches["b"])
+		assert.Empty(t, cache.watches["c"])
+
+		// c no longer watched
+		w = make(chan Response, 1)
+		_ = cache.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: cache.getVersion()}, sotwState, w)
+		require.NoError(t, err)
+		mustBlock(t, w)
+		checkVersionMapNotSet(t, cache)
+
+		b = &endpoint.ClusterLoadAssignment{ClusterName: "b", Endpoints: []*endpoint.LocalityLbEndpoints{ // resource update
+			{Priority: 15},
+		}}
+		err = cache.UpdateResources(map[string]types.Resource{"b": b}, nil)
+
+		assert.Empty(t, cache.watches["a"])
+		assert.Empty(t, cache.watches["b"])
+		assert.Empty(t, cache.watches["c"])
+
+		require.NoError(t, err)
+		verifyResponseResources(t, w, testType, cache.getVersion(), "b")
+		checkVersionMapNotSet(t, cache)
+
+		w = make(chan Response, 1)
+		_ = cache.CreateWatch(&Request{ResourceNames: []string{"c"}, TypeUrl: testType, VersionInfo: cache.getVersion()}, sotwState, w)
+		require.NoError(t, err)
+		mustBlock(t, w)
+		checkVersionMapNotSet(t, cache)
+
+		c := &endpoint.ClusterLoadAssignment{ClusterName: "c", Endpoints: []*endpoint.LocalityLbEndpoints{ // resource update
+			{Priority: 15},
+		}}
+		err = cache.UpdateResources(map[string]types.Resource{"c": c}, nil)
+		require.NoError(t, err)
+		verifyResponseResources(t, w, testType, cache.getVersion(), "c")
+		checkVersionMapNotSet(t, cache)
+
+		assert.Empty(t, cache.watches["a"])
+		assert.Empty(t, cache.watches["b"])
+		assert.Empty(t, cache.watches["c"])
+	})
+
+	t.Run("watches return full state for types requesting it", func(t *testing.T) {
+		a := &cluster.Cluster{Name: "a"}
+		b := &cluster.Cluster{Name: "b"}
+		c := &cluster.Cluster{Name: "c"}
+		// ClusterType requires all resources to always be returned
+		cache := NewLinearCache(resource.ClusterType, WithInitialResources(map[string]types.Resource{
+			"a": a,
+			"b": b,
+			"c": c,
+		}), WithLogger(log.NewTestLogger(t)))
+		assert.Equal(t, 3, cache.NumResources())
+
+		// Non-wildcard request
+		nonWildcardState := stream.NewStreamState(false, nil)
+		w1 := make(chan Response, 1)
+		_ = cache.CreateWatch(&Request{ResourceNames: []string{"a", "b", "d"}, TypeUrl: resource.ClusterType, VersionInfo: cache.getVersion()}, nonWildcardState, w1)
+		mustBlock(t, w1)
+		checkVersionMapNotSet(t, cache)
+
+		// wildcard request
+		wildcardState := stream.NewStreamState(true, nil)
+		w2 := make(chan Response, 1)
+		_ = cache.CreateWatch(&Request{ResourceNames: nil, TypeUrl: resource.ClusterType, VersionInfo: cache.getVersion()}, wildcardState, w2)
+		mustBlock(t, w2)
+		checkVersionMapNotSet(t, cache)
+
+		// request not requesting b
+		otherState := stream.NewStreamState(false, nil)
+		w3 := make(chan Response, 1)
+		_ = cache.CreateWatch(&Request{ResourceNames: []string{"a", "c", "d"}, TypeUrl: resource.ClusterType, VersionInfo: cache.getVersion()}, otherState, w3)
+		mustBlock(t, w3)
+		checkVersionMapNotSet(t, cache)
+
+		b.AltStatName = "othername"
+		err := cache.UpdateResources(map[string]types.Resource{"b": b}, nil)
+		require.NoError(t, err)
+
+		// Other watch has not triggered
+		mustBlock(t, w3)
+
+		verifyResponseResources(t, w1, resource.ClusterType, cache.getVersion(), "a", "b")      // a is also returned as cluster requires full state
+		verifyResponseResources(t, w2, resource.ClusterType, cache.getVersion(), "a", "b", "c") // a and c are also returned wildcard
+
+		// Recreate the watches
+		w1 = make(chan Response, 1)
+		_ = cache.CreateWatch(&Request{ResourceNames: []string{"a", "b", "d"}, TypeUrl: resource.ClusterType, VersionInfo: cache.getVersion()}, nonWildcardState, w1)
+		mustBlock(t, w1)
+		w2 = make(chan Response, 1)
+		_ = cache.CreateWatch(&Request{ResourceNames: nil, TypeUrl: resource.ClusterType, VersionInfo: cache.getVersion()}, wildcardState, w2)
+		mustBlock(t, w2)
+
+		// Update d, new resource in the cache
+		d := &cluster.Cluster{Name: "d"}
+		err = cache.UpdateResource("d", d)
+		require.NoError(t, err)
+
+		verifyResponseResources(t, w1, resource.ClusterType, cache.getVersion(), "a", "b", "d")
+		verifyResponseResources(t, w2, resource.ClusterType, cache.getVersion(), "a", "b", "c", "d")
+		verifyResponseResources(t, w3, resource.ClusterType, cache.getVersion(), "a", "c", "d")
+	})
+}
diff --git a/pkg/cache/v3/resource.go b/pkg/cache/v3/resource.go
index d4c25f2a11..0079afb08e 100644
--- a/pkg/cache/v3/resource.go
+++ b/pkg/cache/v3/resource.go
@@ -97,6 +97,24 @@ func GetResourceName(res types.Resource) string {
 	}
 }
 
+// ResourceRequiresFullStateInSotw indicates whether when building the reply in Sotw,
+// the response must include all existing resources or can return only the modified ones
+func ResourceRequiresFullStateInSotw(typeURL resource.Type) bool {
+	// From https://www.envoyproxy.io/docs/envoy/v1.28.0/api-docs/xds_protocol#grouping-resources-into-responses,
+	// when using sotw the control-plane MUST return all requested resources (or simply all if wildcard)
+	// for some types. This is relied on by xds-grpc which is explicitly requesting clusters and listeners
+	// but expects to receive all existing resources for those types. Missing clusters or listeners are 
+	// considered deleted.
+	switch typeURL {
+	case resource.ClusterType:
+		return true
+	case resource.ListenerType:
+		return true
+	default:
+		return false
+	}
+}
+
 // GetResourceName returns the resource names for a list of valid xDS response types.
 func GetResourceNames(resources []types.Resource) []string {
 	out := make([]string, len(resources))
diff --git a/pkg/log/test.go b/pkg/log/test.go
new file mode 100644
index 0000000000..214dc9eb69
--- /dev/null
+++ b/pkg/log/test.go
@@ -0,0 +1,33 @@
+package log
+
+import "testing"
+
+type testLogger struct {
+	t testing.TB
+}
+
+var _ Logger = testLogger{}
+
+func NewTestLogger(t testing.TB) Logger {
+	return testLogger{t}
+}
+
+// Debugf logs a message at level debug on the test logger.
+func (l testLogger) Debugf(msg string, args ...interface{}) {
+	l.t.Logf("[debug] "+msg, args...)
+}
+
+// Infof logs a message at level info on the test logger.
+func (l testLogger) Infof(msg string, args ...interface{}) {
+	l.t.Logf("[info] "+msg, args...)
+}
+
+// Warnf logs a message at level warn on the test logger.
+func (l testLogger) Warnf(msg string, args ...interface{}) {
+	l.t.Logf("[warn] "+msg, args...)
+}
+
+// Errorf logs a message at level error on the test logger.
+func (l testLogger) Errorf(msg string, args ...interface{}) {
+	l.t.Logf("[error] "+msg, args...)
+}