Skip to content

Commit

Permalink
Merge pull request #352 from barrycarey/dev
Browse files Browse the repository at this point in the history
Dev merge
  • Loading branch information
barrycarey authored Dec 14, 2023
2 parents 9501abf + 32d82ec commit cd1aab2
Show file tree
Hide file tree
Showing 17 changed files with 297 additions and 233 deletions.
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ services:
- RUN_ENV=production
- db_user=sub_monitor
- LOG_LEVEL=INFO
- CELERY_IMPORTS=redditrepostsleuth.core.celery.response_tasks
- CELERY_IMPORTS=redditrepostsleuth.core.celery.tasks.monitored_sub_tasks
entrypoint: celery -A redditrepostsleuth.core.celery worker -Q submonitor -n submonitor_worker --autoscale=6,2

ingest_worker:
Expand Down
2 changes: 1 addition & 1 deletion redditrepostsleuth/core/celery/admin_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ def update_subreddit_config_from_database(self, monitored_sub: MonitoredSub, use

@celery.task(bind=True, base=AdminTask, autoretry_for=(UtilApiException,ConnectionError,TooManyRequests), retry_kwards={'max_retries': 3})
def check_user_for_only_fans(self, username: str) -> None:
skip_names = ['[deleted]']
skip_names = ['[deleted]', 'AutoModerator']

if username in skip_names:
log.info('Skipping name %s', username)
Expand Down
3 changes: 2 additions & 1 deletion redditrepostsleuth/core/celery/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from celery import Celery, signals
from celery.signals import after_setup_logger
from kombu.serialization import registry
from prawcore import TooManyRequests

from redditrepostsleuth.core.exception import IngestHighMatchMeme, ImageConversionException

Expand All @@ -28,7 +29,7 @@ def init_sentry(**_kwargs):
monitor_beat_tasks=True,
),
],
ignore_errors=[IngestHighMatchMeme, ImageConversionException, WorkerLostError]
ignore_errors=[IngestHighMatchMeme, ImageConversionException, WorkerLostError, TooManyRequests]
)

@after_setup_logger.connect
Expand Down
1 change: 0 additions & 1 deletion redditrepostsleuth/core/celery/celeryconfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
'redditrepostsleuth.core.celery.tasks.scheduled_tasks.check_for_subreddit_config_update_task': {'queue': 'subreddit_config_updates'},
'redditrepostsleuth.core.celery.tasks.scheduled_tasks.*': {'queue': 'scheduled_tasks'},
'redditrepostsleuth.core.celery.admin_tasks.update_proxies_job': {'queue': 'scheduled_tasks'},
'redditrepostsleuth.core.celery.response_tasks.process_summons': {'queue': 'summons'},
'redditrepostsleuth.core.celery.admin_tasks.check_user_for_only_fans': {'queue': 'onlyfans_check'},
'redditrepostsleuth.core.celery.admin_tasks.update_subreddit_config_from_database': {'queue': 'update_wiki_from_database'}

Expand Down
155 changes: 0 additions & 155 deletions redditrepostsleuth/core/celery/response_tasks.py
Original file line number Diff line number Diff line change
@@ -1,158 +1,3 @@
import json
import time
from random import randint

import requests
from celery import Task
from praw.exceptions import RedditAPIException, APIException
from prawcore import TooManyRequests
from requests.exceptions import ConnectionError

from redditrepostsleuth.core.celery import celery
from redditrepostsleuth.core.config import Config
from redditrepostsleuth.core.db.databasemodels import MonitoredSub
from redditrepostsleuth.core.db.db_utils import get_db_engine
from redditrepostsleuth.core.db.uow.unitofworkmanager import UnitOfWorkManager
from redditrepostsleuth.core.exception import LoadSubredditException, NoIndexException, RateLimitException
from redditrepostsleuth.core.logfilters import ContextFilter
from redditrepostsleuth.core.logging import configure_logger
from redditrepostsleuth.core.services.duplicateimageservice import DuplicateImageService
from redditrepostsleuth.core.services.eventlogging import EventLogging
from redditrepostsleuth.core.services.reddit_manager import RedditManager
from redditrepostsleuth.core.services.response_handler import ResponseHandler
from redditrepostsleuth.core.services.responsebuilder import ResponseBuilder
from redditrepostsleuth.core.util.helpers import update_log_context_data
from redditrepostsleuth.core.util.reddithelpers import get_reddit_instance
from redditrepostsleuth.submonitorsvc.submonitor import SubMonitor

