Skip to content

Commit

Permalink
Merge pull request #1349 from ubitransports/master-gps-attributes
Browse files Browse the repository at this point in the history
feat(GPS): allow send attributes in Google PubSub message.
  • Loading branch information
makasim authored Aug 12, 2024
2 parents bb6e759 + cf6bbec commit 6b48a41
Show file tree
Hide file tree
Showing 16 changed files with 117 additions and 46 deletions.
4 changes: 2 additions & 2 deletions bin/changelog
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ then
exit 1
fi

docker-compose run -e CHANGELOG_GITHUB_TOKEN=${CHANGELOG_GITHUB_TOKEN:-""} --workdir="/mqdev" --rm generate-changelog github_changelog_generator --future-release "$1" --no-issues --unreleased-only --output "CHANGELOG_FUTURE.md"
docker compose run -e CHANGELOG_GITHUB_TOKEN=${CHANGELOG_GITHUB_TOKEN:-""} --workdir="/mqdev" --rm generate-changelog github_changelog_generator --future-release "$1" --no-issues --unreleased-only --output "CHANGELOG_FUTURE.md"

#git add CHANGELOG.md && git commit -m "Release $1" -S && git push origin "$CURRENT_BRANCH"
git add CHANGELOG.md && git commit -m "Release $1" -S && git push origin "$CURRENT_BRANCH"
8 changes: 4 additions & 4 deletions bin/dev
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ set -e
while getopts "bustefdp" OPTION; do
case $OPTION in
b)
docker-compose pull -q && docker-compose build
docker compose pull -q && docker compose build
;;
u)
docker-compose up
docker compose up
;;
s)
docker-compose stop
docker compose stop
;;
e)
docker exec -it mqdev_dev_1 /bin/bash
Expand All @@ -21,7 +21,7 @@ while getopts "bustefdp" OPTION; do
./bin/php-cs-fixer fix
;;

d) docker-compose run --workdir="/mqdev" --rm dev php pkg/enqueue-bundle/Tests/Functional/app/console.php config:dump-reference enqueue -vvv
d) docker compose run --workdir="/mqdev" --rm dev php pkg/enqueue-bundle/Tests/Functional/app/console.php config:dump-reference enqueue -vvv
;;
\?)
echo "Invalid option: -$OPTARG" >&2
Expand Down
2 changes: 1 addition & 1 deletion bin/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
set -x
set -e

docker-compose run --workdir="/mqdev" --rm dev ./docker/bin/test.sh $@
docker compose run --workdir="/mqdev" --rm dev ./docker/bin/test.sh $@
3 changes: 1 addition & 2 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
"doctrine/persistence": "^2.0|^3.0",
"mongodb/mongodb": "^1.2",
"pda/pheanstalk": "^3.1",
"aws/aws-sdk-php": "^3.155",
"aws/aws-sdk-php": "^3.290",
"stomp-php/stomp-php": "^4.5|^5",
"php-http/guzzle7-adapter": "^0.1.1",
"php-http/client-common": "^2.2.1",
Expand Down Expand Up @@ -137,4 +137,3 @@
}
}
}

17 changes: 9 additions & 8 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,16 @@ services:
- PREDIS_DSN=redis+predis://redis
- PHPREDIS_DSN=redis+phpredis://redis
- GPS_DSN=gps:?projectId=mqdev&emulatorHost=http://google-pubsub:8085
- SQS_DSN=sqs:?key=key&secret=secret&region=us-east-1&endpoint=http://localstack:4576&version=latest
- SNS_DSN=sns:?key=key&secret=secret&region=us-east-1&endpoint=http://localstack:4575&version=latest
- SNSQS_DSN=snsqs:?key=key&secret=secret&region=us-east-1&sns_endpoint=http://localstack:4575&sqs_endpoint=http://localstack:4576&version=latest
- SQS_DSN=sqs:?key=key&secret=secret&region=us-east-1&endpoint=http://localstack:4566&version=latest
- SNS_DSN=sns:?key=key&secret=secret&region=us-east-1&endpoint=http://localstack:4566&version=latest
- SNSQS_DSN=snsqs:?key=key&secret=secret&region=us-east-1&sns_endpoint=http://localstack:4566&sqs_endpoint=http://localstack:4566&version=latest
- WAMP_DSN=wamp://thruway:9090
- REDIS_HOST=redis
- REDIS_PORT=6379
- AWS_SQS_KEY=key
- AWS_SQS_SECRET=secret
- AWS_SQS_REGION=us-east-1
- AWS_SQS_ENDPOINT=http://localstack:4576
- AWS_SQS_ENDPOINT=http://localstack:4566
- AWS_SQS_VERSION=latest
- BEANSTALKD_DSN=beanstalk://beanstalkd:11300
- GEARMAN_DSN=gearman://gearmand:4730
Expand Down Expand Up @@ -83,6 +83,7 @@ services:

