From 43b25edb4f8be2314eb0d8f22d52dded24c04d0a Mon Sep 17 00:00:00 2001 From: Fire Demon Date: Tue, 21 Jul 2020 20:55:40 -0500 Subject: [Federation] Split collection items fetching from FetchRepliesService off into general-purpose FetchCollectionItemsService --- .../activitypub/fetch_collection_items_service.rb | 60 ++++++++++++++++++++++ app/services/activitypub/fetch_replies_service.rb | 49 +++++------------- app/workers/fetch_reply_worker.rb | 4 +- 3 files changed, 75 insertions(+), 38 deletions(-) create mode 100644 app/services/activitypub/fetch_collection_items_service.rb diff --git a/app/services/activitypub/fetch_collection_items_service.rb b/app/services/activitypub/fetch_collection_items_service.rb new file mode 100644 index 000000000..2d990dbf5 --- /dev/null +++ b/app/services/activitypub/fetch_collection_items_service.rb @@ -0,0 +1,60 @@ +# frozen_string_literal: true + +class ActivityPub::FetchCollectionItemsService < BaseService + include JsonLdHelper + + def call(collection_or_uri, account, page_limit: 10, item_limit: 100, allow_synchronous_requests: true) + @account = account + @allow_synchronous_requests = allow_synchronous_requests + + collection_items(collection_or_uri, page_limit, item_limit) + end + + private + + def collection_items(collection_or_uri, page_limit, item_limit) + collection = fetch_collection(collection_or_uri) + return [] unless collection.is_a?(Hash) + + collection = fetch_collection(collection['first']) if collection['first'].present? + page_count = 0 + item_count = 0 + items = [] + + while collection.present? && collection.is_a?(Hash) && collection['type'].present? + batch = case collection['type'] + when 'Collection', 'CollectionPage' + collection['items'].each + when 'OrderedCollection', 'OrderedCollectionPage' + collection['orderedItems'] + end + + batch_size = [batch.count, item_limit - item_count].min + items.push( + batch.take(batch_size) + .map { |item| value_or_id(item) } + .reject { |uri| unsupported_uri_scheme?(uri) || ActivityPub::TagManager.instance.local_uri?(uri) } + ) + + item_count += batch_size + page_count += 1 + + break unless item_count < item_limit && page_count < page_limit && collection['next'].present? + + collection = fetch_collection(collection['next']) + end + + items.uniq + end + + def fetch_collection(collection_or_uri) + return collection_or_uri if collection_or_uri.is_a?(Hash) + return unless @allow_synchronous_requests + return if invalid_origin?(collection_or_uri) + + on_behalf_of = @account.present? ? @account.followers.local.first : nil + fetch_resource_without_id_validation(collection_or_uri, on_behalf_of, true) + rescue Mastodon::UnexpectedResponseError + nil + end +end diff --git a/app/services/activitypub/fetch_replies_service.rb b/app/services/activitypub/fetch_replies_service.rb index 0145f25da..a398bbc79 100644 --- a/app/services/activitypub/fetch_replies_service.rb +++ b/app/services/activitypub/fetch_replies_service.rb @@ -1,49 +1,26 @@ # frozen_string_literal: true class ActivityPub::FetchRepliesService < BaseService - include JsonLdHelper - def call(parent_status, collection_or_uri, allow_synchronous_requests = true) - @account = parent_status.account + @parent_status = parent_status + @collection_or_uri = collection_or_uri @allow_synchronous_requests = allow_synchronous_requests - @items = collection_items(collection_or_uri) - return if @items.nil? - - FetchReplyWorker.push_bulk(filtered_replies) + items = fetch_collection_items + return if items.blank? - @items + FetchReplyWorker.push_bulk(items) + items end private - def collection_items(collection_or_uri) - collection = fetch_collection(collection_or_uri) - return unless collection.is_a?(Hash) - - collection = fetch_collection(collection['first']) if collection['first'].present? - return unless collection.is_a?(Hash) - - case collection['type'] - when 'Collection', 'CollectionPage' - collection['items'] - when 'OrderedCollection', 'OrderedCollectionPage' - collection['orderedItems'] - end - end - - def fetch_collection(collection_or_uri) - return collection_or_uri if collection_or_uri.is_a?(Hash) - return unless @allow_synchronous_requests - return if invalid_origin?(collection_or_uri) - fetch_resource_without_id_validation(collection_or_uri, nil, true) - end - - def filtered_replies - # Only fetch replies to the same server as the original status to avoid - # amplification attacks. - - # Also limit to 25 fetched replies to limit potential for DoS. - @items.map { |item| value_or_id(item) }.reject { |uri| invalid_origin?(uri) }.take(25) + def fetch_collection_items + ActivityPub::FetchCollectionItemsService.new.call( + @collection_or_uri, @parent_status&.account, + page_limit: 1, + item_limit: 20, + allow_synchronous_requests: @allow_synchronous_requests + ) end end diff --git a/app/workers/fetch_reply_worker.rb b/app/workers/fetch_reply_worker.rb index 1490f283c..d7d11a9d3 100644 --- a/app/workers/fetch_reply_worker.rb +++ b/app/workers/fetch_reply_worker.rb @@ -6,7 +6,7 @@ class FetchReplyWorker sidekiq_options queue: 'pull', retry: 3 - def perform(child_url, on_behalf_of = nil) - FetchRemoteStatusService.new.call(child_url, nil, on_behalf_of) + def perform(child_url) + FetchRemoteStatusService.new.call(child_url, nil) end end -- cgit