Skip to content

Commit

Permalink
add taskid
Browse files Browse the repository at this point in the history
  • Loading branch information
Johann Bahl committed Jul 13, 2023
1 parent ca66e36 commit d82d828
Show file tree
Hide file tree
Showing 11 changed files with 96 additions and 58 deletions.
2 changes: 2 additions & 0 deletions changelog.d/20230626_005856_jb_issue_30_server_migration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@
- Add `tags {set, add, remove}` subcommand

- Add `expire` subcommand

- logging: add taskid
4 changes: 3 additions & 1 deletion src/backy/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ async def reconfigure(
@middleware
async def log_conn(self, request: web.Request, handler):
request["log"] = self.log.bind(
path=request.path, query=request.query_string
path=request.path,
query=request.query_string,
remote_taskid=request.headers.get("taskid"),
)
try:
resp = await handler(request)
Expand Down
35 changes: 18 additions & 17 deletions src/backy/backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ def touch(self):
def set_purge_pending(self):
open(p.join(self.path, ".purge_pending"), "w").close()

def remove_purge_pending(self):
def clear_purge_pending(self):
path = p.join(self.path, ".purge_pending")
if p.exists(path):
os.remove(path)
Expand Down Expand Up @@ -350,7 +350,7 @@ def verify(self, revision=None):
def purge(self):
backend = self.backend_factory(self.history[0], self.log)
backend.purge()
self.remove_purge_pending()
self.clear_purge_pending()

#################
# Restoring
Expand Down Expand Up @@ -554,17 +554,18 @@ def find(self, spec) -> Revision:

###################
# Syncing Revisions
# called by the scheduler without a subprocess

@locked(target=".backup", mode="exclusive")
async def push_metadata(self, peers):
self.log.info("push-metadata-start")
async with APIClientManager(peers, self.log) as apis:
async def push_metadata(self, peers, taskid: str, log):
log.info("push-metadata-start")
async with APIClientManager(peers, taskid, log) as apis:
peers_with_removals = set()
for r in self.history:
if not r.pending_changes:
continue

self.log.debug(
log.debug(
"push-metadata-updating-tags",
server=r.location,
rev_uuid=r.uuid,
Expand All @@ -581,37 +582,37 @@ async def push_metadata(self, peers):
isinstance(e, ClientResponseError)
and e.status == HTTPPreconditionFailed.status_code
):
self.log.warning(
log.warning(
"push-metadata-unexpected-server-state",
expected_tags=r.orig_tags,
)
elif isinstance(e, ClientConnectorError):
self.log.debug("pull-metadata-connection-error")
log.debug("pull-metadata-connection-error")
else:
self.log.exception("push-metadata-error")
log.exception("push-metadata-error")
r.remove(force=True)

for s in peers_with_removals:
self.log.debug("push-metadata-purging-remote", server=s)
log.debug("push-metadata-purging-remote", server=s)
try:
await apis[s].run_purge(self.name)
except ClientError:
self.log.warning(
log.warning(
"push-metadata-remote-purge-error",
exc_info=True,
)
continue

@locked(target=".backup", mode="exclusive")
async def pull_metadata(self, peers: dict):
self.log.info("pull-metadata-start")
async with APIClientManager(peers, self.log) as apis:
async def pull_metadata(self, peers: dict, taskid: str, log):
log.info("pull-metadata-start")
async with APIClientManager(peers, taskid, log) as apis:
await asyncio.gather(
*[self._pull_metadata(apis[server]) for server in apis]
*[self._pull_metadata(apis[server], log) for server in apis]
)

async def _pull_metadata(self, api: APIClient):
log = self.log.bind(server=api.server_name)
async def _pull_metadata(self, api: APIClient, log):
log = log.bind(server=api.server_name)
try:
await api.touch_backup(self.name)
remote_revs = await api.get_revs(self)
Expand Down
9 changes: 6 additions & 3 deletions src/backy/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,20 @@ class APIClientManager:
connector: TCPConnector
peers: dict
clients: dict[str, "APIClient"]
taskid: str
log: BoundLogger

def __init__(self, peers, log):
def __init__(self, peers, taskid, log):
self.connector = TCPConnector()
self.peers = peers
self.clients = dict()
self.taskid = taskid
self.log = log.bind(subsystem="APIClientManager")

def __getitem__(self, name):
if name and name not in self.clients:
self.clients[name] = APIClient.from_conf(
name, self.peers[name], self.log, self.connector
name, self.peers[name], self.taskid, self.log, self.connector
)
return self.clients[name]

Expand Down Expand Up @@ -59,6 +61,7 @@ def __init__(
server_name: str,
url: str,
token: str,
taskid: str,
log,
connector=None,
):
Expand All @@ -67,7 +70,7 @@ def __init__(
self.server_name = server_name
self.session = aiohttp.ClientSession(
url,
headers={hdrs.AUTHORIZATION: "Bearer " + token},
headers={hdrs.AUTHORIZATION: "Bearer " + token, "taskid": taskid},
raise_for_status=True,
timeout=ClientTimeout(30, connect=10),
connector=connector,
Expand Down
2 changes: 1 addition & 1 deletion src/backy/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ def _apply_config(self):
job = self.jobs[name]
if config != job.last_config:
self.log.info("changed-job", job_name=name)
job.configure(config)
job.stop()
job.configure(config)
job.start()

for name, job in list(self.jobs.items()):
Expand Down
12 changes: 3 additions & 9 deletions src/backy/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,9 @@ def write(line):
+ " "
)

pid = event_dict.pop("pid", None)
if pid is not None:
write(DIM + str(pid) + RESET_ALL + " ")
taskid = event_dict.pop("taskid", None)
if taskid is not None:
write(DIM + str(taskid) + RESET_ALL + " ")

level = event_dict.pop("level", None)
if level is not None:
Expand Down Expand Up @@ -278,11 +278,6 @@ def write(line):
return {"console": console_io.getvalue(), "file": log_io.getvalue()}


def add_pid(logger, method_name, event_dict):
event_dict["pid"] = os.getpid()
return event_dict


def process_exc_info(logger, name, event_dict):
"""Transforms exc_info to the exception tuple format returned by
sys.exc_info(). Uses the the same logic as as structlog's format_exc_info()
Expand Down Expand Up @@ -336,7 +331,6 @@ def init_logging(
)

processors = [
add_pid,
structlog.processors.add_log_level,
process_exc_info,
format_exc_info,
Expand Down
27 changes: 19 additions & 8 deletions src/backy/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import backy.backup
import backy.daemon
from backy.utils import format_datetime_local
from backy.utils import format_datetime_local, generate_taskid

from . import logging
from .client import APIClient, CLIClient
Expand All @@ -36,10 +36,12 @@ class Command(object):
"""Proxy between CLI calls and actual backup code."""

path: str
taskid: str
log: BoundLogger

def __init__(self, path, log):
def __init__(self, path, taskid, log):
self.path = path
self.taskid = taskid
self.log = log

def status(self):
Expand Down Expand Up @@ -128,15 +130,17 @@ def verify(self, revision):
def client(self, config, peer, url, token, apifunc, **kwargs):
async def run():
if url and token:
api = APIClient("<server>", url, token, self.log)
api = APIClient("<server>", url, token, self.taskid, self.log)
else:
d = backy.daemon.BackyDaemon(config, self.log)
d._read_config()
if peer:
api = APIClient.from_conf(peer, d.peers[peer], self.log)
api = APIClient.from_conf(
peer, d.peers[peer], self.taskid, self.log
)
else:
api = APIClient.from_conf(
"<server>", d.api_cli_default, self.log
"<server>", d.api_cli_default, self.taskid, self.log
)
async with CLIClient(api, self.log) as c:
await getattr(c, apifunc)(**kwargs)
Expand Down Expand Up @@ -178,7 +182,7 @@ def setup_argparser():
default=argparse.SUPPRESS,
help=(
"file name to write log output in. "
"(default: /var/log/backy.log for `scheduler`, "
"(default: /var/log/backy.log for `scheduler`, ignored for `client`, "
"$backupdir/backy.log otherwise)"
),
)
Expand All @@ -192,6 +196,12 @@ def setup_argparser():
"(default: %(default)s)"
),
)
parser.add_argument(
"-t",
"--taskid",
default=generate_taskid(),
help=("id to include in log messages" "(default: %(default)s)"),
)

subparsers = parser.add_subparsers()

Expand Down Expand Up @@ -468,10 +478,10 @@ def main():
or (args.func == "client" and args.apifunc == "check")
else "",
)
log = structlog.stdlib.get_logger(subsystem="command")
log = structlog.stdlib.get_logger(subsystem="command", taskid=args.taskid)
log.debug("invoked", args=" ".join(sys.argv))

command = Command(args.backupdir, log)
command = Command(args.backupdir, args.taskid, log)
func = getattr(command, args.func)

# Pass over to function
Expand All @@ -480,6 +490,7 @@ def main():
del func_args["verbose"]
del func_args["backupdir"]
del func_args["logfile"]
del func_args["taskid"]

try:
log.debug("parsed", func=args.func, func_args=func_args)
Expand Down
28 changes: 23 additions & 5 deletions src/backy/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@

from .backup import Backup
from .ext_deps import BACKY_CMD
from .utils import SafeFile, format_datetime_local, time_or_event
from .utils import (
SafeFile,
format_datetime_local,
generate_taskid,
time_or_event,
)


class Job(object):
Expand All @@ -35,6 +40,7 @@ class Job(object):
run_immediately: asyncio.Event
errors: int = 0
backoff: int = 0
taskid: str = ""
log: BoundLogger

_task: Optional[asyncio.Task] = None
Expand Down Expand Up @@ -146,7 +152,10 @@ async def run_forever(self):
self.backoff = 0
self.log.debug("loop-started")
while True:
self.backup.scan()
self.taskid = generate_taskid()
self.log = self.log.bind(job_name=self.name + f"[{self.taskid}]")

self.backup = Backup(self.path, self.log)

next_time, next_tags = self.schedule.next(
backy.utils.now(), self.spread, self.backup
Expand Down Expand Up @@ -188,7 +197,6 @@ async def run_forever(self):
async with self.daemon.backup_semaphores[speed]:
self.update_status(f"running ({speed})")

self.update_config()
await self.run_backup(next_tags)
await self.pull_metadata()
await self.run_expiry()
Expand Down Expand Up @@ -216,15 +224,21 @@ async def run_forever(self):
self.update_status("finished")

async def pull_metadata(self):
await self.backup.pull_metadata(self.daemon.peers)
await self.backup.pull_metadata(
self.daemon.peers, self.taskid, self.log
)

async def push_metadata(self):
await self.backup.push_metadata(self.daemon.peers)
await self.backup.push_metadata(
self.daemon.peers, self.taskid, self.log
)

async def run_backup(self, tags):
self.log.info("backup-started", tags=", ".join(tags))
proc = await asyncio.create_subprocess_exec(
BACKY_CMD,
"-t",
self.taskid,
"-b",
self.path,
"-l",
Expand Down Expand Up @@ -260,6 +274,8 @@ async def run_expiry(self):
self.log.info("expiry-started")
proc = await asyncio.create_subprocess_exec(
BACKY_CMD,
"-t",
self.taskid,
"-b",
self.path,
"-l",
Expand Down Expand Up @@ -294,6 +310,8 @@ async def run_purge(self):
self.log.info("purge-started")
proc = await asyncio.create_subprocess_exec(
BACKY_CMD,
"-t",
self.taskid,
"-b",
self.path,
"-l",
Expand Down
Loading

0 comments on commit d82d828

Please sign in to comment.