mysql:
image: mysql:5.7
platform: linux/amd64
environment:
MYSQL_ROOT_PASSWORD: rootpass
MYSQL_DATABASE: mqdev
Expand Down Expand Up @@ -127,13 +128,13 @@ services:
- '9090:9090'

localstack:
image: 'localstack/localstack:0.8.10'
image: 'localstack/localstack:3.6.0'
ports:
- '4576:4576'
- '4575:4575'
- "127.0.0.1:4566:4566" # LocalStack Gateway
- "127.0.0.1:4510-4559:4510-4559" # external services port range
environment:
HOSTNAME_EXTERNAL: 'localstack'
SERVICES: 'sqs,sns'
SERVICES: 's3,sqs,sns'

influxdb:
image: 'influxdb:latest'
Expand Down
2 changes: 1 addition & 1 deletion docker/bin/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ waitForService gearmand 4730 50
waitForService kafka 9092 50
waitForService mongo 27017 50
waitForService thruway 9090 50
waitForService localstack 4576 50
waitForService localstack 4566 50

php docker/bin/refresh-mysql-database.php || exit 1
php docker/bin/refresh-postgres-database.php || exit 1
Expand Down
2 changes: 1 addition & 1 deletion docs/contribution.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,6 @@ Once everything is done open a pull request on official repository.

## WTF?!

* If you get `rabbitmqssl: forward host lookup failed: Unknown host, wait for service rabbitmqssl:5671` do `docker-compose down`.
* If you get `rabbitmqssl: forward host lookup failed: Unknown host, wait for service rabbitmqssl:5671` do `docker compose down`.

[back to index](index.md)
8 changes: 4 additions & 4 deletions phpunit.xml.dist
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,6 @@
<directory>pkg/sns/Tests</directory>
</testsuite>

<testsuite name="snsqs transport">
<directory>pkg/snsqs/Tests</directory>
</testsuite>

<testsuite name="pheanstalk transport">
<directory>pkg/pheanstalk/Tests</directory>
</testsuite>
Expand Down Expand Up @@ -123,6 +119,10 @@
<testsuite name="monitoring">
<directory>pkg/monitoring/Tests</directory>
</testsuite>

<testsuite name="snsqs transport">
<directory>pkg/snsqs/Tests</directory>
</testsuite>
</testsuites>

<php>
Expand Down
9 changes: 7 additions & 2 deletions pkg/amqp-ext/Tests/Functional/AmqpCommonUseCasesTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public function testProduceAndReceiveOneMessageSentDirectlyToTemporaryQueue()
$queue = $this->amqpContext->createTemporaryQueue();

$message = $this->amqpContext->createMessage(__METHOD__);
$message->setDeliveryTag(145);

$producer = $this->amqpContext->createProducer();
$producer->send($queue, $message);
Expand All @@ -137,6 +138,7 @@ public function testProduceAndReceiveOneMessageSentDirectlyToTopic()
$this->amqpContext->bind(new AmqpBind($topic, $queue));

$message = $this->amqpContext->createMessage(__METHOD__);
$message->setDeliveryTag(145);

$producer = $this->amqpContext->createProducer();
$producer->send($topic, $message);
Expand All @@ -158,10 +160,11 @@ public function testConsumerReceiveMessageFromTopicDirectly()
$this->amqpContext->declareTopic($topic);

$consumer = $this->amqpContext->createConsumer($topic);
//guard
// guard
$this->assertNull($consumer->receive(1000));

$message = $this->amqpContext->createMessage(__METHOD__);
$message->setDeliveryTag(145);