log = configure_logger(
name='redditrepostsleuth',
format='%(asctime)s - %(module)s:%(funcName)s:%(lineno)d - Trace_ID=%(trace_id)s Post_ID=%(post_id)s Subreddit=%(subreddit)s Service=%(service)s Level=%(levelname)s Message=%(message)s',
filters=[ContextFilter()]
)


class SubMonitorTask(Task):

def __init__(self):
self.config = Config()
self.reddit = get_reddit_instance(self.config)
self.reddit_manager = RedditManager(self.reddit)
self.uowm = UnitOfWorkManager(get_db_engine(self.config))
event_logger = EventLogging(config=self.config)
response_handler = ResponseHandler(self.reddit, self.uowm, event_logger, source='submonitor', live_response=self.config.live_responses)
dup_image_svc = DuplicateImageService(self.uowm, event_logger, self.reddit, config=self.config)
response_builder = ResponseBuilder(self.uowm)
self.sub_monitor = SubMonitor(dup_image_svc, self.uowm, self.reddit, response_builder, response_handler, event_logger=event_logger, config=self.config)
self.blacklisted_posts = []


@celery.task(
bind=True,
base=SubMonitorTask,
serializer='pickle',
autoretry_for=(TooManyRequests, RedditAPIException, NoIndexException, RateLimitException),
retry_kwards={'max_retries': 3}
)
def sub_monitor_check_post(self, post_id: str, monitored_sub: MonitoredSub):
update_log_context_data(log, {'trace_id': str(randint(100000, 999999)), 'post_id': post_id,
'subreddit': monitored_sub.name, 'service': 'Subreddit_Monitor'})
if self.sub_monitor.has_post_been_checked(post_id):
log.debug('Post %s has already been checked', post_id)
return
if post_id in self.blacklisted_posts:
log.debug('Skipping blacklisted post')
return

start = time.perf_counter()
with self.uowm.start() as uow:
post = uow.posts.get_by_post_id(post_id)
if not post:
log.warning('Post %s does exist', post_id)
return
if not post.post_type:
log.warning('Unknown post type for %s - https://redd.it/%s', post.post_id, post.post_id)
return

self.sub_monitor.handle_only_fans_check(post, uow, monitored_sub)
self.sub_monitor.handle_high_volume_reposter_check(post, uow, monitored_sub)

title_keywords = []
if monitored_sub.title_ignore_keywords:
title_keywords = monitored_sub.title_ignore_keywords.split(',')

if not self.sub_monitor.should_check_post(
post,
monitored_sub,
title_keyword_filter=title_keywords
):
return

try:
results = self.sub_monitor.check_submission(monitored_sub, post)
except (TooManyRequests, RateLimitException):
log.warning('Currently out of API credits')
raise
except NoIndexException:
log.warning('No indexes available to do post check')
raise
except APIException:
log.exception('Unexpected Reddit API error')
raise
except RedditAPIException:
log.exception('')
raise
except Exception as e:
log.exception('')
return

if results:
self.sub_monitor.create_checked_post(results, monitored_sub)

total_check_time = round(time.perf_counter() - start, 5)

if total_check_time > 20:
log.warning('Long Check. Time: %s | Subreddit: %s | Post ID: %s | Type: %s', total_check_time, monitored_sub.name, post.post_id, post.post_type)

if len(self.blacklisted_posts) > 10000:
log.info('Resetting blacklisted posts')
self.blacklisted_posts = []


@celery.task(bind=True, base=SubMonitorTask, serializer='pickle', ignore_results=True, autoretry_for=(LoadSubredditException,), retry_kwards={'max_retries': 3})
def process_monitored_sub(self, monitored_sub):

submission_ids_to_check = []

if monitored_sub.is_private:
# Don't run through proxy if it's private
log.info('Loading all submissions from %s (PRIVATE)', monitored_sub.name)
submission_ids_to_check += [sub.id for sub in self.reddit.subreddit(monitored_sub.name).new(limit=500)]
else:
try:
log.info('Loading all submissions from %s', monitored_sub.name)
r = requests.get(f'{self.config.util_api}/reddit/subreddit', params={'subreddit': monitored_sub.name, 'limit': 500})
except ConnectionError:
log.error('Connection error with util API')
return
except Exception as e:
log.error('Error getting new posts from util api', exc_info=True)
return

