From 035afe13224d4a82fd94194908963cdccf357015 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Kr=C3=B6ger?= Date: Tue, 25 Jul 2023 16:48:18 +0200 Subject: [PATCH] Improve graphite cache generation (#323) * Allow to specify collections to update by name * Allow to specify Serveradmin query to limit objects * Handle exception when server has been deleted * Run sprite/numeric generation in threads --- .../management/commands/cache_graphite.py | 171 ++++++++++++------ 1 file changed, 118 insertions(+), 53 deletions(-) diff --git a/serveradmin/graphite/management/commands/cache_graphite.py b/serveradmin/graphite/management/commands/cache_graphite.py index ef070405..9dc65659 100644 --- a/serveradmin/graphite/management/commands/cache_graphite.py +++ b/serveradmin/graphite/management/commands/cache_graphite.py @@ -1,11 +1,11 @@ """Serveradmin - Graphite Integration -Copyright (c) 2022 InnoGames GmbH +Copyright (c) 2023 InnoGames GmbH """ import json import time -from datetime import datetime +from concurrent.futures import ThreadPoolExecutor from decimal import Decimal from io import BytesIO from os import mkdir @@ -14,71 +14,118 @@ from urllib.request import ( HTTPBasicAuthHandler, HTTPPasswordMgrWithDefaultRealm, - build_opener + build_opener, ) -from PIL import Image from django.conf import settings -from django.core.management.base import BaseCommand +from django.core.management.base import BaseCommand, CommandParser from django.db import transaction +from django.utils.timezone import now +from PIL import Image from adminapi import filters +from adminapi.parse import parse_query from serveradmin.dataset import Query from serveradmin.graphite.models import ( GRAPHITE_ATTRIBUTE_ID, - Collection, AttributeFormatter, + Collection, ) from serveradmin.serverdb.models import Server class Command(BaseCommand): - """Generate sprites from the overview graphics""" + """Generate sprites and update numeric values for collections.""" + help = __doc__ - def handle(self, *args, **kwargs): + def add_arguments(self, parser: CommandParser) -> None: + parser.add_argument( + "--collections", + nargs="*", + type=str, + help="Generate/update only these collections.", + ) + parser.add_argument( + "--query", + type=str, + help="Generate/update only objects matching this Serveradmin query.", + ) + parser.add_argument( + "--threads", + type=int, + default=5, + help="Generate n sprites/numerics concurrently.", + ) + + def handle(self, *args, **options): """The entry point of the command""" + if options["threads"] < 1: + self.stderr.write(self.style.ERROR(f"--threads must be greater 0!")) + exit(1) + start = time.time() sprite_params = settings.GRAPHITE_SPRITE_PARAMS - sprite_dir = settings.MEDIA_ROOT + '/graph_sprite' + sprite_dir = settings.MEDIA_ROOT + "/graph_sprite" if not isdir(sprite_dir): mkdir(sprite_dir) - # We will make sure to generate a single sprite for a single hostname. - for collection in Collection.objects.filter(overview=True): - collection_start = time.time() + collections = Collection.objects.filter(overview=True) + if options["collections"]: + collections = collections.filter(name__in=options["collections"]) + + for collection in collections: + self.stdout.write(f"[{now()}] Starting collection {collection}") - collection_dir = sprite_dir + '/' + collection.name + collection_dir = sprite_dir + "/" + collection.name if not isdir(collection_dir): mkdir(collection_dir) - for server in Query( - { - GRAPHITE_ATTRIBUTE_ID: collection.name, - 'state': filters.Not('retired'), - }): - graph_table = collection.graph_table(server, sprite_params) - if graph_table: - self.generate_sprite(collection_dir, graph_table, server) - self.cache_numerics(collection, server) - - collection_duration = time.time() - collection_start - print('[{}] Collection {} finished after {} seconds'.format( - datetime.now(), collection.name, collection_duration)) - - duration = time.time() - start - print('[{}] Finished after {} seconds'.format(datetime.now(), - duration)) - - def generate_sprite(self, collection_dir, graph_table, server): + query_filter = { + GRAPHITE_ATTRIBUTE_ID: collection.name, + "state": filters.Not("retired"), + } + + if options["query"]: + query_filter.update(**parse_query(options["query"])) + + futures = [] + with ThreadPoolExecutor(options["threads"]) as executor: + for server in Query(query_filter, ["hostname"]): + futures.append( + executor.submit( + self.generate_sprite, + collection_dir, + server, + collection, + sprite_params, + ) + ) + futures.append( + executor.submit(self.cache_numerics, collection, server) + ) + + self.stdout.write(f"[{now()}] Finished collection {collection}") + + end = time.time() + self.stdout.write( + self.style.SUCCESS(f"[{now()}] Total time: {end - start:.2f} seconds.") + ) + + def generate_sprite(self, collection_dir, server, collection, sprite_params): """Generate sprites for the given server using the given collection""" + + graph_table = collection.graph_table(server, sprite_params) + if not graph_table: + return + graphs = [v2 for k1, v1 in graph_table for k2, v2 in v1] sprite_width = settings.GRAPHITE_SPRITE_WIDTH sprite_height = settings.GRAPHITE_SPRITE_HEIGHT total_width = len(graphs) * sprite_width - sprite_img = Image.new('RGB', (total_width, sprite_height), (255,) * 3) + sprite_img = Image.new("RGB", (total_width, sprite_height), (255,) * 3) for graph, offset in zip(graphs, range(0, total_width, sprite_width)): response = self.get_from_graphite(graph) @@ -86,7 +133,9 @@ def generate_sprite(self, collection_dir, graph_table, server): box = (offset, 0, offset + sprite_width, sprite_height) sprite_img.paste(Image.open(BytesIO(response)), box) - sprite_img.save(collection_dir + '/' + server['hostname'] + '.png') + sprite_img.save(collection_dir + "/" + server["hostname"] + ".png") + + self.stdout.write(f"[{now()}] Generated sprite for {server['hostname']}") def cache_numerics(self, collection, server): """Generate sprites for the given server using the given collection""" @@ -97,19 +146,23 @@ def cache_numerics(self, collection, server): if not response: continue - response_json = json.loads(response.decode('utf8')) + response_json = json.loads(response.decode("utf8")) try: - value = response_json[0]['datapoints'][0][0] + value = response_json[0]["datapoints"][0][0] except IndexError: - print( - ( - "Warning: Graphite response '{}' for collection {}/{}" - " on server {} couldn't be parsed" - ).format(response, collection, numeric, server['hostname']) + self.stdout.write( + self.style.NOTICE( + f"[{now()}] {server['hostname']}: Can't parse response {response} for {params}." + ) ) continue if value is None: + self.stdout.write( + self.style.NOTICE( + f"[{now()}] {server['hostname']}: None value for {params} received." + ) + ) continue # Django can be set up to implicitly execute commands in database @@ -117,16 +170,28 @@ def cache_numerics(self, collection, server): # it is set up like this. This process takes a long time. # We want the values to be immediately available to the users. with transaction.atomic(): - # Lock server for changes to avoid nonrepeatable reads in the - # query_committer. - locked_server = Server.objects.select_for_update().get( - server_id=server.object_id) + try: + # Lock server for changes to avoid non-repeatable reads in the + # query_committer. + locked_server = Server.objects.select_for_update().get( + server_id=server.object_id + ) + except Server.DoesNotExist: + self.stdout.write( + self.style.NOTICE( + f"[{now()}] {server['hostname']} has been deleted." + ) + ) + continue + locked_server.servernumberattribute_set.update_or_create( server_id=locked_server.server_id, attribute=numeric.attribute, - defaults={'value': Decimal(value)}, + defaults={"value": Decimal(value)}, ) + self.stdout.write(f"[{now()}] Updated numerics for {server['hostname']}") + def get_from_graphite(self, params): """Make a GET request to Graphite with the given params""" password_mgr = HTTPPasswordMgrWithDefaultRealm() @@ -137,21 +202,21 @@ def get_from_graphite(self, params): settings.GRAPHITE_PASSWORD, ) auth_handler = HTTPBasicAuthHandler(password_mgr) - url = '{0}/render?{1}'.format( - settings.GRAPHITE_URL, params - ) + url = "{0}/render?{1}".format(settings.GRAPHITE_URL, params) start = time.time() try: with build_opener(auth_handler).open(url) as response: return response.read() except HTTPError as error: - print('Warning: Graphite returned ' + str(error) + ' to ' + url) + self.stdout.write( + self.style.NOTICE(f"[{now()}] Graphite returned {error} for {url}") + ) finally: end = time.time() if end - start > 10: - print( - 'Warning: Graphite request to {0} took {1} seconds'.format( - url, end - start + self.stdout.write( + self.style.WARNING( + f"[{now()}] Graphite request {url} took {end - start} seconds" ) )