Skip to content

Commit

Permalink
Merges 1.14 into master
Browse files Browse the repository at this point in the history
  • Loading branch information
elena-kolevska committed Aug 29, 2024
2 parents a409bc1 + e53cf34 commit 9156779
Show file tree
Hide file tree
Showing 10 changed files with 30 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ set -e

export INFLUX_TOKEN=$(openssl rand -base64 32)
echo "INFLUX_TOKEN=$INFLUX_TOKEN" >> $GITHUB_ENV
docker-compose -f .github/infrastructure/docker-compose-influxdb.yml -p influxdb up -d
docker compose -f .github/infrastructure/docker-compose-influxdb.yml -p influxdb up -d
2 changes: 1 addition & 1 deletion .github/scripts/components-scripts/docker-compose.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ set -e
FILE="$1"
PROJECT="${2:-$FILE}"

docker-compose -f .github/infrastructure/docker-compose-${FILE}.yml -p ${PROJECT} up -d
docker compose -f .github/infrastructure/docker-compose-${FILE}.yml -p ${PROJECT} up -d
30 changes: 11 additions & 19 deletions pubsub/aws/snssqs/snssqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import (
)

type snsSqs struct {
topicsLocker TopicsLocker
topicLock sync.RWMutex
// key is the sanitized topic name
topicArns map[string]string
// key is the topic name, value holds the ARN of the queue and its url.
Expand Down Expand Up @@ -169,11 +169,10 @@ func (s *snsSqs) Init(ctx context.Context, metadata pubsub.Metadata) error {
}
// subscription manager responsible for managing the lifecycle of subscriptions.
s.subscriptionManager = NewSubscriptionMgmt(s.logger)
s.topicsLocker = NewLockManager()

s.topicArns = make(map[string]string)
s.queues = make(map[string]*sqsQueueInfo)
s.subscriptions = make(map[string]string)
s.topicArns = make(map[string]string)

return nil
}
Expand Down Expand Up @@ -241,17 +240,14 @@ func (s *snsSqs) getTopicArn(parentCtx context.Context, topic string) (string, e
func (s *snsSqs) getOrCreateTopic(ctx context.Context, topic string) (topicArn string, sanitizedTopic string, err error) {
sanitizedTopic = nameToAWSSanitizedName(topic, s.metadata.Fifo)

var loadOK bool
if topicArn, loadOK = s.topicArns[sanitizedTopic]; loadOK {
if len(topicArn) > 0 {
s.logger.Debugf("Found existing topic ARN for topic %s: %s", topic, topicArn)

return topicArn, sanitizedTopic, err
} else {
err = fmt.Errorf("the ARN for (sanitized) topic: %s was empty", sanitizedTopic)
var exists bool
s.topicLock.RLock()
topicArn, exists = s.topicArns[sanitizedTopic]
s.topicLock.RUnlock()

return topicArn, sanitizedTopic, err
}
if exists {
s.logger.Debugf("Found existing topic ARN for topic %s: %s", topic, topicArn)
return topicArn, sanitizedTopic, err
}

// creating queues is idempotent, the names serve as unique keys among a given region.
Expand All @@ -274,7 +270,9 @@ func (s *snsSqs) getOrCreateTopic(ctx context.Context, topic string) (topicArn s
}

// record topic ARN.
s.topicLock.Lock()
s.topicArns[sanitizedTopic] = topicArn
s.topicLock.Unlock()

return topicArn, sanitizedTopic, err
}
Expand Down Expand Up @@ -760,9 +758,6 @@ func (s *snsSqs) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, han
return errors.New("component is closed")
}

s.topicsLocker.Lock(req.Topic)
defer s.topicsLocker.Unlock(req.Topic)

// subscribers declare a topic ARN and declare a SQS queue to use
// these should be idempotent - queues should not be created if they exist.
topicArn, sanitizedName, err := s.getOrCreateTopic(ctx, req.Topic)
Expand Down Expand Up @@ -842,9 +837,6 @@ func (s *snsSqs) Publish(ctx context.Context, req *pubsub.PublishRequest) error
return errors.New("component is closed")
}

s.topicsLocker.Lock(req.Topic)
defer s.topicsLocker.Unlock(req.Topic)

