From c4480b20958ecf39a8d26eb77babc9e27c2f2d53 Mon Sep 17 00:00:00 2001 From: latentvector Date: Sat, 24 Aug 2024 15:36:11 -0400 Subject: [PATCH] docker fix --- Dockerfile | 25 ++--- commune/app/app.py | 85 ++++++++------ commune/chat/chat.py | 54 ++++----- commune/module/_manager.py | 20 +++- commune/module/_misc.py | 4 +- commune/module/module.py | 16 +-- commune/modules/pm2/pm2.py | 3 - commune/{modules => }/remote/.gitignore | 0 commune/{modules => }/remote/README.md | 0 commune/{modules => }/remote/app.py | 10 +- commune/{modules => }/remote/data/.gitkeep | 0 commune/{modules => }/remote/peer.py | 0 commune/{modules => }/remote/remote.py | 0 commune/{modules => }/remote/ssh.py | 0 commune/serializer/serializer.py | 104 ++++++----------- commune/serializer/serializers.py | 124 -------------------- commune/serializer/test.py | 31 +++++ commune/serializer/types/bytes.py | 10 ++ commune/serializer/types/munch.py | 48 ++++++++ commune/serializer/types/numpy.py | 33 ++++++ commune/serializer/types/pandas.py | 15 +++ commune/serializer/types/torch.py | 20 ++++ commune/server/manager.py | 6 +- commune/server/namespace.py | 125 ++++++++++----------- commune/server/test.py | 17 ++- commune/vali/test.py | 42 +++---- commune/vali/vali.py | 15 +-- scripts/start.sh | 5 +- 28 files changed, 398 insertions(+), 414 deletions(-) rename commune/{modules => }/remote/.gitignore (100%) rename commune/{modules => }/remote/README.md (100%) rename commune/{modules => }/remote/app.py (96%) rename commune/{modules => }/remote/data/.gitkeep (100%) rename commune/{modules => }/remote/peer.py (100%) rename commune/{modules => }/remote/remote.py (100%) rename commune/{modules => }/remote/ssh.py (100%) delete mode 100644 commune/serializer/serializers.py create mode 100644 commune/serializer/test.py create mode 100644 commune/serializer/types/bytes.py create mode 100644 commune/serializer/types/munch.py create mode 100644 commune/serializer/types/numpy.py create mode 100644 commune/serializer/types/pandas.py create mode 100644 commune/serializer/types/torch.py diff --git a/Dockerfile b/Dockerfile index e6ef9521..af493c8a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,36 +1,25 @@ # THE GENERAL CONTAINER FOR CONNECTING ALL THE ENVIRONMENTS 😈 FROM ubuntu:22.04 - #SYSTEM +ENV LIBNAME=commune +WORKDIR /app ARG DEBIAN_FRONTEND=noninteractive RUN usermod -s /bin/bash root RUN apt-get update + #RUST RUN apt-get install curl nano build-essential cargo libstd-rust-dev -y #JS RUN apt-get install -y nodejs npm RUN npm install -g pm2 -ENV LIBNAME commune -ENV PWD /app -WORKDIR /app -RUN git clone https://github.com/commune-ai/commune.git /commune -RUN pip install -e /commune #PYTHON RUN apt-get install python3 python3-pip python3-venv -y +RUN git clone -b main https://github.com/commune-ai/commune.git /commune +RUN pip install -e /commune +WORKDIR /app +# TODO DOCKER -# WANT TO HAVE TO REBUILD THE WHOLE IMAGE EVERY TIME WE CHANGE THE REQUIREMENTS -COPY ./requirements.txt /app/requirements.txt -COPY ./setup.py /app/setup.py -COPY ./README.md /app/README.md - -RUN pip install -e ./ -# THIS IS FOR THE LOCAL PACKAG -COPY ./ /app -# git safety for app -RUN git config --global --add safe.directory /app -RUN git config pull.rebase false -# IMPORT EVERYTHING ELSE ENTRYPOINT [ "tail", "-f", "/dev/null"] \ No newline at end of file diff --git a/commune/app/app.py b/commune/app/app.py index 65aa3cd1..4871a46e 100644 --- a/commune/app/app.py +++ b/commune/app/app.py @@ -9,6 +9,16 @@ class App(c.Module): port_range = [8501, 8600] name_prefix = 'app::' + + def get_free_port(self, module, port=None, update=False): + app2info = self.get('app2info', {}) + if update: + return c.free_port() + port = app2info.get(module, {}).get('port', None) + if port == None: + port = c.free_port() + return port + def start(self, module:str = 'server', name : Optional[str] = None, @@ -18,52 +28,46 @@ def start(self, kwargs:dict=None, cmd = None, update:bool=False, - cwd = None, - **extra_kwargs): - - - module = c.shortcuts().get(module, module) - app2info = self.get('app2info', {}) - kwargs = kwargs or {} - name = name or module - port = port or app2info.get(name, {}).get('port', c.free_port()) - if update: - port = c.free_port() - process_name = self.name_prefix + name - if c.port_used(port): - c.kill_port(port) - c.pm2_kill(process_name) - if c.module_exists(module + '.app'): - module = module + '.app' - kwargs_str = json.dumps(kwargs or {}).replace('"', "'") - module_class = c.module(module) - cmd = cmd or f'streamlit run {module_class.filepath()} --server.port {port} -- --fn {fn} --kwargs "{kwargs_str}"' - - - cwd = cwd or os.path.dirname(module_class.filepath()) - + process_name:str=None, + cwd = None): + port = self.get_free_port(module=module, port=port, update=update) if remote: + if self.app_exists(name): + self.kill_app(name) rkwargs = c.locals2kwargs(locals()) rkwargs['remote'] = False - del rkwargs['module_class'] - del rkwargs['app2info'] - del rkwargs['process_name'] self.remote_fn( fn='start', - name=self.name_prefix + name , + name=self.name_prefix + module , kwargs= rkwargs) return { - 'name': name, - 'cwd': cwd, - 'fn': fn, + 'success': True, + 'module': module, 'address': { 'local': f'http://localhost:{port}', 'public': f'http://{c.ip()}:{port}', - } + } , + 'kwargs': rkwargs } + + module = c.shortcuts().get(module, module) + + kwargs = kwargs or {} + name = name or module + port = port or self.get_free_port(module) + # if the process is already running, kill it + # if the module is an app, we need to add the .app to the module name + if c.module_exists(module + '.app'): + module = module + '.app' + app2info = self.app2info() + kwargs_str = json.dumps(kwargs or {}).replace('"', "'") + module_class = c.module(module) + cmd = cmd or f'streamlit run {module_class.filepath()} --server.port {port} -- --fn {fn} --kwargs "{kwargs_str}"' + cwd = cwd or os.path.dirname(module_class.filepath()) + module = c.module(module) app_info= { 'name': name, @@ -85,13 +89,23 @@ def app2info(self): app2info = self.get('app2info', {}) if not isinstance(app2info, dict): app2info = {} + changed = False + og_app2info = app2info.copy() + for name, info in og_app2info.items(): + if not c.port_used(info['port']): + c.print(f'Port {info["port"]} is not used. Killing {name}') + changed = True + del app2info[name] + if changed: + self.put('app2info', app2info) + return app2info def kill_all(self): return c.module('pm2').kill_many(self.apps()) - def kill(self, name): + def kill_app(self, name): return c.module('pm2').kill(self.name_prefix+name) def filter_name(self, name:str) -> bool: @@ -104,6 +118,11 @@ def apps(self, remove_prefix = True): apps = [n[len(self.name_prefix):] for n in apps] return apps + + def app_exists(self, name): + return name in self.apps() + + def app_modules(self, **kwargs): return list(set([m.replace('.app','') for m in self.modules() if self.has_app(m, **kwargs)])) diff --git a/commune/chat/chat.py b/commune/chat/chat.py index 3c524662..4dd73083 100644 --- a/commune/chat/chat.py +++ b/commune/chat/chat.py @@ -50,7 +50,6 @@ def generate(self, data): params = c.copy(data['data']) input = params.pop('input') system_prompt = params.pop('system_prompt', None) - if system_prompt: input = system_prompt + '\n' + input r = self.model.generate( input,stream=1, **params) @@ -71,19 +70,17 @@ def save_data(self, data): def get_params(self): model = st.selectbox('Model', self.models) - with st.sidebar.expander('Parameters', False): - - temperature = st.slider('Temperature', 0.0, 1.0, 0.5) + temperature = st.slider('Temperature', 0.0, 1.0, 0.5) - if hasattr(self.model, 'get_model_info'): - model_info = self.model.get_model_info(model) - max_tokens = min(int(model_info['context_length']*0.9), self.max_tokens) - else: - model_info = {} - max_tokens = self.max_tokens - max_tokens = st.number_input('Max Tokens', 1, max_tokens, max_tokens) - system_prompt = st.text_area('System Prompt',self.system_prompt, height=200) + if hasattr(self.model, 'get_model_info'): + model_info = self.model.get_model_info(model) + max_tokens = min(int(model_info['context_length']*0.9), self.max_tokens) + else: + model_info = {} + max_tokens = self.max_tokens + max_tokens = st.number_input('Max Tokens', 1, max_tokens, max_tokens) + system_prompt = st.text_area('System Prompt',self.system_prompt, height=200) input = st.text_area('Text',self.text, height=100) @@ -111,8 +108,7 @@ def sidebar(self, user='user', password='password', seperator='::'): 'path': self.resolve_path('history', self.key.ss58_address ), 'history': self.history(self.key.ss58_address) }) - with st.expander('History', expanded=False): - self.search_hsitory() + def search_history(self): search = st.text_input('Search') # if the search is in any of the columns @@ -134,7 +130,9 @@ def app(self): self.history_page() def chat_page(self): - data = c.ticket(self.get_params(), key=self.key) + with st.sidebar.expander('Params', expanded=True): + params = self.get_params() + data = c.ticket(params, key=self.key) # make the buttons cover the whole page cols = st.columns([1,1]) @@ -147,18 +145,20 @@ def chat_page(self): reverse_emojis = emojis[::-1] with st.spinner(f'{emojis} Generating {reverse_emojis}'): st.write_stream(r) - - with st.expander('Post Processing', False): - lambda_string = st.text_area('fn(x={model_output})', 'x', height=100) - prefix = 'lambda x: ' - lambda_string = prefix + lambda_string if not lambda_string.startswith(prefix) else lambda_string - lambda_fn = eval(lambda_string) - print(data) - try: - output = data['data']['output'] - output = lambda_fn(output) - except Exception as e: - st.error(e) + + self.post_processing(data) + + + def post_processing(self, data): + lambda_string = st.text_area('fn(x={model_output})', 'x', height=100) + prefix = 'lambda x: ' + lambda_string = prefix + lambda_string if not lambda_string.startswith(prefix) else lambda_string + lambda_fn = eval(lambda_string) + try: + output = data['data']['output'] + output = lambda_fn(output) + except Exception as e: + st.error(e) def history_page(self): history = self.data.history diff --git a/commune/module/_manager.py b/commune/module/_manager.py index 9a60a0cf..bc9acd24 100644 --- a/commune/module/_manager.py +++ b/commune/module/_manager.py @@ -417,9 +417,17 @@ def module_exists(cls, module:str, **kwargs) -> bool: def has_app(cls, module:str, **kwargs) -> bool: return cls.module_exists(module + '.app', **kwargs) + @classmethod + def simplify_paths(cls, paths): + paths = [cls.simplify_path(p) for p in paths] + paths = [p for p in paths if p] + return paths + @classmethod def simplify_path(cls, p, avoid_terms=['modules']): chunks = p.split('.') + if len(chunks) < 2: + return None file_name = chunks[-2] chunks = chunks[:-1] path = '' @@ -448,18 +456,18 @@ def simplify_path(cls, p, avoid_terms=['modules']): @classmethod def local_modules(cls, search=None): - objects = cls.find_classes(cls.pwd()) - object_paths = [cls.simplify_path(obj) for obj in objects] + object_paths = cls.find_classes(cls.pwd()) + object_paths = cls.simplify_paths(object_paths) if search != None: - object_paths = [obj for obj in object_paths if search in obj] + object_paths = [p for p in object_paths if search in p] return sorted(list(set(object_paths))) @classmethod def lib_modules(cls, search=None): - objects = cls.find_classes(cls.libpath, ) - object_paths = [cls.simplify_path(obj) for obj in objects] + object_paths = cls.find_classes(cls.libpath ) + object_paths = cls.simplify_paths(object_paths) if search != None: - object_paths = [obj for obj in object_paths if search in obj] + object_paths = [p for p in object_paths if search in p] return sorted(list(set(object_paths))) @classmethod diff --git a/commune/module/_misc.py b/commune/module/_misc.py index ecd24b8a..feef0ece 100644 --- a/commune/module/_misc.py +++ b/commune/module/_misc.py @@ -562,11 +562,9 @@ def set_env(cls, key:str, value:str)-> None: @classmethod def pwd(cls): - pwd = os.getenv('PWD', '/app') # the current wor king directory from the process starts + pwd = os.getenv('PWD', '/commune') # the current wor king directory from the process starts return pwd - - @classmethod def choice(cls, options:Union[list, dict])->list: options = deepcopy(options) # copy to avoid changing the original diff --git a/commune/module/module.py b/commune/module/module.py index c4c95397..9b8c0033 100755 --- a/commune/module/module.py +++ b/commune/module/module.py @@ -925,22 +925,13 @@ def remote_fn(cls, kwargs = kwargs if kwargs else {} args = args if args else [] - if name == None: - module_path = cls.resolve_object(module).module_name() - name = f"{module_path}{tag_seperator}{fn}" - if tag != None: - name = f'{name}{tag_seperator}{tag}' - if 'remote' in kwargs: kwargs['remote'] = False - - cwd = cwd or cls.dirpath() + cwd = cwd or cls.dirpath() kwargs = kwargs or {} args = args or [] - module = cls.resolve_object(module) - # resolve the name if name == None: # if the module has a module_path function, use that as the name @@ -948,10 +939,7 @@ def remote_fn(cls, name = module.module_name() else: name = module.__name__.lower() - # resolve the tag - if tag != None: - name = f'{name}{tag_seperator}{tag}' - + c.print(f'[bold cyan]Launching --> <<[/bold cyan][bold yellow]class:{module.__name__}[/bold yellow] [bold white]name[/bold white]:{name} [bold white]fn[/bold white]:{fn} [bold white]mode[/bold white]:{mode}>>', color='green') launch_kwargs = dict( diff --git a/commune/modules/pm2/pm2.py b/commune/modules/pm2/pm2.py index 9c27a77a..104dd8f0 100644 --- a/commune/modules/pm2/pm2.py +++ b/commune/modules/pm2/pm2.py @@ -104,7 +104,6 @@ def kill_many(cls, search=None, verbose:bool = True, timeout=10): @classmethod def kill_all(cls, verbose:bool = True, timeout=10, trials=10): - results = {} while len(cls.servers()) > 0: results = cls.kill_many(search=None, verbose=verbose, timeout=timeout) trials -= 1 @@ -122,7 +121,6 @@ def servers(cls, search=None, verbose:bool = False) -> List[str]: if 'errored' in line: cls.kill(server_name, verbose=True) continue - module_list += [server_name] if search != None: @@ -135,7 +133,6 @@ def servers(cls, search=None, verbose:bool = False) -> List[str]: pm2ls = servers - # commune.run_command('pm2 status').stdout.split('\n')[5].split(' │')[0].split(' │ ')[-1]commune.run_command('pm2 status').stdout.split('\n')[5].split(' │')[0].split(' │ ')[-1] @classmethod diff --git a/commune/modules/remote/.gitignore b/commune/remote/.gitignore similarity index 100% rename from commune/modules/remote/.gitignore rename to commune/remote/.gitignore diff --git a/commune/modules/remote/README.md b/commune/remote/README.md similarity index 100% rename from commune/modules/remote/README.md rename to commune/remote/README.md diff --git a/commune/modules/remote/app.py b/commune/remote/app.py similarity index 96% rename from commune/modules/remote/app.py rename to commune/remote/app.py index 234bf00d..82c7703d 100644 --- a/commune/modules/remote/app.py +++ b/commune/remote/app.py @@ -21,9 +21,9 @@ def app(cls, module: str = None, **kwargs): with st.sidebar: self.filter_hosts_dashboard() - self.manage_hosts() + tabs = st.tabs(['SSH', 'Manage Hosts']) + self.manage_hosts() - page = 'manage_hosts' self.ssh() @@ -121,6 +121,12 @@ def manage_hosts(self): with st.expander('Edit Hosts', expanded=False): self.edit_hosts() + with st.expander('Save SSH Config', expanded=True): + path = st.text_input('Enter Path', "~/.ssh/config") + if st.button(f'Save SSH Config to {path}'): + st.success(self.save_ssh_config(path)) + + def host2ssh_search(self, expander=True): host = st.selectbox('Search', list(self.host2ssh.keys())) diff --git a/commune/modules/remote/data/.gitkeep b/commune/remote/data/.gitkeep similarity index 100% rename from commune/modules/remote/data/.gitkeep rename to commune/remote/data/.gitkeep diff --git a/commune/modules/remote/peer.py b/commune/remote/peer.py similarity index 100% rename from commune/modules/remote/peer.py rename to commune/remote/peer.py diff --git a/commune/modules/remote/remote.py b/commune/remote/remote.py similarity index 100% rename from commune/modules/remote/remote.py rename to commune/remote/remote.py diff --git a/commune/modules/remote/ssh.py b/commune/remote/ssh.py similarity index 100% rename from commune/modules/remote/ssh.py rename to commune/remote/ssh.py diff --git a/commune/serializer/serializer.py b/commune/serializer/serializer.py index 6d3be476..635225dc 100644 --- a/commune/serializer/serializer.py +++ b/commune/serializer/serializer.py @@ -1,14 +1,14 @@ -from .serializers import serilizer_map import commune as c +import json + class Serializer(c.Module): - serilizer_map = serilizer_map - serializers = serilizer_map.values() list_types = [list, set, tuple] # shit that you can turn into lists for json iterable_types = [list, set, tuple, dict] # json_serializable_types = [int, float, str, bool, type(None)] def serialize(self,x:dict, mode = 'dict', copy_value = True): + if copy_value: x = c.copy(x) if type(x) in self.iterable_types: @@ -28,7 +28,6 @@ def serialize(self,x:dict, mode = 'dict', copy_value = True): else: # GET THE TYPE OF THE VALUE data_type = str(type(x)).split("'")[1].lower() - if 'munch' in data_type: data_type = 'munch' if 'tensor' in data_type or 'torch' in data_type: @@ -38,12 +37,8 @@ def serialize(self,x:dict, mode = 'dict', copy_value = True): if 'dataframe' in data_type: data_type = 'pandas' - serializer = serilizer_map[data_type] - if not hasattr(serializer, 'date_type'): - serializer = serializer() - setattr(serializer, 'date_type', data_type) - serilizer_map[data_type] = serializer - if serializer is not None: + serializer = self.get_serializer(data_type) + if serializer != None: # SERIALIZE MODE ON result = {'data': serializer.serialize(x), 'data_type': serializer.date_type, @@ -51,10 +46,13 @@ def serialize(self,x:dict, mode = 'dict', copy_value = True): else: result = {"success": False, "error": f"Type {serializer.data_type} not supported"} - result = self.resolve_serialized_result(result, mode=mode) - return result + return self.process_output(result, mode=mode) - def resolve_serialized_result(self, result, mode = 'str'): + def process_output(self, result, mode = 'str'): + """ + + + """ if mode == 'str': if isinstance(result, dict): result = json.dumps(result) @@ -79,10 +77,6 @@ def is_serialized(self, data): def deserialize(self, x) -> object: """Serializes a torch object to DataBlock wire format. """ - if isinstance(x, dict) and isinstance(x.get('data', None), str): - x = x['data'] - - if isinstance(x, str): if x.startswith('{') or x.startswith('['): x = self.str2dict(x) @@ -92,32 +86,32 @@ def deserialize(self, x) -> object: elif c.is_float(x): x = float(x) return x - - is_single = isinstance(x,dict) and all([k in x for k in ['data', 'data_type', 'serialized']]) - if is_single: - x = [x] - k_list = [] - if isinstance(x, dict): - k_list = list(x.keys()) - elif type(x) in [list]: - k_list = list(range(len(x))) - elif type(x) in [tuple, set]: - # convert to list, to format as json - x = list(x) - k_list = list(range(len(x))) - - for k in k_list: - v = x[k] - if self.is_serialized(v): - data_type = v['data_type'] - data = v['data'] - if hasattr(self, f'deserialize_{data_type}'): - x[k] = getattr(self, f'deserialize_{data_type}')(data=data) - elif type(v) in [dict, list, tuple, set]: - x[k] = self.deserialize(x=v) - if is_single: - x = x[0] + is_serialized = self.is_serialized(x) + if is_serialized: + serializer = self.get_serializer(x['data_type']) + return serializer.deserialize(x['data']) return x + + def serializer_map(self): + type_path = self.dirpath() + '/types' + module_paths = c.find_objects(type_path) + return {p.split('.')[-2]: c.obj(p)() for p in module_paths} + + + def types(self): + return list(self.serializer_map().keys()) + + + def get_serializer(self, data_type): + serializer_map = self.serializer_map() + if data_type in serializer_map: + serializer = serializer_map[data_type] + if not hasattr(serializer, 'date_type'): + setattr(serializer, 'date_type', data_type) + serializer_map[data_type] = serializer + else: + raise TypeError(f'Type Not supported for serializeation ({data_type})') + return serializer def dict2bytes(self, data:dict) -> bytes: import msgpack @@ -133,29 +127,3 @@ def str2dict(self, data:str) -> bytes: if isinstance(data, str): data = json.loads(data) return data - - def test(self): - import torch, time - data_list = [ - torch.ones(1000), - torch.zeros(1000), - torch.rand(1000), - [1,2,3,4,5], - {'a':1, 'b':2, 'c':3}, - 'hello world', - 1, - 1.0, - True, - False, - None - - ] - for data in data_list: - t1 = time.time() - data = self.serialize(data) - data = self.deserialize(data) - t2 = time.time() - latency = t2 - t1 - emoji = '✅' if data == data else '❌' - print('DATA', data, 'LATENCY', latency, emoji) - return {'msg': 'PASSED test_serialize_deserialize'} diff --git a/commune/serializer/serializers.py b/commune/serializer/serializers.py deleted file mode 100644 index 64c44534..00000000 --- a/commune/serializer/serializers.py +++ /dev/null @@ -1,124 +0,0 @@ -""" An interface for serializing and deserializing tensors""" - -# I DONT GIVE A FUCK LICENSE (IDGAF) -# Do whatever you want with this code -# Dont pull up with your homies if it dont work. -import numpy as np -from typing import * -from copy import deepcopy -import commune as c -import json - -class MunchSerializer: - - def serialize(self, data: dict) -> str: - return json.dumps(self.munch2dict(data)) - - def deserialize(self, data: bytes) -> 'Munch': - return self.dict2munch(self.str2dict(data)) - - - def str2dict(self, data:str) -> bytes: - if isinstance(data, bytes): - data = data.decode('utf-8') - if isinstance(data, str): - data = json.loads(data) - return data - - @classmethod - def dict2munch(cls, x:dict, recursive:bool=True)-> 'Munch': - from munch import Munch - ''' - Turn dictionary into Munch - ''' - if isinstance(x, dict): - for k,v in x.items(): - if isinstance(v, dict) and recursive: - x[k] = cls.dict2munch(v) - x = Munch(x) - return x - - @classmethod - def munch2dict(cls, x:'Munch', recursive:bool=True)-> dict: - from munch import Munch - ''' - Turn munch object into dictionary - ''' - if isinstance(x, Munch): - x = dict(x) - for k,v in x.items(): - if isinstance(v, Munch) and recursive: - x[k] = cls.munch2dict(v) - return x - - - def dict2str(self, data:dict) -> bytes: - return - - -class BytesSerializer: - - def serialize(self, data: dict) -> bytes: - return data.hex() - - def deserialize(self, data: bytes) -> 'DataBlock': - if isinstance(data, str): - data = bytes.fromhex(data) - return data - -class NumpySerializer: - - def serialize(self, data: 'np.ndarray') -> 'np.ndarray': - return self.numpy2bytes(data).hex() - - def deserialize(self, data: bytes) -> 'np.ndarray': - if isinstance(data, str): - data = bytes.fromhex(data) - return self.bytes2numpy(data) - - def bytes2numpy(self, data:bytes) -> np.ndarray: - import msgpack_numpy - import msgpack - output = msgpack.unpackb(data, object_hook=msgpack_numpy.decode) - return output - - def numpy2bytes(self, data:np.ndarray)-> bytes: - import msgpack_numpy - import msgpack - output = msgpack.packb(data, default=msgpack_numpy.encode) - return output - - @classmethod - def bytes2str(cls, x, **kwargs): - return x.hex() - - @classmethod - def str2bytes(cls, x, **kwargs): - return - -class PandasSerializer: - - def serialize(self, data: 'pd.DataFrame') -> 'DataBlock': - data = data.to_json() - if isinstance(data, bytes): - data = data.decode('utf-8') - return data - - def deserialize(self, data: bytes) -> 'pd.DataFrame': - import pandas as pd - data = pd.DataFrame.from_dict(json.loads(data)) - return data - -class TorchSerializer: - def deserialize(self, data: dict) -> 'torch.Tensor': - from safetensors.torch import load - if isinstance(data, str): - data = self.str2bytes(data) - data = load(data) - return data['data'] - - def serialize(self, data: 'torch.Tensor') -> 'DataBlock': - from safetensors.torch import save - return save({'data':data}).hex() - -serilizer_map = {k.split('Serializer')[0].lower():v for k,v in locals().items() if k.endswith('Serializer')} \ No newline at end of file diff --git a/commune/serializer/test.py b/commune/serializer/test.py new file mode 100644 index 00000000..1a497b15 --- /dev/null +++ b/commune/serializer/test.py @@ -0,0 +1,31 @@ + +import commune as c +class TestSerializer(c.Module): + def test(self): + import torch, time + data_list = [ + torch.ones(1000), + torch.zeros(1000), + torch.rand(1000), + [1,2,3,4,5], + {'a':1, 'b':2, 'c':3}, + 'hello world', + c.df([{'name': 'joe', 'fam': 1}]), + 1, + 1.0, + True, + False, + None + + ] + for data in data_list: + t1 = time.time() + ser_data = self.serialize(data) + des_data = self.deserialize(ser_data) + des_ser_data = self.serialize(des_data) + t2 = time.time() + + latency = t2 - t1 + emoji = '✅' if str(des_ser_data) == str(ser_data) else '❌' + print(type(data),emoji) + return {'msg': 'PASSED test_serialize_deserialize'} diff --git a/commune/serializer/types/bytes.py b/commune/serializer/types/bytes.py new file mode 100644 index 00000000..9905e60a --- /dev/null +++ b/commune/serializer/types/bytes.py @@ -0,0 +1,10 @@ + +class BytesSerializer: + + def serialize(self, data: dict) -> bytes: + return data.hex() + + def deserialize(self, data: bytes) -> 'DataBlock': + if isinstance(data, str): + data = bytes.fromhex(data) + return data diff --git a/commune/serializer/types/munch.py b/commune/serializer/types/munch.py new file mode 100644 index 00000000..61d07174 --- /dev/null +++ b/commune/serializer/types/munch.py @@ -0,0 +1,48 @@ +import json + +class MunchSerializer: + + def serialize(self, data: dict) -> str: + return json.dumps(self.munch2dict(data)) + + def deserialize(self, data: bytes) -> 'Munch': + return self.dict2munch(self.str2dict(data)) + + + def str2dict(self, data:str) -> bytes: + if isinstance(data, bytes): + data = data.decode('utf-8') + if isinstance(data, str): + data = json.loads(data) + return data + + @classmethod + def dict2munch(cls, x:dict, recursive:bool=True)-> 'Munch': + from munch import Munch + ''' + Turn dictionary into Munch + ''' + if isinstance(x, dict): + for k,v in x.items(): + if isinstance(v, dict) and recursive: + x[k] = cls.dict2munch(v) + x = Munch(x) + return x + + @classmethod + def munch2dict(cls, x:'Munch', recursive:bool=True)-> dict: + from munch import Munch + ''' + Turn munch object into dictionary + ''' + if isinstance(x, Munch): + x = dict(x) + for k,v in x.items(): + if isinstance(v, Munch) and recursive: + x[k] = cls.munch2dict(v) + return x + + + def dict2str(self, data:dict) -> bytes: + return + diff --git a/commune/serializer/types/numpy.py b/commune/serializer/types/numpy.py new file mode 100644 index 00000000..4e8ada33 --- /dev/null +++ b/commune/serializer/types/numpy.py @@ -0,0 +1,33 @@ +class NumpySerializer: + + def serialize(self, data: 'np.ndarray') -> 'np.ndarray': + return self.numpy2bytes(data).hex() + + def deserialize(self, data: bytes) -> 'np.ndarray': + if isinstance(data, str): + data = bytes.fromhex(data) + return self.bytes2numpy(data) + + def bytes2numpy(self, data:bytes) -> 'np.ndarray': + import msgpack_numpy + import msgpack + output = msgpack.unpackb(data, object_hook=msgpack_numpy.decode) + return output + + def numpy2bytes(self, data:'np.ndarray')-> bytes: + import msgpack_numpy + import msgpack + output = msgpack.packb(data, default=msgpack_numpy.encode) + return output + + @classmethod + def bytes2str(cls, x, **kwargs): + return x.hex() + + @classmethod + def str2bytes(cls, data: str, mode: str = 'hex') -> bytes: + if mode in ['utf-8']: + return bytes(data, mode) + elif mode in ['hex']: + return bytes.fromhex(data) + \ No newline at end of file diff --git a/commune/serializer/types/pandas.py b/commune/serializer/types/pandas.py new file mode 100644 index 00000000..dda9e25b --- /dev/null +++ b/commune/serializer/types/pandas.py @@ -0,0 +1,15 @@ +import json + +class PandasSerializer: + + def serialize(self, data: 'pd.DataFrame') -> 'DataBlock': + data = data.to_json() + if isinstance(data, bytes): + data = data.decode('utf-8') + return data + + def deserialize(self, data: bytes) -> 'pd.DataFrame': + import pandas as pd + data = pd.DataFrame.from_dict(json.loads(data)) + return data + \ No newline at end of file diff --git a/commune/serializer/types/torch.py b/commune/serializer/types/torch.py new file mode 100644 index 00000000..655890de --- /dev/null +++ b/commune/serializer/types/torch.py @@ -0,0 +1,20 @@ + +class TorchSerializer: + def deserialize(self, data: dict) -> 'torch.Tensor': + from safetensors.torch import load + if isinstance(data, str): + data = self.str2bytes(data) + data = load(data) + return data['data'] + + def serialize(self, data: 'torch.Tensor') -> 'DataBlock': + from safetensors.torch import save + return save({'data':data}).hex() + + @classmethod + def str2bytes(cls, data: str, mode: str = 'hex') -> bytes: + if mode in ['utf-8']: + return bytes(data, mode) + elif mode in ['hex']: + return bytes.fromhex(data) + \ No newline at end of file diff --git a/commune/server/manager.py b/commune/server/manager.py index 7b22ea5d..656c2eb0 100644 --- a/commune/server/manager.py +++ b/commune/server/manager.py @@ -53,9 +53,7 @@ def kill_prefix(cls, prefix:str, **kwargs): @classmethod - def kill_many(cls, servers, search:str = None, network='local', parallel=True, timeout=10, **kwargs): - - + def kill_many(cls, servers, search:str = None, network='local', timeout=10, **kwargs): servers = c.servers(network=network) servers = [s for s in servers if search in s] futures = [] @@ -66,9 +64,7 @@ def kill_many(cls, servers, search:str = None, network='local', parallel=True, results = [] for r in c.as_completed(futures, timeout=timeout): results += [r.result()] - c.print(f'Killed {len(results)} servers', color='red') - return results diff --git a/commune/server/namespace.py b/commune/server/namespace.py index c61fde99..6b25fc40 100644 --- a/commune/server/namespace.py +++ b/commune/server/namespace.py @@ -6,8 +6,6 @@ # THIS IS THE INTERNET OF INTERNETS. class Namespace(c.Module): - - # the default network : str = 'local' @classmethod @@ -25,65 +23,68 @@ def namespace(cls, search=None, netuid=None, max_age:int = 60, timeout=6, - verbose=False, - **kwargs) -> dict: - + verbose=False) -> dict: network = network or 'local' path = cls.resolve_network_path(network) - namespace = cls.get(path, {}, max_age=max_age, update=update) - if len(namespace) == 0: - c.print(f'UPDATING NETWORK(network={network})', color='blue', verbose=verbose) - if network == 'local': - namespace = {} - addresses = ['0.0.0.0'+':'+str(p) for p in c.used_ports()] - future2address = {} - for address in addresses: - - f = c.submit(c.call, [address+'/server_name'], timeout=timeout) - future2address[f] = address - futures = list(future2address.keys()) - try: - progress = c.tqdm(len(futures)) - for f in c.as_completed(futures, timeout=timeout): - address = future2address[f] - progress.update(1) - try: - name = f.result() - namespace[name] = address - c.print(f'{name}-->{address}', color='green', verbose=verbose) - except Exception as e: - c.print(f'Error {e} with {name} and {address}', color='red', verbose=verbose) - except Exception as e: - c.print(f'Timeout error {e}', color='red', verbose=verbose) - elif 'subspace' in network: - if '.' in network: - network, netuid = network.split('.') - else: - netuid = netuid or 0 - if c.is_int(netuid): - netuid = int(netuid) - namespace = c.module(network)().namespace(search=search, - update=update, - netuid=netuid, - **kwargs) - else: - namespace = {} - - namespace = {k:v for k,v in namespace.items() if 'Error' not in k} - cls.put_namespace(network, namespace) - + namespace = cls.get(path, None, max_age=max_age) + if namespace == None: + namespace = cls.update_namespace(network=network, + netuid=netuid, + timeout=timeout, + verbose=verbose) + cls.put(path,namespace) if search != None: namespace = {k:v for k,v in namespace.items() if search in k} - + namespace = {k:':'.join(v.split(':')[:-1]) + ':'+ str(v.split(':')[-1]) for k,v in namespace.items()} + namespace = dict(sorted(namespace.items(), key=lambda x: x[0])) ip = c.ip() namespace = {k: v.replace(ip, '0.0.0.0') for k,v in namespace.items() } - namespace = dict(sorted(namespace.items(), key=lambda x: x[0])) return namespace + @classmethod + def update_namespace(cls, network, netuid=None, timeout=4, search=None, verbose=False): + c.print(f'UPDATING --> NETWORK(network={network} netuid={netuid})', color='blue') + + if 'subspace' in network: + if '.' in network: + network, netuid = network.split('.') + else: + netuid = netuid or 0 + if c.is_int(netuid): + netuid = int(netuid) + namespace = c.module(network)().namespace(search=search, max_age=1, netuid=netuid) + return namespace + elif 'local' == network: + print(network, 'FAM') + namespace = {} + addresses = ['0.0.0.0'+':'+str(p) for p in c.used_ports()] + future2address = {} + for address in addresses: + f = c.submit(c.call, [address+'/server_name'], timeout=timeout) + future2address[f] = address + futures = list(future2address.keys()) + try: + progress = c.tqdm(len(futures)) + for f in c.as_completed(futures, timeout=timeout): + address = future2address[f] + try: + name = f.result() + namespace[name] = address + except Exception as e: + c.print(f'Error {e} with {name} and {address}', color='red', verbose=verbose) + progress.update(1) + except Exception as e: + c.print(f'Timeout error {e}', color='red', verbose=verbose) - + namespace = {k:v for k,v in namespace.items() if 'Error' not in k} + ip = c.ip() + namespace = {k: v.replace(ip, '0.0.0.0') for k,v in namespace.items() } + else: + return {} + + return namespace get_namespace = _namespace = namespace @classmethod @@ -142,7 +143,6 @@ def name2address(cls, name:str, network:str=network ): namespace = cls.namespace(network=network) address = namespace.get(name, None) ip = c.ip() - address = address.replace(c.default_ip, ip) assert ip in address, f'ip {ip} not in address {address}' return address @@ -327,24 +327,19 @@ def server_exists(cls, name:str, network:str = None, prefix_match:bool=False, * @classmethod def test(cls): - network = 'test' - network2 = 'test2' + network = 'test_namespace' cls.rm_namespace(network) - cls.rm_namespace(network2) namespace = cls.namespace(network=network) assert cls.namespace(network=network) == {}, f'Namespace not empty., {namespace}' - cls.register_server('test', 'test', network=network) - assert cls.namespace(network=network) == {'test': 'test'}, f'Namespace not updated. {cls.namespace(network=network)}' - assert cls.namespace(network2) == {} - cls.register_server('test', 'test', network=network2) - assert cls.namespace(network=network) == {'test': 'test'}, f'Namespace not restored. {cls.namespace(network=network)}' - cls.deregister_server('test', network=network2) - assert cls.namespace(network2) == {} + name = 'test' + address = '0.0.0.0:8888' + cls.register_server(name=name, address=address, network=network) + namespace = cls.namespace(network=network) + assert namespace[name] == address, f'Namespace not updated. {namespace}' + cls.deregister_server(name, network=network) + assert cls.namespace(network=network) == {} cls.rm_namespace(network) - assert cls.namespace_exists(network) == False - cls.rm_namespace(network2) - assert cls.namespace_exists(network2) == False - + assert cls.namespace_exists(network) == False return {'success': True, 'msg': 'Namespace tests passed.'} diff --git a/commune/server/test.py b/commune/server/test.py index 0a29a54b..51529355 100644 --- a/commune/server/test.py +++ b/commune/server/test.py @@ -30,28 +30,27 @@ def test_serving(cls, server_name = 'module::test'): assert server_name not in c.servers() return {'success': True, 'msg': 'server test passed'} - @classmethod - def test_serving_with_different_key(cls, module_name = 'module::test', key_name='module::test2'): - + def test_serving_with_different_key(cls, module = 'module'): + tag = 'test_serving_with_different_key' + key_name = module + '::'+ tag + module_name = module + '::'+ tag + '_b' if not c.key_exists(key_name): key = c.add_key(key_name) if c.server_exists(module_name): c.kill(module_name) while c.server_exists(module_name): c.sleep(1) - address = c.serve(module_name, key=key_name)['address'] + c.print(c.serve(module_name, key=key_name)) key = c.get_key(key_name) while not c.server_exists(module_name): c.sleep(1) c.print('waiting for server {}'.format(module_name)) - try: - info = c.connect(module_name).info() - except Exception as e: - print(c.namespace()) - print(info) + info = c.connect(module_name).info() assert info['key'] == key.ss58_address, f"key failed {key.ss58_address} != {info['key']}" c.kill(module_name) c.rm_key(key_name) + assert not c.key_exists(key_name) + assert not c.server_exists(module_name) return {'success': True, 'msg': 'server test passed'} diff --git a/commune/vali/test.py b/commune/vali/test.py index c2980efb..47154170 100644 --- a/commune/vali/test.py +++ b/commune/vali/test.py @@ -4,10 +4,10 @@ class Test(c.Module): def test_net(self, - n=3, + n=2, sleep_time=8, timeout = 20, - tag = 'test', + tag = 'vali_test_net', miner='module', vali='vali', storage_path = '/tmp/commune/vali_test', @@ -17,15 +17,11 @@ def test_net(self, test_vali = f'{vali}::{tag}' modules = test_miners + [test_vali] for m in modules: - c.kill(m) - + c.kill(m) for m in modules: - if m == test_vali: - c.print(c.serve(m, kwargs={'network': network, - 'storage_path': storage_path, - 'search': test_miners[0].split('::')[0]})) - else: - c.print(c.serve(m)) + c.print(c.serve(m, kwargs={'network': network, + 'storage_path': storage_path, + 'search': test_miners[0][:-1]})) t0 = c.time() while not c.server_exists(test_vali): time_elapsed = c.time() - t0 @@ -34,29 +30,27 @@ def test_net(self, c.sleep(1) c.print(f'Waiting for {test_vali} to get the Leaderboard {time_elapsed}/{timeout} seconds') - vali = c.connect(test_vali) - t0 = c.time() - c.print(f'Sleeping for {sleep_time} seconds') c.print(c.call(test_vali+'/refresh_leaderboard')) leaderboard = None while c.time() - t0 < sleep_time: + try: + vali = c.connect(test_vali) + leaderboard = c.call(test_vali+'/leaderboard') - leaderboard = c.call(test_vali+'/leaderboard') - if len(leaderboard) >= n: - break - else: - c.print(f'Waiting for leaderboard to be updated {len(leaderboard)} is n={n}') - c.sleep(1) + if len(leaderboard) >= n: + break + else: + c.print(f'Waiting for leaderboard to be updated {len(leaderboard)} is n={n}') + c.sleep(1) + except Exception as e: + print(e) - leaderboard = c.call(test_vali+'/leaderboard') + leaderboard = c.call(test_vali+'/leaderboard', df=1) assert isinstance(leaderboard, pd.DataFrame), leaderboard assert len(leaderboard) >= n, leaderboard - c.print(c.call(test_vali+'/refresh_leaderboard')) - - c.print(leaderboard) - + c.print(c.call(test_vali+'/refresh_leaderboard')) for miner in test_miners + [test_vali]: c.print(c.kill(miner)) return {'success': True, 'msg': 'subnet test passed'} diff --git a/commune/vali/vali.py b/commune/vali/vali.py index 0f69a721..913a1049 100644 --- a/commune/vali/vali.py +++ b/commune/vali/vali.py @@ -49,12 +49,7 @@ def __init__(self, init_vali = __init__ def score(self, module): - v = c.random_int() - k = c.hash(v) - module.put_item(k, v) - if module.get_item(k) == v: - return 1 - return 0 + return 'name' in module.info() def set_score_fn(self, score_fn: Union[Callable, str]): """ @@ -149,7 +144,7 @@ def get_next_result(self, futures=None): if did_score_bool: keys = ['w', 'name', 'address', 'latency'] else: - keys = ['w', 'name', 'error'] + keys = list(result.keys()) result = {k: result.get(k, None) for k in keys if k in result} msg = ' '.join([f'{k}={result[k]}' for k in result]) msg = f'RESULT({msg})' @@ -204,10 +199,9 @@ def epoch(self, df=True): results = c.df(results) results = results.sort_values(by='w', ascending=False) - + return results - - + @property def network_staleness(self) -> int: """ @@ -223,7 +217,6 @@ def filter_module(self, module:str, search=None): search_list = [search] return all([s == None or s in module for s in search_list ]) - def sync(self, network:str=None, netuid:int=None, diff --git a/scripts/start.sh b/scripts/start.sh index 18153660..db7c8250 100755 --- a/scripts/start.sh +++ b/scripts/start.sh @@ -5,6 +5,7 @@ # if docker is not running, start it NAME=commune +PWD=~/$NAME IMAGE_NAME=$NAME CONTAINER_NAME=$NAME ENABLE_DOCKER=true @@ -27,9 +28,9 @@ fi CMD_STR="docker run -d \ --name $CONTAINER_NAME \ --shm-size $SHM_SIZE \ - -v ~/.commune:/root/.commune \ + -v ~/.$NAME:/root/.$NAME \ -v ~/.bittensor:/root/.bittensor \ - -v $PWD:/app \ + -v $PWD:/$NAME \ -p $START_PORT-$END_PORT:$START_PORT-$END_PORT \ --restart unless-stopped \ --privileged