diff --git a/.idea/workspace.xml b/.idea/workspace.xml index 9760be9..c08fe12 100644 --- a/.idea/workspace.xml +++ b/.idea/workspace.xml @@ -6,6 +6,7 @@ + - { + "keyToString": { + "Python.loadtest.executor": "Run", + "Python.main.executor": "Run", + "Python.track (1).executor": "Run", + "Python.track.executor": "Run", + "RunOnceActivity.OpenProjectViewOnStart": "true", + "RunOnceActivity.ShowReadmeOnStart": "true", + "git-widget-placeholder": "master", + "settings.editor.selected.configurable": "preferences.lookFeel" } -}]]> +} diff --git a/bot/reddit/track.py b/bot/reddit/track.py index b279211..656390a 100644 --- a/bot/reddit/track.py +++ b/bot/reddit/track.py @@ -1,3 +1,5 @@ +import asyncio + import asyncpraw import re import orjson @@ -20,8 +22,10 @@ user_agent="Reply Recruit" ) -async def main(): - producer = KafkaProducer(bootstrap_servers=["85.10.200.219:9092"], api_version=(3, 6, 0)) +producer = KafkaProducer(bootstrap_servers=["85.10.200.219:9092"], api_version=(3, 6, 0)) + + +async def post_stream(): while True: try: count = 0 @@ -45,3 +49,32 @@ async def main(): except Exception as e: continue +async def comment_stream(): + while True: + try: + count = 0 + sub = await reddit.subreddit(subreddit) + async for comment in sub.stream.comments(): + if count < 100: # This removes the 100 historical submissions that SubredditStream pulls. + count += 1 + continue + json_data = {"type": "redditcomment", + "data" : {"author" : comment.author.name, + "avatar" : comment.author.icon_img, + "body" : comment.body, + "url" : comment.permalink, + "score" : comment.score, + "submission_author" : comment.submission.author.name, + "submission_title" : comment.submission.title + }} + producer.send(topic="reddit", value=orjson.dumps(json_data), timestamp_ms=int(pend.now(tz=pend.UTC).timestamp()) * 1000) + except Exception as e: + continue + + +async def main(): + loop = asyncio.get_event_loop() + loop.create_task(comment_stream()) + loop.create_task(post_stream()) + +