Skip to content

Commit

Permalink
update the prefix and root default, all test passes.
Browse files Browse the repository at this point in the history
  • Loading branch information
geyang committed May 17, 2021
1 parent d82eee2 commit 7256d86
Show file tree
Hide file tree
Showing 8 changed files with 159 additions and 92 deletions.
4 changes: 2 additions & 2 deletions ml_logger/README
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,12 @@ The logging server uses the ``sanic`` framework, which means defaults to
``sanic``, such as maximum request size are carried over.

When using ``ml-logger`` to save and load **very
large** ``pytorch`` checkpoints, you need  to raise ``sanic``\ ’s
large** ``pytorch`` checkpoints, you need  to raise \ ``sanic``\ ’s
default request size limit from 100MB to something like a gigabyte or
even larger. The file upload is done using multi-part form upload, where
each query is kept small. However sanic will throw if the overall size
of the query exceeds this
parameter ``SANIC_REQUEST_MAX_SIZE=1000_000_000``. The default is
parameter \ ``SANIC_REQUEST_MAX_SIZE=1000_000_000``. The default is
``100_000_000``, or 100MB.

Use ssh tunnel if you are running on a managed cluster.
Expand Down
2 changes: 1 addition & 1 deletion ml_logger/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.7.6
0.7.8
7 changes: 4 additions & 3 deletions ml_logger/ml_logger/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from .struts import ALLOWED_TYPES
from .log_client import LogClient
from .helpers.print_utils import PrintHelper
from .caches.summary_cache import SummaryCache
from .helpers.print_utils import PrintHelper
from .log_client import LogClient
from .ml_logger import *
from .struts import ALLOWED_TYPES

23 changes: 12 additions & 11 deletions ml_logger/ml_logger/log_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class LogClient:
sync_pool = None
async_pool = None

def __init__(self, url: str = None, asynchronous=None, max_workers=None):
def __init__(self, root: str = None, user=None, access_token=None, asynchronous=None, max_workers=None):
"""
When max_workers is 0, the HTTP requests are synchronous. This allows one to make
synchronous requests procedurally.
Expand All @@ -54,24 +54,25 @@ def __init__(self, url: str = None, asynchronous=None, max_workers=None):
Mujoco-py for example, would have trouble with forked processes if multiple
threads are started before forking the subprocesses.
:param url:
:param root:
:param asynchronous: If this is not None, we create a request pool. This way
we can use the (A)SyncContext call right after construction.
:param max_workers:
"""
if asynchronous is not None:
self.set_session(asynchronous, max_workers)

if url.startswith("file://"):
self.local_server = LoggingServer(data_dir=url[6:], silent=True)
elif os.path.isabs(url):
self.local_server = LoggingServer(data_dir=url, silent=True)
elif url.startswith('http://'):
if root.startswith("file://"):
self.local_server = LoggingServer(cwd=root[6:], silent=True)
elif os.path.isabs(root):
self.local_server = LoggingServer(cwd=root, silent=True)
elif root.startswith('http://'):
self.local_server = None # remove local server to use sessions.
self.url = url
self.stream_url = os.path.join(url, "stream")
self.ping_url = os.path.join(url, "ping")
self.glob_url = os.path.join(url, "glob")
self.url = os.path.join(root, user)
self.access_token = access_token
self.stream_url = os.path.join(root, user, "stream")
self.ping_url = os.path.join(root, user, "ping")
self.glob_url = os.path.join(root, user, "glob")
# when setting sessions the first time, default to use Asynchronous Session.
if self.session is None:
asynchronous = True if asynchronous is None else asynchronous
Expand Down
65 changes: 43 additions & 22 deletions ml_logger/ml_logger/ml_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,15 @@
from .helpers.print_utils import PrintHelper
from .log_client import LogClient

# environment defaults
CWD = os.environ["PWD"]
USER = os.environ["USER"]

# ML_Logger defaults
ROOT = os.environ.get("ML_LOGGER_ROOT", CWD)
USER = os.environ.get("ML_LOGGER_USER", USER)
ACCESS_TOKEN = os.environ.get("ML_LOGGER_ACCESS_TOKEN", None)


