Skip to content

Commit

Permalink
feat: massStoreRunAsynchronous()
Browse files Browse the repository at this point in the history
Even though commit d915473 introduced a
socket-level TCP keepalive support into the server's implementation,
this was observed multiple times to not be enough to
**deterministically** fix the issues with the `CodeChecker store` client
hanging indefinitely when the server takes a long time processing the
to-be-stored data.
The underlying reasons are not entirely clear and the issue only pops
up sporadically, but we did observe a few similar scenarios (such as
multi-million report storage from analysing LLVM and then storing
between datacentres) where it almost reliably reproduces.
The symptoms (even with a configure `kepalive`) generally include the
server not becoming notified about the client's disconnect, while the
client process is hung on a low-level system call `read(4, ...)`, trying
to get the Thrift response of `massStoreRun()` from the HTTP socket.
Even if the server finishes the storage processing "in time" and sent
the Thrift reply, it never reaches the client, which means it never
exits from the waiting, which means it keeps either the terminal or,
worse, a CI script occupied, blocking execution.

This is the "more proper solution" foreshadowed in
commit 15af7d8.

Implemented the server-side logic to spawn a `MassStoreRun` task and
return its token, giving the `massStoreRunAsynchronous()` API call full
force.

Implemented the client-side logic to use the new `task_client` module
and the same logic as
`CodeChecker cmd serverside-tasks --await --token TOKEN...`
to poll the server for the task's completion and status.
  • Loading branch information
whisperity committed Sep 20, 2024
1 parent d42164b commit e30fcc2
Show file tree
Hide file tree
Showing 8 changed files with 160 additions and 103 deletions.
13 changes: 13 additions & 0 deletions docs/web/user_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,19 @@ optional arguments:
is given, the longest match will be removed. You may
also use Unix shell-like wildcards (e.g.
'/*/jsmith/').
--detach Runs `store` in fire-and-forget mode: exit immediately
once the server accepted the analysis reports for
storing, without waiting for the server-side data
processing to conclude. Doing this is generally not
recommended, as the client will never be notified of
potential processing failures, and there is no easy way
to wait for the successfully stored results to become
available server-side for potential further processing
(e.g., `CodeChecker cmd diff`). However, using
'--detach' can significantly speed up large-scale
monitoring analyses where access to the results by a
tool is not a goal, such as in the case of non-gating
CI systems.
--config CONFIG_FILE Allow the configuration from an explicit configuration
file. The values configured in the config file will
overwrite the values set in the command line.
Expand Down
4 changes: 2 additions & 2 deletions web/api/codechecker_api_shared.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ enum ErrorCode {
API_MISMATCH = 5,

// REMOVED IN API v6.59 (CodeChecker v6.25.0)!
// Previously sent by report_server.thrif/codeCheckerDBAccess::massStoreRun()
// Previously sent by report_server.thrift/codeCheckerDBAccess::massStoreRun()
// when the client uploaded a source file which contained errors, such as
// review status source-code-comment errors.
/* SOURCE_FILE = 6, */ // Never reuse the value of the enum constant!

// REMOVED IN API v6.59 (CodeChecker v6.25.0)!
// Previously sent by report_server.thrif/codeCheckerDBAccess::massStoreRun()
// Previously sent by report_server.thrift/codeCheckerDBAccess::massStoreRun()
// when the client uploaded a report with annotations that had invalid types.
/* REPORT_FORMAT = 7, */ // Never reuse the value of the enum constant!
}
Expand Down
129 changes: 68 additions & 61 deletions web/client/codechecker_client/cmd/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@
from threading import Timer
from typing import Dict, Iterable, List, Set, Tuple

from codechecker_api.codeCheckerDBAccess_v6.ttypes import StoreLimitKind
from codechecker_api.codeCheckerDBAccess_v6.ttypes import \
StoreLimitKind, SubmittedRunOptions

