Skip to content
This repository has been archived by the owner on Nov 22, 2024. It is now read-only.

Commit

Permalink
Migrating to middleware local addition of actor
Browse files Browse the repository at this point in the history
Signed-off-by: John Andersen <[email protected]>
  • Loading branch information
pdxjohnny committed Oct 22, 2023
1 parent 3ddcb58 commit f39ec18
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 64 deletions.
132 changes: 68 additions & 64 deletions scitt_emulator/federation_activitypub_bovine.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,24 +73,19 @@ class SCITTFederationActivityPubBovine(SCITTFederation):
def __init__(self, app, signals, config_path):
super().__init__(app, signals, config_path)

self.start_herd = self.config.get("start_herd", False)
if self.start_herd:
raise NotImplementedError("Please start bovine-herd manually")

self.domain = self.config["domain"]
self.handle_name = self.config["handle_name"]
self.workspace = Path(self.config["workspace"]).expanduser()

self.federate_created_entries_socket_path = self.workspace.joinpath(
"federate_created_entries_socket",
)

BovinePubSub(app)
BovineHerd(app)

# self.initialize_service()
@app.before_serving
async def initialize_service():
await self.initialize_service()

async def initialize_service(self):
self.domain = f'127.0.0.1:{self.app.config["port"]}'

def initialize_service(self):
config_toml_path = pathlib.Path(self.workspace, "config.toml")
if not config_toml_path.exists():
logger.info("Actor client config does not exist, creating...")
Expand All @@ -113,25 +108,20 @@ def initialize_service(self):
config_toml_obj[self.handle_name]["handlers"][
inspect.getmodule(sys.modules[__name__]).__spec__.name
] = {
"federate_created_entries_socket_path": str(
self.federate_created_entries_socket_path.resolve()
),
# TODO Sending signal to submit federated claim
# signals.federation.submit_claim.send(self, claim=created_entry.claim)
"signals": self.signals,
"following": self.config.get("following", {}),
}
config_toml_path.write_text(tomli_w.dumps(config_toml_obj))
# Extract public key from private key in config file
did_key = bovine.crypto.private_key_to_did_key(
config_toml_obj[self.handle_name]["secret"],
)

# TODO This may not work if there is another instance of an event loop
# running. There shouldn't be but can we come up with a workaround in
# case that does happen?
actor_url = asyncio.run(
get_actor_url(
self.domain,
did_key=did_key,
)
# TODO XXX ERROR Not bound yet, can we resolve via self.app?
actor_url = await get_actor_url(
self.domain,
did_key=did_key,
)
# TODO take BOVINE_DB_URL from config, populate env on call to tool if
# NOT already set in env.
Expand Down Expand Up @@ -174,6 +164,7 @@ def initialize_service(self):
logger.info("Actor key added in database")

# Run client handlers
"""
cmd = [
sys.executable,
"-um",
Expand All @@ -184,14 +175,38 @@ def initialize_service(self):
cwd=self.workspace,
)
atexit.register(self.mechanical_bull_proc.terminate)
"""

def build_handler(handler, value):
import importlib
from functools import partial

func = importlib.import_module(handler).handle

if isinstance(value, dict):
return partial(func, **value)
return func

def load_handlers(handlers):
return [build_handler(handler, value) for handler, value in handlers.items()]

async def mechanical_bull_loop(config):
from mechanical_bull.event_loop import loop

async with asyncio.TaskGroup() as taskgroup:
for client_name, value in config.items():
if isinstance(value, dict):
handlers = load_handlers(value["handlers"])
taskgroup.create_task(loop(client_name, value, handlers))

self.app.add_background_task(mechanical_bull_loop, config_toml_obj)

def created_entry(
self,
scitt_service: SCITTServiceEmulator,
created_entry: SCITTSignalsFederationCreatedEntry,
):
return
# NOTE Test of sending signal to submit federated claim -> self.signals.federation.submit_claim.send(self, claim=created_entry.claim)
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as client:
client.connect(str(self.federate_created_entries_socket_path.resolve()))
client.send(
Expand Down Expand Up @@ -334,45 +349,34 @@ async def init_follow(client, retry: int = 5, **kwargs):

async def federate_created_entries(
client: bovine.BovineClient,
socket_path: Path,
sender: SCITTServiceEmulator,
):
async def federate_created_entry(reader, writer):
try:
logger.info("federate_created_entry() Reading... %r", reader)
content_bytes = await reader.read()
logger.info("federate_created_entry() Read: %r", content_bytes)
note = (
client.object_factory.note(
content=content_bytes.decode(),
)
.as_public()
.build()
try:
logger.info("federate_created_entry() Reading... %r", reader)
content_bytes = await reader.read()
logger.info("federate_created_entry() Read: %r", content_bytes)
note = (
client.object_factory.note(
content=content_bytes.decode(),
)
activity = client.activity_factory.create(note).build()
logger.info("Sending... %r", activity)
await client.send_to_outbox(activity)

writer.close()
await writer.wait_closed()

# DEBUG NOTE Dumping outbox
print("client:", client)
outbox = client.outbox()
print("outbox:", outbox)
count_messages = 0
async for message in outbox:
count_messages += 1
print(f"Message {count_messages} in outbox:", message)
print(f"End of messages in outbox, total: {count_messages}")
except:
logger.error(traceback.format_exc())

logger.info("Attempting UNIX bind at %r", socket_path)
server = await asyncio.start_unix_server(
federate_created_entry,
path=str(Path(socket_path).resolve()),
)
async with server:
logger.info("Awaiting receipts to federate at %r", socket_path)
while True:
await asyncio.sleep(60)
.as_public()
.build()
)
activity = client.activity_factory.create(note).build()
logger.info("Sending... %r", activity)
await client.send_to_outbox(activity)

writer.close()
await writer.wait_closed()

# DEBUG NOTE Dumping outbox
print("client:", client)
outbox = client.outbox()
print("outbox:", outbox)
count_messages = 0
async for message in outbox:
count_messages += 1
print(f"Message {count_messages} in outbox:", message)
print(f"End of messages in outbox, total: {count_messages}")
except:
logger.error(traceback.format_exc())
1 change: 1 addition & 0 deletions scitt_emulator/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ def cli(fn):
def cmd(args):
app = create_flask_app(
{
"port": args.port,
"middleware": args.middleware,
"middleware_config_path": args.middleware_config_path,
"tree_alg": args.tree_alg,
Expand Down

0 comments on commit f39ec18

Please sign in to comment.