diff --git a/scitt_emulator/federation_activitypub_bovine.py b/scitt_emulator/federation_activitypub_bovine.py index 80800172..e29fe822 100644 --- a/scitt_emulator/federation_activitypub_bovine.py +++ b/scitt_emulator/federation_activitypub_bovine.py @@ -73,24 +73,19 @@ class SCITTFederationActivityPubBovine(SCITTFederation): def __init__(self, app, signals, config_path): super().__init__(app, signals, config_path) - self.start_herd = self.config.get("start_herd", False) - if self.start_herd: - raise NotImplementedError("Please start bovine-herd manually") - - self.domain = self.config["domain"] self.handle_name = self.config["handle_name"] self.workspace = Path(self.config["workspace"]).expanduser() - self.federate_created_entries_socket_path = self.workspace.joinpath( - "federate_created_entries_socket", - ) - BovinePubSub(app) BovineHerd(app) - # self.initialize_service() + @app.before_serving + async def initialize_service(): + await self.initialize_service() + + async def initialize_service(self): + self.domain = f'127.0.0.1:{self.app.config["port"]}' - def initialize_service(self): config_toml_path = pathlib.Path(self.workspace, "config.toml") if not config_toml_path.exists(): logger.info("Actor client config does not exist, creating...") @@ -113,25 +108,20 @@ def initialize_service(self): config_toml_obj[self.handle_name]["handlers"][ inspect.getmodule(sys.modules[__name__]).__spec__.name ] = { - "federate_created_entries_socket_path": str( - self.federate_created_entries_socket_path.resolve() - ), + # TODO Sending signal to submit federated claim + # signals.federation.submit_claim.send(self, claim=created_entry.claim) + "signals": self.signals, "following": self.config.get("following", {}), } - config_toml_path.write_text(tomli_w.dumps(config_toml_obj)) # Extract public key from private key in config file did_key = bovine.crypto.private_key_to_did_key( config_toml_obj[self.handle_name]["secret"], ) - # TODO This may not work if there is another instance of an event loop - # running. There shouldn't be but can we come up with a workaround in - # case that does happen? - actor_url = asyncio.run( - get_actor_url( - self.domain, - did_key=did_key, - ) + # TODO XXX ERROR Not bound yet, can we resolve via self.app? + actor_url = await get_actor_url( + self.domain, + did_key=did_key, ) # TODO take BOVINE_DB_URL from config, populate env on call to tool if # NOT already set in env. @@ -174,6 +164,7 @@ def initialize_service(self): logger.info("Actor key added in database") # Run client handlers + """ cmd = [ sys.executable, "-um", @@ -184,6 +175,31 @@ def initialize_service(self): cwd=self.workspace, ) atexit.register(self.mechanical_bull_proc.terminate) + """ + + def build_handler(handler, value): + import importlib + from functools import partial + + func = importlib.import_module(handler).handle + + if isinstance(value, dict): + return partial(func, **value) + return func + + def load_handlers(handlers): + return [build_handler(handler, value) for handler, value in handlers.items()] + + async def mechanical_bull_loop(config): + from mechanical_bull.event_loop import loop + + async with asyncio.TaskGroup() as taskgroup: + for client_name, value in config.items(): + if isinstance(value, dict): + handlers = load_handlers(value["handlers"]) + taskgroup.create_task(loop(client_name, value, handlers)) + + self.app.add_background_task(mechanical_bull_loop, config_toml_obj) def created_entry( self, @@ -191,7 +207,6 @@ def created_entry( created_entry: SCITTSignalsFederationCreatedEntry, ): return - # NOTE Test of sending signal to submit federated claim -> self.signals.federation.submit_claim.send(self, claim=created_entry.claim) with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as client: client.connect(str(self.federate_created_entries_socket_path.resolve())) client.send( @@ -334,45 +349,34 @@ async def init_follow(client, retry: int = 5, **kwargs): async def federate_created_entries( client: bovine.BovineClient, - socket_path: Path, + sender: SCITTServiceEmulator, ): - async def federate_created_entry(reader, writer): - try: - logger.info("federate_created_entry() Reading... %r", reader) - content_bytes = await reader.read() - logger.info("federate_created_entry() Read: %r", content_bytes) - note = ( - client.object_factory.note( - content=content_bytes.decode(), - ) - .as_public() - .build() + try: + logger.info("federate_created_entry() Reading... %r", reader) + content_bytes = await reader.read() + logger.info("federate_created_entry() Read: %r", content_bytes) + note = ( + client.object_factory.note( + content=content_bytes.decode(), ) - activity = client.activity_factory.create(note).build() - logger.info("Sending... %r", activity) - await client.send_to_outbox(activity) - - writer.close() - await writer.wait_closed() - - # DEBUG NOTE Dumping outbox - print("client:", client) - outbox = client.outbox() - print("outbox:", outbox) - count_messages = 0 - async for message in outbox: - count_messages += 1 - print(f"Message {count_messages} in outbox:", message) - print(f"End of messages in outbox, total: {count_messages}") - except: - logger.error(traceback.format_exc()) - - logger.info("Attempting UNIX bind at %r", socket_path) - server = await asyncio.start_unix_server( - federate_created_entry, - path=str(Path(socket_path).resolve()), - ) - async with server: - logger.info("Awaiting receipts to federate at %r", socket_path) - while True: - await asyncio.sleep(60) + .as_public() + .build() + ) + activity = client.activity_factory.create(note).build() + logger.info("Sending... %r", activity) + await client.send_to_outbox(activity) + + writer.close() + await writer.wait_closed() + + # DEBUG NOTE Dumping outbox + print("client:", client) + outbox = client.outbox() + print("outbox:", outbox) + count_messages = 0 + async for message in outbox: + count_messages += 1 + print(f"Message {count_messages} in outbox:", message) + print(f"End of messages in outbox, total: {count_messages}") + except: + logger.error(traceback.format_exc()) diff --git a/scitt_emulator/server.py b/scitt_emulator/server.py index 26d68641..798f788c 100644 --- a/scitt_emulator/server.py +++ b/scitt_emulator/server.py @@ -141,6 +141,7 @@ def cli(fn): def cmd(args): app = create_flask_app( { + "port": args.port, "middleware": args.middleware, "middleware_config_path": args.middleware_config_path, "tree_alg": args.tree_alg,