from codechecker_report_converter import twodim
from codechecker_report_converter.report import Report, report_file, \
Expand All @@ -50,8 +51,8 @@ def assemble_blame_info(_, __) -> int:
"""
raise NotImplementedError()

from codechecker_client import client as libclient
from codechecker_client import product
from codechecker_client import client as libclient, product
from codechecker_client.task_client import await_task_termination
from codechecker_common import arg, logger, cmd_config
from codechecker_common.checker_labels import CheckerLabels
from codechecker_common.compatibility.multiprocessing import Pool
Expand Down Expand Up @@ -256,6 +257,24 @@ def add_arguments_to_parser(parser):
"match will be removed. You may also use Unix "
"shell-like wildcards (e.g. '/*/jsmith/').")

parser.add_argument("--detach",
dest="detach",
default=argparse.SUPPRESS,
action="store_true",
required=False,
help="""
Runs `store` in fire-and-forget mode: exit immediately once the server accepted
the analysis reports for storing, without waiting for the server-side data
processing to conclude.
Doing this is generally not recommended, as the client will never be notified
of potential processing failures, and there is no easy way to wait for the
successfully stored results to become available server-side for potential
further processing (e.g., `CodeChecker cmd diff`).
However, using '--detach' can significantly speed up large-scale monitoring
analyses where access to the results by a tool is not a goal, such as in the
case of non-gating CI systems.
""")

cmd_config.add_option(parser)

parser.add_argument('-f', '--force',
Expand Down Expand Up @@ -718,7 +737,7 @@ def get_analysis_statistics(inputs, limits):
return statistics_files if has_failed_zip else []


def storing_analysis_statistics(client, inputs, run_name):
def store_analysis_statistics(client, inputs, run_name):
"""
Collects and stores analysis statistics information on the server.
"""
Expand Down Expand Up @@ -933,68 +952,56 @@ def main(args):

description = args.description if 'description' in args else None

LOG.info("Storing results to the server...")

try:
with _timeout_watchdog(timedelta(hours=1),
signal.SIGUSR1):
client.massStoreRun(args.name,
args.tag if 'tag' in args else None,
str(context.version),
b64zip,
'force' in args,
trim_path_prefixes,
description)
except WatchdogError as we:
LOG.warning("%s", str(we))

# Showing parts of the exception stack is important here.
# We **WANT** to see that the timeout happened during a wait on
# Thrift reading from the TCP connection (something deep in the
# Python library code at "sock.recv_into").
import traceback
_, _, tb = sys.exc_info()
frames = traceback.extract_tb(tb)
first, last = frames[0], frames[-2]
formatted_frames = traceback.format_list([first, last])
fmt_first, fmt_last = formatted_frames[0], formatted_frames[1]
LOG.info("Timeout was triggered during:\n%s", fmt_first)
LOG.info("Timeout interrupted this low-level operation:\n%s",
fmt_last)

LOG.error("Timeout!"
"\n\tThe server's reply did not arrive after "
"%d seconds (%s) elapsed since the server-side "
"processing began."
"\n\n\tThis does *NOT* mean that there was an issue "
"with the run you were storing!"
"\n\tThe server might still be processing the results..."
"\n\tHowever, it is more likely that the "
"server had already finished, but the client did not "
"receive a response."
"\n\tUsually, this is caused by the underlying TCP "
"connection failing to signal a low-level disconnect."
"\n\tClients potentially hanging indefinitely in these "
"scenarios is an unfortunate and known issue."
"\n\t\tSee http://github.com/Ericsson/codechecker/"
"issues/3672 for details!"
"\n\n\tThis error here is a temporary measure to ensure "
"an infinite hang is replaced with a well-explained "
"timeout."
"\n\tA more proper solution will be implemented in a "
"subsequent version of CodeChecker.",
we.timeout.total_seconds(), str(we.timeout))
sys.exit(1)
LOG.info("Storing results to the server ...")
task_token: str = client.massStoreRunAsynchronous(
b64zip,
SubmittedRunOptions(
runName=args.name,
tag=args.tag if "tag" in args else None,
version=str(context.version),
force="force" in args,
trimPathPrefixes=trim_path_prefixes,
description=description)
)
LOG.info("Reports submitted to the server for processing.")

# Storing analysis statistics if the server allows them.
if client.allowsStoringAnalysisStatistics():
storing_analysis_statistics(client, args.input, args.name)

LOG.info("Storage finished successfully.")
store_analysis_statistics(client, args.input, args.name)

if "detach" in args:
LOG.warning("Exiting the 'store' subcommand as '--detach' was "
"specified: not waiting for the result of the store "
"operation.\n"
"The server might not have finished processing "
"everything at this point, so do NOT rely on querying "
"the results just yet!\n"
"To await the completion of the processing later, "
"you can execute:\n\n"
"\tCodeChecker cmd serverside-tasks --token %s "
"--await",
task_token)
# Print the token to stdout as well, so scripts can use "--detach"
# meaningfully.
print(task_token)
return

task_client = libclient.setup_task_client(protocol, host, port)
task_status: str = await_task_termination(LOG, task_token,
task_api_client=task_client)

if task_status == "COMPLETED":
LOG.info("Storing the reports finished successfully.")
else:
LOG.error("Storing the reports failed! "
"The job terminated in status '%s'. "
"The comments associated with the failure are:\n\n%s",
task_status,
task_client.getTaskInfo(task_token).comments)
sys.exit(1)
except Exception as ex:
import traceback
traceback.print_exc()
LOG.info("Storage failed: %s", str(ex))
LOG.error("Storing the reports failed: %s", str(ex))
sys.exit(1)
finally:
os.close(zip_file_handle)
Expand Down
10 changes: 6 additions & 4 deletions web/client/codechecker_client/helpers/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,12 @@ def massStoreRun(self, name, tag, version, zipdir, force,
pass

@thrift_client_call
def massStoreRunAsynchronous(self, zipfile_blob: str,
store_opts: ttypes.SubmittedRunOptions) \
-> str:
pass
def massStoreRunAsynchronous(
self,
zipfile_blob: str,
store_opts: ttypes.SubmittedRunOptions
) -> str:
raise NotImplementedError("Should have called Thrift code!")

@thrift_client_call
def allowsStoringAnalysisStatistics(self):
Expand Down
5 changes: 3 additions & 2 deletions web/client/codechecker_client/task_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,7 @@ def _transform_product_ids_to_endpoints(
product.id: product.endpoint
for product
in get_product_api().getProducts(None, None)}
ti["productEndpoint"] = product_id_to_endpoint[ti["productId"]]
del ti["productId"]


Expand Down Expand Up @@ -445,7 +446,7 @@ def get_product_api() -> ThriftProductHelper:
ti["taskKind"],
ti["summary"],
ti["status"],
ti["productEndpoint"] or "",
ti.get("productEndpoint", ""),
ti["actorUsername"] or "",
ti["enqueuedAt"] or "",
ti["startedAt"] or "",
Expand Down Expand Up @@ -529,7 +530,7 @@ def get_product_api() -> ThriftProductHelper:
ti = task_info_for_print[0]
product_line = \
f" - Product: {ti['productEndpoint']}\n" \
if ti["productEndpoint"] else ""
if "productEndpoint" in ti else ""
user_line = f" - User: {ti['actorUsername']}\n" \
if ti["actorUsername"] else ""
cancel_line = " - Cancelled by administrators!\n" \
Expand Down
32 changes: 26 additions & 6 deletions web/server/codechecker_server/api/mass_store_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
import sqlalchemy
import tempfile
import time
from typing import Any, Dict, List, Optional, Set, Tuple, Union, cast
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union, \
cast
import zipfile
import zlib

Expand Down Expand Up @@ -54,7 +55,7 @@
from ..metadata import checker_is_unavailable, MetadataInfoParser
from ..product import Product as ServerProduct
from ..session_manager import SessionManager
from ..task_executors.abstract_task import AbstractTask
from ..task_executors.abstract_task import AbstractTask, TaskCancelHonoured
from ..task_executors.task_manager import TaskManager
from .thrift_enum_helper import report_extended_data_type_str

Expand Down Expand Up @@ -591,7 +592,13 @@ def _implementation(self, tm: TaskManager):
f"'{self._product.endpoint}' is in "
"a bad shape!")

m = MassStoreRun(self.data_path / "store_zip",
def __cancel_if_needed():
tm.heartbeat(self)
if tm.should_cancel(self):
raise TaskCancelHonoured(self)

m = MassStoreRun(__cancel_if_needed,
self.data_path / "store_zip",
self._package_context,
tm.configuration_database_session_factory,
self._product,
Expand All @@ -615,9 +622,8 @@ class MassStoreRun:
# This is the place where complex implementation logic must go, but be
# careful, there is no way to communicate with the user's client anymore!

# TODO: Poll the task manager at regular points for a cancel signal!

def __init__(self,
graceful_cancel: Callable[[], None],
zip_dir: Path,
package_context,
config_db,
Expand All @@ -641,6 +647,7 @@ def __init__(self,
self.__config_db = config_db
self.__package_context = package_context
self.__product = product
self.__graceful_cancel_if_requested = graceful_cancel

self.__mips: Dict[str, MetadataInfoParser] = {}
self.__analysis_info: Dict[str, AnalysisInfo] = {}
Expand Down Expand Up @@ -668,10 +675,10 @@ def __store_source_files(
filename_to_hash: Dict[str, str]
) -> Dict[str, int]:
""" Storing file contents from plist. """

file_path_to_id = {}

for file_name, file_hash in filename_to_hash.items():
self.__graceful_cancel_if_requested()
source_file_path = path_for_fake_root(file_name, str(source_root))
LOG.debug("Storing source file: %s", source_file_path)
trimmed_file_path = trim_path_prefixes(
Expand Down Expand Up @@ -721,6 +728,7 @@ def __add_blame_info(
with DBSession(self.__product.session_factory) as session:
for subdir, _, files in os.walk(blame_root):
for f in files:
self.__graceful_cancel_if_requested()
blame_file = Path(subdir) / f
file_path = f"/{str(blame_file.relative_to(blame_root))}"
blame_info, remote_url, tracking_branch = \
Expand Down Expand Up @@ -1502,6 +1510,7 @@ def get_skip_handler(
LOG.debug("Parsing input file '%s'", f)

report_file_path = os.path.join(root_dir_path, f)
self.__graceful_cancel_if_requested()
self.__process_report_file(
report_file_path, session, run_id,
file_path_to_id, run_history_time,
Expand Down Expand Up @@ -1604,6 +1613,7 @@ def store(self,
original_zip_size: int,
time_spent_on_task_preparation: float):
"""Store run results to the server."""
self.__graceful_cancel_if_requested()
start_time = time.time()

try:
Expand All @@ -1627,12 +1637,14 @@ def store(self,

with StepLog(self._name, "Parse 'metadata.json's"):
for root_dir_path, _, _ in os.walk(report_dir):
self.__graceful_cancel_if_requested()
metadata_file_path = os.path.join(
root_dir_path, 'metadata.json')

self.__mips[root_dir_path] = \
MetadataInfoParser(metadata_file_path)

self.__graceful_cancel_if_requested()
with StepLog(self._name,
"Store look-up ID for checkers in 'metadata.json'"):
checkers_in_metadata = {
Expand All @@ -1656,10 +1668,16 @@ def store(self,
session, report_dir, source_root, run_id,
file_path_to_id, run_history_time)

self.__graceful_cancel_if_requested()
session.commit()
self.__load_report_ids_for_reports_with_fake_checkers(
session)

# The task should not be cancelled after this point, as the
# "main" bulk of the modifications to the database had already
# been committed, and the user would be left with potentially
# a bunch of "fake checkers" visible in the database.

if self.__reports_with_fake_checkers:
with StepLog(
self._name,
Expand Down Expand Up @@ -1720,6 +1738,8 @@ def store(self,
LOG.error("Database error! Storing reports to the "
"database failed: %s", ex)
raise
except TaskCancelHonoured:
raise
except Exception as ex:
LOG.error("Failed to store results: %s", ex)
import traceback
Expand Down
Loading

0 comments on commit e30fcc2

Please sign in to comment.