Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve graphite cache generation #323

Merged
merged 9 commits into from
Jul 25, 2023
171 changes: 118 additions & 53 deletions serveradmin/graphite/management/commands/cache_graphite.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
"""Serveradmin - Graphite Integration

Copyright (c) 2022 InnoGames GmbH
Copyright (c) 2023 InnoGames GmbH
"""

import json
import time
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor
from decimal import Decimal
from io import BytesIO
from os import mkdir
Expand All @@ -14,79 +14,128 @@
from urllib.request import (
HTTPBasicAuthHandler,
HTTPPasswordMgrWithDefaultRealm,
build_opener
build_opener,
)

from PIL import Image
from django.conf import settings
from django.core.management.base import BaseCommand
from django.core.management.base import BaseCommand, CommandParser
from django.db import transaction
from django.utils.timezone import now
from PIL import Image

from adminapi import filters
from adminapi.parse import parse_query
from serveradmin.dataset import Query
from serveradmin.graphite.models import (
GRAPHITE_ATTRIBUTE_ID,
Collection,
AttributeFormatter,
Collection,
)
from serveradmin.serverdb.models import Server


class Command(BaseCommand):
"""Generate sprites from the overview graphics"""
"""Generate sprites and update numeric values for collections."""

help = __doc__

def handle(self, *args, **kwargs):
def add_arguments(self, parser: CommandParser) -> None:
parser.add_argument(
"--collections",
nargs="*",
type=str,
help="Generate/update only these collections.",
)
parser.add_argument(
"--query",
type=str,
help="Generate/update only objects matching this Serveradmin query.",
)
parser.add_argument(
"--threads",
type=int,
default=5,
help="Generate n sprites/numerics concurrently.",
)

def handle(self, *args, **options):
"""The entry point of the command"""

if options["threads"] < 1:
self.stderr.write(self.style.ERROR(f"--threads must be greater 0!"))
exit(1)

start = time.time()

sprite_params = settings.GRAPHITE_SPRITE_PARAMS
sprite_dir = settings.MEDIA_ROOT + '/graph_sprite'
sprite_dir = settings.MEDIA_ROOT + "/graph_sprite"
if not isdir(sprite_dir):
mkdir(sprite_dir)

# We will make sure to generate a single sprite for a single hostname.
for collection in Collection.objects.filter(overview=True):
collection_start = time.time()
collections = Collection.objects.filter(overview=True)
if options["collections"]:
collections = collections.filter(name__in=options["collections"])

for collection in collections:
self.stdout.write(f"[{now()}] Starting collection {collection}")

collection_dir = sprite_dir + '/' + collection.name
collection_dir = sprite_dir + "/" + collection.name
if not isdir(collection_dir):
mkdir(collection_dir)

for server in Query(
{
GRAPHITE_ATTRIBUTE_ID: collection.name,
'state': filters.Not('retired'),
}):
graph_table = collection.graph_table(server, sprite_params)
if graph_table:
self.generate_sprite(collection_dir, graph_table, server)
self.cache_numerics(collection, server)

collection_duration = time.time() - collection_start
print('[{}] Collection {} finished after {} seconds'.format(
datetime.now(), collection.name, collection_duration))

duration = time.time() - start
print('[{}] Finished after {} seconds'.format(datetime.now(),
duration))

def generate_sprite(self, collection_dir, graph_table, server):
query_filter = {
GRAPHITE_ATTRIBUTE_ID: collection.name,
"state": filters.Not("retired"),
}

if options["query"]:
query_filter.update(**parse_query(options["query"]))

futures = []
with ThreadPoolExecutor(options["threads"]) as executor:
for server in Query(query_filter, ["hostname"]):
futures.append(
executor.submit(
self.generate_sprite,
collection_dir,
server,
collection,
sprite_params,
)
)
futures.append(
executor.submit(self.cache_numerics, collection, server)
)

self.stdout.write(f"[{now()}] Finished collection {collection}")

end = time.time()
self.stdout.write(
self.style.SUCCESS(f"[{now()}] Total time: {end - start:.2f} seconds.")
)

