Skip to content

Commit

Permalink
Use capnp to serialize/deserialize
Browse files Browse the repository at this point in the history
- Replace customized zmq frames serialization with Cap'n Proto to
  pave the road to implement language agnostic scheduler
  implementation, so make C++ implementation of scheduler feasible.
- Replace legacy setup.py with pyproject.toml
- Add configuration for black, flake8, and mypy
- Change to monotonic time for profiling
- Fix mypy warnings
  • Loading branch information
sharpener6 committed Aug 18, 2024
1 parent ce84e0a commit b4237f2
Show file tree
Hide file tree
Showing 74 changed files with 1,865 additions and 1,216 deletions.
20 changes: 20 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
__pychace__/
*.py[cod]
*$py.class

build/
dist/
sdist/
wheels/
eggs/
.eggs/
.idea/
*.egg-info/
*.egg
.mypy_cache/

venv*/
.vscode/

dask-worker-space/*
.pre-commit-config.yaml
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ _doc:
rm -fr docsvenv build; mkdir build
python3.8 -m venv docsvenv
. docsvenv/bin/activate; \
pip install -r docs/requirements_docs.txt; \
pip install -r requirements.txt; \
cd docs; make clean && make html
pip install -r docs/requirements_docs.txt; \
pip install -r requirements.txt; \
cd docs; make clean && make html
zip -r build/scaler_docs.zip docs/build/html/*
29 changes: 17 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<img src="https://github.com/citi.png" alt="Citi" width="80" height="80">
</a>

<h3 align="center">Citi/scaler</h3>
<h3 align="center">Citi/scaler</h3>

<p align="center">
Efficient, lightweight and reliable distributed computation engine.
Expand Down Expand Up @@ -43,13 +43,15 @@ messaging errors, among others.

- Distributed computing on **multiple cores and multiple servers**
- **Python** reference implementation, with **language agnostic messaging protocol** built on top of
[ZeroMQ](https://zeromq.org)
[Cap'n Proto](https://capnproto.org/) and [ZeroMQ](https://zeromq.org)
- **Graph** scheduling, which supports [Dask](https://www.dask.org)-like graph computing, optionally you
can use [GraphBLAS](https://graphblas.org)
- **Automated load balancing**. When workers got full of tasks, these will be scheduled to idle workers
can use [GraphBLAS](https://graphblas.org) for massive graph tasks
- **Automated load balancing**. automate balance busy workers' loads to idle workers, keep every worker as busy as
possible
- **Automated recovery** from faulting workers or clients
- Supports for **nested tasks**. Tasks can themselves submit new tasks
- `top`-like **monitoring tools**
- GUI monitoring tool

Scaler's scheduler can be run on PyPy, which will provide a performance boost

Expand Down Expand Up @@ -77,7 +79,6 @@ A local scheduler and a local set of workers can be conveniently spawn using `Sc
```python
from scaler import SchedulerClusterCombo


cluster = SchedulerClusterCombo(address="tcp://127.0.0.1:2345", n_workers=4)

...
Expand Down Expand Up @@ -154,9 +155,11 @@ from scaler import Client
def inc(i):
return i + 1


def add(a, b):
return a + b


def minus(a, b):
return a - b

Expand Down Expand Up @@ -256,11 +259,11 @@ W|Linux|15943|a7fe8b5e+ 0.0% 30.7m 0.0% 28.3m 1000 0 0 |
- function_id_to_tasks section shows task count for each function used
- worker section shows worker details, you can use shortcuts to sort by columns, the char * on column header show which
column is sorted right now
- agt_cpu/agt_rss means cpu/memory usage of worker agent
- cpu/rss means cpu/memory usage of worker
- free means number of free task slots for this worker
- sent means how many tasks scheduler sent to the worker
- queued means how many tasks worker received and queued
- agt_cpu/agt_rss means cpu/memory usage of worker agent
- cpu/rss means cpu/memory usage of worker
- free means number of free task slots for this worker
- sent means how many tasks scheduler sent to the worker
- queued means how many tasks worker received and queued

### From the web UI

Expand All @@ -274,7 +277,8 @@ This will open a web server on port `8081`.

## Contributing

Your contributions are at the core of making this a true open source project. Any contributions you make are **greatly appreciated**.
Your contributions are at the core of making this a true open source project. Any contributions you make are **greatly
appreciated**.

We welcome you to:

Expand All @@ -297,4 +301,5 @@ This project is distributed under the [Apache-2.0 License](https://www.apache.or

## Contact

If you have a query or require support with this project, [raise an issue](https://github.com/Citi/scaler/issues). Otherwise, reach out to [[email protected]](mailto:[email protected]).
If you have a query or require support with this project, [raise an issue](https://github.com/Citi/scaler/issues).
Otherwise, reach out to [[email protected]](mailto:[email protected]).
57 changes: 57 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
[build-system]
requires = ["setuptools", "mypy", "black", "flake8"]
build-backend = "setuptools.build_meta"

[project]
name = "scaler"
description = "Scaler Distribution Framework"
requires-python = ">=3.8"
readme = { file = "README.md", content-type = "text/markdown" }
license = { text = "Apache 2.0" }
authors = [{ name = "Citi", email = "[email protected]" }]
dynamic = ["dependencies", "version"]

[project.urls]
Home = "https://github.com/Citi/scaler"

[project.scripts]
scaler_scheduler = "scaler.entry_points.scheduler:main"
scaler_cluster = "scaler.entry_points.cluster:main"
scaler_top = "scaler.entry_points.top:main"
scaler_ui = "scaler.entry_points.webui:main"

[project.optional-dependencies]
uvloop = ["uvloop"]
graphblas = ["python-graphblas", "numpy"]
gui = ["nicegui[plotly]"]
all = ["python-graphblas", "numpy", "uvloop", "nicegui[plotly]"]

[tool.setuptools]
packages = ["scaler"]
include-package-data = true

[tool.setuptools.dynamic]
dependencies = { file = "requirements.txt" }
version = { attr = "scaler.about.__version__" }

[tool.mypy]
no_strict_optional = true
check_untyped_defs = true
ignore_missing_imports = true
exclude = [
"^docs.*$",
"^benchmark.*$"
]

[tool.black]
line-length = 120
skip-magic-trailing-comma = true

[tool.flake8]
max-line-length = 120
extend-ignore = "E203"
exclude = "venv312"

[metadata]
long_description = { file = "README.md" }
long_description_content_type = "text/markdown"
9 changes: 5 additions & 4 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pyzmq
psutil
bidict
cloudpickle
graphlib-backport; python_version < '3.9'
psutil
pycapnp
pyzmq
tblib
bidict
graphlib-backport; python_version < '3.9'
3 changes: 2 additions & 1 deletion run_top.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from scaler.entry_points.top import main
from scaler.utility.debug import pdb_wrapped

if __name__ == "__main__":
main()
pdb_wrapped(main)()
2 changes: 1 addition & 1 deletion scaler/about.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "1.7.14"
__version__ = "1.8.0"
6 changes: 3 additions & 3 deletions scaler/client/agent/client_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@
ClientShutdownResponse,
GraphTask,
GraphTaskCancel,
MessageVariant,
ObjectInstruction,
ObjectRequest,
ObjectResponse,
Task,
TaskCancel,
TaskResult,
)
from scaler.protocol.python.mixins import Message
from scaler.utility.event_loop import create_async_loop_routine
from scaler.utility.exceptions import ClientCancelledException, ClientQuitException, ClientShutdownException
from scaler.utility.zmq_config import ZMQConfig
Expand Down Expand Up @@ -113,7 +113,7 @@ def run(self):
self.__initialize()
self.__run_loop()

async def __on_receive_from_client(self, message: MessageVariant):
async def __on_receive_from_client(self, message: Message):
if isinstance(message, ClientDisconnect):
await self._disconnect_manager.on_client_disconnect(message)
return
Expand Down Expand Up @@ -144,7 +144,7 @@ async def __on_receive_from_client(self, message: MessageVariant):

raise TypeError(f"Unknown {message=}")

async def __on_receive_from_scheduler(self, message: MessageVariant):
async def __on_receive_from_scheduler(self, message: Message):
if isinstance(message, ClientShutdownResponse):
await self._disconnect_manager.on_client_shutdown_response(message)
return
Expand Down
4 changes: 2 additions & 2 deletions scaler/client/agent/disconnect_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from scaler.client.agent.mixins import DisconnectManager
from scaler.io.async_connector import AsyncConnector
from scaler.protocol.python.message import ClientDisconnect, ClientShutdownResponse, DisconnectType
from scaler.protocol.python.message import ClientDisconnect, ClientShutdownResponse
from scaler.utility.exceptions import ClientQuitException, ClientShutdownException


Expand All @@ -18,7 +18,7 @@ def register(self, connector_internal: AsyncConnector, connector_external: Async
async def on_client_disconnect(self, disconnect: ClientDisconnect):
await self._connector_external.send(disconnect)

if disconnect.type == DisconnectType.Disconnect:
if disconnect.disconnect_type == ClientDisconnect.DisconnectType.Disconnect:
raise ClientQuitException("client disconnecting")

async def on_client_shutdown_response(self, response: ClientShutdownResponse):
Expand Down
8 changes: 5 additions & 3 deletions scaler/client/agent/future_manager.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import logging
import threading
from concurrent.futures import InvalidStateError
from concurrent.futures import InvalidStateError, Future
from typing import Dict, Tuple

from scaler.client.agent.mixins import FutureManager
from scaler.client.future import ScalerFuture
from scaler.client.serializer.mixins import Serializer
from scaler.protocol.python.message import ObjectResponse, TaskResult, TaskStatus
from scaler.protocol.python.common import TaskStatus
from scaler.protocol.python.message import ObjectResponse, TaskResult
from scaler.utility.exceptions import DisconnectedError, NoWorkerError, TaskNotFoundError, WorkerDiedError
from scaler.utility.metadata.profile_result import retrieve_profiling_result_from_task_result
from scaler.utility.object_utility import deserialize_failure
Expand All @@ -20,7 +21,8 @@ def __init__(self, serializer: Serializer):
self._task_id_to_future: Dict[bytes, ScalerFuture] = dict()
self._object_id_to_future: Dict[bytes, Tuple[TaskStatus, ScalerFuture]] = dict()

def add_future(self, future: ScalerFuture):
def add_future(self, future: Future):
assert isinstance(future, ScalerFuture)
with self._lock:
future.set_running_or_notify_cancel()
self._task_id_to_future[future.task_id] = future
Expand Down
10 changes: 7 additions & 3 deletions scaler/client/agent/heartbeat_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from scaler.client.agent.mixins import HeartbeatManager
from scaler.io.async_connector import AsyncConnector
from scaler.protocol.python.message import ClientHeartbeat, ClientHeartbeatEcho
from scaler.protocol.python.status import Resource
from scaler.utility.mixins import Looper


Expand All @@ -26,9 +27,12 @@ def register(self, connector_external: AsyncConnector):
self._connector_external = connector_external

async def send_heartbeat(self):
cpu = self._process.cpu_percent() / 100
rss = self._process.memory_info().rss
await self._connector_external.send(ClientHeartbeat(cpu, rss, self._latency_us))
await self._connector_external.send(
ClientHeartbeat.new_msg(
Resource.new_msg(int(self._process.cpu_percent() * 10), self._process.memory_info().rss),
self._latency_us,
)
)

async def on_heartbeat_echo(self, heartbeat: ClientHeartbeatEcho):
if not self._connected:
Expand Down
51 changes: 27 additions & 24 deletions scaler/client/agent/object_manager.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,14 @@
from typing import Optional
from typing import Optional, Set

from scaler.client.agent.mixins import ObjectManager
from scaler.io.async_connector import AsyncConnector
from scaler.protocol.python.message import (
ObjectContent,
ObjectInstruction,
ObjectInstructionType,
ObjectRequest,
ObjectRequestType,
)
from scaler.protocol.python.common import ObjectContent
from scaler.protocol.python.message import ObjectInstruction, ObjectRequest


class ClientObjectManager(ObjectManager):
def __init__(self, identity: bytes):
self._sent_object_ids = set()
self._sent_object_ids: Set[bytes] = set()
self._identity = identity

self._connector_internal: Optional[AsyncConnector] = None
Expand All @@ -24,31 +19,39 @@ def register(self, connector_internal: AsyncConnector, connector_external: Async
self._connector_external = connector_external

async def on_object_instruction(self, instruction: ObjectInstruction):
if instruction.type == ObjectInstructionType.Create:
if instruction.instruction_type == ObjectInstruction.ObjectInstructionType.Create:
await self.__send_object_creation(instruction)
elif instruction.type == ObjectInstructionType.Delete:
elif instruction.instruction_type == ObjectInstruction.ObjectInstructionType.Delete:
await self.__delete_objects(instruction)

async def on_object_request(self, object_request: ObjectRequest):
assert object_request.type == ObjectRequestType.Get
assert object_request.request_type == ObjectRequest.ObjectRequestType.Get
await self._connector_external.send(object_request)

def record_task_result(self, task_id: bytes, object_id: bytes):
self._sent_object_ids.add(object_id)

async def clean_all_objects(self):
await self._connector_external.send(
ObjectInstruction(ObjectInstructionType.Delete, self._identity, ObjectContent(tuple(self._sent_object_ids)))
ObjectInstruction.new_msg(
ObjectInstruction.ObjectInstructionType.Delete,
self._identity,
ObjectContent.new_msg(tuple(self._sent_object_ids)),
)
)
self._sent_object_ids = set()

async def __send_object_creation(self, instruction: ObjectInstruction):
assert instruction.type == ObjectInstructionType.Create
assert instruction.instruction_type == ObjectInstruction.ObjectInstructionType.Create

new_object_content = list(
zip(
new_object_ids = set(instruction.object_content.object_ids) - self._sent_object_ids
if not new_object_ids:
return

new_object_content = ObjectContent.new_msg(
*zip(
*filter(
lambda object_pack: object_pack[0] not in self._sent_object_ids,
lambda object_pack: object_pack[0] in new_object_ids,
zip(
instruction.object_content.object_ids,
instruction.object_content.object_names,
Expand All @@ -58,15 +61,15 @@ async def __send_object_creation(self, instruction: ObjectInstruction):
)
)

if not new_object_content:
return

instruction.object_content = ObjectContent(*new_object_content)
self._sent_object_ids.update(set(new_object_content.object_ids))

self._sent_object_ids.update(instruction.object_content.object_ids)
await self._connector_external.send(instruction)
await self._connector_external.send(
ObjectInstruction.new_msg(
ObjectInstruction.ObjectInstructionType.Create, instruction.object_user, new_object_content
)
)

async def __delete_objects(self, instruction: ObjectInstruction):
assert instruction.type == ObjectInstructionType.Delete
assert instruction.instruction_type == ObjectInstruction.ObjectInstructionType.Delete
self._sent_object_ids.difference_update(instruction.object_content.object_ids)
await self._connector_external.send(instruction)
Loading

0 comments on commit b4237f2

Please sign in to comment.