if r.status_code == 403:
log.error('Monitored sub %s is private. Skipping', monitored_sub.name)
return

if r.status_code != 200:
log.error('Bad status code from Util API %s for %s', r.status_code, monitored_sub.name)
return

response_data = json.loads(r.text)

submission_ids_to_check += [submission['id'] for submission in response_data]

for submission_id in submission_ids_to_check:
sub_monitor_check_post.apply_async((submission_id, monitored_sub), queue='submonitor_private')

log.info('All submissions from %s sent to queue', monitored_sub.name)

Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import logging
import time

from praw.exceptions import APIException, RedditAPIException
from prawcore import TooManyRequests

from redditrepostsleuth.core.db.uow.unitofwork import UnitOfWork
from redditrepostsleuth.core.exception import RateLimitException, NoIndexException
from redditrepostsleuth.submonitorsvc.monitored_sub_service import MonitoredSubService

log = logging.getLogger(__name__)

def process_monitored_subreddit_submission(post_id: str, monitored_sub_svc: MonitoredSubService, uow: UnitOfWork) -> None:

start = time.perf_counter()

post = uow.posts.get_by_post_id(post_id)

if not post:
log.warning('Post %s does exist', post_id)
return

if not post.post_type:
log.warning('Unknown post type for %s - https://redd.it/%s', post.post_id, post.post_id)
return

monitored_sub = uow.monitored_sub.get_by_sub(post.subreddit)
whitelisted_user = uow.user_whitelist.get_by_username_and_subreddit(post.author, monitored_sub.id)

monitored_sub_svc.handle_only_fans_check(post, uow, monitored_sub, whitelisted_user=whitelisted_user)
monitored_sub_svc.handle_high_volume_reposter_check(post, uow, monitored_sub, whitelisted_user=whitelisted_user)

title_keywords = []
if monitored_sub.title_ignore_keywords:
title_keywords = monitored_sub.title_ignore_keywords.split(',')

if not monitored_sub_svc.should_check_post(
post,
monitored_sub,
title_keyword_filter=title_keywords,
whitelisted_user=whitelisted_user
):
return

try:
results = monitored_sub_svc.check_submission(monitored_sub, post)
except (TooManyRequests, RateLimitException):
log.warning('Currently out of API credits')
raise
except NoIndexException:
log.warning('No indexes available to do post check')
raise
except APIException:
log.exception('Unexpected Reddit API error')
raise
except RedditAPIException:
log.exception('')
raise
except Exception as e:
log.exception('')
return

if results:
monitored_sub_svc.create_checked_post(results, monitored_sub)

total_check_time = round(time.perf_counter() - start, 5)

if total_check_time > 20:
log.warning('Long Check. Time: %s | Subreddit: %s | Post ID: %s | Type: %s', total_check_time, monitored_sub.name, post.post_id, post.post_type)
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,9 @@ def token_checker() -> None:
if token_res.status_code != 200:
log.error('Failed to get new token')
return
decoded_token = jwt.decode(json.loads(token_res.text), '', algorithms=["HS256"], options={"verify_signature": False})
redis_client.set('prof_token', decoded_token['sub'])
#decoded_token = jwt.decode(json.loads(token_res.text), '', algorithms=["HS256"], options={"verify_signature": False})
new_token = json.loads(token_res.text)
redis_client.set('prof_token', new_token)
log.info('New token set in Redis')

def update_monitored_sub_data(
Expand Down
2 changes: 1 addition & 1 deletion redditrepostsleuth/core/celery/tasks/ingest_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def save_new_post(self, submission: dict):
monitored_sub = uow.monitored_sub.get_by_sub(post.subreddit)
if monitored_sub and monitored_sub.active:
log.info('Sending ingested post to monitored sub queue')
celery.send_task('redditrepostsleuth.core.celery.response_tasks.sub_monitor_check_post',
celery.send_task('redditrepostsleuth.core.celery.tasks.monitored_sub_tasks.sub_monitor_check_post',
args=[post.post_id, monitored_sub],
queue='submonitor', countdown=20)

Expand Down
Loading

0 comments on commit cd1aab2

Please sign in to comment.