diff --git a/src/aiida/brokers/rabbitmq/broker.py b/src/aiida/brokers/rabbitmq/broker.py index 7bfcb2fec..e8ec71d59 100644 --- a/src/aiida/brokers/rabbitmq/broker.py +++ b/src/aiida/brokers/rabbitmq/broker.py @@ -6,9 +6,8 @@ import functools import typing as t -from plumpy.rmq import RemoteProcessThreadController, RmqCoordinator from plumpy import ProcessController -from plumpy.rmq.process_control import RemoteProcessController +from plumpy.rmq import RemoteProcessThreadController from aiida.brokers.broker import Broker from aiida.brokers.rabbitmq.coordinator import RmqLoopCoordinator diff --git a/src/aiida/brokers/rabbitmq/coordinator.py b/src/aiida/brokers/rabbitmq/coordinator.py index 6c6a13c7e..96739dc78 100644 --- a/src/aiida/brokers/rabbitmq/coordinator.py +++ b/src/aiida/brokers/rabbitmq/coordinator.py @@ -1,9 +1,8 @@ -# -*- coding: utf-8 -*- +import concurrent.futures from asyncio import AbstractEventLoop from typing import Generic, TypeVar, final -import kiwipy -import concurrent.futures +import kiwipy from plumpy.exceptions import CoordinatorConnectionError from plumpy.rmq.communications import convert_to_comm @@ -70,7 +69,7 @@ def broadcast_send( subject=None, correlation_id=None, ): - from aio_pika.exceptions import ChannelInvalidStateError, AMQPConnectionError + from aio_pika.exceptions import AMQPConnectionError, ChannelInvalidStateError try: rsp = self._comm.broadcast_send(body, sender, subject, correlation_id) diff --git a/src/aiida/engine/runners.py b/src/aiida/engine/runners.py index cb14be2b8..761514524 100644 --- a/src/aiida/engine/runners.py +++ b/src/aiida/engine/runners.py @@ -22,12 +22,12 @@ from plumpy.coordinator import Coordinator from plumpy.events import reset_event_loop_policy, set_event_loop_policy from plumpy.persistence import Persister -from plumpy.rmq import RemoteProcessThreadController, wrap_communicator +from plumpy.rmq import RemoteProcessThreadController +from aiida.brokers import Broker from aiida.common import exceptions from aiida.orm import ProcessNode, load_node from aiida.plugins.utils import PluginVersionProvider -from aiida.brokers import Broker from . import transports, utils from .processes import Process, ProcessBuilder, ProcessState, futures