From ab16e84c854435ccb1e1917da3a54f159b01b0fb Mon Sep 17 00:00:00 2001 From: Kirill Kuzminykh Date: Thu, 13 Jun 2024 15:19:30 +0300 Subject: [PATCH] Fixed processing of heartbeats and a session expiration and addeds support of aiohttp_cors (#448) * - Fixed processing of heartbeats and a session expiration. - Fixed ping-pong based heartbeats for web-socket connections. - Added arguments ``heartbeat_delay`` and ``disconnect_delay`` into ``Session.__init__()``. - Added argument ``disconnect_delay`` into ``SessionManager.__init__()``. - **Breaking change:** Removed argument ``timeout`` from ``Session.__init__()`` and ``SessionManager.__init__()``. - **Breaking change:** Argument ``heartbeat`` of ``SessionManager.__init__()`` renamed into ``heartbeat_delay``. - **Breaking change:** ``Session.registry`` renamed into ``Session.app``. - **Breaking change:** Dropped support of Python < 3.7 * Deleted method ``SessionManager.route_url()``. * Added release date into CHANGES.rst * Removed commented code from tests and fixed CHANGES.rst * - Added argument ``cors_config`` into function ``add_endpoint()`` to support of CORS settings from ``aiohttp_cors``. - Function ``add_endpoint()`` now returns all registered routes. - Replaced returning instances of error HTTP responses on raising its as exceptions. - Changed name of some routes. * Fixed GitHub actions * Fixed GitHub actions * Fixed GitHub actions * Fixed version of flake8 to support Python 3.7 * Fixed typing * Reverted pytest-timeout * Removed embedded CORS processing. * Added arguments ``heartbeat_delay`` and ``disconnect_delay`` into function ``add_endpoint()``. * - Heartbeat task moved from ``SessionManager`` into ``Session``. - Methods ``_acquire`` and ``_release`` of ``Sessions`` renamed into ``acquire`` and ``release``. * Added processing of ``ConnectionError`` in ``StreamingTransport``. * Fixed name of transports. * - Changed arguments of handler function. Now handler function must be defined like ``async def handler(manager, session, msg):`` - Constants: - FRAME_OPEN - FRAME_CLOSE - FRAME_MESSAGE - FRAME_MESSAGE_BLOB - FRAME_HEARTBEAT replaced by ``Frame`` enums with corresponding values. - Constants: - MSG_OPEN - MSG_MESSAGE - MSG_CLOSE - MSG_CLOSED replaced by ``MsgType`` enums with corresponding values. - Constants: - STATE_NEW - STATE_OPEN - STATE_CLOSING - STATE_CLOSED replaced by ``SessionState`` enums with corresponding values. * Fixed error processing in StreamingTransport * Changed supported versions of Python. * Added "sockjs_transport_name" into request object. * - Updated dependencies versions. - Fixed code style. * Fixed dependencies. * Fixed versions of GitHub actions. --- .coveragerc | 7 - .github/workflows/check_and_test.yaml | 37 ++ .github/workflows/pythonpublish.yml | 6 +- .gitignore | 1 + .travis.yml | 31 -- CHANGES.rst | 60 +++ MANIFEST.in | 1 - Makefile | 21 +- README.rst | 2 +- docs/Makefile | 67 --- docs/api.rst | 23 - docs/conf.py | 185 -------- docs/index.rst | 18 - docs/install.rst | 58 --- docs/overview.rst | 175 -------- examples/chat.html | 172 ++++---- examples/chat.py | 41 +- requirements.txt | 20 +- setup.cfg | 9 +- setup.py | 32 +- sockjs-testsrv.py | 38 +- sockjs/__init__.py | 39 +- sockjs/protocol.py | 65 +-- sockjs/route.py | 244 ++++++----- sockjs/session.py | 494 +++++++++++---------- sockjs/transports/__init__.py | 28 +- sockjs/transports/base.py | 144 ++++--- sockjs/transports/eventsource.py | 23 +- sockjs/transports/htmlfile.py | 44 +- sockjs/transports/jsonp.py | 56 +-- sockjs/transports/rawwebsocket.py | 103 +++-- sockjs/transports/utils.py | 40 +- sockjs/transports/websocket.py | 134 ++++-- sockjs/transports/xhr.py | 37 -- sockjs/transports/xhr_pooling.py | 81 ++++ sockjs/transports/xhrsend.py | 43 -- sockjs/transports/xhrstreaming.py | 17 +- tests/asdf | 0 tests/conftest.py | 102 +++-- tests/test_route.py | 125 ++++-- tests/test_session.py | 590 +++++++++++++++----------- tests/test_transport.py | 65 ++- tests/test_transport_eventsource.py | 15 +- tests/test_transport_htmlfile.py | 32 +- tests/test_transport_jsonp.py | 41 +- tests/test_transport_rawwebsocket.py | 22 +- tests/test_transport_websocket.py | 52 +-- tests/test_transport_xhr.py | 18 +- tests/test_transport_xhrsend.py | 27 +- tests/test_transport_xhrstreaming.py | 14 +- 50 files changed, 1797 insertions(+), 1902 deletions(-) delete mode 100644 .coveragerc create mode 100644 .github/workflows/check_and_test.yaml delete mode 100644 .travis.yml delete mode 100644 docs/Makefile delete mode 100644 docs/api.rst delete mode 100644 docs/conf.py delete mode 100644 docs/index.rst delete mode 100644 docs/install.rst delete mode 100644 docs/overview.rst delete mode 100644 sockjs/transports/xhr.py create mode 100644 sockjs/transports/xhr_pooling.py delete mode 100644 sockjs/transports/xhrsend.py delete mode 100644 tests/asdf diff --git a/.coveragerc b/.coveragerc deleted file mode 100644 index 639d298c..00000000 --- a/.coveragerc +++ /dev/null @@ -1,7 +0,0 @@ -[run] -branch = True -source = sockjs, tests -omit = site-packages - -[html] -directory = coverage diff --git a/.github/workflows/check_and_test.yaml b/.github/workflows/check_and_test.yaml new file mode 100644 index 00000000..04aa843e --- /dev/null +++ b/.github/workflows/check_and_test.yaml @@ -0,0 +1,37 @@ +name: Check and Test + +on: + push: + branches: [ "master" ] + pull_request: + branches: [ "master" ] + workflow_dispatch: {} + + +jobs: + run_tests: + strategy: + matrix: + python-version: [ "3.10", "3.11", "3.12" ] + + name: Test on Python ${{ matrix.python-version }} + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + + - uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + architecture: x64 + + - name: Install dependencies + run: | + python -m pip install --upgrade setuptools + python -m pip install -r requirements.txt + python -m pip install -e .[test] + + - name: Run checks and tests + run: | + make flake + pytest ./tests diff --git a/.github/workflows/pythonpublish.yml b/.github/workflows/pythonpublish.yml index b143a530..bb2a83e0 100644 --- a/.github/workflows/pythonpublish.yml +++ b/.github/workflows/pythonpublish.yml @@ -8,15 +8,15 @@ jobs: deploy: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 - name: Set up Python - uses: actions/setup-python@v1 + uses: actions/setup-python@v5 with: python-version: '3.x' - name: Install dependencies run: | python -m pip install --upgrade pip - pip install setuptools wheel twine + python -m pip install setuptools wheel twine - name: Build and publish env: TWINE_USERNAME: ${{ secrets.PYPI_USERNAME }} diff --git a/.gitignore b/.gitignore index 7bb7b93d..8a7d23b7 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,7 @@ eggs sources dist develop-eggs +build *.egg-info *.pyc *.pyo diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index 38727d34..00000000 --- a/.travis.yml +++ /dev/null @@ -1,31 +0,0 @@ -language: python -python: - - 3.5 - - 3.6 - - 3.7 - - 3.8 -install: - - pip install --upgrade setuptools - - pip install -r requirements.txt - - pip install codecov - - python setup.py develop - -script: - - make cov - - python setup.py check -rms - - -after_success: - - - codecov - -deploy: - provider: pypi - user: aio-libs-bot - password: - secure: "En+U4kq5LKMrOQ2g8qGBLRbTPEfgNsLIcAfs264LjAtSb9Rc8g62wJ3s2Vob5f8k848ZcBmBh+K5M75gNIYZtP2wU1p4sHvTnz5bxSkdgN9A7gdgS8UdxwbIO6dUBGz3YTlXNcbRmiSbA8CL7M8ULswVtqTwpr864q80TBlj49+HbfszI2wb9UOu7kGW9i1qFtCRSagqe6V1MWhRc0H5nD8WmwzmlxMtllJfubDA4EpqAwAPPwfxYP7QQgo8L3e5CBcbDmLwvvjXkxrOpp6yae2003AWJXFNcygcpo2mt1BRe8/bXtIxXLET0djP7sj+3yu5XTksAI2JTcGVW9PZUr+NmhTeKTrAFQ+7qW+QNWQRRYlS5SokhOEoTRPRsH2D9kYKFy0wteBNNe2TD4o09KsqQHwY+Lpr5nlJUkX5HXBrHoGSQW9lNaQEq7nutpEwiTBCAdINjmfjKMxHIXfy93XfjR2wwGIoUr94i+yWG/zIkCKwr31s5CvfRbHmntU/jFTk6cqSTfzphk+7XEMWQlw8tRh55b641IY4/PMXqSXx8oNpoK4/lvrKG0KP4wSBBLjIOVSq46VPij3YQjnN2EzqECKess2D6Wrec3JaPukLtjCnOymbMq72BstnRI41THrL6bNpyUc7OkXL9NwoU6TNSXdMZoVA2lu6nRE6F+w=" - distributions: "sdist bdist_wheel" - on: - tags: true - all_branches: true - python: 3.6 diff --git a/CHANGES.rst b/CHANGES.rst index 7eb64c5e..bb3e0fe7 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -2,6 +2,66 @@ CHANGES ======= +0.13.0 (not-released) +--------------------- + +- Added argument ``cors_config`` into function ``add_endpoint()`` + to support of CORS settings from ``aiohttp_cors``. +- Added arguments ``heartbeat_delay`` and ``disconnect_delay`` + into function ``add_endpoint()``. +- Function ``add_endpoint()`` now returns all registered routes. +- Replaced returning instances of error HTTP responses + on raising its as exceptions. +- Changed name of some routes. +- Heartbeat task moved from ``SessionManager`` into ``Session``. +- Methods ``_acquire`` and ``_release`` of ``Sessions`` renamed into + ``acquire`` and ``release``. +- Added processing of ``ConnectionError`` in ``StreamingTransport``. +- Changed arguments of handler function. Now handler function must be defined + like ``async def handler(manager, session, msg):`` +- Constants: + + - FRAME_OPEN + - FRAME_CLOSE + - FRAME_MESSAGE + - FRAME_MESSAGE_BLOB + - FRAME_HEARTBEAT + + replaced by ``Frame`` enums with corresponding values. +- Constants: + + - MSG_OPEN + - MSG_MESSAGE + - MSG_CLOSE + - MSG_CLOSED + + replaced by ``MsgType`` enums with corresponding values. +- Constants: + + - STATE_NEW + - STATE_OPEN + - STATE_CLOSING + - STATE_CLOSED + + replaced by ``SessionState`` enums with corresponding values. + + +0.12.0 (2022-02-08) +------------------- + +- **Breaking change:** Removed argument ``timeout`` from ``Session.__init__()`` + and ``SessionManager.__init__()``. +- **Breaking change:** Argument ``heartbeat`` of ``SessionManager.__init__()`` + renamed into ``heartbeat_delay``. +- **Breaking change:** ``Session.registry`` renamed into ``Session.app``. +- **Breaking change:** Deleted method ``SessionManager.route_url()``. +- **Breaking change:** Dropped support of Python < 3.7 +- Fixed processing of heartbeats and a session expiration. +- Fixed ping-pong based heartbeats for web-socket connections. +- Added arguments ``heartbeat_delay`` and ``disconnect_delay`` into + ``Session.__init__()``. +- Added argument ``disconnect_delay`` into ``SessionManager.__init__()``. + 0.11.0 (2020-10-22) ------------------- diff --git a/MANIFEST.in b/MANIFEST.in index 831624aa..fcbcab7e 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -4,7 +4,6 @@ include README.rst include Makefile include sockjs-testsrv.py graft sockjs -graft docs graft examples graft tests global-exclude *.pyc diff --git a/Makefile b/Makefile index aba1428b..2db017a9 100644 --- a/Makefile +++ b/Makefile @@ -4,18 +4,10 @@ FLAGS= flake: -# python setup.py check -rms flake8 sockjs tests examples - if python -c "import sys; sys.exit(sys.version_info<(3,6))"; then \ - black --check sockjs tests setup.py; \ - fi - -fmt: - black sockjs tests setup.py - develop: - python setup.py develop + pip install -e .[test] test: flake develop pytest $(FLAGS) ./tests/ @@ -23,10 +15,6 @@ test: flake develop vtest: flake develop pytest -s -v $(FLAGS) ./tests/ -cov cover coverage: flake develop - @py.test --cov=sockjs --cov-report=term --cov-report=html tests - @echo "open file://`pwd`/coverage/index.html" - clean: rm -rf `find . -name __pycache__` rm -f `find . -type f -name '*.py[co]' ` @@ -40,11 +28,6 @@ clean: rm -rf coverage rm -rf build rm -rf cover - make -C docs clean python setup.py clean -doc: - make -C docs html - @echo "open file://`pwd`/docs/_build/html/index.html" - -.PHONY: all build venv flake test vtest testloop cov clean doc +.PHONY: all flake test vtest clean diff --git a/README.rst b/README.rst index 10625f10..c3cf70bd 100644 --- a/README.rst +++ b/README.rst @@ -73,7 +73,7 @@ Supported transports Requirements ------------ -- Python 3.5.3 +- Python 3.10.0 - gunicorn 19.2.0 diff --git a/docs/Makefile b/docs/Makefile deleted file mode 100644 index 8ac72307..00000000 --- a/docs/Makefile +++ /dev/null @@ -1,67 +0,0 @@ -# Makefile for Sphinx documentation -# - -# You can set these variables from the command line. -SPHINXOPTS = -W -SPHINXBUILD = ../../../bin/sphinx-build -PAPER = - -# Internal variables. -PAPEROPT_a4 = -D latex_paper_size=a4 -PAPEROPT_letter = -D latex_paper_size=letter -ALLSPHINXOPTS = -d _build/doctrees $(PAPEROPT_$(PAPER)) $(SPHINXOPTS) . - -.PHONY: help clean html web pickle htmlhelp latex changes linkcheck - -help: - @echo "Please use \`make ' where is one of" - @echo " html to make standalone HTML files" - @echo " pickle to make pickle files (usable by e.g. sphinx-web)" - @echo " htmlhelp to make HTML files and a HTML help project" - @echo " changes to make an overview over all changed/added/deprecated items" - @echo " linkcheck to check all external links for integrity" - -clean: - -rm -rf _build/* - -html: - mkdir -p _build/html _build/doctrees - $(SPHINXBUILD) -b html $(ALLSPHINXOPTS) _build/html - @echo - @echo "Build finished. The HTML pages are in _build/html." - -text: - mkdir -p _build/text _build/doctrees - $(SPHINXBUILD) -b text $(ALLSPHINXOPTS) _build/text - @echo - @echo "Build finished. The HTML pages are in _build/text." - -pickle: - mkdir -p _build/pickle _build/doctrees - $(SPHINXBUILD) -b pickle $(ALLSPHINXOPTS) _build/pickle - @echo - @echo "Build finished; now you can process the pickle files or run" - @echo " sphinx-web _build/pickle" - @echo "to start the sphinx-web server." - -web: pickle - -htmlhelp: - mkdir -p _build/htmlhelp _build/doctrees - $(SPHINXBUILD) -b htmlhelp $(ALLSPHINXOPTS) _build/htmlhelp - @echo - @echo "Build finished; now you can run HTML Help Workshop with the" \ - ".hhp project file in _build/htmlhelp." - -changes: - mkdir -p _build/changes _build/doctrees - $(SPHINXBUILD) -b changes $(ALLSPHINXOPTS) _build/changes - @echo - @echo "The overview file is in _build/changes." - -linkcheck: - mkdir -p _build/linkcheck _build/doctrees - $(SPHINXBUILD) -b linkcheck $(ALLSPHINXOPTS) _build/linkcheck - @echo - @echo "Link check complete; look for any errors in the above output " \ - "or in _build/linkcheck/output.txt." diff --git a/docs/api.rst b/docs/api.rst deleted file mode 100644 index fa0eb87c..00000000 --- a/docs/api.rst +++ /dev/null @@ -1,23 +0,0 @@ -API -=== - - -.. automodule:: pyramid_sockjs - -.. autofunction:: get_session_manager - -Session states: - - .. py:data:: STATE_NEW - - .. py:data:: STATE_OPEN - - .. py:data:: STATE_CLOSING - - .. py:data:: STATE_CLOSED - -.. autoclass:: Session(id) - :members: - -.. autoclass:: SessionManager - :members: diff --git a/docs/conf.py b/docs/conf.py deleted file mode 100644 index f7c2b044..00000000 --- a/docs/conf.py +++ /dev/null @@ -1,185 +0,0 @@ -# -*- coding: utf-8 -*- -# -# ptah documentation build configuration file -# -# This file is execfile()d with the current directory set to its containing -# dir. -# -# The contents of this file are pickled, so don't put values in the -# namespace that aren't pickleable (module imports are okay, they're -# removed automatically). -# -# All configuration values have a default value; values that are commented -# out serve to show the default value. - - -# General configuration -# --------------------- - -# Add any Sphinx extension module names here, as strings. They can be -# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom ones. -extensions = ['sphinx.ext.intersphinx', - 'sphinx.ext.autodoc'] - -# Add any paths that contain templates here, relative to this directory. -templates_path = ['.templates'] - -# The suffix of source filenames. -source_suffix = '.rst' - -# The master toctree document. -master_doc = 'index' - -# General substitutions. -project = 'pyramid_sockjs' - -# The default replacements for |version| and |release|, also used in various -# other places throughout the built documents. -# -# The short X.Y version. -version = '2.0dev1' -# The full version, including alpha/beta/rc tags. -release = version - -# There are two options for replacing |today|: either, you set today to -# some non-false value, then it is used: -#today = '' -# Else, today_fmt is used as the format for a strftime call. -today_fmt = '%B %d, %Y' - -# List of documents that shouldn't be included in the build. -#unused_docs = ['_themes/README'] - -# List of directories, relative to source directories, that shouldn't be -# searched for source files. -#exclude_dirs = [] - -# The reST default role (used for this markup: `text`) to use for all -# documents. -#default_role = None - -# If true, '()' will be appended to :func: etc. cross-reference text. -#add_function_parentheses = True - -# If true, the current module name will be prepended to all description -# unit titles (such as .. function::). -#add_module_names = True - -# If true, sectionauthor and moduleauthor directives will be shown in the -# output. They are ignored by default. -#show_authors = False - -# The name of the Pygments (syntax highlighting) style to use. -pygments_style = 'sphinx' - - -# Options for HTML output -# ----------------------- - -#sys.path.append(os.path.abspath('_themes')) -#html_theme_path = ['_themes'] -#html_theme = 'pylons' - -# The style sheet to use for HTML and HTML Help pages. A file of that name -# must exist either in Sphinx' static/ path, or in one of the custom paths -# given in html_static_path. -#html_style = 'pylons.css' - -# The name for this set of Sphinx documents. If None, it defaults to -# " v documentation". -#html_title = None - -# A shorter title for the navigation bar. Default is the same as -# html_title. -#html_short_title = None - -# The name of an image file (within the static path) to place at the top of -# the sidebar. -#html_logo = '.static/logo_hi.gif' - -# The name of an image file (within the static path) to use as favicon of -# the docs. This file should be a Windows icon file (.ico) being 16x16 or -# 32x32 pixels large. -#html_favicon = None - -# Add any paths that contain custom static files (such as style sheets) -# here, relative to this directory. They are copied after the builtin -# static files, so a file named "default.css" will overwrite the builtin -# "default.css". -#html_static_path = ['.static'] - -# If not '', a 'Last updated on:' timestamp is inserted at every page -# bottom, using the given strftime format. -html_last_updated_fmt = '%b %d, %Y' - -# If true, SmartyPants will be used to convert quotes and dashes to -# typographically correct entities. -#html_use_smartypants = True - -# Custom sidebar templates, maps document names to template names. -#html_sidebars = {} - -# Additional templates that should be rendered to pages, maps page names to -# template names. -#html_additional_pages = {} - -# If false, no module index is generated. -#html_use_modindex = True - -# If false, no index is generated. -#html_use_index = True - -# If true, the index is split into individual pages for each letter. -#html_split_index = False - -# If true, the reST sources are included in the HTML build as -# _sources/. -#html_copy_source = True - -# If true, an OpenSearch description file will be output, and all pages -# will contain a tag referring to it. The value of this option must -# be the base URL from which the finished HTML is served. -#html_use_opensearch = '' - -# If nonempty, this is the file name suffix for HTML files (e.g. ".xhtml"). -#html_file_suffix = '' - -# Output file base name for HTML help builder. -htmlhelp_basename = 'atemplatedoc' - - -# Options for LaTeX output -# ------------------------ - -# The paper size ('letter' or 'a4'). -#latex_paper_size = 'letter' - -# The font size ('10pt', '11pt' or '12pt'). -#latex_font_size = '10pt' - -# Grouping the document tree into LaTeX files. List of tuples -# (source start file, target name, title, -# author, document class [howto/manual]). -latex_documents = [ - ('index', 'atemplate.tex', 'Pyramid SockJS documentation', - 'Developers', 'manual'), -] - -# The name of an image file (relative to this directory) to place at the -# top of the title page. -latex_logo = '.static/logo_hi.gif' - -# For "manual" documents, if this is true, then toplevel headings are -# parts, not chapters. -#latex_use_parts = False - -# Additional stuff for the LaTeX preamble. -#latex_preamble = '' - -# Documents to append as an appendix to all manuals. -#latex_appendices = [] - -# If false, no module index is generated. -#latex_use_modindex = True - -#autoclass_content = 'both' diff --git a/docs/index.rst b/docs/index.rst deleted file mode 100644 index a70d01cd..00000000 --- a/docs/index.rst +++ /dev/null @@ -1,18 +0,0 @@ -Pyramid SockJS -============== - -.. toctree:: - :maxdepth: 2 - - overview.rst - install.rst - api.rst - - -Indices and tables ------------------- - -* :ref:`genindex` -* :ref:`modindex` -* :ref:`search` - diff --git a/docs/install.rst b/docs/install.rst deleted file mode 100644 index 21835623..00000000 --- a/docs/install.rst +++ /dev/null @@ -1,58 +0,0 @@ -============ -Installation -============ - -virtualenv -========== - - -1. Install virtualenv:: - - $ wget https://raw.github.com/pypa/virtualenv/master/virtualenv.py - $ python2.7 ./virtualenv.py --no-site-packages sockjs - -2. Install gevent 1.0b2 (non-Windows users):: - - $ ./sockjs/bin/pip install http://gevent.googlecode.com/files/gevent-1.0b2.tar.gz - -2. Install gevent 1.0b2 (Windows users, presuming you are running 32bit Python 2.7):: - - $ ./sockjs/Scripts/easy_install http://gevent.googlecode.com/files/gevent-1.0b2-py2.7-win32.egg - -3. Clone pyramid_sockjs from github and then install:: - - $ git clone https://github.com/fafhrd91/pyramid_sockjs.git - $ cd pyramid_sockjs - $ ../sockjs/bin/python setup.py develop - - -Server config -============= - -To use gevent based server use following configuration -for server section:: - - [server:main] - use = egg:pyramid_sockjs#server - host = 127.0.0.1 - port = 8080 - -To use gunicorn server use following configuation for server section, -gunicorn 0.14.3 or greater is required:: - - [server:main] - use = egg:gunicorn - host = 127.0.0.1 - port = 8080 - workers = 1 - worker_class = gevent - - -Chat example -============ - -You can run `chat` example with following command. It doesnt require -any configuration, it runs on host ``127.0.0.1`` and port ``8080``:: - - - $ ./sockjs/bin/python ./pyramid_sockjs/examples/chat.py diff --git a/docs/overview.rst b/docs/overview.rst deleted file mode 100644 index 9d5110ef..00000000 --- a/docs/overview.rst +++ /dev/null @@ -1,175 +0,0 @@ -Pyramid SockJS -============== - -Overview --------- - -Gevent-based SockJS integration for Pyramid. SockJS interface is -implemented as pyramid route. It runs inside wsgi app rather than wsgi server. -It's possible to create any number of different sockjs routes, ie -`/__sockjs__/*` or `/mycustom-sockjs/*`. also you can provide different -session implementation and management for each of sockjs routes. - -Gevent based server is required for ``pyramid_sockjs``. -For example ``gunicorn`` with gevent worker. ``pyramid_sockjs`` provides -simple paster server runner: - -.. code-block:: text - :linenos: - - [server:main] - use = egg:pyramid_sockjs#server - host = 0.0.0.0 - port = 8080 - -Example of sockjs route: - -.. code-block:: python - - def main(global_settings, **settings): - config = Configurator(settings=settings) - config.add_sockjs_route() - - return config.make_wsgi_app() - -By default :py:func:`add_sockjs_route` directive creates sockjs route -with empty name and prefix ``/__sockjs__``, so js client code should look like: - - -.. code-block:: javascript - - - - - -All interactions between client and server happen through `Sessions`. -Its possible to override default session with custom implementation. -Default session is very stupid, its even not possible to receive -client messages, so in most cases it is required to replace session. -Let's implement `echo` session as example: - -.. code-block:: python - - from pyramid_sockjs.session import Session - - class EchoSession(Session): - - def on_open(self): - self.send('Hello') - self.manager.broadcast("Someone joined.") - - def on_message(self, message): - self.send(message) - - def on_close(self): - self.manager.broadcast("Someone left.") - -To use custom session implementation pass it to :py:func:`add_sockjs_route` -directive: - -.. code-block:: python - - def main(global_settings, **settings): - config = Configurator(settings=settings) - - config.add_sockjs_route(session=EchoSession) - - return config.make_wsgi_app() - - -Sessions are managed by ``SessionManager``, each sockjs route has separate -session manager. Session manage is addressed by same name as sockjs route. -To get session manager use :py:func:`get_sockjs_manager` -request function. - -.. code-block:: python - - def main(...): - ... - config.add_sockjs_route('chat-service') - ... - config.add_route('broadcast', '/broadcast') - ... - return config.make_wsgi_app() - - - @view_config(route_name='broadcast', renderer='string') - def send_message(request): - message = request.GET.get('message') - if message: - manager = request.get_sockjs_manager('chat-service') - for session in manager.active_session(): - session.send(message) - - return 'Message has been sent' - - -To use custom ``SessionManager`` pass it as `session_manager=` argument -to :py:func:`add_sockjs_route` configurator directive. -Check :py:class:`pyramid_sockjs.Session` -and :py:class:`pyramid_sockjs.SessionManager` api for -detailed description. - - -Supported transports --------------------- - -* websocket (`hixie-76 `_ - and `hybi-10 `_) -* `xhr-streaming `_ -* `xhr-polling `_ -* `iframe-xhr-polling `_ -* iframe-eventsource (`EventSource `_ used from an. - `iframe via postMessage `_) -* iframe-htmlfile (`HtmlFile `_ - used from an `iframe via postMessage `_.) -* `jsonp-polling `_ - - -Limitations ------------ - -Pyramid sockjs does not support multple websocket session with same session id. - -gevent does not support Python 3 - -Requirements ------------- - -- Python 2.6/2.7 - -- `virtualenv `_ - -- `gevent 1.0b1 or greater `_ - -- `gevent-websocket 0.3.0 or greater `_ - -- `gunicorn 0.14.3 or greater `_ - - -Examples --------- - -You can find them in the `examples` repository at github. - -https://github.com/fafhrd91/pyramid_sockjs/tree/master/examples - - -License -------- - -pyramid_sockjs is offered under the BSD license. diff --git a/examples/chat.html b/examples/chat.html index b576a37b..7c126637 100644 --- a/examples/chat.html +++ b/examples/chat.html @@ -2,88 +2,98 @@ - + +

Chat!

@@ -129,8 +139,8 @@

Chat!

- - + +
diff --git a/examples/chat.py b/examples/chat.py index efba4801..e98f0cab 100644 --- a/examples/chat.py +++ b/examples/chat.py @@ -1,24 +1,28 @@ import logging import os +import aiohttp_cors from aiohttp import web import sockjs +from sockjs import SessionManager, Session, SockjsMessage, MsgType CHAT_FILE = open( os.path.join(os.path.dirname(__file__), 'chat.html'), 'rb').read() -async def chat_msg_handler(msg, session): - if session.manager is None: - return - if msg.type == sockjs.MSG_OPEN: - session.manager.broadcast("Someone joined.") - elif msg.type == sockjs.MSG_MESSAGE: - session.manager.broadcast(msg.data) - elif msg.type == sockjs.MSG_CLOSED: - session.manager.broadcast("Someone left.") +async def chat_msg_handler( + manager: SessionManager, + session: Session, + msg: SockjsMessage, +): + if msg.type == MsgType.OPEN: + manager.broadcast("Someone joined.") + elif msg.type == MsgType.MESSAGE: + manager.broadcast(msg.data) + elif msg.type == MsgType.CLOSED: + manager.broadcast("Someone left.") def index(request): @@ -32,6 +36,23 @@ def index(request): app = web.Application() app.router.add_route('GET', '/', index) - sockjs.add_endpoint(app, chat_msg_handler, name='chat', prefix='/sockjs/') + + # Configure default CORS settings. + cors = aiohttp_cors.setup(app, defaults={ + '*': aiohttp_cors.ResourceOptions( + allow_credentials=True, + expose_headers='*', + allow_headers='*', + max_age=31536000, + ) + }) + + sockjs.add_endpoint( + app, + chat_msg_handler, + name='chat', + prefix='/sockjs/', + cors_config=cors, + ) web.run_app(app) diff --git a/requirements.txt b/requirements.txt index 571669f6..a2afe0e8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,12 +1,8 @@ -black==20.8b1; python_version>="3.6" -flake8==3.8.4 -docutils==0.16 -pytest==6.2.1 -pytest-aiohttp==0.3.0 -pytest-cov==2.10.1 -pytest-sugar==0.9.4 -pytest-mock==3.4.0 -pytest-timeout==1.4.2 -sphinx==3.4.1 -aiohttp==3.7.4 --e . +flake8==7.0.0 +pytest==8.2.2 +pytest-aiohttp==1.0.5 +pytest-mock==3.14.0 +pytest-timeout==2.3.1 +aiohttp==3.9.5 +twine==5.1.0 +-e .[test] diff --git a/setup.cfg b/setup.cfg index 6c896046..a2ed1736 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,11 +1,14 @@ [easy_install] zip_ok = false + [flake8] -ignore = N801,N802,N803,E226 +ignore = N801,N802,N803,E226,W503 max-line-length = 88 + [tool:pytest] timeout = 3 -filterwarnings= - error +#filterwarnings= +# error +asyncio_mode = auto diff --git a/setup.py b/setup.py index 30a3f90b..2a0e4923 100644 --- a/setup.py +++ b/setup.py @@ -1,7 +1,9 @@ import codecs import os import re -from setuptools import setup, find_packages + +from setuptools import find_packages, setup + with codecs.open( os.path.join(os.path.abspath(os.path.dirname(__file__)), "sockjs", "__init__.py"), @@ -13,8 +15,6 @@ except IndexError: raise RuntimeError("Unable to determine version.") -install_requires = ["aiohttp >= 3.0.0"] - def read(f): return open(os.path.join(os.path.dirname(__file__), f)).read().strip() @@ -31,10 +31,9 @@ def read(f): "Intended Audience :: Developers", "Programming Language :: Python", "Programming Language :: Python :: 3", - "Programming Language :: Python :: 3.5", - "Programming Language :: Python :: 3.6", - "Programming Language :: Python :: 3.7", - "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", "Programming Language :: Python :: Implementation :: CPython", "Topic :: Internet :: WWW/HTTP", "Framework :: AsyncIO", @@ -44,8 +43,23 @@ def read(f): url="https://github.com/aio-libs/sockjs/", license="Apache 2", packages=find_packages(), - python_requires=">=3.5.3", - install_requires=install_requires, + python_requires=">=3.10.0", + install_requires=[ + "aiohttp>=3.7.4", + "async-timeout>=4.0.3", + ], + extras_require={ + "test": [ + "pytest", + "multidict", + "yarl", + "pytest-aiohttp", + "pytest-mock", + "pytest-timeout", + "cykooz.testing", + "aiohttp_cors", + ], + }, include_package_data=True, zip_safe=False, ) diff --git a/sockjs-testsrv.py b/sockjs-testsrv.py index 6c2a7bce..10541285 100644 --- a/sockjs-testsrv.py +++ b/sockjs-testsrv.py @@ -1,5 +1,5 @@ -import asyncio import logging + from aiohttp import web import sockjs @@ -8,24 +8,23 @@ from sockjs.transports.xhrstreaming import XHRStreamingTransport -async def echoSession(msg, session): - if msg.type == sockjs.MSG_MESSAGE: +async def echo_session(manager, session, msg): + if msg.type == sockjs.MsgType.MESSAGE: session.send(msg.data) -async def closeSessionHander(msg, session): - if msg.type == sockjs.MSG_OPEN: +async def close_session_handler(manager, session, msg): + if msg.type == sockjs.MsgType.OPEN: session.close() -async def broadcastSession(msg, session): - if msg.type == sockjs.MSG_OPEN: - session.manager.broadcast(msg.data) +async def broadcast_session(manager, session, msg): + if msg.type == sockjs.MsgType.OPEN: + manager.broadcast(msg.data) if __name__ == '__main__': """ Sockjs tests server """ - loop = asyncio.get_event_loop() logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(message)s') @@ -33,19 +32,24 @@ async def broadcastSession(msg, session): EventsourceTransport.maxsize = 4096 XHRStreamingTransport.maxsize = 4096 - app = web.Application(loop=loop) + app = web.Application() sockjs.add_endpoint( - app, echoSession, name='echo', prefix='/echo') + app, echo_session, name='echo', prefix='/echo' + ) sockjs.add_endpoint( - app, closeSessionHander, name='close', prefix='/close') + app, close_session_handler, name='close', prefix='/close' + ) sockjs.add_endpoint( - app, broadcastSession, name='broadcast', prefix='/broadcast') + app, broadcast_session, name='broadcast', prefix='/broadcast' + ) sockjs.add_endpoint( - app, echoSession, name='wsoff', prefix='/disabled_websocket_echo', + app, echo_session, name='wsoff', prefix='/disabled_websocket_echo', disable_transports=('websocket',)) + sockjs.add_endpoint( - app, echoSession, name='cookie', prefix='/cookie_needed_echo', - cookie_needed=True) + app, echo_session, name='cookie', prefix='/cookie_needed_echo', + cookie_needed=True + ) - web.run_app(app) + web.run_app(app, port=8081) diff --git a/sockjs/__init__.py b/sockjs/__init__.py index 090bdac1..b37cec96 100644 --- a/sockjs/__init__.py +++ b/sockjs/__init__.py @@ -1,27 +1,10 @@ -# pyramid_sockjs +from .exceptions import SessionIsAcquired, SessionIsClosed +from .protocol import SessionState, MsgType, Frame, SockjsMessage +from .route import add_endpoint, get_manager +from .session import Session, SessionManager -# Session, SessionManager are not imported - -from sockjs.session import Session -from sockjs.session import SessionManager -from sockjs.exceptions import SessionIsClosed -from sockjs.exceptions import SessionIsAcquired - -from sockjs.protocol import STATE_NEW -from sockjs.protocol import STATE_OPEN -from sockjs.protocol import STATE_CLOSING -from sockjs.protocol import STATE_CLOSED - -from sockjs.protocol import MSG_OPEN -from sockjs.protocol import MSG_MESSAGE -from sockjs.protocol import MSG_CLOSE -from sockjs.protocol import MSG_CLOSED - -from sockjs.route import get_manager, add_endpoint - - -__version__ = "0.11.0" +__version__ = "0.13.0" __all__ = ( "get_manager", @@ -30,12 +13,8 @@ "SessionManager", "SessionIsClosed", "SessionIsAcquired", - "STATE_NEW", - "STATE_OPEN", - "STATE_CLOSING", - "STATE_CLOSED", - "MSG_OPEN", - "MSG_MESSAGE", - "MSG_CLOSE", - "MSG_CLOSED", + "SessionState", + "MsgType", + "Frame", + "SockjsMessage", ) diff --git a/sockjs/protocol.py b/sockjs/protocol.py index cf9ba367..61c0868f 100644 --- a/sockjs/protocol.py +++ b/sockjs/protocol.py @@ -1,13 +1,19 @@ -import collections +import dataclasses +import enum import hashlib from datetime import datetime +from typing import Union + ENCODING = "utf-8" -STATE_NEW = 0 -STATE_OPEN = 1 -STATE_CLOSING = 2 -STATE_CLOSED = 3 + +@enum.unique +class SessionState(enum.Enum): + NEW = 0 + OPEN = 1 + CLOSING = 2 + CLOSED = 3 _days = ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"] @@ -62,11 +68,14 @@ def dthandler(obj): # Frames # ------ -FRAME_OPEN = "o" -FRAME_CLOSE = "c" -FRAME_MESSAGE = "a" -FRAME_MESSAGE_BLOB = "a1" -FRAME_HEARTBEAT = "h" + +@enum.unique +class Frame(enum.Enum): + OPEN = "o" + CLOSE = "c" + MESSAGE = "a" + MESSAGE_BLOB = "a1" + HEARTBEAT = "h" # ------------------ @@ -74,8 +83,8 @@ def dthandler(obj): IFRAME_HTML = """ - - + + \r\n" % dumps(text)).encode(ENCODING) - await self.response.write(blob) - - self.size += len(blob) - if self.size > self.maxsize: - return True - else: - return False + async def _send(self, text: str): + text = "\r\n" % dumps(text) + return await super()._send(text) async def process(self): request = self.request - try: - callback = request.query.get("c", None) - except Exception: - callback = request.GET.get("c", None) - + callback = request.query.get("c") if callback is None: - await self.session._remote_closed() - return web.HTTPInternalServerError(text='"callback" parameter required') + await self.manager.remote_closed(self.session) + raise web.HTTPInternalServerError(text='"callback" parameter required') elif not self.check_callback.match(callback): - await self.session._remote_closed() - return web.HTTPInternalServerError(text='invalid "callback" parameter') + await self.manager.remote_closed(self.session) + raise web.HTTPInternalServerError(text='invalid "callback" parameter') headers = ( (hdrs.CONTENT_TYPE, "text/html; charset=UTF-8"), @@ -61,10 +54,9 @@ async def process(self): (hdrs.CONNECTION, "close"), ) headers += session_cookie(request) - headers += cors_headers(request.headers) # open sequence (sockjs protocol) - resp = self.response = web.StreamResponse(headers=headers) + resp = self.response = web.StreamResponse(headers=MultiDict(headers)) await resp.prepare(self.request) await resp.write( b"".join((PRELUDE1, callback.encode("utf-8"), PRELUDE2, b" " * 1024)) diff --git a/sockjs/transports/jsonp.py b/sockjs/transports/jsonp.py index ed1d4ec8..3c2099cf 100644 --- a/sockjs/transports/jsonp.py +++ b/sockjs/transports/jsonp.py @@ -1,51 +1,50 @@ """jsonp transport""" + import re from urllib.parse import unquote_plus -from aiohttp import web, hdrs +from aiohttp import hdrs, web +from multidict import MultiDict from .base import StreamingTransport -from .utils import CACHE_CONTROL, session_cookie, cors_headers -from ..protocol import dumps, loads, ENCODING +from .utils import CACHE_CONTROL, session_cookie +from ..protocol import ENCODING, dumps, loads class JSONPolling(StreamingTransport): - + name = "jsonp-polling" + create_session = True + maxsize = 0 check_callback = re.compile(r"^[a-zA-Z0-9_\.]+$") callback = "" - async def send(self, text): - data = "/**/%s(%s);\r\n" % (self.callback, dumps(text)) - await self.response.write(data.encode(ENCODING)) - return True + async def _send(self, text: str): + text = "/**/%s(%s);\r\n" % (self.callback, dumps(text)) + return await super()._send(text) async def process(self): + manager = self.manager session = self.session request = self.request meth = request.method if request.method == hdrs.METH_GET: - try: - callback = self.callback = request.query.get("c") - except Exception: - callback = self.callback = request.GET.get("c") - + callback = self.callback = request.query.get("c") if not callback: - await self.session._remote_closed() - return web.HTTPInternalServerError(text='"callback" parameter required') + await self.manager.remote_closed(self.session) + raise web.HTTPInternalServerError(text='"callback" parameter required') elif not self.check_callback.match(callback): - await self.session._remote_closed() - return web.HTTPInternalServerError(text='invalid "callback" parameter') + await self.manager.remote_closed(self.session) + raise web.HTTPInternalServerError(text='invalid "callback" parameter') headers = ( (hdrs.CONTENT_TYPE, "application/javascript; charset=UTF-8"), (hdrs.CACHE_CONTROL, CACHE_CONTROL), ) headers += session_cookie(request) - headers += cors_headers(request.headers) - resp = self.response = web.StreamResponse(headers=headers) + resp = self.response = web.StreamResponse(headers=MultiDict(headers)) await resp.prepare(request) await self.handle_session() @@ -57,28 +56,33 @@ async def process(self): ctype = request.content_type.lower() if ctype == "application/x-www-form-urlencoded": if not data.startswith(b"d="): - return web.HTTPInternalServerError(text="Payload expected.") + raise web.HTTPInternalServerError(text="Payload expected.") data = unquote_plus(data[2:].decode(ENCODING)) else: data = data.decode(ENCODING) if not data: - return web.HTTPInternalServerError(text="Payload expected.") + raise web.HTTPInternalServerError(text="Payload expected.") try: messages = loads(data) except Exception: - return web.HTTPInternalServerError(text="Broken JSON encoding.") + raise web.HTTPInternalServerError(text="Broken JSON encoding.") - await session._remote_messages(messages) + await manager.remote_messages(session, messages) headers = ( - (hdrs.CONTENT_TYPE, "text/html;charset=UTF-8"), + (hdrs.CONTENT_TYPE, "text/plain;charset=UTF-8"), (hdrs.CACHE_CONTROL, CACHE_CONTROL), ) headers += session_cookie(request) - return web.Response(body=b"ok", headers=headers) + return web.Response(body=b"ok", headers=MultiDict(headers)) else: - return web.HTTPBadRequest(text="No support for such method: %s" % meth) + raise web.HTTPBadRequest(text="No support for such method: %s" % meth) + + +class JSONPollingSend(JSONPolling): + name = "jsonp-polling" + create_session = False diff --git a/sockjs/transports/rawwebsocket.py b/sockjs/transports/rawwebsocket.py index 3997952d..15415ab3 100644 --- a/sockjs/transports/rawwebsocket.py +++ b/sockjs/transports/rawwebsocket.py @@ -1,78 +1,127 @@ """raw websocket transport.""" -import asyncio -from aiohttp import web +import asyncio from asyncio import ensure_future +from typing import Optional +from uuid import uuid4 + +from aiohttp import web +from async_timeout import timeout from .base import Transport +from .utils import cancel_tasks from ..exceptions import SessionIsClosed -from ..protocol import FRAME_CLOSE, FRAME_MESSAGE, FRAME_MESSAGE_BLOB, FRAME_HEARTBEAT +from ..protocol import Frame +from ..session import Session, SessionManager class RawWebSocketTransport(Transport): - async def server(self, ws, session): + name = "websocket-raw" + heartbeat_timeout = 10 + + @classmethod + def get_session(cls, manager: SessionManager, session_id: str) -> Session: + # For WebSockets, as opposed to other transports, it is valid to + # reuse `session_id`. The lifetime of SockJS WebSocket session is + # defined by a lifetime of underlying WebSocket connection. It is + # correct to have two separate sessions sharing the same + # `session_id` at the same time. + + # Generate unique session_id based on given ID. + orig_session_id = session_id + while session_id in manager.sessions: + session_id = "%s-%s" % (orig_session_id, uuid4().hex[-8:]) + return super().get_session(manager, session_id) + + def __init__(self, manager: SessionManager, session: Session, request: web.Request): + super().__init__(manager, session, request) + self._pong_event = asyncio.Event() + self._wait_pong_task: Optional[asyncio.Task] = None + + async def server(self, ws: web.WebSocketResponse): while True: try: - frame, data = await session._wait(pack=False) + frame, data = await self.session.get_frame(pack=False) except SessionIsClosed: break - if frame == FRAME_MESSAGE: + if frame == Frame.MESSAGE: for text in data: await ws.send_str(text) - elif frame == FRAME_MESSAGE_BLOB: + elif frame == Frame.MESSAGE_BLOB: data = data[1:] if data.startswith("["): data = data[1:-1] await ws.send_str(data) - elif frame == FRAME_HEARTBEAT: + elif frame == Frame.HEARTBEAT: await ws.ping() - elif frame == FRAME_CLOSE: + if self._wait_pong_task is None: + self._wait_pong_task = asyncio.create_task(self._wait_pong()) + self._wait_pong_task.add_done_callback(self._wait_done_callback) + elif frame == Frame.CLOSE: try: - await ws.close(message="Go away!") + await ws.close(message=b"Go away!") finally: - await session._remote_closed() + await self.manager.remote_closed(self.session) + + async def _wait_pong(self): + try: + async with timeout(self.heartbeat_timeout): + await self._pong_event.wait() + except asyncio.TimeoutError: + self.session.close(3000, "No response from heartbeat") + finally: + self._pong_event.clear() + + def _wait_done_callback(self, _): + self._wait_pong_task = None - async def client(self, ws, session): + async def client(self, ws: web.WebSocketResponse): while True: msg = await ws.receive() + if self._wait_pong_task is not None: + self._pong_event.set() if msg.type == web.WSMsgType.text: if not msg.data: continue - await self.session._remote_message(msg.data) + await self.manager.remote_message(self.session, msg.data) elif msg.type == web.WSMsgType.close: - await self.session._remote_close() + await self.manager.remote_close(self.session) elif msg.type in (web.WSMsgType.closed, web.WSMsgType.closing): - await self.session._remote_closed() + await self.manager.remote_closed(self.session) break elif msg.type == web.WSMsgType.PONG: - self.session._tick() + self.session.tick() + elif msg.type == web.WSMsgType.PING: + await ws.pong(msg.data) + self.session.tick() async def process(self): # start websocket connection - ws = self.ws = web.WebSocketResponse(autoping=False) + ws = web.WebSocketResponse(autoping=False) await ws.prepare(self.request) try: - await self.manager.acquire(self.session) + await self.manager.acquire(self.session, self.request) except Exception: # should use specific exception - await ws.close(message="Go away!") + await ws.close(message=b"Go away!") return ws - server = ensure_future(self.server(ws, self.session)) - client = ensure_future(self.client(ws, self.session)) + server = ensure_future(self.server(ws)) + client = ensure_future(self.client(ws)) try: - await asyncio.wait((server, client), return_when=asyncio.FIRST_COMPLETED) + await asyncio.wait( + (server, client), + return_when=asyncio.FIRST_COMPLETED, + ) except asyncio.CancelledError: raise except Exception as exc: - await self.session._remote_close(exc) + await self.manager.remote_close(self.session, exc) finally: + self.session.expire() await self.manager.release(self.session) - if not server.done(): - server.cancel() - if not client.done(): - client.cancel() + await cancel_tasks(server, client, self._wait_pong_task) return ws diff --git a/sockjs/transports/utils.py b/sockjs/transports/utils.py index b70da21e..2033acb8 100644 --- a/sockjs/transports/utils.py +++ b/sockjs/transports/utils.py @@ -1,23 +1,12 @@ +import asyncio import http.cookies -from aiohttp import hdrs from datetime import datetime, timedelta +import async_timeout +from aiohttp import hdrs -CACHE_CONTROL = "no-store, no-cache, no-transform, must-revalidate, max-age=0" - - -def cors_headers(headers, nocreds=False): - origin = headers.get(hdrs.ORIGIN, "*") - cors = ((hdrs.ACCESS_CONTROL_ALLOW_ORIGIN, origin),) - - ac_headers = headers.get(hdrs.ACCESS_CONTROL_REQUEST_HEADERS) - if ac_headers: - cors += ((hdrs.ACCESS_CONTROL_ALLOW_HEADERS, ac_headers),) - if origin != "*": - return cors + ((hdrs.ACCESS_CONTROL_ALLOW_CREDENTIALS, "true"),) - else: - return cors +CACHE_CONTROL = "no-store, no-cache, no-transform, must-revalidate, max-age=0" def session_cookie(request): @@ -30,7 +19,7 @@ def session_cookie(request): td365 = timedelta(days=365) td365seconds = str( - (td365.microseconds + (td365.seconds + td365.days * 24 * 3600) * 10 ** 6) // 10 ** 6 + (td365.microseconds + (td365.seconds + td365.days * 24 * 3600) * 10**6) // 10**6 ) @@ -41,3 +30,22 @@ def cache_headers(): (hdrs.CACHE_CONTROL, "max-age=%s, public" % td365seconds), (hdrs.EXPIRES, d.strftime("%a, %d %b %Y %H:%M:%S")), ) + + +async def cancel_tasks(*coros_or_futures, timeout=1): + """Cancel all not stopped coroutine or feature before exit + from this context manager. + """ + futures = [asyncio.ensure_future(cf) for cf in coros_or_futures if cf] + waiting_to_complete = [] + for fut in futures: + if not fut.cancelled() and fut.done(): + continue + fut.cancel() + waiting_to_complete.append(fut) + if waiting_to_complete: + try: + async with async_timeout.timeout(timeout): + await asyncio.gather(*waiting_to_complete, return_exceptions=True) + except asyncio.TimeoutError: + pass diff --git a/sockjs/transports/websocket.py b/sockjs/transports/websocket.py index 6d5d3622..e4e0979b 100644 --- a/sockjs/transports/websocket.py +++ b/sockjs/transports/websocket.py @@ -1,35 +1,88 @@ """websocket transport""" -import asyncio -from aiohttp import web +import asyncio +import logging from asyncio import ensure_future +from typing import Optional +from uuid import uuid4 + +from aiohttp import web +from aiohttp.web_exceptions import HTTPMethodNotAllowed +from async_timeout import timeout from .base import Transport +from .utils import cancel_tasks from ..exceptions import SessionIsClosed -from ..protocol import STATE_CLOSED, FRAME_CLOSE -from ..protocol import loads, close_frame +from ..protocol import SessionState, Frame, close_frame, loads +from ..session import Session, SessionManager + + +log = logging.getLogger("sockjs") class WebSocketTransport(Transport): - async def server(self, ws, session): + name = "websocket" + heartbeat_timeout = 10 + + @classmethod + def get_session(cls, manager: SessionManager, session_id: str) -> Session: + # For WebSockets, as opposed to other transports, it is valid to + # reuse `session_id`. The lifetime of SockJS WebSocket session is + # defined by a lifetime of underlying WebSocket connection. It is + # correct to have two separate sessions sharing the same + # `session_id` at the same time. + + # Generate unique session_id based on given ID. + orig_session_id = session_id + while session_id in manager.sessions: + session_id = "%s-%s" % (orig_session_id, uuid4().hex[-8:]) + return super().get_session(manager, session_id) + + def __init__(self, manager: SessionManager, session: Session, request: web.Request): + super().__init__(manager, session, request) + self._pong_event = asyncio.Event() + self._wait_pong_task: Optional[asyncio.Task] = None + + async def server(self, ws: web.WebSocketResponse): while True: try: - frame, data = await session._wait() + frame, data = await self.session.get_frame() except SessionIsClosed: break - try: - await ws.send_str(data) - except OSError: - pass # ignore 'cannot write to closed transport' - if frame == FRAME_CLOSE: + + if frame == Frame.HEARTBEAT: + await ws.ping() + log.debug("Send WS PING") + if self._wait_pong_task is None: + self._wait_pong_task = asyncio.create_task(self._wait_pong()) + self._wait_pong_task.add_done_callback(self._wait_done_callback) + continue + + await ws.send_str(data) + + if frame == Frame.CLOSE: try: await ws.close() finally: - await session._remote_closed() + await self.manager.remote_closed(self.session) + + async def _wait_pong(self): + try: + async with timeout(self.heartbeat_timeout): + await self._pong_event.wait() + except asyncio.TimeoutError: + self.session.close(3000, "No response from heartbeat") + finally: + self._pong_event.clear() - async def client(self, ws, session): + def _wait_done_callback(self, _): + self._wait_pong_task = None + + async def client(self, ws: web.WebSocketResponse): while True: msg = await ws.receive() + if self._wait_pong_task is not None: + self._pong_event.set() if msg.type == web.WSMsgType.text: data = msg.data @@ -39,43 +92,56 @@ async def client(self, ws, session): try: text = loads(data) except Exception as exc: - await session._remote_close(exc) - await session._remote_closed() + await self.manager.remote_close(self.session, exc) + await self.manager.remote_closed(self.session) await ws.close(message=b"broken json") break if data.startswith("["): - await session._remote_messages(text) + await self.manager.remote_messages(self.session, text) else: - await session._remote_message(text) - + await self.manager.remote_message(self.session, text) + elif msg.type == web.WSMsgType.PONG: + log.debug("Received WS PONG") + self.session.tick() + elif msg.type == web.WSMsgType.PING: + log.debug("Received WS PING") + await ws.pong(msg.data) + self.session.tick() elif msg.type == web.WSMsgType.close: - await session._remote_close() + await self.manager.remote_close(self.session) elif msg.type in (web.WSMsgType.closed, web.WSMsgType.closing): - await session._remote_closed() + await self.manager.remote_closed(self.session) break async def process(self): + if self.request.method != "GET": + # WebSocket should only accept GET + raise HTTPMethodNotAllowed( + self.request.method, + ["GET"], + body=b"", + content_type="", + ) + # start websocket connection - ws = self.ws = web.WebSocketResponse() + ws = web.WebSocketResponse(autoping=False) await ws.prepare(self.request) # session was interrupted if self.session.interrupted: - await self.ws.send_str(close_frame(1002, "Connection interrupted")) - - elif self.session.state == STATE_CLOSED: - await self.ws.send_str(close_frame(3000, "Go away!")) - + await ws.send_str(close_frame(1002, "Connection interrupted")) + elif self.session.state == SessionState.CLOSED: + await ws.send_str(close_frame(3000, "Go away!")) else: try: - await self.manager.acquire(self.session) + await self.manager.acquire(self.session, self.request) except Exception: # should use specific exception - await self.ws.send_str(close_frame(3000, "Go away!")) + await ws.send_str(close_frame(3000, "Go away!")) await ws.close() return ws - server = ensure_future(self.server(ws, self.session)) - client = ensure_future(self.client(ws, self.session)) + server = ensure_future(self.server(ws)) + client = ensure_future(self.client(ws)) try: await asyncio.wait( (server, client), return_when=asyncio.FIRST_COMPLETED @@ -83,12 +149,10 @@ async def process(self): except asyncio.CancelledError: raise except Exception as exc: - await self.session._remote_close(exc) + await self.manager.remote_close(self.session, exc) finally: + self.session.expire() await self.manager.release(self.session) - if not server.done(): - server.cancel() - if not client.done(): - client.cancel() + await cancel_tasks(server, client, self._wait_pong_task) return ws diff --git a/sockjs/transports/xhr.py b/sockjs/transports/xhr.py deleted file mode 100644 index 14b5df79..00000000 --- a/sockjs/transports/xhr.py +++ /dev/null @@ -1,37 +0,0 @@ -from aiohttp import web, hdrs - -from .base import StreamingTransport -from .utils import CACHE_CONTROL, session_cookie, cors_headers, cache_headers - - -class XHRTransport(StreamingTransport): - """Long polling derivative transports, - used for XHRPolling and JSONPolling.""" - - maxsize = 0 - - async def process(self): - request = self.request - - if request.method == hdrs.METH_OPTIONS: - headers = ( - (hdrs.CONTENT_TYPE, "application/javascript; charset=UTF-8"), - (hdrs.ACCESS_CONTROL_ALLOW_METHODS, "OPTIONS, POST"), - ) - headers += session_cookie(request) - headers += cors_headers(request.headers) - headers += cache_headers() - return web.Response(status=204, headers=headers) - - headers = ( - (hdrs.CONTENT_TYPE, "application/javascript; charset=UTF-8"), - (hdrs.CACHE_CONTROL, CACHE_CONTROL), - ) - headers += session_cookie(request) - headers += cors_headers(request.headers) - - resp = self.response = web.StreamResponse(headers=headers) - await resp.prepare(request) - - await self.handle_session() - return resp diff --git a/sockjs/transports/xhr_pooling.py b/sockjs/transports/xhr_pooling.py new file mode 100644 index 00000000..e05f194a --- /dev/null +++ b/sockjs/transports/xhr_pooling.py @@ -0,0 +1,81 @@ +from aiohttp import hdrs, web +from multidict import MultiDict + +from .base import StreamingTransport, Transport +from .utils import CACHE_CONTROL, cache_headers, session_cookie +from ..protocol import loads, ENCODING + + +class XHRTransport(StreamingTransport): + """Long polling derivative transports, + used for XHRPolling and JSONPolling.""" + + name = "xhr-polling" + create_session = True + maxsize = 0 + + async def _send(self, text: str): + return await super()._send(text + "\n") + + async def process(self): + request = self.request + + if request.method == hdrs.METH_OPTIONS: + headers = ( + (hdrs.CONTENT_TYPE, "application/javascript; charset=UTF-8"), + (hdrs.ACCESS_CONTROL_ALLOW_METHODS, "OPTIONS, POST"), + ) + headers += session_cookie(request) + headers += cache_headers() + return web.Response(status=204, headers=MultiDict(headers)) + + headers = ( + (hdrs.CONTENT_TYPE, "application/javascript; charset=UTF-8"), + (hdrs.CACHE_CONTROL, CACHE_CONTROL), + ) + headers += session_cookie(request) + + resp = self.response = web.StreamResponse(headers=MultiDict(headers)) + await resp.prepare(request) + + await self.handle_session() + return resp + + +class XHRSendTransport(Transport): + name = "xhr-polling" + create_session = False + + async def process(self): + request = self.request + + if request.method not in (hdrs.METH_GET, hdrs.METH_POST, hdrs.METH_OPTIONS): + raise web.HTTPForbidden(text="Method is not allowed") + + if self.request.method == hdrs.METH_OPTIONS: + headers = ( + (hdrs.ACCESS_CONTROL_ALLOW_METHODS, "OPTIONS, POST"), + (hdrs.CONTENT_TYPE, "application/javascript; charset=UTF-8"), + ) + headers += session_cookie(request) + headers += cache_headers() + return web.Response(status=204, headers=MultiDict(headers)) + + data = await request.read() + if not data: + raise web.HTTPInternalServerError(text="Payload expected.") + + try: + messages = loads(data.decode(ENCODING)) + except Exception: + raise web.HTTPInternalServerError(text="Broken JSON encoding.") + + await self.manager.remote_messages(self.session, messages) + + headers = ( + (hdrs.CONTENT_TYPE, "text/plain; charset=UTF-8"), + (hdrs.CACHE_CONTROL, CACHE_CONTROL), + ) + headers += session_cookie(request) + + return web.Response(status=204, headers=MultiDict(headers)) diff --git a/sockjs/transports/xhrsend.py b/sockjs/transports/xhrsend.py deleted file mode 100644 index 2a512eea..00000000 --- a/sockjs/transports/xhrsend.py +++ /dev/null @@ -1,43 +0,0 @@ -from aiohttp import web, hdrs - -from ..protocol import loads, ENCODING -from .base import Transport -from .utils import CACHE_CONTROL, session_cookie, cors_headers, cache_headers - - -class XHRSendTransport(Transport): - async def process(self): - request = self.request - - if request.method not in (hdrs.METH_GET, hdrs.METH_POST, hdrs.METH_OPTIONS): - return web.HTTPForbidden(text="Method is not allowed") - - if self.request.method == hdrs.METH_OPTIONS: - headers = ( - (hdrs.ACCESS_CONTROL_ALLOW_METHODS, "OPTIONS, POST"), - (hdrs.CONTENT_TYPE, "application/javascript; charset=UTF-8"), - ) - headers += session_cookie(request) - headers += cors_headers(request.headers) - headers += cache_headers() - return web.Response(status=204, headers=headers) - - data = await request.read() - if not data: - return web.HTTPInternalServerError(text="Payload expected.") - - try: - messages = loads(data.decode(ENCODING)) - except Exception: - return web.HTTPInternalServerError(text="Broken JSON encoding.") - - await self.session._remote_messages(messages) - - headers = ( - (hdrs.CONTENT_TYPE, "text/plain; charset=UTF-8"), - (hdrs.CACHE_CONTROL, CACHE_CONTROL), - ) - headers += session_cookie(request) - headers += cors_headers(request.headers) - - return web.Response(status=204, headers=headers) diff --git a/sockjs/transports/xhrstreaming.py b/sockjs/transports/xhrstreaming.py index bd15c2d6..f2fcd943 100644 --- a/sockjs/transports/xhrstreaming.py +++ b/sockjs/transports/xhrstreaming.py @@ -1,14 +1,18 @@ -from aiohttp import web, hdrs +from aiohttp import hdrs, web +from multidict import MultiDict from .base import StreamingTransport -from .utils import CACHE_CONTROL, session_cookie, cors_headers, cache_headers +from .utils import CACHE_CONTROL, cache_headers, session_cookie class XHRStreamingTransport(StreamingTransport): - - maxsize = 131072 # 128K bytes + name = "xhr-streaming" + create_session = True open_seq = b"h" * 2048 + b"\n" + async def _send(self, text: str): + return await super()._send(text + "\n") + async def process(self): request = self.request headers = ( @@ -18,15 +22,14 @@ async def process(self): ) headers += session_cookie(request) - headers += cors_headers(request.headers) if request.method == hdrs.METH_OPTIONS: headers += ((hdrs.ACCESS_CONTROL_ALLOW_METHODS, "OPTIONS, POST"),) headers += cache_headers() - return web.Response(status=204, headers=headers) + return web.Response(status=204, headers=MultiDict(headers)) # open sequence (sockjs protocol) - resp = self.response = web.StreamResponse(headers=headers) + resp = self.response = web.StreamResponse(headers=MultiDict(headers)) resp.force_close() await resp.prepare(request) await resp.write(self.open_seq) diff --git a/tests/asdf b/tests/asdf deleted file mode 100644 index e69de29b..00000000 diff --git a/tests/conftest.py b/tests/conftest.py index 6802d52f..164383ea 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,30 +1,28 @@ import asyncio - -from datetime import timedelta +from typing import Optional from unittest import mock +import aiohttp_cors import pytest - from aiohttp import web +from aiohttp.test_utils import TestClient, make_mocked_coro, make_mocked_request from aiohttp.web_urldispatcher import UrlMappingMatchInfo -from aiohttp.test_utils import make_mocked_request, make_mocked_coro from multidict import CIMultiDict from yarl import URL -from sockjs import Session, SessionManager, transports +from sockjs import Session, SessionManager, add_endpoint, transports from sockjs.route import SockJSRoute -@pytest.fixture -def app(): +@pytest.fixture(name="app") +def app_fixture(): return web.Application() @pytest.fixture def make_fut(): def maker(val, makemock=True): - loop = asyncio.get_event_loop() - fut = loop.create_future() + fut = asyncio.Future() fut.set_result(val) if makemock: @@ -39,41 +37,24 @@ def maker(val, makemock=True): @pytest.fixture def make_handler(): - def maker(result, coro=True, exc=False): + def maker(result, exc=False): if result is None: result = [] output = result - def handler(msg, s): + async def handler(manager, s, msg): if exc: raise ValueError((msg, s)) output.append((msg, s)) - if coro: - - async def async_handler(msg, s): - return handler(msg, s) - - return async_handler - else: - return handler - - return maker - - -@pytest.fixture -def make_route(make_handler, app): - def maker(handlers=transports.handlers): - handler = make_handler([]) - sm = SessionManager("sm", app, handler) - return SockJSRoute("sm", sm, "http:sockjs-cdn", handlers, (), True) + return handler return maker @pytest.fixture def make_request(app): - def maker(method, path, query_params={}, headers=None, match_info=None): + def maker(method, path, query_params=None, headers=None, match_info=None): path = URL(path) if query_params: path = path.with_query(query_params) @@ -98,7 +79,9 @@ def maker(method, path, query_params={}, headers=None, match_info=None): transport = mock.Mock() transport._drain_helper = make_mocked_coro() loop = asyncio.get_event_loop() - ret = make_mocked_request(method, str(path), headers, writer=writer, loop=loop) + ret = make_mocked_request( + method, str(path), headers, writer=writer, transport=transport, loop=loop + ) if match_info is None: match_info = UrlMappingMatchInfo({}, mock.Mock()) @@ -112,23 +95,62 @@ def maker(method, path, query_params={}, headers=None, match_info=None): @pytest.fixture def make_session(make_handler, make_request): def maker( - name="test", timeout=timedelta(10), request=None, handler=None, result=None + name="test", disconnect_delay=10, manager: Optional[SessionManager] = None ): - if request is None: - request = make_request("GET", "/TestPath/") - - if handler is None: - handler = make_handler(result) - return Session(name, handler, request, timeout=timeout, debug=True) + session = Session(name, disconnect_delay=disconnect_delay, debug=True) + if manager: + manager.sessions[session.id] = session + return session return maker @pytest.fixture -def make_manager(app, make_handler, make_session): +async def make_manager(app, make_handler, make_session): + managers = [] + def maker(handler=None): if handler is None: handler = make_handler([]) - return SessionManager("sm", app, handler, debug=True) + manager = SessionManager("sm", app, handler, debug=True) + managers.append(manager) + return manager + + yield maker + + for sm in managers: + await sm.stop() + + +@pytest.fixture +def make_route(make_manager, make_handler, app): + def maker(handlers=transports.transport_handlers): + sm = make_manager() + app.on_cleanup.append(sm.stop) + return SockJSRoute("sm", sm, "http:sockjs-cdn", handlers, (), True) return maker + + +@pytest.fixture(name="test_client") +async def test_client_fixture(app, aiohttp_client, make_handler) -> TestClient: + handler = make_handler(None) + # Configure default CORS settings. + cors = aiohttp_cors.setup( + app, + defaults={ + "*": aiohttp_cors.ResourceOptions( + allow_credentials=True, + expose_headers="*", + allow_headers="*", + max_age=31536000, + ) + }, + ) + add_endpoint( + app, + handler, + name="main", + cors_config=cors, + ) + return await aiohttp_client(app) diff --git a/tests/test_route.py b/tests/test_route.py index c0c390a7..75b967c0 100644 --- a/tests/test_route.py +++ b/tests/test_route.py @@ -1,9 +1,15 @@ import asyncio +import pytest from aiohttp import web +from aiohttp.test_utils import TestClient +from cykooz.testing import D from multidict import CIMultiDict from sockjs import protocol +from sockjs.route import ALL_METH_WO_OPTIONS +from sockjs.transports import transport_handlers +from sockjs.transports.base import Transport async def test_info(make_route, make_request): @@ -15,6 +21,15 @@ async def test_info(make_route, make_request): assert info["websocket"] assert info["cookie_needed"] + assert info["transports"] == [ + "eventsource", + "htmlfile", + "jsonp-polling", + "websocket", + "websocket-raw", + "xhr-polling", + "xhr-streaming", + ] async def test_info_entropy(make_route, make_request): @@ -30,22 +45,6 @@ async def test_info_entropy(make_route, make_request): assert entropy1 != entropy2 -async def test_info_options(make_route, make_request): - route = make_route() - request = make_request("OPTIONS", "/sm/") - response = await route.info_options(request) - - assert response.status == 204 - - headers = response.headers - assert "Access-Control-Max-Age" in headers - assert "Cache-Control" in headers - assert "Expires" in headers - assert "Set-Cookie" in headers - assert "access-control-allow-credentials" in headers - assert "access-control-allow-origin" in headers - - async def test_greeting(make_route, make_request): route = make_route() request = make_request("GET", "/sm/") @@ -62,8 +61,8 @@ async def test_iframe(make_route, make_request): text = """ - - + + \r\n') assert not stop assert trans.size == len(b'\r\n') trans.maxsize = 1 - stop = trans.send("text data") + stop = await trans._send("text data") assert stop @@ -46,24 +46,18 @@ async def test_process(make_transport, make_fut): async def test_process_no_callback(make_transport, make_fut): transp = make_transport() transp.session = mock.Mock() - transp.session._remote_closed = make_fut(1) + transp.manager.remote_closed = make_fut(1) - resp = await transp.process() - assert transp.session._remote_closed.called - assert resp.status == 500 + with pytest.raises(web.HTTPInternalServerError): + await transp.process() + assert transp.manager.remote_closed.called async def test_process_bad_callback(make_transport, make_fut): transp = make_transport(query_params={"c": "calback!!!!"}) transp.session = mock.Mock() - transp.session._remote_closed = make_fut(1) - - resp = await transp.process() - assert transp.session._remote_closed.called - assert resp.status == 500 - + transp.manager.remote_closed = make_fut(1) -async def test_session_has_request(make_transport, make_fut): - transp = make_transport(method="POST") - transp.session._remote_messages = make_fut(1) - assert isinstance(transp.session.request, web.Request) + with pytest.raises(web.HTTPInternalServerError): + await transp.process() + assert transp.manager.remote_closed.called diff --git a/tests/test_transport_jsonp.py b/tests/test_transport_jsonp.py index 449f213c..1109d984 100644 --- a/tests/test_transport_jsonp.py +++ b/tests/test_transport_jsonp.py @@ -1,8 +1,7 @@ from unittest import mock -from aiohttp import web - import pytest +from aiohttp import web from aiohttp.test_utils import make_mocked_coro from sockjs.transports import jsonp @@ -10,11 +9,11 @@ @pytest.fixture def make_transport(make_request, make_manager, make_handler, make_fut): - def maker(method="GET", path="/", query_params={}): + def maker(method="GET", path="/", query_params=None): handler = make_handler(None) manager = make_manager(handler) request = make_request(method, path, query_params=query_params) - session = manager.get("TestSessionJsonP", create=True, request=request) + session = manager.get("TestSessionJsonP", create=True) request.app.freeze() return jsonp.JSONPolling(manager, session, request) @@ -27,7 +26,7 @@ async def test_streaming_send(make_transport): resp = trans.response = mock.Mock() resp.write = make_mocked_coro(None) - stop = await trans.send("text data") + stop = await trans._send("text data") resp.write.assert_called_with(b'/**/cb("text data");\r\n') assert stop @@ -43,33 +42,32 @@ async def test_process(make_transport, make_fut): async def test_process_no_callback(make_transport, make_fut): transp = make_transport() transp.session = mock.Mock() - transp.session._remote_closed = make_fut(1) + transp.manager.remote_closed = make_fut(1) - resp = await transp.process() - assert transp.session._remote_closed.called - assert resp.status == 500 + with pytest.raises(web.HTTPInternalServerError): + await transp.process() + assert transp.manager.remote_closed.called async def test_process_bad_callback(make_transport, make_fut): transp = make_transport(query_params={"c": "calback!!!!"}) transp.session = mock.Mock() - transp.session._remote_closed = make_fut(1) + transp.manager.remote_closed = make_fut(1) - resp = await transp.process() - assert transp.session._remote_closed.called - assert resp.status == 500 + with pytest.raises(web.HTTPInternalServerError): + await transp.process() + assert transp.manager.remote_closed.called async def test_process_not_supported(make_transport): transp = make_transport(method="PUT") - resp = await transp.process() - assert resp.status == 400 + with pytest.raises(web.HTTPBadRequest): + await transp.process() async def xtest_process_bad_encoding(make_transport, make_fut): transp = make_transport(method="POST") transp.request.read = make_fut(b"test") - transp.request.content_type transp.request._content_type = "application/x-www-form-urlencoded" resp = await transp.process() assert resp.status == 500 @@ -78,7 +76,6 @@ async def xtest_process_bad_encoding(make_transport, make_fut): async def xtest_process_no_payload(make_transport, make_fut): transp = make_transport(method="POST") transp.request.read = make_fut(b"d=") - transp.request.content_type transp.request._content_type = "application/x-www-form-urlencoded" resp = await transp.process() assert resp.status == 500 @@ -93,14 +90,8 @@ async def xtest_process_bad_json(make_transport, make_fut): async def xtest_process_message(make_transport, make_fut): transp = make_transport(method="POST") - transp.session._remote_messages = make_fut(1) + transp.manager.remote_messages = make_fut(1) transp.request.read = make_fut(b'["msg1","msg2"]') resp = await transp.process() assert resp.status == 200 - transp.session._remote_messages.assert_called_with(["msg1", "msg2"]) - - -async def test_session_has_request(make_transport, make_fut): - transp = make_transport(method="POST") - transp.session._remote_messages = make_fut(1) - assert isinstance(transp.session.request, web.Request) + transp.manager.remote_messages.assert_called_with(["msg1", "msg2"]) diff --git a/tests/test_transport_rawwebsocket.py b/tests/test_transport_rawwebsocket.py index bac60964..62e5f0b3 100644 --- a/tests/test_transport_rawwebsocket.py +++ b/tests/test_transport_rawwebsocket.py @@ -1,22 +1,22 @@ -from unittest import mock from asyncio import Future +from unittest import mock import pytest +from aiohttp import WSMessage, WSMsgType from sockjs.exceptions import SessionIsClosed -from sockjs.protocol import FRAME_CLOSE, FRAME_HEARTBEAT +from sockjs.protocol import Frame from sockjs.transports.rawwebsocket import RawWebSocketTransport - -from aiohttp import WSMessage, WSMsgType +from sockjs.transports.utils import cancel_tasks @pytest.fixture def make_transport(make_request, make_fut): - def maker(method="GET", path="/", query_params={}): + def maker(method="GET", path="/", query_params=None): manager = mock.Mock() session = mock.Mock() session._remote_closed = make_fut(1) - session._wait = make_fut((FRAME_CLOSE, "")) + session._get_frame = make_fut((Frame.CLOSE, "")) request = make_request(method, path, query_params=query_params) request.app.freeze() return RawWebSocketTransport(manager, session, request) @@ -42,7 +42,7 @@ async def xtest_ticks_pong(make_transport, make_fut): session = transp.session await transp.client(ws, session) - assert session._tick.called + assert session.tick.called async def test_sends_ping(make_transport, make_fut): @@ -55,13 +55,15 @@ async def test_sends_ping(make_transport, make_fut): ws.ping.side_effect = [future] hb_future = Future() - hb_future.set_result((FRAME_HEARTBEAT, b"")) + hb_future.set_result((Frame.HEARTBEAT, b"")) session_close_future = Future() session_close_future.set_exception(SessionIsClosed) session = mock.Mock() - session._wait.side_effect = [hb_future, session_close_future] + session.get_frame.side_effect = [hb_future, session_close_future] + transp.session = session - await transp.server(ws, session) + await transp.server(ws) assert ws.ping.called + await cancel_tasks(transp._wait_pong_task) diff --git a/tests/test_transport_websocket.py b/tests/test_transport_websocket.py index 2b1ef593..6f9defcc 100644 --- a/tests/test_transport_websocket.py +++ b/tests/test_transport_websocket.py @@ -1,27 +1,26 @@ import asyncio +import datetime from asyncio import Future from unittest import mock -from aiohttp import web, WSMessage, WSMsgType - import pytest - +from aiohttp import WSMessage, WSMsgType from aiohttp.test_utils import make_mocked_coro -from sockjs import SessionManager, MSG_OPEN, MSG_CLOSED, MSG_MESSAGE -from sockjs.protocol import FRAME_CLOSE +from sockjs import MsgType, Session, Frame +from sockjs.protocol import SockjsMessage from sockjs.transports import WebSocketTransport @pytest.fixture def make_transport(make_manager, make_request, make_handler, make_fut): - def maker(method="GET", path="/", query_params={}, handler=None): + def maker(method="GET", path="/", query_params=None, handler=None): handler = handler or make_handler(None) manager = make_manager(handler) request = make_request(method, path, query_params=query_params) request.app.freeze() - session = manager.get("TestSessionWebsocket", create=True, request=request) - session._wait = make_fut((FRAME_CLOSE, "")) + session = manager.get("TestSessionWebsocket", create=True) + session.get_frame = make_fut((Frame.CLOSE, "")) return WebSocketTransport(manager, session, request) return maker @@ -32,6 +31,8 @@ async def xtest_process_release_acquire_and_remote_closed(make_transport): transp.session.interrupted = False transp.manager.acquire = make_mocked_coro() transp.manager.release = make_mocked_coro() + transp.manager.remote_closed = make_mocked_coro() + resp = await transp.process() await transp.manager.clear() @@ -39,7 +40,7 @@ async def xtest_process_release_acquire_and_remote_closed(make_transport): assert resp.headers.get("upgrade", "").lower() == "websocket" assert resp.headers.get("connection", "").lower() == "upgrade" - transp.session._remote_closed.assert_called_once_with() + assert transp.manager.remote_closed.called assert transp.manager.acquire.called assert transp.manager.release.called @@ -47,35 +48,35 @@ async def xtest_process_release_acquire_and_remote_closed(make_transport): async def test_server_close(app, make_manager, make_request): reached_closed = False - loop = asyncio.get_event_loop() + loop = asyncio.get_running_loop() - async def handler(msg, session): + async def handler(manager, session: Session, msg: SockjsMessage): nonlocal reached_closed - if msg.tp == MSG_OPEN: - asyncio.ensure_future(session._remote_message("TESTMSG")) - pass - - elif msg.tp == MSG_MESSAGE: + if msg.type == MsgType.OPEN: # To reproduce the ordering which makes the issue loop.call_later(0.05, session.close) - elif msg.tp == MSG_CLOSED: + elif msg.type == MsgType.MESSAGE: + # To reproduce the ordering which makes the issue + loop.call_later(0.05, session.close) + elif msg.type == MsgType.CLOSED: reached_closed = True app.freeze() - request = make_request("GET", "/", query_params={}) - manager = SessionManager("sm", app, handler, debug=True) + request = make_request("GET", "/") + manager = make_manager(handler) session = manager.get("test", create=True) transp = WebSocketTransport(manager, session, request) await transp.process() - assert reached_closed is True - + assert reached_closed is False + assert session.expires + assert not session.expired + session.expires = datetime.datetime.now() + await manager._gc_expired_sessions() -async def test_session_has_request(make_transport, make_fut): - transp = make_transport(method="POST") - assert isinstance(transp.session.request, web.Request) + assert reached_closed is True async def test_frames(make_transport, make_handler): @@ -111,8 +112,7 @@ async def test_frames(make_transport, make_handler): close_frame, ] - session = transp.session - await transp.client(ws, session) + await transp.client(ws) assert result[0][0].data == "single_msg" assert result[1][0].data == "msg1" diff --git a/tests/test_transport_xhr.py b/tests/test_transport_xhr.py index f6aa25b9..e9ccf0f7 100644 --- a/tests/test_transport_xhr.py +++ b/tests/test_transport_xhr.py @@ -1,19 +1,17 @@ -from aiohttp import web - import pytest -from sockjs.transports import xhr +from sockjs.transports import xhr_pooling @pytest.fixture def make_transport(make_manager, make_request, make_handler, make_fut): - def maker(method="GET", path="/", query_params={}): + def maker(method="GET", path="/", query_params=None): handler = make_handler(None) manager = make_manager(handler) request = make_request(method, path, query_params=query_params) request.app.freeze() - session = manager.get("TestSessionXhr", create=True, request=request) - return xhr.XHRTransport(manager, session, request) + session = manager.get("TestSessionXhr", create=True) + return xhr_pooling.XHRTransport(manager, session, request) return maker @@ -26,13 +24,7 @@ async def test_process(make_transport, make_fut): assert resp.status == 200 -async def test_process_OPTIONS(make_transport): +async def test_process_options(make_transport): transp = make_transport(method="OPTIONS") resp = await transp.process() assert resp.status == 204 - - -async def test_session_has_request(make_transport, make_fut): - transp = make_transport() - transp.session._remote_messages = make_fut(1) - assert isinstance(transp.session.request, web.Request) diff --git a/tests/test_transport_xhrsend.py b/tests/test_transport_xhrsend.py index 5ad64d5a..e7b4cf14 100644 --- a/tests/test_transport_xhrsend.py +++ b/tests/test_transport_xhrsend.py @@ -1,27 +1,26 @@ -from aiohttp import web - import pytest +from aiohttp import web -from sockjs.transports import xhrsend +from sockjs.transports import xhr_pooling @pytest.fixture def make_transport(make_manager, make_request, make_handler, make_fut): - def maker(method="GET", path="/", query_params={}): + def maker(method="GET", path="/", query_params=None): handler = make_handler(None) manager = make_manager(handler) request = make_request(method, path, query_params=query_params) request.app.freeze() - session = manager.get("TestSessionXhrSend", create=True, request=request) - return xhrsend.XHRSendTransport(manager, session, request) + session = manager.get("TestSessionXhrSend", create=True) + return xhr_pooling.XHRSendTransport(manager, session, request) return maker async def test_not_supported_meth(make_transport): transp = make_transport(method="PUT") - resp = await transp.process() - assert resp.status == 403 + with pytest.raises(web.HTTPForbidden): + await transp.process() async def xtest_no_payload(make_transport, make_fut): @@ -40,20 +39,14 @@ async def xtest_bad_json(make_transport, make_fut): async def xtest_post_message(make_transport, make_fut): transp = make_transport() - transp.session._remote_messages = make_fut(1) + transp.manager.remote_messages = make_fut(1) transp.request.read = make_fut(b'["msg1","msg2"]') resp = await transp.process() assert resp.status == 204 - transp.session._remote_messages.assert_called_with(["msg1", "msg2"]) + transp.manager.remote_messages.assert_called_with(["msg1", "msg2"]) -async def test_OPTIONS(make_transport): +async def test_options(make_transport): transp = make_transport(method="OPTIONS") resp = await transp.process() assert resp.status == 204 - - -async def test_session_has_request(make_transport, make_fut): - transp = make_transport(method="POST") - transp.session._remote_messages = make_fut(1) - assert isinstance(transp.session.request, web.Request) diff --git a/tests/test_transport_xhrstreaming.py b/tests/test_transport_xhrstreaming.py index 4f27621a..060664c4 100644 --- a/tests/test_transport_xhrstreaming.py +++ b/tests/test_transport_xhrstreaming.py @@ -1,5 +1,3 @@ -from aiohttp import web - import pytest from sockjs.transports import xhrstreaming @@ -7,12 +5,12 @@ @pytest.fixture def make_transport(make_manager, make_request, make_handler, make_fut): - def maker(method="GET", path="/", query_params={}): + def maker(method="GET", path="/", query_params=None): handler = make_handler(None) manager = make_manager(handler) request = make_request(method, path, query_params=query_params) request.app.freeze() - session = manager.get("TestSessionXhrStreaming", create=True, request=request) + session = manager.get("TestSessionXhrStreaming", create=True) return xhrstreaming.XHRStreamingTransport(manager, session, request) return maker @@ -26,13 +24,7 @@ async def test_process(make_transport, make_fut): assert resp.status == 200 -async def test_process_OPTIONS(make_transport): +async def test_process_options(make_transport): transp = make_transport(method="OPTIONS") resp = await transp.process() assert resp.status == 204 - - -async def test_session_has_request(make_transport, make_fut): - transp = make_transport(method="POST") - transp.session._remote_messages = make_fut(1) - assert isinstance(transp.session.request, web.Request)