Skip to content

Commit

Permalink
removal of no longer used files
Browse files Browse the repository at this point in the history
  • Loading branch information
m-karo committed Oct 9, 2024
1 parent 15fde58 commit db2a2cc
Show file tree
Hide file tree
Showing 12 changed files with 50 additions and 1,816 deletions.
2 changes: 1 addition & 1 deletion docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -68,5 +68,5 @@ COPY config/ ./config/

# Start process
ENV PYTHONPATH="${PYTHONPATH}:/src"
ENTRYPOINT ["python", "emf/loadflow_tool/model_merger/merge_worker.py"]
ENTRYPOINT ["python", "emf/loadflow_tool/model_merger/worker.py"]
#ENTRYPOINT ["/bin/bash"]
72 changes: 40 additions & 32 deletions emf/common/integrations/rabbit.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,9 @@ def __init__(self,
self._que = que
self._username = username

self._executor = ThreadPoolExecutor()
self._executor_stopped = False

self._connection_parameters = pika.ConnectionParameters(host=self._host,
port=self._port,
virtual_host=self._vhost,
Expand Down Expand Up @@ -382,21 +385,7 @@ def on_consumer_cancelled(self, method_frame):
if self._channel:
self._channel.close()

def on_message(self, _unused_channel, basic_deliver, properties, body):
"""Invoked by pika when a message is delivered from RabbitMQ. The
channel is passed for your convenience. The basic_deliver object that
is passed in carries the exchange, routing key, delivery tag and
a redelivered flag for the message. The properties passed in is an
instance of BasicProperties with the message properties and the body
is the message that was sent.
:param pika.channel.Channel _unused_channel: The channel object
:param pika.Spec.Basic.Deliver: basic_deliver method
:param pika.Spec.BasicProperties: properties
:param bytes body: The message body
"""
logger.info(f"Received message # {basic_deliver.delivery_tag} from {properties.app_id} meta: {properties.headers}")
logger.debug(f"Message body: {body}")

def _process_messages(self, basic_deliver, properties, body):
ack = True

# Convert if needed
Expand All @@ -406,7 +395,7 @@ def on_message(self, _unused_channel, basic_deliver, properties, body):
#
# while not converter_task.done():
# logger.info("Waiting for converter")
# #self._connection.process_data_events(time_limit=1)
# # self._connection.process_data_events(time_limit=1)
# self._connection._heartbeat_checker._send_heartbeat()
# time.sleep(10)
#
Expand All @@ -424,7 +413,7 @@ def on_message(self, _unused_channel, basic_deliver, properties, body):
except Exception as error:
logger.error(f"Message conversion failed: {error}", exc_info=True)
ack = False
self.stop()
# self.stop()

if self.message_handlers:
# with ThreadPoolExecutor() as handler_executor:
Expand All @@ -438,23 +427,22 @@ def on_message(self, _unused_channel, basic_deliver, properties, body):
# try:
# # TODO - set to debug when rabbit issue solved
# logger.info("Waiting for handler")
# #logger.info(self._connection.ioloop.is_running())
# #logger.info(asyncio.current_task(self._connection.ioloop))
# #logger.info(asyncio.all_tasks(self._connection.ioloop))
# #logger.info(self._connection.ioloop._scheduled)
# #loop_time = self._connection.ioloop.time()
# #logger.info([task.when() - loop_time for task in self._connection.ioloop._scheduled])
# #logger.info(self._connection.ioloop.time())
# # logger.info(self._connection.ioloop.is_running())
# # logger.info(asyncio.current_task(self._connection.ioloop))
# # logger.info(asyncio.all_tasks(self._connection.ioloop))
# # logger.info(self._connection.ioloop._scheduled)
# # loop_time = self._connection.ioloop.time()
# # logger.info([task.when() - loop_time for task in self._connection.ioloop._scheduled])
# # logger.info(self._connection.ioloop.time())
# self._connection._heartbeat_checker._send_heartbeat()
# #self._connection.ioloop.poll()
# #self._connection.process_data_events(time_limit=1)
# #self._connection._heartbeat_checker.send_heartbeat()
# # self._connection.ioloop.poll()
# # self._connection.process_data_events(time_limit=1)
# # self._connection._heartbeat_checker.send_heartbeat()
# time.sleep(10)
#
# except Exception as error:
# logger.info(error)
#
#
# try:
# body = handler_task.result()
#
Expand All @@ -472,13 +460,28 @@ def on_message(self, _unused_channel, basic_deliver, properties, body):
except Exception as error:
logger.error(f"Message handling failed: {error}", exc_info=True)
ack = False
converter_executor.shutdown(wait=False)
self.stop()
# self.stop()
break

if ack:
self.acknowledge_message(basic_deliver.delivery_tag)

def on_message(self, _unused_channel, basic_deliver, properties, body):
"""Invoked by pika when a message is delivered from RabbitMQ. The
channel is passed for your convenience. The basic_deliver object that
is passed in carries the exchange, routing key, delivery tag and
a redelivered flag for the message. The properties passed in is an
instance of BasicProperties with the message properties and the body
is the message that was sent.
:param pika.channel.Channel _unused_channel: The channel object
:param pika.Spec.Basic.Deliver: basic_deliver method
:param pika.Spec.BasicProperties: properties
:param bytes body: The message body
"""
logger.info(f"Received message # {basic_deliver.delivery_tag} from {properties.app_id} meta: {properties.headers}")
logger.debug(f"Message body: {body}")
self._executor.submit(self._process_messages, basic_deliver, properties, body)

def acknowledge_message(self, delivery_tag):
"""Acknowledge the message delivery from RabbitMQ by sending a
Basic.Ack RPC method for the delivery tag.
Expand Down Expand Up @@ -519,6 +522,9 @@ def run(self):
"""Run the example consumer by connecting to RabbitMQ and then
starting the IOLoop to block and allow the SelectConnection to operate.
"""
if self._executor_stopped:
self._executor = ThreadPoolExecutor()
self._executor_stopped = False
self._connection = self.connect()
self._connection.ioloop.run_forever()

Expand All @@ -540,6 +546,8 @@ def stop(self):
self._connection.ioloop.run_forever()
else:
self._connection.ioloop.stop()
self._executor.shutdown()
self._executor_stopped = True
logger.info(f"Stopped")


Expand Down Expand Up @@ -633,4 +641,4 @@ def stop(self):
try:
consumer.run()
except KeyboardInterrupt:
consumer.stop()
consumer.stop()
Loading

0 comments on commit db2a2cc

Please sign in to comment.