def pJoin(*args):
from os.path import join
Expand Down Expand Up @@ -198,7 +207,9 @@ def __repr__(self):
# noinspection PyInitNewSignature
# todo: use prefixes as opposed to prefix. (add *prefixae after prefix=None)
# todo: resolve path segment with $env variables.
def __init__(self, root_dir: str = None, prefix=None, *prefixae, buffer_size=2048, max_workers=None,
def __init__(self, prefix="", *prefixae,
log_dir=ROOT, user=USER, access_token=ACCESS_TOKEN,
buffer_size=2048, max_workers=None,
asynchronous=None, summary_cache_opts: dict = None):
""" logger constructor.
Expand All @@ -213,8 +224,11 @@ def __init__(self, root_dir: str = None, prefix=None, *prefixae, buffer_size=204
| 1. prefix="causal_infogan" => logs to "/tmp/some_dir/causal_infogan"
| 2. prefix="" => logs to "/tmp/some_dir"
:param root_dir: the server host and port number
:param prefix: the prefix path
:param **prefixae: the rest of the prefix arguments
:param log_dir: the server host and port number
:param user: environment $ML_LOGGER_USER
:param access_token: environment $ML_LOGGER_ACCESS_TOKEN
:param asynchronous: When this is not None, we create a http thread pool.
:param buffer_size: The string buffer size for the print buffer.
:param max_workers: the number of request-session workers for the async http requests.
Expand All @@ -236,18 +250,19 @@ def __init__(self, root_dir: str = None, prefix=None, *prefixae, buffer_size=204
self.summary_caches = defaultdict(partial(SummaryCache, **(summary_cache_opts or {})))

# todo: add https support
self.root_dir = interpolate(root_dir) or "/"
self.prefix = interpolate(prefix) or os.getcwd()[1:]
if prefix is not None:
self.prefix = os.path.join(*[interpolate(p) for p in (prefix, *prefixae) if p is not None])
self.root_dir = interpolate(log_dir) or ROOT

# logger client contains thread pools, should not be re-created lightly.
self.client = LogClient(url=self.root_dir, asynchronous=asynchronous, max_workers=max_workers)
prefixae = [interpolate(p) for p in (prefix or "", *prefixae) if p is not None]
self.prefix = os.path.join(*prefixae) if prefixae else ""
self.client = LogClient(root=self.root_dir, user=user, access_token=access_token,
asynchronous=asynchronous, max_workers=max_workers)

def configure(self,
root_dir: str = None,
prefix=None,
*prefixae,
log_dir: str = None,
user=None,
access_token=None,
asynchronous=None,
max_workers=None,
buffer_size=None,
Expand Down Expand Up @@ -293,8 +308,11 @@ def configure(self,
todo: the table at the moment seems a bit verbose. I'm considering making this
just a single line print.
:param log_directory:
:param prefix:
:param prefix: the first prefix
:param *prefixae: a list of prefix segments
:param log_dir:
:param user:
:param access_token:
:param buffer_size:
:param summary_cache_opts:
:param asynchronous:
Expand All @@ -305,9 +323,11 @@ def configure(self,
"""

# path logic
root_dir = interpolate(root_dir) or os.getcwd()
log_dir = interpolate(log_dir) or os.getcwd()
if prefix is not None:
self.prefix = os.path.join(*[interpolate(p) for p in (prefix, *prefixae) if p is not None])
prefixae = [interpolate(p) for p in (prefix, *prefixae) if p is not None]
if prefixae is not None:
self.prefix = os.path.join(*prefixae)

if buffer_size is not None:
self.print_buffer_size = buffer_size
Expand All @@ -318,17 +338,18 @@ def configure(self,
self.summary_caches.clear()
self.summary_caches = defaultdict(partial(SummaryCache, **(summary_cache_opts or {})))

if root_dir != self.root_dir or asynchronous is not None or max_workers is not None:
# note: logger.configure shouldn't be called too often, so it is okay to assume
# that we can discard the old logClient.
# To quickly switch back and forth between synchronous and asynchronous calls,
# use the `SyncContext` and `AsyncContext` instead.
if log_dir:
self.root_dir = interpolate(log_dir) or ROOT
if log_dir or asynchronous is not None or max_workers is not None:
# note: logger.configure shouldn't be called too often. To quickly switch back
# and forth between synchronous and asynchronous calls, use the `SyncContext`
# and `AsyncContext` instead.
if not silent:
cprint('creating new logging client...', color='yellow', end=' ')
self.root_dir = root_dir
self.client.__init__(url=self.root_dir, asynchronous=asynchronous, max_workers=max_workers)
cprint('creating new logging client...', color='yellow', end='\r')
self.client.__init__(root=self.root_dir, user=user, access_token=access_token,
asynchronous=asynchronous, max_workers=max_workers)
if not silent:
cprint('✓ done', color="green")
cprint('✓ created a new logging client', color="green")

if not silent:
from urllib.parse import quote
Expand Down
Loading

0 comments on commit 7256d86

Please sign in to comment.