$producer = $this->amqpContext->createProducer();
$producer->send($topic, $message);
Expand All @@ -181,10 +184,11 @@ public function testConsumerReceiveMessageWithZeroTimeout()
$this->amqpContext->declareTopic($topic);

$consumer = $this->amqpContext->createConsumer($topic);
//guard
// guard
$this->assertNull($consumer->receive(1000));

$message = $this->amqpContext->createMessage(__METHOD__);
$message->setDeliveryTag(145);

$producer = $this->amqpContext->createProducer();
$producer->send($topic, $message);
Expand All @@ -205,6 +209,7 @@ public function testPurgeMessagesFromQueue()
$consumer = $this->amqpContext->createConsumer($queue);

$message = $this->amqpContext->createMessage(__METHOD__);
$message->setDeliveryTag(145);

$producer = $this->amqpContext->createProducer();
$producer->send($queue, $message);
Expand Down
31 changes: 24 additions & 7 deletions pkg/gps/GpsMessage.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ class GpsMessage implements Message, \JsonSerializable
*/
private $headers;

/**
* @var array
*/
private $attributes;

/**
* @var bool
*/
Expand All @@ -34,11 +39,12 @@ class GpsMessage implements Message, \JsonSerializable
*/
private $nativeMessage;

public function __construct(string $body = '', array $properties = [], array $headers = [])
public function __construct(string $body = '', array $properties = [], array $headers = [], array $attributes = [])
{
$this->body = $body;
$this->properties = $properties;
$this->headers = $headers;
$this->attributes = $attributes;

$this->redelivered = false;
}
Expand Down Expand Up @@ -103,7 +109,7 @@ public function isRedelivered(): bool
return $this->redelivered;
}

public function setCorrelationId(string $correlationId = null): void
public function setCorrelationId(?string $correlationId = null): void
{
$this->setHeader('correlation_id', $correlationId);
}
Expand All @@ -113,7 +119,7 @@ public function getCorrelationId(): ?string
return $this->getHeader('correlation_id');
}

public function setMessageId(string $messageId = null): void
public function setMessageId(?string $messageId = null): void
{
$this->setHeader('message_id', $messageId);
}
Expand All @@ -130,12 +136,12 @@ public function getTimestamp(): ?int
return null === $value ? null : (int) $value;
}

public function setTimestamp(int $timestamp = null): void
public function setTimestamp(?int $timestamp = null): void
{
$this->setHeader('timestamp', $timestamp);
}

public function setReplyTo(string $replyTo = null): void
public function setReplyTo(?string $replyTo = null): void
{
$this->setHeader('reply_to', $replyTo);
}
Expand All @@ -151,6 +157,7 @@ public function jsonSerialize(): array
'body' => $this->getBody(),
'properties' => $this->getProperties(),
'headers' => $this->getHeaders(),
'attributes' => $this->getAttributes(),
];
}

Expand All @@ -161,16 +168,26 @@ public static function jsonUnserialize(string $json): self
throw new \InvalidArgumentException(sprintf('The malformed json given. Error %s and message %s', json_last_error(), json_last_error_msg()));
}

return new self($data['body'] ?? $json, $data['properties'] ?? [], $data['headers'] ?? []);
return new self($data['body'] ?? $json, $data['properties'] ?? [], $data['headers'] ?? [], $data['attributes'] ?? []);
}

public function getNativeMessage(): ?GoogleMessage
{
return $this->nativeMessage;
}

public function setNativeMessage(GoogleMessage $message = null): void
public function setNativeMessage(?GoogleMessage $message = null): void
{
$this->nativeMessage = $message;
}

public function setAttributes(array $attributes): void
{
$this->attributes = $attributes;
}

public function getAttributes(): array
{
return $this->attributes;
}
}
17 changes: 11 additions & 6 deletions pkg/gps/GpsProducer.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,17 @@ public function send(Destination $destination, Message $message): void

/** @var Topic $topic */
$topic = $this->context->getClient()->topic($destination->getTopicName());
$topic->publish([
'data' => json_encode($message),
]);

$params = ['data' => json_encode($message)];

if (count($message->getAttributes()) > 0) {
$params['attributes'] = $message->getAttributes();
}

$topic->publish($params);
}

