diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..bbdf352 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,146 @@ +# Changelog + +All notable changes to this project will be documented in this file. + +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), +and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + +## [5.0.0] - 2021-09-10 +### Added + +- New log to inform when broker send an `ERROR` frame. +- Start to manage db connections, honoring the `CONN_MAX_AGE` defined in Django app settings. + +### Fixed + +- Update `stomp.py` from `6.0.0` to `~=7.0.0`. +- Update `tenacity` from `~=6.0` to `~=8.0`. + +## [4.2.1] - 2021-04-07 +### Changed + +- Pin `stomp.py` to `6.0.0` version to avoid its breaking changes. + +## [4.2.0] - 2020-08-13 +### Changed + +- Logs level from `tenacity` to `WARNING` + +### Fixed + +- Fix lost messages that was sent within a `STOMP` transaction when reconnections with the broker is necessary. + +## [4.1.2] - 2020-06-22 +### Changed + +- Standard headers generated by `django-stomp` (`tshoot-destination`, `x-dead-letter-routing-key` and `x-dead-letter-exchange`) cannot be overwritten. + +### Fixed + +- Properly handling of unsafe headers before a message is sent. + +## [4.1.1] - 2020-06-16 +### Changed + +- Removal of unsafe headers (such as `message-id`) before a message is publish to broker. + +### Fixed + +- Fix consumers that was stuck with messages without the `correlation-id` header when `STOMP_CORRELATION_ID_REQUIRED` is set to `true`. + +## [4.1.0] - 2020-05-26 +### Added + +- New command (`ack_all_messages`) to clean all messages from a queue. + +## [4.0.1] - 2020-05-22 +### Fixed + +- Remove the unnecessary creation of a [durable topic subscriber](https://activemq.apache.org/how-do-durable-queues-and-topics-work) in `ActiveMQ`. + +## [4.0.0] - 2020-05-11 +### Changed + +- Parameter `STOMP_OUTGOING_HEARTBIT` to `STOMP_OUTGOING_HEARTBEAT`. +- Parameter `STOMP_INCOMING_HEARTBIT` to `STOMP_INCOMING_HEARTBEAT`. +- The default interval values for incoming/outgoing heart-beats is 10s. + +## [3.2.0] - 2020-05-11 +### Changed + +- Command `move_messages` to move messages between two different brokers. + +## [3.1.0] - 2020-05-01 +### Added + +- Logs that tells the parameters used in the connection with a broker. + +## [3.0.0] - 2020-04-27 +### Added + +- New settings parameter (`STOMP_PROCESS_MSG_ON_BACKGROUND`) that enables the message processing to occur in background, allowing any other broker-consumer communication (such as heartbeats) to still take place. This behaviour is enabled by default. +- New optional settings parameter (`STOMP_PROCESS_MSG_WORKERS`) that controls the maximum number of worker threads (that process the message in the background) to be spawned. + +### Changed +- Enable [STOMP heart-beats](https://stomp.github.io/stomp-specification-1.1.html#Heart-beating) by default setting an interval of 6s for incoming and outgoing heart-beats. + +## [2.1.1] - 2020-04-13 +### Removed + +- `CustomStompConnection11`, since it's now possible to send a custom header when we send a `NACK` action with the standard `StompConnection` from `stomp.py`. + +## [2.1.0] - 2020-04-06 +### Added + +- New settings parameter (`STOMP_CORRELATION_ID_REQUIRED`) that control if a consumer require messages to contain a `correlation-id` header. The default value is `true`. + +## [2.0.1] - 2020-03-10 +### Added + +- New settings parameter (`STOMP_SERVER_VHOST`) that allow the consumer/publisher to connect with a server virtual host. + +## [2.0.0] - 2020-03-09 +### Added + +- A mechanism to mimic the [Virtual Topic](https://activemq.apache.org/virtual-destinations) behaviour from `ActiveMQ` in `RabbitMQ`. + +### Fixed + +- Pin `stomp.py` to `~=6.0` + +## [1.0.4] - 2020-01-23 +### Fixed + +- Fix the connection parameters to really consider zero values. + +## [1.0.3] - 2020-01-23 +### Fixed + +- Fix a helper function to correctly deal with `0` value when it's given. + +## [1.0.2] - 2020-01-07 +### Changed + +- Pin `request-id-django-log` to `0.1.1` version. +- Pin `stomp.py` to `~=4.1`. +- Pin `tenacity` to `~=6.0`. + +[5.0.0]: https://github.com/juntossomosmais/django-stomp/compare/4.2.1...5.0.0 +[4.2.1]: https://github.com/juntossomosmais/django-stomp/compare/4.2.0...4.2.1 +[4.2.0]: https://github.com/juntossomosmais/django-stomp/compare/4.1.2...4.2.0 +[4.1.2]: https://github.com/juntossomosmais/django-stomp/compare/4.1.1...4.1.2 +[4.1.1]: https://github.com/juntossomosmais/django-stomp/compare/4.1.0...4.1.1 +[4.1.0]: https://github.com/juntossomosmais/django-stomp/compare/4.0.1...4.1.0 +[4.0.1]: https://github.com/juntossomosmais/django-stomp/compare/4.0.0...4.0.1 +[4.0.0]: https://github.com/juntossomosmais/django-stomp/compare/3.2.0...4.0.0 +[3.2.0]: https://github.com/juntossomosmais/django-stomp/compare/3.1.0...3.2.0 +[3.1.0]: https://github.com/juntossomosmais/django-stomp/compare/3.0.0...3.1.0 +[3.0.0]: https://github.com/juntossomosmais/django-stomp/compare/2.1.1...3.0.0 +[2.1.1]: https://github.com/juntossomosmais/django-stomp/compare/2.1.0...2.1.1 +[2.1.0]: https://github.com/juntossomosmais/django-stomp/compare/2.0.1...2.1.0 +[2.0.1]: https://github.com/juntossomosmais/django-stomp/compare/2.0.0...2.0.1 +[2.0.0]: https://github.com/juntossomosmais/django-stomp/compare/1.0.4...2.0.0 +[1.0.4]: https://github.com/juntossomosmais/django-stomp/compare/1.0.3...1.0.4 +[1.0.3]: https://github.com/juntossomosmais/django-stomp/compare/1.0.2...1.0.3 +[1.0.2]: https://github.com/juntossomosmais/django-stomp/compare/1.0.1...1.0.2 +[1.0.1]: https://github.com/juntossomosmais/django-stomp/tree/1.0.1 \ No newline at end of file diff --git a/Pipfile b/Pipfile index 71d7cec..2d66262 100644 --- a/Pipfile +++ b/Pipfile @@ -19,9 +19,9 @@ pytest-trio = "*" [packages] django = "~=2.2" -"stomp.py" = "==6.0.0" +"stomp.py" = "~=7.0" request-id-django-log = "==0.1.1" -tenacity = "~=6.0" +tenacity = "~=8.0" [requires] python_version = "3.7" diff --git a/Pipfile.lock b/Pipfile.lock index cfa9bd3..ed40214 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -1,7 +1,7 @@ { "_meta": { "hash": { - "sha256": "7766e6bba9c430db5510914400524d5bb413a54356e1425275573f5ced5c6e4f" + "sha256": "143808dafdd2aa4003f3bdd0684959d500f3fc0f0e82f99df8f163d5b7cefcb9" }, "pipfile-spec": 6, "requires": { @@ -18,11 +18,11 @@ "default": { "django": { "hashes": [ - "sha256:2484f115891ab1a0e9ae153602a641fbc15d7894c036d79fb78662c0965d7954", - "sha256:2569f9dc5f8e458a5e988b03d6b7a02bda59b006d6782f4ea0fd590ed7336a64" + "sha256:3339ff0e03dee13045aef6ae7b523edff75b6d726adf7a7a48f53d5a501f7db7", + "sha256:f2084ceecff86b1e631c2cd4107d435daf4e12f1efcdf11061a73bf0b5e95f92" ], "index": "pypi", - "version": "==2.2.20" + "version": "==2.2.24" }, "docopt": { "hashes": [ @@ -45,45 +45,31 @@ "index": "pypi", "version": "==0.1.1" }, - "six": { - "hashes": [ - "sha256:30639c035cdb23534cd4aa2dd52c3bf48f06e5f4a941509c8bafd8ce11080259", - "sha256:8b74bedcbbbaca38ff6d7491d76f2b06b3592611af620f8426e82dddb04a5ced" - ], - "version": "==1.15.0" - }, "sqlparse": { "hashes": [ - "sha256:017cde379adbd6a1f15a61873f43e8274179378e95ef3fede90b5aa64d304ed0", - "sha256:0f91fd2e829c44362cbcfab3e9ae12e22badaa8a29ad5ff599f9ec109f0454e8" + "sha256:0c00730c74263a94e5a9919ade150dfc3b19c574389985446148402998287dae", + "sha256:48719e356bb8b42991bdbb1e8b83223757b93789c00910a616a071910ca4a64d" ], - "version": "==0.4.1" + "version": "==0.4.2" }, "stomp.py": { "hashes": [ - "sha256:3a9e4908ddb3a1d46481c2c25d60dcc46caaa60be3d4d6d4e39096c884051ef0", - "sha256:4e639fee9aaa288e98b0bb981029a11eb8464276b8d19b3e61adead52bc3c01b" + "sha256:6e1d93f2b2a7c63301f3e09e7ffa82ea80affec59164cd8c9b7807af4fe0e732", + "sha256:fb301f338292b1b95089c6f1d3a38a9dd8353a5ff3f921e389dfa5f9413b5a8a" ], "index": "pypi", - "version": "==6.0.0" + "version": "==7.0.0" }, "tenacity": { "hashes": [ - "sha256:baed357d9f35ec64264d8a4bbf004c35058fad8795c5b0d8a7dc77ecdcbb8f39", - "sha256:e14d191fb0a309b563904bbc336582efe2037de437e543b38da749769b544d7f" + "sha256:43242a20e3e73291a28bcbcacfd6e000b02d3857a9a9fff56b297a27afdc932f", + "sha256:f78f4ea81b0fabc06728c11dc2a8c01277bfc5181b321a4770471902e3eb844a" ], "index": "pypi", - "version": "==6.3.1" + "version": "==8.0.1" } }, "develop": { - "appdirs": { - "hashes": [ - "sha256:7d5d0167b2b1ba821647616af46a749d1c653740dd0d2415100fe26e27afdf41", - "sha256:a841dacd6b99318a741b166adb07e19ee71a274450e68237b4650ca1055ab128" - ], - "version": "==1.4.4" - }, "async-generator": { "hashes": [ "sha256:01c7bf666359b4967d2cda0000cc2e4af16a0ae098cbffcb8472fb9e8ad6585b", @@ -93,31 +79,39 @@ }, "attrs": { "hashes": [ - "sha256:31b2eced602aa8423c2aea9c76a724617ed67cf9513173fd3a4f03e3a929c7e6", - "sha256:832aa3cde19744e49938b91fea06d69ecb9e649c93ba974535d08ad92164f700" + "sha256:149e90d6d8ac20db7a955ad60cf0e6881a3f20d37096140088356da6c716b0b1", + "sha256:ef6aaac3ca6cd92904cdd0d83f629a15f18053ec84e6432106f7a4d04ae4f5fb" + ], + "version": "==21.2.0" + }, + "backports.entry-points-selectable": { + "hashes": [ + "sha256:988468260ec1c196dab6ae1149260e2f5472c9110334e5d51adcb77867361f6a", + "sha256:a6d9a871cde5e15b4c4a53e3d43ba890cc6861ec1332c9c2428c92f977192acc" ], - "version": "==20.3.0" + "version": "==1.1.0" }, "bleach": { "hashes": [ - "sha256:6123ddc1052673e52bab52cdc955bcb57a015264a1c57d37bea2f6b817af0125", - "sha256:98b3170739e5e83dd9dc19633f074727ad848cbedb6026708c8ac2d3b697a433" + "sha256:0900d8b37eba61a802ee40ac0061f8c2b5dee29c1927dd1d233e075ebf5a71da", + "sha256:4d2651ab93271d1129ac9cbc679f524565cc8a1b791909c4a51eac4446a15994" ], - "version": "==3.3.0" + "version": "==4.1.0" }, "certifi": { "hashes": [ - "sha256:1a4995114262bffbc2413b159f2a1a480c969de6e6eb13ee966d470af86af59c", - "sha256:719a74fb9e33b9bd44cc7f3a8d94bc35e4049deebe19ba7d8e108280cfd59830" + "sha256:2bbf76fd432960138b3ef6dda3dde0544f27cbf8546c458e60baf371917ba9ee", + "sha256:50b1e4f8446b06f41be7dd6338db18e0990601dce795c2b1686458aa7e8fa7d8" ], - "version": "==2020.12.5" + "version": "==2021.5.30" }, - "chardet": { + "charset-normalizer": { "hashes": [ - "sha256:0d6f53a15db4120f2b08c94f11e7d93d2c911ee118b6b30a04ec3ee8310179fa", - "sha256:f864054d66fd9118f2e67044ac8981a54775ec5b67aed0441892edb553d21da5" + "sha256:0c8911edd15d19223366a194a513099a302055a962bca2cec0f54b8b63175d8b", + "sha256:f23667ebe1084be45f6ae0538e4a5a865206544097e4e8bbcacf42cd02a348f3" ], - "version": "==4.0.0" + "markers": "python_version >= '3'", + "version": "==2.0.4" }, "colorama": { "hashes": [ @@ -192,17 +186,17 @@ }, "distlib": { "hashes": [ - "sha256:8c09de2c67b3e7deef7184574fc060ab8a793e7adbb183d942c389c8b13c52fb", - "sha256:edf6116872c863e1aa9d5bb7cb5e05a022c519a4594dc703843343a9ddd9bff1" + "sha256:106fef6dc37dd8c0e2c0a60d3fca3e77460a48907f335fa28420463a6f799736", + "sha256:23e223426b28491b1ced97dc3bbe183027419dfc7982b4fa2f05d5f3ff10711c" ], - "version": "==0.3.1" + "version": "==0.3.2" }, "docutils": { "hashes": [ - "sha256:a71042bb7207c03d5647f280427f14bfbd1a65c9eb84f4b341d85fafb6bb4bdf", - "sha256:e2ffeea817964356ba4470efba7c2f42b6b0de0b04e66378507e3e2504bbff4c" + "sha256:686577d2e4c32380bb50cbb22f575ed742d58168cee37e99117a854bcd88f125", + "sha256:cf316c8370a737a022b72b56874f6602acf974a37a9fba42ec2876387549fc61" ], - "version": "==0.17" + "version": "==0.17.1" }, "filelock": { "hashes": [ @@ -213,18 +207,19 @@ }, "idna": { "hashes": [ - "sha256:b307872f855b18632ce0c21c5e45be78c0ea7ae4c15c828c20788b26921eb3f6", - "sha256:b97d804b1e9b523befed77c48dacec60e6dcb0b5391d57af6a65a312a90648c0" + "sha256:14475042e284991034cb48e06f6851428fb14c4dc953acd9be9a5e95c7b6dd7a", + "sha256:467fbad99067910785144ce333826c71fb0e63a425657295239737f7ecd125f3" ], - "version": "==2.10" + "markers": "python_version >= '3'", + "version": "==3.2" }, "importlib-metadata": { "hashes": [ - "sha256:c9db46394197244adf2f0b08ec5bc3cf16757e9590b02af1fca085c16c0d600a", - "sha256:d2d46ef77ffc85cbf7dac7e81dd663fde71c45326131bea8033b9bad42268ebe" + "sha256:b618b6d2d5ffa2f16add5697cf57a46c76a56229b0ed1c438322e4e95645bd15", + "sha256:f284b3e11256ad1e5d03ab86bb2ccd6f5339688ff17a4d797a0fe7df326f23b1" ], "markers": "python_version < '3.8'", - "version": "==3.10.0" + "version": "==4.8.1" }, "iniconfig": { "hashes": [ @@ -235,28 +230,35 @@ }, "keyring": { "hashes": [ - "sha256:045703609dd3fccfcdb27da201684278823b72af515aedec1a8515719a038cb8", - "sha256:8f607d7d1cc502c43a932a275a56fe47db50271904513a379d39df1af277ac48" + "sha256:b32397fd7e7063f8dd74a26db910c9862fc2109285fa16e3b5208bcb42a3e579", + "sha256:b7e0156667f5dcc73c1f63a518005cd18a4eb23fe77321194fefcc03748b21a4" ], - "version": "==23.0.1" + "version": "==23.1.0" }, "lxml": { "hashes": [ "sha256:079f3ae844f38982d156efce585bc540c16a926d4436712cf4baee0cce487a3d", "sha256:0fbcf5565ac01dff87cbfc0ff323515c823081c5777a9fc7703ff58388c258c3", "sha256:122fba10466c7bd4178b07dba427aa516286b846b2cbd6f6169141917283aae2", + "sha256:1b38116b6e628118dea5b2186ee6820ab138dbb1e24a13e478490c7db2f326ae", "sha256:1b7584d421d254ab86d4f0b13ec662a9014397678a7c4265a02a6d7c2b18a75f", "sha256:26e761ab5b07adf5f555ee82fb4bfc35bf93750499c6c7614bd64d12aaa67927", "sha256:289e9ca1a9287f08daaf796d96e06cb2bc2958891d7911ac7cae1c5f9e1e0ee3", "sha256:2a9d50e69aac3ebee695424f7dbd7b8c6d6eb7de2a2eb6b0f6c7db6aa41e02b7", + "sha256:3082c518be8e97324390614dacd041bb1358c882d77108ca1957ba47738d9d59", "sha256:33bb934a044cf32157c12bfcfbb6649807da20aa92c062ef51903415c704704f", "sha256:3439c71103ef0e904ea0a1901611863e51f50b5cd5e8654a151740fde5e1cade", + "sha256:36108c73739985979bf302006527cf8a20515ce444ba916281d1c43938b8bb96", "sha256:39b78571b3b30645ac77b95f7c69d1bffc4cf8c3b157c435a34da72e78c82468", "sha256:4289728b5e2000a4ad4ab8da6e1db2e093c63c08bdc0414799ee776a3f78da4b", "sha256:4bff24dfeea62f2e56f5bab929b4428ae6caba2d1eea0c2d6eb618e30a71e6d4", + "sha256:4c61b3a0db43a1607d6264166b230438f85bfed02e8cff20c22e564d0faff354", "sha256:542d454665a3e277f76954418124d67516c5f88e51a900365ed54a9806122b83", "sha256:5a0a14e264069c03e46f926be0d8919f4105c1623d620e7ec0e612a2e9bf1c04", + "sha256:5c8c163396cc0df3fd151b927e74f6e4acd67160d6c33304e805b84293351d16", + "sha256:64812391546a18896adaa86c77c59a4998f33c24788cadc35789e55b727a37f4", "sha256:66e575c62792c3f9ca47cb8b6fab9e35bab91360c783d1606f758761810c9791", + "sha256:6f12e1427285008fd32a6025e38e977d44d6382cf28e7201ed10d6c1698d2a9a", "sha256:74f7d8d439b18fa4c385f3f5dfd11144bb87c1da034a466c5b5577d23a1d9b51", "sha256:7610b8c31688f0b1be0ef882889817939490a36d0ee880ea562a4e1399c447a1", "sha256:76fa7b1362d19f8fbd3e75fe2fb7c79359b0af8747e6f7141c338f0bee2f871a", @@ -269,10 +271,15 @@ "sha256:b007cbb845b28db4fb8b6a5cdcbf65bacb16a8bd328b53cbc0698688a68e1caa", "sha256:bc4313cbeb0e7a416a488d72f9680fffffc645f8a838bd2193809881c67dd106", "sha256:bccbfc27563652de7dc9bdc595cb25e90b59c5f8e23e806ed0fd623755b6565d", + "sha256:c1a40c06fd5ba37ad39caa0b3144eb3772e813b5fb5b084198a985431c2f1e8d", + "sha256:c47ff7e0a36d4efac9fd692cfa33fbd0636674c102e9e8d9b26e1b93a94e7617", "sha256:c4f05c5a7c49d2fb70223d0d5bcfbe474cf928310ac9fa6a7c6dddc831d0b1d4", + "sha256:cdaf11d2bd275bf391b5308f86731e5194a21af45fbaaaf1d9e8147b9160ea92", "sha256:ce256aaa50f6cc9a649c51be3cd4ff142d67295bfc4f490c9134d0f9f6d58ef0", "sha256:d2e35d7bf1c1ac8c538f88d26b396e73dd81440d59c1ef8522e1ea77b345ede4", + "sha256:d916d31fd85b2f78c76400d625076d9124de3e4bda8b016d25a050cc7d603f24", "sha256:df7c53783a46febb0e70f6b05df2ba104610f2fb0d27023409734a3ecbb78fb2", + "sha256:e1cbd3f19a61e27e011e02f9600837b921ac661f0c40560eefb366e4e4fb275e", "sha256:efac139c3f0bf4f0939f9375af4b02c5ad83a622de52d6dfa8e438e8e01d0eb0", "sha256:efd7a09678fd8b53117f6bae4fa3825e0a22b03ef0a932e070c0bdbb3a35e654", "sha256:f2380a6376dfa090227b663f9678150ef27543483055cc327555fb592c5967e2", @@ -290,10 +297,10 @@ }, "packaging": { "hashes": [ - "sha256:5b327ac1320dc863dca72f4514ecc086f31186744b84a230374cc1fd776feae5", - "sha256:67714da7f7bc052e064859c05c595155bd1ee9f69f76557e21f051443c20947a" + "sha256:7dc96269f53a4ccec5c0670940a4281106dd0bb343f47b7471f779df49c2fbe7", + "sha256:c86254f9220d55e31cc94d69bade760f0847da8000def4dfe1c6b872fd14ff14" ], - "version": "==20.9" + "version": "==21.0" }, "parsel": { "hashes": [ @@ -305,17 +312,24 @@ }, "pkginfo": { "hashes": [ - "sha256:029a70cb45c6171c329dfc890cde0879f8c52d6f3922794796e06f577bb03db4", - "sha256:9fdbea6495622e022cc72c2e5e1b735218e4ffb2a2a69cde2694a6c1f16afb75" + "sha256:37ecd857b47e5f55949c41ed061eb51a0bee97a87c969219d144c0e023982779", + "sha256:e7432f81d08adec7297633191bbf0bd47faf13cd8724c3a13250e51d542635bd" ], - "version": "==1.7.0" + "version": "==1.7.1" + }, + "platformdirs": { + "hashes": [ + "sha256:15b056538719b1c94bdaccb29e5f81879c7f7f0f4a153f46086d155dffcd4f0f", + "sha256:8003ac87717ae2c7ee1ea5a84a1a61e87f3fbd16eb5aadba194ea30a9019f648" + ], + "version": "==2.3.0" }, "pluggy": { "hashes": [ - "sha256:15b2acde666561e1298d71b523007ed7364de07029219b604cf808bfa1c765b0", - "sha256:966c145cd83c96502c3c3868f50408687b38434af77734af1e9ca461a4081d2d" + "sha256:4224373bacce55f955a878bf9cfa763c1e360858e330072059e10bad68531159", + "sha256:74134bbf457f031a36d68416e1509f34bd5ccc019f0bcc952c7b909d06b37bd3" ], - "version": "==0.13.1" + "version": "==1.0.0" }, "py": { "hashes": [ @@ -326,10 +340,10 @@ }, "pygments": { "hashes": [ - "sha256:2656e1a6edcdabf4275f9a3640db59fd5de107d88e8663c5d4e9a0fa62f77f94", - "sha256:534ef71d539ae97d4c3a4cf7d6f110f214b0e687e92f9cb9d2a3b0d3101289c8" + "sha256:b8e67fe6af78f492b3c4b3e2970c0624cbf08beb1e493b2c99b9fa1b67a20380", + "sha256:f398865f7eb6874156579fdf36bc840a03cab64d1cde9e93d68f46a425ec52c6" ], - "version": "==2.8.1" + "version": "==2.10.0" }, "pyparsing": { "hashes": [ @@ -340,27 +354,27 @@ }, "pytest": { "hashes": [ - "sha256:671238a46e4df0f3498d1c3270e5deb9b32d25134c99b7d75370a68cfbe9b634", - "sha256:6ad9c7bdf517a808242b998ac20063c41532a570d088d77eec1ee12b0b5574bc" + "sha256:131b36680866a76e6781d13f101efb86cf674ebb9762eb70d3082b6f29889e89", + "sha256:7310f8d27bc79ced999e760ca304d69f6ba6c6649c0b60fb0e04a4a77cacc134" ], "index": "pypi", - "version": "==6.2.3" + "version": "==6.2.5" }, "pytest-cov": { "hashes": [ - "sha256:359952d9d39b9f822d9d29324483e7ba04a3a17dd7d05aa6beb7ea01e359e5f7", - "sha256:bdb9fdb0b85a7cc825269a4c56b48ccaa5c7e365054b6038772c32ddcdc969da" + "sha256:261bb9e47e65bd099c89c3edf92972865210c36813f80ede5277dceb77a4a62a", + "sha256:261ceeb8c227b726249b376b8526b600f38667ee314f910353fa318caa01f4d7" ], "index": "pypi", - "version": "==2.11.1" + "version": "==2.12.1" }, "pytest-django": { "hashes": [ - "sha256:10e384e6b8912ded92db64c58be8139d9ae23fb8361e5fc139d8e4f8fc601bc2", - "sha256:26f02c16d36fd4c8672390deebe3413678d89f30720c16efb8b2a6bf63b9041f" + "sha256:65783e78382456528bd9d79a35843adde9e6a47347b20464eb2c885cb0f1f606", + "sha256:b5171e3798bf7e3fc5ea7072fe87324db67a4dd9f1192b037fed4cc3c1b7f455" ], "index": "pypi", - "version": "==4.1.0" + "version": "==4.4.0" }, "pytest-env": { "hashes": [ @@ -371,11 +385,11 @@ }, "pytest-mock": { "hashes": [ - "sha256:379b391cfad22422ea2e252bdfc008edd08509029bcde3c25b2c0bd741e0424e", - "sha256:a1e2aba6af9560d313c642dae7e00a2a12b022b80301d9d7fc8ec6858e1dd9fc" + "sha256:30c2f2cc9759e76eee674b81ea28c9f0b94f8f0445a1b87762cadf774f0df7e3", + "sha256:40217a058c52a63f1042f0784f62009e976ba824c418cced42e88d5f40ab0e62" ], "index": "pypi", - "version": "==3.5.1" + "version": "==3.6.1" }, "pytest-trio": { "hashes": [ @@ -393,11 +407,11 @@ }, "requests": { "hashes": [ - "sha256:27973dd4a904a4f13b263a19c866c13b92a39ed1c964655f025f3f8d3d75b804", - "sha256:c210084e36a42ae6b9219e00e48287def368a26d03a048ddad7bfee44f75871e" + "sha256:6c1246513ecd5ecd4528a0906f910e8f0f9c6b8ec72030dc9fd154dc1a6efd24", + "sha256:b8aa58f8cf793ffd8782d3d8cb19e66ef36f7aba4353eec859e74678b01b07a7" ], "index": "pypi", - "version": "==2.25.1" + "version": "==2.26.0" }, "requests-toolbelt": { "hashes": [ @@ -408,17 +422,17 @@ }, "rfc3986": { "hashes": [ - "sha256:112398da31a3344dc25dbf477d8df6cb34f9278a94fee2625d89e4514be8bb9d", - "sha256:af9147e9aceda37c91a05f4deb128d4b4b49d6b199775fd2d2927768abdc8f50" + "sha256:270aaf10d87d0d4e095063c65bf3ddbc6ee3d0b226328ce21e036f946e421835", + "sha256:a86d6e1f5b1dc238b218b012df0aa79409667bb209e58da56d0b94704e712a97" ], - "version": "==1.4.0" + "version": "==1.5.0" }, "six": { "hashes": [ - "sha256:30639c035cdb23534cd4aa2dd52c3bf48f06e5f4a941509c8bafd8ce11080259", - "sha256:8b74bedcbbbaca38ff6d7491d76f2b06b3592611af620f8426e82dddb04a5ced" + "sha256:1e61c37477a1626458e36f7b1d82aa5c9b094fa4802892072e49de9c60c4c926", + "sha256:8abb2f1d86890a2dfb989f9a77cfcfd3e47c2a354b01111771326f8aa26e0254" ], - "version": "==1.15.0" + "version": "==1.16.0" }, "sniffio": { "hashes": [ @@ -429,10 +443,10 @@ }, "sortedcontainers": { "hashes": [ - "sha256:37257a32add0a3ee490bb170b599e93095eed89a55da91fa9f48753ea12fd73f", - "sha256:59cc937650cf60d677c16775597c89a960658a09cf7c1a668f86e1e4464b10a1" + "sha256:25caa5a06cc30b6b83d11423433f65d1f9d76c4c6a0c90e3379eaa43b9bfdb88", + "sha256:a163dcaede0f1c021485e957a39245190e74249897e2ae4b2aa38595db237ee0" ], - "version": "==2.3.0" + "version": "==2.4.0" }, "toml": { "hashes": [ @@ -443,57 +457,57 @@ }, "tox": { "hashes": [ - "sha256:05a4dbd5e4d3d8269b72b55600f0b0303e2eb47ad5c6fe76d3576f4c58d93661", - "sha256:e007673f3595cede9b17a7c4962389e4305d4a3682a6c5a4159a1453b4f326aa" + "sha256:9fbf8e2ab758b2a5e7cb2c72945e4728089934853076f67ef18d7575c8ab6b88", + "sha256:c6c4e77705ada004283610fd6d9ba4f77bc85d235447f875df9f0ba1bc23b634" ], "index": "pypi", - "version": "==3.23.0" + "version": "==3.24.3" }, "tqdm": { "hashes": [ - "sha256:daec693491c52e9498632dfbe9ccfc4882a557f5fa08982db1b4d3adbe0887c3", - "sha256:ebdebdb95e3477ceea267decfc0784859aa3df3e27e22d23b83e9b272bf157ae" + "sha256:80aead664e6c1672c4ae20dc50e1cdc5e20eeff9b14aa23ecd426375b28be588", + "sha256:a4d6d112e507ef98513ac119ead1159d286deab17dffedd96921412c2d236ff5" ], - "version": "==4.60.0" + "version": "==4.62.2" }, "trio": { "hashes": [ - "sha256:87a66ae61f27fe500c9024926a9ba482c07e1e0f56380b70a264d19c435ba076", - "sha256:a42af0634ba729cbfe8578be058750c6471dac19fbc7167ec6a3ca3f966fb424" + "sha256:895e318e5ec5e8cea9f60b473b6edb95b215e82d99556a03eb2d20c5e027efe1", + "sha256:c27c231e66336183c484fbfe080fa6cc954149366c15dc21db8b7290081ec7b8" ], "index": "pypi", - "version": "==0.18.0" + "version": "==0.19.0" }, "twine": { "hashes": [ - "sha256:16f706f2f1687d7ce30e7effceee40ed0a09b7c33b9abb5ef6434e5551565d83", - "sha256:a56c985264b991dc8a8f4234eb80c5af87fa8080d0c224ad8f2cd05a2c22e83b" + "sha256:087328e9bb405e7ce18527a2dca4042a84c7918658f951110b38bc135acab218", + "sha256:4caec0f1ed78dc4c9b83ad537e453d03ce485725f2aea57f1bb3fdde78dae936" ], "index": "pypi", - "version": "==3.4.1" + "version": "==3.4.2" }, "typing-extensions": { "hashes": [ - "sha256:7cb407020f00f7bfc3cb3e7881628838e69d8f3fcab2f64742a5e76b2f841918", - "sha256:99d4073b617d30288f569d3f13d2bd7548c3a7e4c8de87db09a9d29bb3a4a60c", - "sha256:dafc7639cde7f1b6e1acc0f457842a83e722ccca8eef5270af2d74792619a89f" + "sha256:49f75d16ff11f1cd258e1b988ccff82a3ca5570217d7ad8c5f48205dd99a677e", + "sha256:d8226d10bc02a29bcc81df19a26e56a9647f8b0a6d4a83924139f4a8b01f17b7", + "sha256:f1d25edafde516b146ecd0613dabcc61409817af4766fbbcfb8d1ad4ec441a34" ], "markers": "python_version < '3.8'", - "version": "==3.7.4.3" + "version": "==3.10.0.2" }, "urllib3": { "hashes": [ - "sha256:2f4da4594db7e1e110a944bb1b551fdf4e6c136ad42e4234131391e21eb5b0df", - "sha256:e7b021f7241115872f92f43c6508082facffbd1c048e3c6e2bb9c2a157e28937" + "sha256:39fb8672126159acb139a7718dd10806104dec1e2f0f6c88aab05d17df10c8d4", + "sha256:f57b4c16c62fa2760b7e3d97c35b255512fb6b59a259730f36ba32ce9f8e342f" ], - "version": "==1.26.4" + "version": "==1.26.6" }, "virtualenv": { "hashes": [ - "sha256:49ec4eb4c224c6f7dd81bb6d0a28a09ecae5894f4e593c89b0db0885f565a107", - "sha256:83f95875d382c7abafe06bd2a4cdd1b363e1bb77e02f155ebe8ac082a916b37c" + "sha256:9ef4e8ee4710826e98ff3075c9a4739e2cb1040de6a2a8d35db0055840dc96a0", + "sha256:e4670891b3a03eb071748c569a87cceaefbf643c5bac46d996c5a45c34aa0f06" ], - "version": "==20.4.3" + "version": "==20.7.2" }, "w3lib": { "hashes": [ @@ -511,18 +525,18 @@ }, "wheel": { "hashes": [ - "sha256:78b5b185f0e5763c26ca1e324373aadd49182ca90e825f7853f4b2509215dc0e", - "sha256:e11eefd162658ea59a60a0f6c7d493a7190ea4b9a85e335b33489d9f17e0245e" + "sha256:21014b2bd93c6d0034b6ba5d35e4eb284340e09d63c59aef6fc14b0f346146fd", + "sha256:e2ef7239991699e3355d54f8e968a21bb940a1dbf34a4d226741e64462516fad" ], "index": "pypi", - "version": "==0.36.2" + "version": "==0.37.0" }, "zipp": { "hashes": [ - "sha256:3607921face881ba3e026887d8150cca609d517579abe052ac81fc5aeffdbd76", - "sha256:51cb66cc54621609dd593d1787f286ee42a5c0adbb4b29abea5a63edc3e03098" + "sha256:957cfda87797e389580cb8b9e3870841ca991e2125350677b2ca83a0e99390a3", + "sha256:f5812b1e007e48cff63449a5e9f4e7ebea716b4111f9c4f9a645f91d579bf0c4" ], - "version": "==3.4.1" + "version": "==3.5.0" } } } diff --git a/README.md b/README.md index 6b8eaba..c2b8667 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,8 @@ # Django Stomp [![Build Status](https://dev.azure.com/juntos-somos-mais-loyalty/python/_apis/build/status/django-stomp?branchName=master)](https://dev.azure.com/juntos-somos-mais-loyalty/python/_build/latest?definitionId=23&branchName=master) -[![Maintainability](https://api.codeclimate.com/v1/badges/381136911e038d1a6887/maintainability)](https://codeclimate.com/github/juntossomosmais/django-stomp/maintainability) -[![Test Coverage](https://api.codeclimate.com/v1/badges/381136911e038d1a6887/test_coverage)](https://codeclimate.com/github/juntossomosmais/django-stomp/test_coverage) +[![Maintainability](https://sonarcloud.io/api/project_badges/measure?project=juntossomosmais_django-stomp&metric=sqale_rating)](https://sonarcloud.io/dashboard?id=juntossomosmais_django-stomp) +[![Test Coverage](https://sonarcloud.io/api/project_badges/measure?project=juntossomosmais_django-stomp&metric=coverage)](https://sonarcloud.io/dashboard?id=juntossomosmais_django-stomp) [![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/ambv/black) [![Downloads](https://pepy.tech/badge/django-stomp)](https://pepy.tech/project/django-stomp) [![Downloads](https://pepy.tech/badge/django-stomp/month)](https://pepy.tech/project/django-stomp/month) @@ -167,6 +167,10 @@ Then at last: pipenv run tox +## Database connection management (applies to version >= 5.0.0) + +For every message that a `django-stomp` consumer receives, it opens a new DB connection if it needs to, keeping it open until it exceeds the maximum age defined by `CONN_MAX_AGE` or when the connection becomes unusable. + ## Known limitations * Currently, we assume that all dead lettered messages are sent to a queue with the same name as its original @@ -178,4 +182,5 @@ positive heartbeat timeout. You can workaround it with the `STOMP_PROCESS_MSG_ON thread pool to process the message. * For the RabbitMQ users: the **django-stomp consumer** always try to connect to a [durable queue](https://www.rabbitmq.com/queues.html#durability), so if your queue is not durable, the RabbitMQ broker -will not allow the subscription. \ No newline at end of file +will not allow the subscription. +* **For versions prior to 5.0.0**: Any database connection management in the consumer side is up to its callback. If you have any long-running consumer that relies on a DB connection, be sure that you manage it properly, otherwise if a connection becomes unusable, you'll have to restart the consumer entirely just to setup a new DB connection. \ No newline at end of file diff --git a/django_stomp/exceptions.py b/django_stomp/exceptions.py index 78b3eed..930cbef 100644 --- a/django_stomp/exceptions.py +++ b/django_stomp/exceptions.py @@ -1,4 +1,4 @@ -class CorrelationIdNotProvidedException(BaseException): +class CorrelationIdNotProvidedException(Exception): pass @@ -6,9 +6,10 @@ class DjangoStompImproperlyConfigured(Exception): pass -class DjangoStompIncorrectUse(BaseException): +class DjangoStompIncorrectUse(Exception): """ Raised when Django stomp has been invoked in a wrong manner such as less arguments than it needs, etc. """ - pass \ No newline at end of file + + pass diff --git a/django_stomp/execution.py b/django_stomp/execution.py index 5c71fdb..387aaca 100644 --- a/django_stomp/execution.py +++ b/django_stomp/execution.py @@ -3,8 +3,11 @@ from time import sleep from typing import Optional +from django import db from django.conf import settings from django.utils.module_loading import import_string +from request_id_django_log import local_threading + from django_stomp.builder import build_listener from django_stomp.builder import build_publisher from django_stomp.exceptions import CorrelationIdNotProvidedException @@ -16,7 +19,6 @@ from django_stomp.helpers import remove_key_from_dict from django_stomp.services.consumer import Listener from django_stomp.services.consumer import Payload -from request_id_django_log import local_threading logger = logging.getLogger("django_stomp") @@ -65,6 +67,7 @@ def main_logic() -> Optional[Listener]: def _callback(payload: Payload) -> None: try: + db.close_old_connections() local_threading.request_id = _get_or_create_correlation_id(payload.headers) if param_to_callback: @@ -80,6 +83,7 @@ def _callback(payload: Payload) -> None: raise e finally: local_threading.request_id = None + db.close_old_connections() listener.start(_callback, wait_forever=is_testing is False) @@ -142,7 +146,7 @@ def send_message_from_one_destination_to_another( def clean_messages_on_destination_by_acking( - source_destination: str, is_testing: bool = False, testing_disconnect: bool = True, return_listener: bool = False, + source_destination: str, is_testing: bool = False, testing_disconnect: bool = True, return_listener: bool = False ) -> Listener: """ Cleans a queue by acking all messages on it (no queue purging or deleting). diff --git a/django_stomp/services/consumer.py b/django_stomp/services/consumer.py index cbff90d..a1fb8cb 100644 --- a/django_stomp/services/consumer.py +++ b/django_stomp/services/consumer.py @@ -11,11 +11,13 @@ from typing import Optional import stomp +from stomp import connect +from stomp.utils import Frame as StompFrame + from django_stomp.helpers import create_dlq_destination_from_another_destination from django_stomp.helpers import is_heartbeat_enabled from django_stomp.helpers import only_destination_name from django_stomp.settings import STOMP_PROCESS_MSG_WORKERS -from stomp import connect logger = logging.getLogger("django_stomp") @@ -71,7 +73,9 @@ def __init__( def _create_new_worker_executor(self): return ThreadPoolExecutor(max_workers=STOMP_PROCESS_MSG_WORKERS, thread_name_prefix=self._subscription_id) - def on_message(self, headers, body): + def on_message(self, frame: StompFrame): + headers, body = frame.headers, frame.body + message_id = headers["message-id"] logger.info(f"Message ID: {message_id}") logger.debug("Received headers: %s", headers) @@ -107,10 +111,7 @@ def start(self, callback: Callable = None, wait_forever=True): logger.info(f"Starting listener with name: {self._listener_id}") logger.info(f"Subscribe/Listener auto-generated ID: {self._subscription_id}") - if self._is_testing: - self._connection.set_listener("TESTING", self._test_listener) - else: - self._connection.set_listener(self._listener_id, self) + self._set_listener() self._callback = callback if callback else self._callback self._connection.connect(**self._connection_configuration) @@ -127,6 +128,12 @@ def start(self, callback: Callable = None, wait_forever=True): self.start(self._callback, wait_forever=False) time.sleep(1) + def _set_listener(self): + if self._is_testing: + self._connection.set_listener("TESTING", self._test_listener) + else: + self._connection.set_listener(self._listener_id, self) + def close(self): disconnect_receipt = str(uuid.uuid4()) self._connection.disconnect(receipt=disconnect_receipt) @@ -135,6 +142,9 @@ def close(self): def shutdown_worker_pool(self): self._pool_executor.shutdown() + def on_error(self, frame: StompFrame): + logger.warning("Received ERROR frame from broker - headers: %d - body: %d", frame.headers, frame.body) + def build_listener( destination_name, diff --git a/django_stomp/services/producer.py b/django_stomp/services/producer.py index 8302505..d4aac7d 100644 --- a/django_stomp/services/producer.py +++ b/django_stomp/services/producer.py @@ -4,29 +4,29 @@ import uuid from contextlib import contextmanager from typing import Dict -from typing import List from typing import Optional from django.core.serializers.json import DjangoJSONEncoder -from django_stomp.helpers import clean_dict_with_falsy_or_strange_values -from django_stomp.helpers import create_dlq_destination_from_another_destination -from django_stomp.helpers import retry -from django_stomp.helpers import slow_down from request_id_django_log.request_id import current_request_id from request_id_django_log.settings import NO_REQUEST_ID from stomp import Connection from stomp.connect import StompConnection11 +from django_stomp.helpers import clean_dict_with_falsy_or_strange_values +from django_stomp.helpers import create_dlq_destination_from_another_destination +from django_stomp.helpers import retry +from django_stomp.helpers import slow_down + logger = logging.getLogger("django_stomp") class Publisher: """ Class used to publish messages to brokers using the STOMP protocol. Some headers are removed - if they are in the send() method as they cause unexpected behavior/errors. - + if they are in the send() method as they cause unexpected behavior/errors. + Such headers are defined in the UNSAFE_OR_RESERVED_BROKER_HEADERS_FOR_REMOVAL class variable which is used - for sanitizing the user-supplied headers. + for sanitizing the user-supplied headers. """ UNSAFE_OR_RESERVED_BROKER_HEADERS_FOR_REMOVAL = [ @@ -106,7 +106,7 @@ def _build_final_headers(self, queue: str, headers: Optional[Dict], persistent: def _get_correlation_id(self, headers: Optional[Dict]) -> str: """ - Gets the correlation id for the message. If 'correlation-id' is in the headers, this value is used. + Gets the correlation id for the message. If 'correlation-id' is in the headers, this value is used. Otherwise, the value of current_request_id() is returned or a new one is generated as a last resort. """ if headers and "correlation-id" in headers: @@ -157,7 +157,7 @@ def _send_to_broker_without_retry_attempts(self, send_data: Dict) -> None: Sends the actual data to the broker using the STOMP protocol WITHOUT any retry attempts as reconnecting to the broker while a transaction was previously created will lead to 'bad transaction' errors because STOMP 1.1 protocol closes any transactions if the producer had TCP connection problems or sends a DISCONNECT frame. - + Hence, when a producer sends a BEGIN frame, all subsequent SEND frames (messages) must always use the SAME connection that was used to start the transaction. diff --git a/docker-compose.yml b/docker-compose.yml index a615807..7ce2742 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,4 +1,4 @@ -version: "2.4" +version: "3.9" services: broker-activemq: image: rmohr/activemq:latest @@ -20,4 +20,4 @@ services: - 61613:61613 - 15672:15672 volumes: - - ./tests/resources/custom-rabbitmq-conf:/etc/rabbitmq/ + - ./tests/resources/custom-rabbitmq-conf:/etc/rabbitmq/ \ No newline at end of file diff --git a/pytest.ini b/pytest.ini index d5b1def..8f0511a 100644 --- a/pytest.ini +++ b/pytest.ini @@ -10,4 +10,4 @@ env = D:STOMP_CORRELATION_ID_REQUIRED=True D:STOMP_PROCESS_MSG_ON_BACKGROUND=False D:STOMP_OUTGOING_HEARTBEAT=0 - D:STOMP_INCOMING_HEARTBEAT=0 + D:STOMP_INCOMING_HEARTBEAT=0 \ No newline at end of file diff --git a/setup.py b/setup.py index 1fa60db..e7a6eca 100644 --- a/setup.py +++ b/setup.py @@ -10,7 +10,7 @@ setup( name="django-stomp", - version="4.2.1", + version="5.0.0", description="A simple implementation of STOMP with Django", long_description=README, long_description_content_type="text/markdown", @@ -20,7 +20,7 @@ license="MIT", url="https://github.com/juntossomosmais/django-stomp", packages=find_packages(), - install_requires=["request-id-django-log==0.1.1", "stomp.py==6.0.0", "tenacity~=6.0"], + install_requires=["request-id-django-log==0.1.1", "stomp.py~=7.0", "tenacity~=8.0"], classifiers=[ "Programming Language :: Python", "Programming Language :: Python :: 3.7", diff --git a/sonar-project.properties b/sonar-project.properties new file mode 100644 index 0000000..80d62f5 --- /dev/null +++ b/sonar-project.properties @@ -0,0 +1,16 @@ +sonar.projectKey=juntossomosmais_django-stomp +sonar.exclusions=\ + tests/resources/**/* +sonar.coverage.exclusions=\ + tests/**/*,\ + **/tests/**/*,\ + .tox/**/*,\ + .venv/**/* +sonar.cpd.exclusions=\ + tests/**/*,\ + **/tests/**/*,\ + .tox/**/*,\ + .venv/**/* +sonar.python.xunit.reportPath=**/*/test-results.xml +sonar.python.coverage.reportPaths=coverage.xml +sonar.python.version=3.7 \ No newline at end of file diff --git a/tests/conftest.py b/tests/conftest.py index 3b0ecff..99ea329 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,5 +1,5 @@ import os -from logging import Formatter +import uuid from django.conf import settings @@ -14,7 +14,7 @@ def pytest_assertrepr_compare(config, op, left, right): def pytest_configure(): settings.configure( - INSTALLED_APPS=["django_stomp"], + INSTALLED_APPS=["django_stomp", "tests.support"], STOMP_SERVER_HOST=os.getenv("STOMP_SERVER_HOST"), STOMP_SERVER_PORT=os.getenv("STOMP_SERVER_PORT"), STOMP_SERVER_STANDBY_HOST=os.getenv("STOMP_SERVER_STANDBY_HOST"), @@ -27,4 +27,15 @@ def pytest_configure(): STOMP_PROCESS_MSG_ON_BACKGROUND=os.getenv("STOMP_PROCESS_MSG_ON_BACKGROUND"), STOMP_OUTGOING_HEARTBEAT=os.getenv("STOMP_OUTGOING_HEARTBEAT"), STOMP_INCOMING_HEARTBEAT=os.getenv("STOMP_INCOMING_HEARTBEAT"), + DATABASES={ + "default": { + "ENGINE": os.getenv("DB_ENGINE", "django.db.backends.sqlite3"), + "NAME": os.getenv("DB_DATABASE", f"test_db-{uuid.uuid4()}"), + "USER": os.getenv("DB_USER"), + "HOST": os.getenv("DB_HOST"), + "PORT": os.getenv("DB_PORT"), + "PASSWORD": os.getenv("DB_PASSWORD"), + "TEST": {"NAME": os.getenv("DB_DATABASE", f"test_db-{uuid.uuid4()}")}, + } + }, ) diff --git a/tests/integration/test_execution.py b/tests/integration/test_execution.py index eb43c6d..5cef9ba 100644 --- a/tests/integration/test_execution.py +++ b/tests/integration/test_execution.py @@ -2,7 +2,6 @@ import logging import re import threading -import time import uuid from time import sleep from unittest import mock @@ -10,10 +9,16 @@ import pytest import trio -from django.core.management import call_command -from django.core.serializers.json import DjangoJSONEncoder +from django.db.backends.signals import connection_created from pytest_mock import MockFixture from stomp.exception import NotConnectedException + +from django_stomp.builder import build_listener +from django_stomp.builder import build_publisher +from django_stomp.execution import clean_messages_on_destination_by_acking +from django_stomp.execution import send_message_from_one_destination_to_another +from django_stomp.execution import start_processing +from django_stomp.services.producer import do_inside_transaction from tests.support import rabbitmq from tests.support.activemq.connections_details import consumers_details from tests.support.activemq.message_details import retrieve_message_published @@ -24,6 +29,7 @@ from tests.support.callbacks_for_tests import callback_standard_path from tests.support.callbacks_for_tests import callback_with_another_log_message_path from tests.support.callbacks_for_tests import callback_with_exception_path +from tests.support.callbacks_for_tests import callback_with_explicit_db_connection_path from tests.support.callbacks_for_tests import callback_with_logging_path from tests.support.callbacks_for_tests import callback_with_nack_path from tests.support.callbacks_for_tests import callback_with_sleep_three_seconds_path @@ -35,18 +41,6 @@ from tests.support.helpers import publish_without_correlation_id_header from tests.support.helpers import wait_for_message_in_log -from django_stomp.builder import build_listener -from django_stomp.builder import build_publisher -from django_stomp.execution import clean_messages_on_destination_by_acking -from django_stomp.execution import send_message_from_one_destination_to_another -from django_stomp.execution import start_processing -from django_stomp.helpers import clean_dict_with_falsy_or_strange_values -from django_stomp.helpers import create_dlq_destination_from_another_destination -from django_stomp.helpers import retry -from django_stomp.services.consumer import Payload -from django_stomp.services.producer import Publisher -from django_stomp.services.producer import do_inside_transaction - def test_should_consume_message_and_publish_to_another_queue_using_same_correlation_id(): # Base environment setup @@ -224,7 +218,7 @@ async def test_should_configure_prefetch_size_as_one(mocker: MockFixture, settin settings.STOMP_SUBSCRIPTION_ID = subscription_id async def collect_consumer_details(): - await trio.sleep(0.5) + await trio.sleep(2) try: consumers = list(consumers_details(f"{listener_id}-listener")) except Exception: @@ -514,7 +508,7 @@ def test_should_create_queue_for_virtual_topic_and_compete_for_its_messages(capl virtual_topic_consumer_queue, callback_with_logging_path, is_testing=True, return_listener=True ), start_processing( - virtual_topic_consumer_queue, callback_with_another_log_message_path, is_testing=True, return_listener=True, + virtual_topic_consumer_queue, callback_with_another_log_message_path, is_testing=True, return_listener=True ), ] @@ -631,7 +625,7 @@ def test_should_consume_message_without_correlation_id_when_it_is_not_required_n # https://activemq.apache.org/message-redelivery-and-dlq-handling publish_without_correlation_id_header(destination_name, some_body, persistent=True) - consumer = start_processing(destination_name, callback_standard_path, is_testing=True) + start_processing(destination_name, callback_standard_path, is_testing=True) source_queue_status = get_destination_metrics_from_broker(source_queue_name) @@ -744,17 +738,17 @@ def test_should_clean_all_messages_on_a_destination(caplog): some_body = {"some": "trash"} some_headers = {"some": "header"} + # command invocation + consumer = clean_messages_on_destination_by_acking(some_source_destination, is_testing=True, return_listener=True) + with build_publisher().auto_open_close_connection() as publisher: - for i in range(0, trash_msgs_count): + for _ in range(0, trash_msgs_count): publisher.send(some_body, some_source_destination, some_headers, attempt=1) - # # command invocation - consumer = clean_messages_on_destination_by_acking(some_source_destination, is_testing=True, return_listener=True) wait_for_message_in_log(caplog, r"Message has been removed!", message_count_to_wait=trash_msgs_count) consumer.close() # asserts that messages were acked - sleep(5) # takes some time to ack all messages source_queue_status = get_destination_metrics_from_broker(source_queue_name) assert source_queue_status.number_of_pending_messages == 0 @@ -815,11 +809,10 @@ def test_should_not_publish_any_messages_if_connection_drops_when_using_transact *_, queue_name = destination_one.split("/") some_correlation_id = uuid.uuid4() some_header = {"correlation-id": some_correlation_id} - some_body = {"please": "no errors 1"} - some_body = {"please": "no errors 2"} + some_body = {"please": "no errors"} # creates destination and publishes to it - start_processing(destination_one, callback_standard_path, is_testing=True, return_listener=True,).close() + start_processing(destination_one, callback_standard_path, is_testing=True, return_listener=True).close() publisher = build_publisher(f"random-publisher-{uuid4()}") with pytest.raises(NotConnectedException): @@ -841,11 +834,10 @@ def test_should_publish_many_messages_if_no_connection_problems_happen_when_usin *_, queue_name = destination_one.split("/") some_correlation_id = uuid.uuid4() some_header = {"correlation-id": some_correlation_id} - some_body = {"please": "no errors 1"} - some_body = {"please": "no errors 2"} + some_body = {"please": "no errors"} # creates destination and publishes to it - start_processing(destination_one, callback_standard_path, is_testing=True, return_listener=True,).close() + start_processing(destination_one, callback_standard_path, is_testing=True, return_listener=True).close() publisher = build_publisher(f"random-publisher-{uuid4()}") # no connection errors inside this transaction @@ -866,11 +858,10 @@ def test_should_publish_messages_if_connection_drops_when_not_transactions(): *_, queue_name = destination_one.split("/") some_correlation_id = uuid.uuid4() some_header = {"correlation-id": some_correlation_id} - some_body = {"please": "no errors 1"} - some_body = {"please": "no errors 2"} + some_body = {"please": "no errors"} # creates destination and publishes to it - start_processing(destination_one, callback_standard_path, is_testing=True, return_listener=True,).close() + start_processing(destination_one, callback_standard_path, is_testing=True, return_listener=True).close() publisher = build_publisher(f"random-publisher-{uuid4()}") # no transaction @@ -885,3 +876,75 @@ def test_should_publish_messages_if_connection_drops_when_not_transactions(): # the two message should have been published due to retry (but no transactions involved) assert queue_status.messages_enqueued == 2 assert queue_status.number_of_pending_messages == 2 + + +@pytest.mark.django_db +def test_should_open_a_new_db_connection_when_previous_connection_is_obsolete_or_unusable(settings): + # Arrange - listen for db connection created signal with mocked function + new_db_connection_callback = mock.MagicMock(return_value=None) + connection_created.connect(new_db_connection_callback) + + # Arrange - settings CONN_MAX_AGE to zero, forcing connection renewal for every new message + settings.DATABASES["default"]["CONN_MAX_AGE"] = 0 + + # Arrange - random destination and fake arbitrary message body + destination = f"/queue/new-connections-{uuid4()}" + body = {"key": "value"} + arbitrary_number_of_msgs = 5 + + # Act - start listening to a random destination queue and publishing an arbitrary number of messages to it + listener = start_processing( + destination, callback_with_explicit_db_connection_path, is_testing=True, return_listener=True + ) + + with build_publisher(f"random-publisher-{uuid4()}").auto_open_close_connection() as publisher: + for _ in range(arbitrary_number_of_msgs): + publisher.send(body, destination) + + sleep(0.5) # some sleep to give enough time to process the messages + + # Assert - for every new message a new db connection is created + assert new_db_connection_callback.call_count == arbitrary_number_of_msgs + + # Assert - all threads that have established a db connection, should have closed them as CONN_MAX_AGE == 0 + threads_with_db_connections = [t for t in threading.enumerate() if hasattr(t, "db")] + assert any(threads_with_db_connections) + assert all(t.db.connection is None for t in threads_with_db_connections) + + listener.close() + + +@pytest.mark.django_db +def test_shouldnt_open_a_new_db_connection_when_there_is_one_still_usable(settings): + # Arrange - listen for db connection created signal with mocked function + new_db_connection_callback = mock.MagicMock(return_value=None) + connection_created.connect(new_db_connection_callback) + + # Arrange - settings CONN_MAX_AGE to 86400s or 1 day + settings.DATABASES["default"]["CONN_MAX_AGE"] = 86400 + + # Arrange - random destination and fake arbitrary message body + destination = f"/queue/no-new-connections-{uuid4()}" + body = {"key": "value"} + arbitrary_number_of_msgs = 5 + + # Act - start listening random destination queue and publish arbitrary number of messages to it + listener = start_processing( + destination, callback_with_explicit_db_connection_path, is_testing=True, return_listener=True + ) + + with build_publisher(f"random-publisher-{uuid4()}").auto_open_close_connection() as publisher: + for _ in range(arbitrary_number_of_msgs): + publisher.send(body, destination) + + sleep(0.5) # some sleep to give enough time to process the messages + + # Assert - only one connection is estabilished + assert new_db_connection_callback.call_count == 1 + + # Assert - all threads that have established a db connection, shouldn't have reset them as CONN_MAX_AGE == 1 day + threads_with_db_connections = [t for t in threading.enumerate() if hasattr(t, "db")] + assert any(threads_with_db_connections) + assert all(t.db.connection is not None for t in threads_with_db_connections) + + listener.close() diff --git a/tests/resources/custom-rabbitmq-conf/rabbitmq.conf b/tests/resources/custom-rabbitmq-conf/rabbitmq.conf index e384930..ece904c 100644 --- a/tests/resources/custom-rabbitmq-conf/rabbitmq.conf +++ b/tests/resources/custom-rabbitmq-conf/rabbitmq.conf @@ -1,3 +1,14 @@ loopback_users.guest = false listeners.tcp.default = 5672 + management.tcp.port = 15672 + +# Reduce retention policy for faster publishing of stats +management.sample_retention_policies.global.minute = 1 +management.sample_retention_policies.global.hour = 60 +management.sample_retention_policies.global.day = 1200 + +management.sample_retention_policies.basic.minute = 1 +management.sample_retention_policies.basic.hour = 60 + +management.sample_retention_policies.detailed.10 = 1 \ No newline at end of file diff --git a/tests/support/callbacks_for_tests.py b/tests/support/callbacks_for_tests.py index 6aacce7..175eb3d 100644 --- a/tests/support/callbacks_for_tests.py +++ b/tests/support/callbacks_for_tests.py @@ -4,11 +4,12 @@ import logging import threading from time import sleep -from typing import Callable -from uuid import uuid4 + +from django import db from django_stomp.builder import build_publisher from django_stomp.services.consumer import Payload +from tests.support.models import Simple callback_move_and_ack_path = "tests.support.callbacks_for_tests.callback_move_and_ack" callback_standard_path = "tests.support.callbacks_for_tests.callback_standard" @@ -20,6 +21,7 @@ callback_with_sleep_three_seconds_path = "tests.support.callbacks_for_tests.callback_with_sleep_three_seconds" callback_with_another_log_message_path = "tests.support.callbacks_for_tests.callback_with_another_log_message" callback_with_nack_path = "tests.support.callbacks_for_tests.callback_with_nack" +callback_with_explicit_db_connection_path = "tests.support.callbacks_for_tests.callback_with_explicit_db_connection" def callback_move_and_ack(payload: Payload, destination: str): @@ -49,7 +51,6 @@ def callback_with_exception(payload: Payload): def callback_with_sleep_three_seconds_while_heartbeat_thread_is_alive(payload: Payload): - heartbeat_threads = [thread for thread in threading.enumerate() if "StompHeartbeatThread" in thread.name] while True: sleep(3) heartbeat_threads = filter(lambda thread: "StompHeartbeatThread" in thread.name, threading.enumerate()) @@ -74,3 +75,25 @@ def callback_with_sleep_three_seconds(payload: Payload): payload.ack() logger = logging.getLogger(__name__) logger.info("%s sucessfully processed!", payload.body) + + +def callback_with_explicit_db_connection(payload: Payload): + """ + This callback do a DB interaction and set a new attribute (`db`) in the thread that it's running + so that any test can have a direct access to a `DatabaseWrapper`[1] and make assertions with it. This + new attribute was needed due to the way Django works estabilishing a new DB connection for each + thread [2]. + + Note: The `db.connections` object is a handler (with a dict-like syntax), see [3] for more information + on it! + + [1] https://github.com/django/django/blob/ca9872905559026af82000e46cde6f7dedc897b6/django/db/backends/base/base.py#L26 + [2] https://docs.djangoproject.com/en/3.2/ref/databases/#caveats + [3] https://github.com/django/django/blob/ca9872905559026af82000e46cde6f7dedc897b6/django/db/utils.py#L134 + """ + Simple.objects.create() + + thread = threading.current_thread() + setattr(thread, "db", db.connections["default"]) + + payload.ack() diff --git a/tests/support/helpers.py b/tests/support/helpers.py index c264e51..ba98416 100644 --- a/tests/support/helpers.py +++ b/tests/support/helpers.py @@ -3,10 +3,10 @@ import threading from time import sleep from typing import Dict -from typing import Optional from uuid import uuid4 from django.core.serializers.json import DjangoJSONEncoder + from django_stomp.builder import build_listener from django_stomp.builder import build_publisher from django_stomp.helpers import clean_dict_with_falsy_or_strange_values @@ -30,7 +30,7 @@ def get_destination_metrics_from_broker(destination_name: str) -> CurrentDestina return destination_metrics -def publish_without_correlation_id_header(destination: str, body: str, attempt=1, persistent=True): +def publish_without_correlation_id_header(destination: str, body: Dict, attempt=1, persistent=True): """ Publishes a message without correlation-id on the headers. """ diff --git a/tests/support/models.py b/tests/support/models.py new file mode 100644 index 0000000..9d716ab --- /dev/null +++ b/tests/support/models.py @@ -0,0 +1,9 @@ +from uuid import uuid4 + +from django.db import models + + +class Simple(models.Model): + id = models.UUIDField(primary_key=True, default=uuid4, editable=False) + created_at = models.DateTimeField(auto_now_add=True, editable=False) + updated_at = models.DateTimeField(auto_now=True, editable=False) diff --git a/tests/support/rabbitmq/__init__.py b/tests/support/rabbitmq/__init__.py index 205f75f..7a76257 100644 --- a/tests/support/rabbitmq/__init__.py +++ b/tests/support/rabbitmq/__init__.py @@ -1,4 +1,5 @@ import json +import logging import urllib.parse from time import sleep from typing import Generator @@ -11,7 +12,10 @@ from tests.support.dtos import CurrentDestinationStatus from tests.support.dtos import MessageStatus +logger = logging.getLogger(__name__) + _queues_details_request_path = "/api/queues" +_specific_queue_details_request_path = _queues_details_request_path + "/%2F/{queue_name}" _bindings_from_queue_request_path = _queues_details_request_path + "/%2F/{queue_name}/bindings" _get_message_from_queue_request_path = _queues_details_request_path + "/%2F/{queue_name}/get" _channels_details_request_path = "/api/channels" @@ -20,25 +24,26 @@ def current_queue_configuration(queue_name, host="localhost", port=15672) -> Optional[CurrentDestinationStatus]: - queues = _do_request(host, port, _queues_details_request_path) - results = list(filter(lambda v: v["name"] == queue_name, queues)) - if len(results) == 1: - queue_details = results[0] - if queue_details.get("message_stats"): - message_stats = queue_details["message_stats"] - messages_dequeued = message_stats.get("deliver_get", 0) - messages_enqueued = message_stats["publish"] - else: - messages_dequeued = 0 - messages_enqueued = None + result = _do_request(host, port, _specific_queue_details_request_path.format(queue_name=queue_name)) - number_of_pending_messages = queue_details["messages"] - number_of_consumers = queue_details["consumers"] + logger.debug("RabbitMQ request result: %s", result) + if result.get("error"): + return None - return CurrentDestinationStatus( - number_of_pending_messages, number_of_consumers, messages_enqueued, messages_dequeued - ) - return None + if result.get("message_stats"): + message_stats = result["message_stats"] + messages_dequeued = message_stats.get("deliver_get", 0) + messages_enqueued = message_stats.get("publish") + else: + messages_dequeued = 0 + messages_enqueued = None + + number_of_pending_messages = result["messages"] + number_of_consumers = result["consumers"] + + return CurrentDestinationStatus( + number_of_pending_messages, number_of_consumers, messages_enqueued, messages_dequeued + ) def current_topic_configuration(topic_name, host="localhost", port=15672) -> Optional[CurrentDestinationStatus]: @@ -119,7 +124,7 @@ def get_broker_version(host="localhost", port=15672) -> str: def _do_request(host, port, request_path, do_post=False, body=None): - sleep(5) + sleep(2) session = requests.Session() session.mount("http://", HTTPAdapter(max_retries=3)) address, auth = f"http://{host}:{port}{request_path}", ("guest", "guest") diff --git a/tests/unit/management/commands/test_move_messages.py b/tests/unit/management/commands/test_move_messages.py index 86bfa4b..9451996 100644 --- a/tests/unit/management/commands/test_move_messages.py +++ b/tests/unit/management/commands/test_move_messages.py @@ -6,10 +6,12 @@ from pytest_mock import MockFixture -def test_should_call_logic_to_send_one_destination_to_another(mocker): - mock_send_message_from_one_destination_to_another = mocker.patch( - "django_stomp.management.commands.move_messages.send_message_from_one_destination_to_another" - ) +@pytest.fixture +def mock_send_message_from_one_destination_to_another(mocker: MockFixture): + return mocker.patch("django_stomp.management.commands.move_messages.send_message_from_one_destination_to_another") + + +def test_should_call_logic_to_send_one_destination_to_another(mock_send_message_from_one_destination_to_another): out = StringIO() fake_source = "/queue/your-source" fake_target = "/queue/your-target" @@ -22,10 +24,7 @@ def test_should_call_logic_to_send_one_destination_to_another(mocker): ) -def test_should_raise_error_if_essential_parameters_are_missing(mocker: MockFixture): - mock_send_message_from_one_destination_to_another = mocker.patch( - "django_stomp.management.commands.move_messages.send_message_from_one_destination_to_another" - ) +def test_should_raise_error_if_essential_parameters_are_missing(mock_send_message_from_one_destination_to_another): fake_source = "/queue/your-source" with pytest.raises(CommandError) as e: @@ -41,10 +40,9 @@ def test_should_raise_error_if_essential_parameters_are_missing(mocker: MockFixt mock_send_message_from_one_destination_to_another.assert_not_called() -def test_should_call_logic_to_send_one_destination_to_another_given_custom_broker(mocker: MockFixture): - mock_send_message_from_one_destination_to_another = mocker.patch( - "django_stomp.management.commands.move_messages.send_message_from_one_destination_to_another" - ) +def test_should_call_logic_to_send_one_destination_to_another_given_custom_broker( + mock_send_message_from_one_destination_to_another, +): out = StringIO() fake_source = "/queue/your-source" fake_target = "/queue/your-target" @@ -64,10 +62,9 @@ def test_should_call_logic_to_send_one_destination_to_another_given_custom_broke ) -def test_should_raise_error_given_custom_port_is_not_informed_to_use_custom_broker(mocker: MockFixture): - mock_send_message_from_one_destination_to_another = mocker.patch( - "django_stomp.management.commands.move_messages.send_message_from_one_destination_to_another" - ) +def test_should_raise_error_given_custom_port_is_not_informed_to_use_custom_broker( + mock_send_message_from_one_destination_to_another, +): out = StringIO() fake_source = "/queue/your-source" fake_target = "/queue/your-target" diff --git a/tests/unit/services/test_consumer.py b/tests/unit/services/test_consumer.py index ac5dc24..c0228fc 100644 --- a/tests/unit/services/test_consumer.py +++ b/tests/unit/services/test_consumer.py @@ -1,6 +1,8 @@ import json from uuid import uuid4 +from django_stomp import builder +from django_stomp.services.consumer import StompFrame from django_stomp.services.consumer import build_listener from tests.support.helpers import get_active_threads_name_with_prefix @@ -10,10 +12,12 @@ def test_should_create_at_most_the_defined_number_of_workers(mocker): listener = build_listener(f"some-destination-{uuid4()}", should_process_msg_on_background=True) - listener.on_message(headers={"message-id": "123"}, body=json.dumps({"someKey": 1})) - listener.on_message(headers={"message-id": "123"}, body=json.dumps({"someKey": 1})) - listener.on_message(headers={"message-id": "123"}, body=json.dumps({"someKey": 1})) - listener.on_message(headers={"message-id": "123"}, body=json.dumps({"someKey": 1})) + fake_frame = StompFrame(cmd="FAKE_CMD", headers={"message-id": "123"}, body=json.dumps({"someKey": 1})) + + listener.on_message(fake_frame) + listener.on_message(fake_frame) + listener.on_message(fake_frame) + listener.on_message(fake_frame) workers_threads = get_active_threads_name_with_prefix(listener._subscription_id) assert len(workers_threads) == 3 # 3 workers @@ -23,7 +27,9 @@ def test_should_create_at_most_the_defined_number_of_workers(mocker): def test_should_clean_up_worker_pool(): listener = build_listener(f"some-destination-{uuid4()}", should_process_msg_on_background=True) - listener.on_message(headers={"message-id": "123"}, body=json.dumps({"someKey": 1})) + fake_frame = StompFrame(cmd="FAKE_CMD", headers={"message-id": "123"}, body=json.dumps({"someKey": 1})) + + listener.on_message(fake_frame) workers_threads_before_pool_shutdown = get_active_threads_name_with_prefix(listener._subscription_id) assert len(workers_threads_before_pool_shutdown) == 1 # only one worker @@ -37,7 +43,9 @@ def test_should_clean_up_worker_pool(): def test_should_still_process_message_if_worker_pool_was_explicitly_shutdown(): listener = build_listener(f"some-destination-{uuid4()}", should_process_msg_on_background=True) - listener.on_message(headers={"message-id": "123"}, body=json.dumps({"someKey": 1})) + fake_frame = StompFrame(cmd="FAKE_CMD", headers={"message-id": "123"}, body=json.dumps({"someKey": 1})) + + listener.on_message(fake_frame) workers_threads_before_pool_shutdown = get_active_threads_name_with_prefix(listener._subscription_id) assert len(workers_threads_before_pool_shutdown) == 1 # only one worker @@ -46,9 +54,28 @@ def test_should_still_process_message_if_worker_pool_was_explicitly_shutdown(): workers_threads_after_pool_shutdown = get_active_threads_name_with_prefix(listener._subscription_id) assert len(workers_threads_after_pool_shutdown) == 0 # no active worker thread - listener.on_message(headers={"message-id": "123"}, body=json.dumps({"someKey": 1})) + listener.on_message(fake_frame) workers_threads_before_pool_shutdown = get_active_threads_name_with_prefix(listener._subscription_id) assert len(workers_threads_before_pool_shutdown) == 1 # only one worker listener.shutdown_worker_pool() + + +def test_should_have_only_one_django_stomp_listener_even_if_set_listener_is_called_multiple_times(): + # Arrange - build listener to some arbirtrary queue + django_stomp_listener = builder.build_listener(f"some-destination-{uuid4()}") + + # Act - in sucession, multiples _set_listener call to trigger _connection.set_listener + django_stomp_listener._set_listener() + django_stomp_listener._set_listener() + django_stomp_listener._set_listener() + django_stomp_listener._set_listener() + django_stomp_listener._set_listener() + + # Assert - it has two listeners (protocol-listener, stomp.py defined and the one defined by django-stomp) + assert len(django_stomp_listener._connection.transport.listeners) == 2 + assert django_stomp_listener._connection.get_listener("protocol-listener") is not None + + django_stomp_listener_id = django_stomp_listener._listener_id + assert django_stomp_listener._connection.get_listener(django_stomp_listener_id) is not None diff --git a/tests/unit/test_settings.py b/tests/unit/test_settings.py index 9b7fe57..28b517e 100644 --- a/tests/unit/test_settings.py +++ b/tests/unit/test_settings.py @@ -1,4 +1,5 @@ import pytest + from django_stomp.exceptions import DjangoStompImproperlyConfigured from django_stomp.settings import eval_as_int_if_provided_value_is_not_none_otherwise_none from django_stomp.settings import eval_settings_otherwise_raise_exception @@ -7,20 +8,21 @@ def test_should_raise_improperly_configured_when_settings_is_not_correct_configured(mocker): mocked_settings = mocker.patch("django_stomp.settings.django_settings") mocked_settings.STOMP_PROCESS_MSG_WORKERS = "abc" + expected_exception_message = "STOMP_PROCESS_MSG_WORKERS is not valid!" - with pytest.raises(DjangoStompImproperlyConfigured, match="STOMP_PROCESS_MSG_WORKERS is not valid!"): + with pytest.raises(DjangoStompImproperlyConfigured, match=expected_exception_message): eval_settings_otherwise_raise_exception( "STOMP_PROCESS_MSG_WORKERS", eval_as_int_if_provided_value_is_not_none_otherwise_none ) mocked_settings.STOMP_PROCESS_MSG_WORKERS = {} - with pytest.raises(DjangoStompImproperlyConfigured, match="STOMP_PROCESS_MSG_WORKERS is not valid!"): + with pytest.raises(DjangoStompImproperlyConfigured, match=expected_exception_message): eval_settings_otherwise_raise_exception( "STOMP_PROCESS_MSG_WORKERS", eval_as_int_if_provided_value_is_not_none_otherwise_none ) mocked_settings.STOMP_PROCESS_MSG_WORKERS = [] - with pytest.raises(DjangoStompImproperlyConfigured, match="STOMP_PROCESS_MSG_WORKERS is not valid!"): + with pytest.raises(DjangoStompImproperlyConfigured, match=expected_exception_message): eval_settings_otherwise_raise_exception( "STOMP_PROCESS_MSG_WORKERS", eval_as_int_if_provided_value_is_not_none_otherwise_none )