diff --git a/.github/scripts/components-scripts/conformance-bindings.influx-setup.sh b/.github/scripts/components-scripts/conformance-bindings.influx-setup.sh index ea97035a4d..96be07e12b 100755 --- a/.github/scripts/components-scripts/conformance-bindings.influx-setup.sh +++ b/.github/scripts/components-scripts/conformance-bindings.influx-setup.sh @@ -4,4 +4,4 @@ set -e export INFLUX_TOKEN=$(openssl rand -base64 32) echo "INFLUX_TOKEN=$INFLUX_TOKEN" >> $GITHUB_ENV -docker-compose -f .github/infrastructure/docker-compose-influxdb.yml -p influxdb up -d +docker compose -f .github/infrastructure/docker-compose-influxdb.yml -p influxdb up -d diff --git a/.github/scripts/components-scripts/docker-compose.sh b/.github/scripts/components-scripts/docker-compose.sh index c71bac388b..e6111bad2c 100755 --- a/.github/scripts/components-scripts/docker-compose.sh +++ b/.github/scripts/components-scripts/docker-compose.sh @@ -5,4 +5,4 @@ set -e FILE="$1" PROJECT="${2:-$FILE}" -docker-compose -f .github/infrastructure/docker-compose-${FILE}.yml -p ${PROJECT} up -d +docker compose -f .github/infrastructure/docker-compose-${FILE}.yml -p ${PROJECT} up -d diff --git a/pubsub/aws/snssqs/snssqs.go b/pubsub/aws/snssqs/snssqs.go index e34d23ca7c..501a7344aa 100644 --- a/pubsub/aws/snssqs/snssqs.go +++ b/pubsub/aws/snssqs/snssqs.go @@ -42,7 +42,7 @@ import ( ) type snsSqs struct { - topicsLocker TopicsLocker + topicLock sync.RWMutex // key is the sanitized topic name topicArns map[string]string // key is the topic name, value holds the ARN of the queue and its url. @@ -169,11 +169,10 @@ func (s *snsSqs) Init(ctx context.Context, metadata pubsub.Metadata) error { } // subscription manager responsible for managing the lifecycle of subscriptions. s.subscriptionManager = NewSubscriptionMgmt(s.logger) - s.topicsLocker = NewLockManager() - s.topicArns = make(map[string]string) s.queues = make(map[string]*sqsQueueInfo) s.subscriptions = make(map[string]string) + s.topicArns = make(map[string]string) return nil } @@ -241,17 +240,14 @@ func (s *snsSqs) getTopicArn(parentCtx context.Context, topic string) (string, e func (s *snsSqs) getOrCreateTopic(ctx context.Context, topic string) (topicArn string, sanitizedTopic string, err error) { sanitizedTopic = nameToAWSSanitizedName(topic, s.metadata.Fifo) - var loadOK bool - if topicArn, loadOK = s.topicArns[sanitizedTopic]; loadOK { - if len(topicArn) > 0 { - s.logger.Debugf("Found existing topic ARN for topic %s: %s", topic, topicArn) - - return topicArn, sanitizedTopic, err - } else { - err = fmt.Errorf("the ARN for (sanitized) topic: %s was empty", sanitizedTopic) + var exists bool + s.topicLock.RLock() + topicArn, exists = s.topicArns[sanitizedTopic] + s.topicLock.RUnlock() - return topicArn, sanitizedTopic, err - } + if exists { + s.logger.Debugf("Found existing topic ARN for topic %s: %s", topic, topicArn) + return topicArn, sanitizedTopic, err } // creating queues is idempotent, the names serve as unique keys among a given region. @@ -274,7 +270,9 @@ func (s *snsSqs) getOrCreateTopic(ctx context.Context, topic string) (topicArn s } // record topic ARN. + s.topicLock.Lock() s.topicArns[sanitizedTopic] = topicArn + s.topicLock.Unlock() return topicArn, sanitizedTopic, err } @@ -760,9 +758,6 @@ func (s *snsSqs) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, han return errors.New("component is closed") } - s.topicsLocker.Lock(req.Topic) - defer s.topicsLocker.Unlock(req.Topic) - // subscribers declare a topic ARN and declare a SQS queue to use // these should be idempotent - queues should not be created if they exist. topicArn, sanitizedName, err := s.getOrCreateTopic(ctx, req.Topic) @@ -842,9 +837,6 @@ func (s *snsSqs) Publish(ctx context.Context, req *pubsub.PublishRequest) error return errors.New("component is closed") } - s.topicsLocker.Lock(req.Topic) - defer s.topicsLocker.Unlock(req.Topic) - topicArn, _, err := s.getOrCreateTopic(ctx, req.Topic) if err != nil { s.logger.Errorf("error getting topic ARN for %s: %v", req.Topic, err) diff --git a/pubsub/aws/snssqs/topics_locker.go b/pubsub/aws/snssqs/topics_locker.go deleted file mode 100644 index bbb934c026..0000000000 --- a/pubsub/aws/snssqs/topics_locker.go +++ /dev/null @@ -1,44 +0,0 @@ -package snssqs - -import ( - "sync" - - "github.com/puzpuzpuz/xsync/v3" -) - -// TopicsLockManager is a singleton for fine-grained locking, to prevent the component r/w operations -// from locking the entire component out when performing operations on different topics. -type TopicsLockManager struct { - xLockMap *xsync.MapOf[string, *sync.Mutex] -} - -type TopicsLocker interface { - Lock(topic string) *sync.Mutex - Unlock(topic string) -} - -func NewLockManager() *TopicsLockManager { - return &TopicsLockManager{xLockMap: xsync.NewMapOf[string, *sync.Mutex]()} -} - -func (lm *TopicsLockManager) Lock(key string) *sync.Mutex { - lock, _ := lm.xLockMap.LoadOrCompute(key, func() *sync.Mutex { - l := &sync.Mutex{} - l.Lock() - - return l - }) - - return lock -} - -func (lm *TopicsLockManager) Unlock(key string) { - lm.xLockMap.Compute(key, func(oldValue *sync.Mutex, exists bool) (newValue *sync.Mutex, delete bool) { - // if exists then the mutex must be already locked, and we unlock it - if exists { - oldValue.Unlock() - } - // we return to comply with the Compute signature, but not using the returned values - return oldValue, false - }) -} diff --git a/pubsub/mqtt3/mqtt.go b/pubsub/mqtt3/mqtt.go index 5a59d3afc7..f36a7ceb37 100644 --- a/pubsub/mqtt3/mqtt.go +++ b/pubsub/mqtt3/mqtt.go @@ -181,8 +181,6 @@ func (m *mqttPubSub) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, } // Delete the topic from the map first, which stops routing messages to handlers - m.subscribingLock.Lock() - defer m.subscribingLock.Unlock() delete(m.topics, topic) // We will call Unsubscribe only if cleanSession is true or if "unsubscribeOnClose" in the request metadata is true diff --git a/tests/certification/flow/dockercompose/dockercompose.go b/tests/certification/flow/dockercompose/dockercompose.go index afa8822148..83bb243b4e 100644 --- a/tests/certification/flow/dockercompose/dockercompose.go +++ b/tests/certification/flow/dockercompose/dockercompose.go @@ -49,7 +49,8 @@ func Up(project, filename string) flow.Runnable { func (c Compose) Up(ctx flow.Context) error { out, err := exec.Command( - "docker-compose", + "docker", + "compose", "-p", c.project, "-f", c.filename, "up", "-d", @@ -65,7 +66,8 @@ func Down(project, filename string) flow.Runnable { func (c Compose) Down(ctx flow.Context) error { out, err := exec.Command( - "docker-compose", + "docker", + "compose", "-p", c.project, "-f", c.filename, "down", "-v").CombinedOutput() @@ -81,12 +83,13 @@ func Start(project, filename string, services ...string) flow.Runnable { func (c Compose) Start(services ...string) flow.Runnable { return func(ctx flow.Context) error { args := []string{ + "compose", "-p", c.project, "-f", c.filename, "start", } args = append(args, services...) - out, err := exec.Command("docker-compose", args...).CombinedOutput() + out, err := exec.Command("docker", args...).CombinedOutput() ctx.Log(string(out)) return err } @@ -99,12 +102,13 @@ func Stop(project, filename string, services ...string) flow.Runnable { func (c Compose) Stop(services ...string) flow.Runnable { return func(ctx flow.Context) error { args := []string{ + "compose", "-p", c.project, "-f", c.filename, "stop", } args = append(args, services...) - out, err := exec.Command("docker-compose", args...).CombinedOutput() + out, err := exec.Command("docker", args...).CombinedOutput() ctx.Log(string(out)) return err } diff --git a/tests/certification/pubsub/pulsar/pulsar_test.go b/tests/certification/pubsub/pulsar/pulsar_test.go index 93f590d2d7..c8578dbb53 100644 --- a/tests/certification/pubsub/pulsar/pulsar_test.go +++ b/tests/certification/pubsub/pulsar/pulsar_test.go @@ -123,7 +123,7 @@ func TestPulsar(t *testing.T) { t.Log("Starting OAuth2 server...") out, err := exec.Command( - "docker-compose", + "docker", "compose", "-p", "oauth2", "-f", dockerComposeMockOAuth2YAML, "up", "-d").CombinedOutput() @@ -133,7 +133,7 @@ func TestPulsar(t *testing.T) { t.Cleanup(func() { t.Log("Stopping OAuth2 server...") out, err = exec.Command( - "docker-compose", + "docker", "compose", "-p", "oauth2", "-f", dockerComposeMockOAuth2YAML, "down", "-v", diff --git a/tests/certification/secretstores/hashicorp/vault/README.md b/tests/certification/secretstores/hashicorp/vault/README.md index 9f4f9c2f38..ba0fc06414 100644 --- a/tests/certification/secretstores/hashicorp/vault/README.md +++ b/tests/certification/secretstores/hashicorp/vault/README.md @@ -54,7 +54,7 @@ This secret store [supports the following features][features]: ### Tests for `vaultToken` and `vaultTokenMountPath` 1. Verify `vaultToken` is used (happy case) - * The baseline fo this test is all the previous test are using a known-to-work value that matches what our docker-compose environment sets up. + * The baseline fo this test is all the previous test are using a known-to-work value that matches what our docker compose environment sets up. 1. Verify failure when we use a `vaultToken` value that does not match what our environment sets up 1. Verify `vaultTokenMountPath` is used (happy case) 1. Verify failure when `vaultTokenMountPath` points to a broken path @@ -65,7 +65,7 @@ This secret store [supports the following features][features]: ### Tests for vaultAddr 1. Verify `vaultAddr` is used (happy case) - * The baseline fo this test is all the previous test are using this flag with a known-to-work value that matches what our docker-compose environment sets up and is **not the default**. + * The baseline fo this test is all the previous test are using this flag with a known-to-work value that matches what our docker compose environment sets up and is **not the default**. 1. Verify initialization and operation success when `vaultAddr` is missing `skipVerify` is `true` * Start a vault instance using a self-signed HTTPS certificate. * Component configuration lacks `vaultAddr` and defaults to address `https://127.0.0.1:8200` @@ -147,12 +147,12 @@ GOLANG_PROTOBUF_REGISTRATION_CONFLICT=warn go test -run TestVersioning -v . ### Docker-compose -You might need to verify if docker-compose is doing what you think it is doing: seeding the right secrets or even booting up properly. +You might need to verify if docker compose is doing what you think it is doing: seeding the right secrets or even booting up properly. Head to the directory hosting the `docker-compose-hashicorp-vault.yml` file and run: ```shell -docker-compose -f docker-compose-hashicorp-vault.yml up --remove-orphans +docker compose -f docker-compose-hashicorp-vault.yml up --remove-orphans ``` # References: diff --git a/tests/certification/state/cockroachdb/v1/README.md b/tests/certification/state/cockroachdb/v1/README.md index 74de6596ab..ea41c763cb 100644 --- a/tests/certification/state/cockroachdb/v1/README.md +++ b/tests/certification/state/cockroachdb/v1/README.md @@ -7,7 +7,7 @@ The purpose of this module is to provide tests that certify the CockroachDB Stat ## Connection Test * Verify the connection is established to CockroachDB. * Create component spec. - * Run the component with docker-compose + * Run the component with docker compose * Run dapr application with component. * Ensure that you have access to the queue and connection to the queue is established. diff --git a/tests/conformance/README.md b/tests/conformance/README.md index 01823f48b8..f2fd023f56 100644 --- a/tests/conformance/README.md +++ b/tests/conformance/README.md @@ -67,10 +67,10 @@ 1. Test setup is independent of the test run. 2. Run the service that needs to conformance tested locally or in your own cloud account. - - For cloud-agnostic components such as Kafka, MQTT etc., there are `docker-compose` definitions under the [/.github/infrastructure](../../.github/infrastructure/) folder you can use to quickly create an instance of the service. For example, to setup Kafka for conformance tests: + - For cloud-agnostic components such as Kafka, MQTT etc., there are `docker compose` definitions under the [/.github/infrastructure](../../.github/infrastructure/) folder you can use to quickly create an instance of the service. For example, to setup Kafka for conformance tests: ```bash - docker-compose -f ./.github/infrastructure/docker-compose-kafka.yml -p kafka up -d + docker compose -f ./.github/infrastructure/docker-compose-kafka.yml -p kafka up -d ``` - For Azure components such as Blob Storage, Key Vault etc., there is an automation script that can help you create the resources under your subscription, and extract the environment variables needed to run the conformance tests. See [/.github/infrastructure/conformance/azure/README.md](../../.github/infrastructure/conformance/azure/README.md) for more details.