Skip to content

Commit

Permalink
working (i think?) recursive fetch
Browse files Browse the repository at this point in the history
  • Loading branch information
sneakers-the-rat committed Sep 19, 2024
1 parent d478936 commit 1696221
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 36 deletions.
32 changes: 14 additions & 18 deletions app/controllers/api/v1/statuses_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,24 +49,20 @@ def context
descendants_limit = DESCENDANTS_LIMIT
descendants_depth_limit = DESCENDANTS_DEPTH_LIMIT
else
unless @status.local?
json_status = fetch_resource(@status.uri, true, @current_account)

logger.warn "json status"
logger.warn json_status
# rescue this whole block on failure, don't want to fail the whole context request if we can't do this
collection = json_status['replies']
logger.warn "replies uri"
logger.warn collection

unless collection.nil?
ActivityPub::FetchRepliesService.new.call(
@status,
collection,
allow_synchronous_requests: true,
all_replies: true
)
end
unless @status.local? && !@status.should_fetch_replies?
json_status = fetch_resource(@status.uri, true, @current_account)

# rescue this whole block on failure, don't want to fail the whole context request if we can't do this
collection = json_status['replies']

unless collection.nil?
ActivityPub::FetchRepliesService.new.call(
@status,
collection,
allow_synchronous_requests: true,
all_replies: true
)
end
end
end

Expand Down
2 changes: 1 addition & 1 deletion app/lib/activitypub/activity/create.rb
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ def fetch_replies(status)
collection = @object['replies']
return if collection.blank?

replies = ActivityPub::FetchRepliesService.new.call(status, collection, allow_synchronous_requests: false, request_id: @options[:request_id])
replies = ActivityPub::FetchRepliesService.new.call(status, collection, allow_synchronous_requests: false, request_id: @options[:request_id], all_replies: status.should_fetch_replies?)
return unless replies.nil?

uri = value_or_id(collection)
Expand Down
10 changes: 10 additions & 0 deletions app/models/status.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
# edited_at :datetime
# trendable :boolean
# ordered_media_attachment_ids :bigint(8) is an Array
# fetched_replies_at :datetime
#

class Status < ApplicationRecord
Expand Down Expand Up @@ -183,6 +184,8 @@ class Status < ApplicationRecord
delegate :domain, to: :account, prefix: true

REAL_TIME_WINDOW = 6.hours
# debounce fetching all replies to minimize DoS
FETCH_REPLIES_DEBOUNCE = 1.hour

def cache_key
"v3:#{super}"
Expand Down Expand Up @@ -440,6 +443,13 @@ def unlink_from_conversations!
end
end

def should_fetch_replies?
# we aren't brand new, and we haven't fetched replies since the debounce window
created_at <= 10.minutes.ago && (
fetched_replies_at.nil? || fetched_replies_at <= FETCH_REPLIES_DEBOUNCE.ago
)
end

private

def update_status_stat!(attrs)
Expand Down
46 changes: 30 additions & 16 deletions app/services/activitypub/fetch_replies_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,49 +5,58 @@ class ActivityPub::FetchRepliesService < BaseService

# Limit of fetched replies used when not fetching all replies
MAX_REPLIES_LOW = 5
# limit of fetched replies used when fetching all replies
MAX_REPLIES_HIGH = 500

def call(parent_status, collection_or_uri, allow_synchronous_requests: true, request_id: nil, all_replies: false)
# Whether we are getting replies from more than the originating server,
# and don't limit ourselves to getting at most `MAX_REPLIES_LOW`
@all_replies = all_replies
# store the status and whether we should fetch replies for it to avoid
# race conditions if something else updates us in the meantime
@status = parent_status
@should_fetch_replies = parent_status.should_fetch_replies?

@account = parent_status.account
@allow_synchronous_requests = allow_synchronous_requests

@items = collection_items(collection_or_uri)
logger = Logger.new(STDOUT)
logger.warn 'collection items'
logger.warn @items
return if @items.nil?

FetchReplyWorker.push_bulk(filtered_replies) { |reply_uri| [reply_uri, { 'request_id' => request_id }] }
# Store last fetched all to debounce
@status.update(fetched_replies_at: Time.now.utc) if fetch_all_replies?

@items
end

private

def collection_items(collection_or_uri)
logger = Logger.new(STDOUT)
collection = fetch_collection(collection_or_uri)
logger.warn 'first collection'
logger.warn collection
return unless collection.is_a?(Hash)

collection = fetch_collection(collection['first']) if collection['first'].present?
logger.warn 'second collection'
logger.warn collection
return unless collection.is_a?(Hash)

# Need to do another "next" here. see https://neuromatch.social/users/jonny/statuses/112401738180959195/replies for example
# then we are home free (stopping for tonight tho.)
all_items = []
while collection.is_a?(Hash)
items = case collection['type']
when 'Collection', 'CollectionPage'
collection['items']
when 'OrderedCollection', 'OrderedCollectionPage'
collection['orderedItems']
end

case collection['type']
when 'Collection', 'CollectionPage'
as_array(collection['items'])
when 'OrderedCollection', 'OrderedCollectionPage'
as_array(collection['orderedItems'])
all_items.concat(as_array(items))

# Quit early if we are not fetching all replies
break if all_items.size >= MAX_REPLIES_HIGH || !fetch_all_replies?

collection = collection['next'].present? ? fetch_collection(collection['next']) : nil
end

all_items
end

def fetch_collection(collection_or_uri)
Expand All @@ -73,7 +82,8 @@ def fetch_collection(collection_or_uri)

def filtered_replies
if @all_replies
@items.map { |item| value_or_id(item) }
# Reject all statuses that we already have in the db
@items.map { |item| value_or_id(item) }.reject { |uri| Status.exists?(uri: uri) }
else
# Only fetch replies to the same server as the original status to avoid
# amplification attacks.
Expand All @@ -82,4 +92,8 @@ def filtered_replies
@items.map { |item| value_or_id(item) }.reject { |uri| non_matching_uri_hosts?(@account.uri, uri) }.take(MAX_REPLIES_LOW)
end
end

def fetch_all_replies?
@all_replies && @should_fetch_replies
end
end
9 changes: 9 additions & 0 deletions db/migrate/20240918233930_add_fetched_replies_at_to_status.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# frozen_string_literal: true

class AddFetchedRepliesAtToStatus < ActiveRecord::Migration[7.1]
disable_ddl_transaction!

def change
add_column :statuses, :fetched_replies_at, :datetime, null: true
end
end
3 changes: 2 additions & 1 deletion db/schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#
# It's strongly recommended that you check this file into your version control system.

ActiveRecord::Schema[7.1].define(version: 2024_08_08_125420) do
ActiveRecord::Schema[7.1].define(version: 2024_09_18_233930) do
# These are extensions that must be enabled in order to support this database
enable_extension "plpgsql"

Expand Down Expand Up @@ -1089,6 +1089,7 @@
t.datetime "edited_at", precision: nil
t.boolean "trendable"
t.bigint "ordered_media_attachment_ids", array: true
t.datetime "fetched_replies_at"
t.index ["account_id", "id", "visibility", "updated_at"], name: "index_statuses_20190820", order: { id: :desc }, where: "(deleted_at IS NULL)"
t.index ["account_id"], name: "index_statuses_on_account_id"
t.index ["deleted_at"], name: "index_statuses_on_deleted_at", where: "(deleted_at IS NOT NULL)"
Expand Down

0 comments on commit 1696221

Please sign in to comment.