public function setDeliveryDelay(int $deliveryDelay = null): Producer
public function setDeliveryDelay(?int $deliveryDelay = null): Producer
{
if (null === $deliveryDelay) {
return $this;
Expand All @@ -56,7 +61,7 @@ public function getDeliveryDelay(): ?int
return null;
}

public function setPriority(int $priority = null): Producer
public function setPriority(?int $priority = null): Producer
{
if (null === $priority) {
return $this;
Expand All @@ -70,7 +75,7 @@ public function getPriority(): ?int
return null;
}

public function setTimeToLive(int $timeToLive = null): Producer
public function setTimeToLive(?int $timeToLive = null): Producer
{
if (null === $timeToLive) {
return $this;
Expand Down
21 changes: 16 additions & 5 deletions pkg/gps/Tests/GpsMessageTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,18 @@ public function testCouldSetGetNativeMessage()

public function testColdBeSerializedToJson()
{
$message = new GpsMessage('theBody', ['thePropFoo' => 'thePropFooVal'], ['theHeaderFoo' => 'theHeaderFooVal']);
$message = new GpsMessage('theBody', ['thePropFoo' => 'thePropFooVal'], ['theHeaderFoo' => 'theHeaderFooVal'], ['theAttributeFoo' => 'theAttributeFooVal']);

$this->assertEquals('{"body":"theBody","properties":{"thePropFoo":"thePropFooVal"},"headers":{"theHeaderFoo":"theHeaderFooVal"}}', json_encode($message));
$this->assertEquals('{"body":"theBody","properties":{"thePropFoo":"thePropFooVal"},"headers":{"theHeaderFoo":"theHeaderFooVal"},"attributes":{"theAttributeFoo":"theAttributeFooVal"}}', json_encode($message));
}

public function testCouldBeUnserializedFromJson()
{
$message = new GpsMessage('theBody', ['thePropFoo' => 'thePropFooVal'], ['theHeaderFoo' => 'theHeaderFooVal']);
$message = new GpsMessage('theBody', ['thePropFoo' => 'thePropFooVal'], ['theHeaderFoo' => 'theHeaderFooVal'], ['theAttributeFoo' => 'theAttributeFooVal']);

$json = json_encode($message);

//guard
// guard
$this->assertNotEmpty($json);

$unserializedMessage = GpsMessage::jsonUnserialize($json);
Expand All @@ -40,7 +40,7 @@ public function testCouldBeUnserializedFromJson()

public function testMessageEntityCouldBeUnserializedFromJson()
{
$json = '{"body":"theBody","properties":{"thePropFoo":"thePropFooVal"},"headers":{"theHeaderFoo":"theHeaderFooVal"}}';
$json = '{"body":"theBody","properties":{"thePropFoo":"thePropFooVal"},"headers":{"theHeaderFoo":"theHeaderFooVal"},"attributes":{"theAttributeFoo":"theAttributeFooVal"}}';

$unserializedMessage = GpsMessage::jsonUnserialize($json);

Expand All @@ -49,6 +49,7 @@ public function testMessageEntityCouldBeUnserializedFromJson()
$this->assertEquals($decoded['body'], $unserializedMessage->getBody());
$this->assertEquals($decoded['properties'], $unserializedMessage->getProperties());
$this->assertEquals($decoded['headers'], $unserializedMessage->getHeaders());
$this->assertEquals($decoded['attributes'], $unserializedMessage->getAttributes());
}

public function testMessagePayloadCouldBeUnserializedFromJson()
Expand All @@ -61,6 +62,7 @@ public function testMessagePayloadCouldBeUnserializedFromJson()
$this->assertEquals($json, $unserializedMessage->getBody());
$this->assertEquals([], $unserializedMessage->getProperties());
$this->assertEquals([], $unserializedMessage->getHeaders());
$this->assertEquals([], $unserializedMessage->getAttributes());
}

public function testThrowIfMalformedJsonGivenOnUnsterilizedFromJson()
Expand All @@ -70,4 +72,13 @@ public function testThrowIfMalformedJsonGivenOnUnsterilizedFromJson()

GpsMessage::jsonUnserialize('{]');
}

public function testGetAttributes()
{
$message = new GpsMessage('the body', [], [], ['key1' => 'value1']);

$attributes = $message->getAttributes();

$this->assertSame(['key1' => 'value1'], $attributes);
}
}
Loading

0 comments on commit 6b48a41

Please sign in to comment.