def generate_sprite(self, collection_dir, server, collection, sprite_params):
"""Generate sprites for the given server using the given collection"""

graph_table = collection.graph_table(server, sprite_params)
if not graph_table:
return

graphs = [v2 for k1, v1 in graph_table for k2, v2 in v1]
sprite_width = settings.GRAPHITE_SPRITE_WIDTH
sprite_height = settings.GRAPHITE_SPRITE_HEIGHT
total_width = len(graphs) * sprite_width
sprite_img = Image.new('RGB', (total_width, sprite_height), (255,) * 3)
sprite_img = Image.new("RGB", (total_width, sprite_height), (255,) * 3)

for graph, offset in zip(graphs, range(0, total_width, sprite_width)):
response = self.get_from_graphite(graph)
if response:
box = (offset, 0, offset + sprite_width, sprite_height)
sprite_img.paste(Image.open(BytesIO(response)), box)

sprite_img.save(collection_dir + '/' + server['hostname'] + '.png')
sprite_img.save(collection_dir + "/" + server["hostname"] + ".png")

self.stdout.write(f"[{now()}] Generated sprite for {server['hostname']}")

def cache_numerics(self, collection, server):
"""Generate sprites for the given server using the given collection"""
Expand All @@ -97,36 +146,52 @@ def cache_numerics(self, collection, server):
if not response:
continue

response_json = json.loads(response.decode('utf8'))
response_json = json.loads(response.decode("utf8"))
try:
value = response_json[0]['datapoints'][0][0]
value = response_json[0]["datapoints"][0][0]
except IndexError:
print(
(
"Warning: Graphite response '{}' for collection {}/{}"
" on server {} couldn't be parsed"
).format(response, collection, numeric, server['hostname'])
self.stdout.write(
self.style.NOTICE(
f"[{now()}] {server['hostname']}: Can't parse response {response} for {params}."
)
)
continue

if value is None:
self.stdout.write(
self.style.NOTICE(
f"[{now()}] {server['hostname']}: None value for {params} received."
)
)
continue

# Django can be set up to implicitly execute commands in database
# transactions. We don't want that behavior in here even when
# it is set up like this. This process takes a long time.
# We want the values to be immediately available to the users.
with transaction.atomic():
# Lock server for changes to avoid nonrepeatable reads in the
# query_committer.
locked_server = Server.objects.select_for_update().get(
server_id=server.object_id)
try:
# Lock server for changes to avoid non-repeatable reads in the
# query_committer.
locked_server = Server.objects.select_for_update().get(
server_id=server.object_id
)
except Server.DoesNotExist:
self.stdout.write(
self.style.NOTICE(
f"[{now()}] {server['hostname']} has been deleted."
)
)
continue

locked_server.servernumberattribute_set.update_or_create(
server_id=locked_server.server_id,
attribute=numeric.attribute,
defaults={'value': Decimal(value)},
defaults={"value": Decimal(value)},
)

self.stdout.write(f"[{now()}] Updated numerics for {server['hostname']}")

def get_from_graphite(self, params):
"""Make a GET request to Graphite with the given params"""
password_mgr = HTTPPasswordMgrWithDefaultRealm()
Expand All @@ -137,21 +202,21 @@ def get_from_graphite(self, params):
settings.GRAPHITE_PASSWORD,
)
auth_handler = HTTPBasicAuthHandler(password_mgr)
url = '{0}/render?{1}'.format(
settings.GRAPHITE_URL, params
)
url = "{0}/render?{1}".format(settings.GRAPHITE_URL, params)
start = time.time()

try:
with build_opener(auth_handler).open(url) as response:
return response.read()
except HTTPError as error:
print('Warning: Graphite returned ' + str(error) + ' to ' + url)
self.stdout.write(
self.style.NOTICE(f"[{now()}] Graphite returned {error} for {url}")
)
finally:
end = time.time()
if end - start > 10:
print(
'Warning: Graphite request to {0} took {1} seconds'.format(
url, end - start
self.stdout.write(
self.style.WARNING(
f"[{now()}] Graphite request {url} took {end - start} seconds"
)
)
Loading