topicArn, _, err := s.getOrCreateTopic(ctx, req.Topic)
if err != nil {
s.logger.Errorf("error getting topic ARN for %s: %v", req.Topic, err)
Expand Down
44 changes: 0 additions & 44 deletions pubsub/aws/snssqs/topics_locker.go

This file was deleted.

2 changes: 0 additions & 2 deletions pubsub/mqtt3/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,6 @@ func (m *mqttPubSub) Subscribe(ctx context.Context, req pubsub.SubscribeRequest,
}

// Delete the topic from the map first, which stops routing messages to handlers
m.subscribingLock.Lock()
defer m.subscribingLock.Unlock()
delete(m.topics, topic)

// We will call Unsubscribe only if cleanSession is true or if "unsubscribeOnClose" in the request metadata is true
Expand Down
12 changes: 8 additions & 4 deletions tests/certification/flow/dockercompose/dockercompose.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ func Up(project, filename string) flow.Runnable {

func (c Compose) Up(ctx flow.Context) error {
out, err := exec.Command(
"docker-compose",
"docker",
"compose",
"-p", c.project,
"-f", c.filename,
"up", "-d",
Expand All @@ -65,7 +66,8 @@ func Down(project, filename string) flow.Runnable {

func (c Compose) Down(ctx flow.Context) error {
out, err := exec.Command(
"docker-compose",
"docker",
"compose",
"-p", c.project,
"-f", c.filename,
"down", "-v").CombinedOutput()
Expand All @@ -81,12 +83,13 @@ func Start(project, filename string, services ...string) flow.Runnable {
func (c Compose) Start(services ...string) flow.Runnable {
return func(ctx flow.Context) error {
args := []string{
"compose",
"-p", c.project,
"-f", c.filename,
"start",
}
args = append(args, services...)
out, err := exec.Command("docker-compose", args...).CombinedOutput()
out, err := exec.Command("docker", args...).CombinedOutput()
ctx.Log(string(out))
return err
}
Expand All @@ -99,12 +102,13 @@ func Stop(project, filename string, services ...string) flow.Runnable {
func (c Compose) Stop(services ...string) flow.Runnable {
return func(ctx flow.Context) error {
args := []string{
"compose",
"-p", c.project,
"-f", c.filename,
"stop",
}
args = append(args, services...)
out, err := exec.Command("docker-compose", args...).CombinedOutput()
out, err := exec.Command("docker", args...).CombinedOutput()
ctx.Log(string(out))
return err
}
Expand Down
4 changes: 2 additions & 2 deletions tests/certification/pubsub/pulsar/pulsar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func TestPulsar(t *testing.T) {

t.Log("Starting OAuth2 server...")
out, err := exec.Command(
"docker-compose",
"docker", "compose",
"-p", "oauth2",
"-f", dockerComposeMockOAuth2YAML,
"up", "-d").CombinedOutput()
Expand All @@ -133,7 +133,7 @@ func TestPulsar(t *testing.T) {
t.Cleanup(func() {
t.Log("Stopping OAuth2 server...")
out, err = exec.Command(
"docker-compose",
"docker", "compose",
"-p", "oauth2",
"-f", dockerComposeMockOAuth2YAML,
"down", "-v",
Expand Down
8 changes: 4 additions & 4 deletions tests/certification/secretstores/hashicorp/vault/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ This secret store [supports the following features][features]:
### Tests for `vaultToken` and `vaultTokenMountPath`

1. Verify `vaultToken` is used (happy case)
* The baseline fo this test is all the previous test are using a known-to-work value that matches what our docker-compose environment sets up.
* The baseline fo this test is all the previous test are using a known-to-work value that matches what our docker compose environment sets up.
1. Verify failure when we use a `vaultToken` value that does not match what our environment sets up
1. Verify `vaultTokenMountPath` is used (happy case)
1. Verify failure when `vaultTokenMountPath` points to a broken path
Expand All @@ -65,7 +65,7 @@ This secret store [supports the following features][features]:
### Tests for vaultAddr

1. Verify `vaultAddr` is used (happy case)
* The baseline fo this test is all the previous test are using this flag with a known-to-work value that matches what our docker-compose environment sets up and is **not the default**.
* The baseline fo this test is all the previous test are using this flag with a known-to-work value that matches what our docker compose environment sets up and is **not the default**.
1. Verify initialization and operation success when `vaultAddr` is missing `skipVerify` is `true`
* Start a vault instance using a self-signed HTTPS certificate.
* Component configuration lacks `vaultAddr` and defaults to address `https://127.0.0.1:8200`
Expand Down Expand Up @@ -147,12 +147,12 @@ GOLANG_PROTOBUF_REGISTRATION_CONFLICT=warn go test -run TestVersioning -v .

### Docker-compose

You might need to verify if docker-compose is doing what you think it is doing: seeding the right secrets or even booting up properly.
You might need to verify if docker compose is doing what you think it is doing: seeding the right secrets or even booting up properly.

Head to the directory hosting the `docker-compose-hashicorp-vault.yml` file and run:

```shell
docker-compose -f docker-compose-hashicorp-vault.yml up --remove-orphans
docker compose -f docker-compose-hashicorp-vault.yml up --remove-orphans
```

# References:
Expand Down
2 changes: 1 addition & 1 deletion tests/certification/state/cockroachdb/v1/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ The purpose of this module is to provide tests that certify the CockroachDB Stat
## Connection Test
* Verify the connection is established to CockroachDB.
* Create component spec.
* Run the component with docker-compose
* Run the component with docker compose
* Run dapr application with component.
* Ensure that you have access to the queue and connection to the queue is established.

Expand Down
4 changes: 2 additions & 2 deletions tests/conformance/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@
1. Test setup is independent of the test run.
2. Run the service that needs to conformance tested locally or in your own cloud account.

- For cloud-agnostic components such as Kafka, MQTT etc., there are `docker-compose` definitions under the [/.github/infrastructure](../../.github/infrastructure/) folder you can use to quickly create an instance of the service. For example, to setup Kafka for conformance tests:
- For cloud-agnostic components such as Kafka, MQTT etc., there are `docker compose` definitions under the [/.github/infrastructure](../../.github/infrastructure/) folder you can use to quickly create an instance of the service. For example, to setup Kafka for conformance tests:

```bash
docker-compose -f ./.github/infrastructure/docker-compose-kafka.yml -p kafka up -d
docker compose -f ./.github/infrastructure/docker-compose-kafka.yml -p kafka up -d
```

- For Azure components such as Blob Storage, Key Vault etc., there is an automation script that can help you create the resources under your subscription, and extract the environment variables needed to run the conformance tests. See [/.github/infrastructure/conformance/azure/README.md](../../.github/infrastructure/conformance/azure/README.md) for more details.
Expand Down

0 comments on commit 9156779

Please sign in to comment.