Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task runtime tracking #381

Merged
merged 1 commit into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion redditrepostsleuth/adminsvc/inbox_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ def __init__(
self.failed_checks = []

def check_inbox(self):
print('[Scheduled Job] Checking Inbox Start')
for msg in self.reddit.inbox.messages(limit=500):
if msg.author != 'RepostSleuthBot' and msg.subject.lower() in ['false negative', 'false positive']:
#self._process_user_report(msg)
Expand Down
2 changes: 2 additions & 0 deletions redditrepostsleuth/core/celery/basetasks.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import datetime

from celery import Task

from redditrepostsleuth.core.config import Config
Expand Down
2 changes: 1 addition & 1 deletion redditrepostsleuth/core/celery/celeryconfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
'redditrepostsleuth.core.celery.admin_tasks.update_proxies_job': {'queue': 'scheduled_tasks'},
'redditrepostsleuth.core.celery.admin_tasks.check_user_for_only_fans': {'queue': 'onlyfans_check'},
'redditrepostsleuth.core.celery.admin_tasks.update_subreddit_config_from_database': {'queue': 'update_wiki_from_database'},
'redditrepostsleuth.core.celery.admin_tasks.delete_search_batch': {'queue': 'batch_delete_searches'},
#'redditrepostsleuth.core.celery.admin_tasks.delete_search_batch': {'queue': 'batch_delete_searches'},
'redditrepostsleuth.core.celery.tasks.reddit_action_tasks.*': {'queue': 'reddit_actions'},


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,12 +239,12 @@ def update_monitored_sub_data(

monitored_sub.is_private = True if subreddit.subreddit_type == 'private' else False
monitored_sub.nsfw = True if subreddit.over18 else False
log.info('[Subscriber Update] %s: %s subscribers', monitored_sub.name, monitored_sub.subscribers)
log.debug('[Subscriber Update] %s: %s subscribers', monitored_sub.name, monitored_sub.subscribers)

perms = get_bot_permissions(subreddit) if monitored_sub.is_mod else []
monitored_sub.post_permission = True if 'all' in perms or 'posts' in perms else None
monitored_sub.wiki_permission = True if 'all' in perms or 'wiki' in perms else None
log.info('[Mod Check] %s | Post Perm: %s | Wiki Perm: %s', monitored_sub.name, monitored_sub.post_permission,
log.debug('[Mod Check] %s | Post Perm: %s | Wiki Perm: %s', monitored_sub.name, monitored_sub.post_permission,
monitored_sub.wiki_permission)


Expand Down
106 changes: 82 additions & 24 deletions redditrepostsleuth/core/celery/tasks/scheduled_tasks.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from praw.exceptions import PRAWException
import datetime
from functools import wraps
from time import perf_counter

from prawcore import TooManyRequests, Redirect, ServerError, NotFound

from redditrepostsleuth.adminsvc.bot_comment_monitor import BotCommentMonitor
Expand All @@ -8,24 +11,70 @@
from redditrepostsleuth.adminsvc.new_activation_monitor import NewActivationMonitor
from redditrepostsleuth.core.celery import celery
from redditrepostsleuth.core.celery.basetasks import RedditTask, SqlAlchemyTask, AdminTask
from redditrepostsleuth.core.celery.task_logic.scheduled_task_logic import update_proxies, update_top_reposts, \
token_checker, run_update_top_reposters, update_top_reposters, update_monitored_sub_data, run_update_top_reposts
from redditrepostsleuth.core.db.databasemodels import MonitoredSub, StatsDailyCount
from redditrepostsleuth.core.celery.task_logic.scheduled_task_logic import update_proxies, token_checker, \
run_update_top_reposters, update_top_reposters, update_monitored_sub_data, run_update_top_reposts
from redditrepostsleuth.core.db.databasemodels import StatsDailyCount
from redditrepostsleuth.core.logging import configure_logger
from redditrepostsleuth.core.util.helpers import chunk_list
from redditrepostsleuth.core.util.reddithelpers import is_sub_mod_praw, get_bot_permissions
from redditrepostsleuth.core.util.replytemplates import MONITORED_SUB_MOD_REMOVED_CONTENT, \
MONITORED_SUB_MOD_REMOVED_SUBJECT

log = configure_logger(
name='redditrepostsleuth',
)

def get_task_influx_points(task_name: str, task_status: str, task_runtime: float):
return ([
{
'measurement': 'Scheduled_Task_Updates',
'time': datetime.datetime.now(datetime.UTC),
'fields': {
'run_time': task_runtime
},
'tags': {
'task_name': task_name,
'task_status': task_status
}
}
])

def record_task_status(func):
@wraps(func)
def dec(self, *args, **kwargs):
if not hasattr(self, 'event_logger'):
log.warning('Task class %s does not have an event logger, cannot record task time', self.__name__)
return func(self, *args, **kwargs)

try:
task_start = perf_counter()
self.event_logger.write_raw_points(
get_task_influx_points(
func.__name__,
'started',
0.0,
)
)
func(self, *args, **kwargs)
task_status = 'finished'

except Exception as e:
log.exception('')
task_status = 'failed'

self.event_logger.write_raw_points(
get_task_influx_points(
func.__name__,
task_status,
perf_counter() - task_start,
)
)
return dec



@celery.task(bind=True, base=RedditTask)
@record_task_status
def check_inbox_task(self) -> None:
print("Checking inbox")
log.info('Scheduled Task: Check Inbox')

inbox_monitor = InboxMonitor(self.uowm, self.reddit, self.response_handler)
try:
inbox_monitor.check_inbox()
Expand All @@ -35,7 +84,9 @@ def check_inbox_task(self) -> None:
log.exception('Failed to update subreddit stats')



@celery.task(bind=True, base=RedditTask)
@record_task_status
def check_new_activations_task(self) -> None:
log.debug('Scheduled Task: Checking For Activations')
activation_monitor = NewActivationMonitor(
Expand All @@ -53,7 +104,9 @@ def check_new_activations_task(self) -> None:


@celery.task(bind=True, base=RedditTask)
@record_task_status
def check_comments_for_downvotes_task(self) -> None:
# TODO: Remove, no longer used
log.info('Scheduled Task: Check Comment Downvotes')
comment_monitor = BotCommentMonitor(self.reddit, self.uowm, self.config, notification_svc=self.notification_svc)
try:
Expand All @@ -62,6 +115,7 @@ def check_comments_for_downvotes_task(self) -> None:
log.exception('Failed to update subreddit stats')

@celery.task(bind=True, base=RedditTask)
@record_task_status
def update_ban_list_task(self) -> None:
"""
Go through banned subs and see if we're still banned
Expand All @@ -74,6 +128,7 @@ def update_ban_list_task(self) -> None:


@celery.task(bind=True, base=RedditTask)
@record_task_status
def update_monitored_sub_data_task(self) -> None:
log.debug('Starting Job: Update Subreddit Data')
try:
Expand All @@ -86,25 +141,16 @@ def update_monitored_sub_data_task(self) -> None:


@celery.task(bind=True, base=RedditTask)
@record_task_status
def remove_expired_bans_task(self) -> None:
log.info('Starting Job: Remove Expired Bans')
try:
remove_expired_bans(self.uowm, self.notification_svc)
except Exception as e:
log.exception('Scheduled Task Failed: Update Mod Status')


@celery.task(bind=True, base=RedditTask)
def update_top_image_reposts_task(self) -> None:
# TODO: Remove
log.info('Starting Job: Remove Expired Bans')
try:
update_stat_top_image_repost(self.uowm, self.reddit)
except Exception as e:
log.exception('Problem in scheduled task')


@celery.task(bind=True, base=RedditTask)
@record_task_status
def send_reports_to_meme_voting_task(self):
log.info('Starting Job: Reports to meme voting')
try:
Expand All @@ -114,14 +160,17 @@ def send_reports_to_meme_voting_task(self):


@celery.task(bind=True, base=RedditTask)
@record_task_status
def check_meme_template_potential_votes_task(self):
log.info('Starting Job: Meme Template Vote')
try:
check_meme_template_potential_votes(self.uowm)
except Exception as e:
log.exception('Problem in scheduled task')


@celery.task(bind=True, base=AdminTask, autoretry_for=(TooManyRequests,), retry_kwards={'max_retries': 3})
@record_task_status
def check_for_subreddit_config_update_task(self, subreddit_name: str) -> None:
with self.uowm.start() as uow:

Expand All @@ -142,7 +191,12 @@ def check_for_subreddit_config_update_task(self, subreddit_name: str) -> None:
log.exception('')

@celery.task(bind=True, base=RedditTask)
@record_task_status
def queue_config_updates_task(self):
"""
Goes through each registered subreddit and queues a job to check their Wiki config for updates
:param self:
"""
log.info('Starting Job: Config Update Check')
try:
print('[Scheduled Job] Queue config update check')
Expand All @@ -158,6 +212,7 @@ def queue_config_updates_task(self):


@celery.task(bind=True, base=SqlAlchemyTask)
@record_task_status
def update_daily_stats(self):
log.info('[Daily Stat Update] Started')
daily_stats = StatsDailyCount()
Expand All @@ -180,6 +235,7 @@ def update_daily_stats(self):


@celery.task(bind=True, base=SqlAlchemyTask)
@record_task_status
def update_all_top_reposts_task(self):
try:
with self.uowm.start() as uow:
Expand All @@ -188,6 +244,7 @@ def update_all_top_reposts_task(self):
log.exception('Unknown task error')

@celery.task(bind=True, base=SqlAlchemyTask)
@record_task_status
def update_all_top_reposters_task(self):
try:
with self.uowm.start() as uow:
Expand All @@ -196,6 +253,7 @@ def update_all_top_reposters_task(self):
log.exception('Unknown task error')

@celery.task(bind=True, base=SqlAlchemyTask)
@record_task_status
def update_daily_top_reposters_task(self):
post_types = [1, 2, 3]
try:
Expand All @@ -207,6 +265,7 @@ def update_daily_top_reposters_task(self):


@celery.task(bind=True, base=RedditTask, autoretry_for=(TooManyRequests,), retry_kwards={'max_retries': 3})
@record_task_status
def update_monitored_sub_stats_task(self, sub_name: str) -> None:
try:
with self.uowm.start() as uow:
Expand All @@ -225,6 +284,7 @@ def update_monitored_sub_stats_task(self, sub_name: str) -> None:
log.exception('')

@celery.task(bind=True, base=SqlAlchemyTask)
@record_task_status
def update_proxies_task(self) -> None:
log.info('Starting proxy update')
try:
Expand All @@ -233,12 +293,9 @@ def update_proxies_task(self) -> None:
except Exception as e:
log.exception('Failed to update proxies')

@celery.task
def update_profile_token_task():
print('Staring token checker')
token_checker()

@celery.task(bind=True, base=SqlAlchemyTask)
@record_task_status
def delete_search_batch(self, ids: list[int]):
try:
with self.uowm.start() as uow:
Expand All @@ -254,6 +311,7 @@ def delete_search_batch(self, ids: list[int]):
log.exception('')

@celery.task(bind=True, base=SqlAlchemyTask)
@record_task_status
def queue_search_history_cleanup(self):
with self.uowm.start() as uow:
searches = uow.repost_search.get_all_ids_older_than_days(120, limit=100000000)
Expand All @@ -263,4 +321,4 @@ def queue_search_history_cleanup(self):
log.info('Queuing Search History Cleanup. Range: ID Range: %s:%s', searches[0].id, searches[-1].id)
ids = [x[0] for x in searches]
for chunk in chunk_list(ids, 5000):
delete_search_batch.apply_async((chunk,), queue='batch_delete_searches')
delete_search_batch.apply_async((chunk,))
10 changes: 9 additions & 1 deletion redditrepostsleuth/core/services/eventlogging.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,12 @@ def _write_to_influx(self, event: InfluxEvent) -> bool:
self._unsaved_events.append(event)
log.error('Failed To Write To InfluxDB', exc_info=True)
log.error(event.get_influx_event())
return False
return False

def write_raw_points(self, points: list[dict]):
try:
self._influx_client.write(bucket=self._config.influx_bucket, record=points)
except Exception as e:
log.exception('Failed to write to Influx')

log.info('Wrote Influx: %s', points)
Loading