diff options
author | Fire Demon <firedemon@creature.cafe> | 2020-07-21 20:55:40 -0500 |
---|---|---|
committer | Fire Demon <firedemon@creature.cafe> | 2020-08-30 05:44:01 -0500 |
commit | 43b25edb4f8be2314eb0d8f22d52dded24c04d0a (patch) | |
tree | c10f573d1d45b52ed08915bb382c3a6f01ea6916 /app/services | |
parent | 1c9e9213111471613c40ed74d7c0917b560b89f1 (diff) |
[Federation] Split collection items fetching from FetchRepliesService off into general-purpose FetchCollectionItemsService
Diffstat (limited to 'app/services')
-rw-r--r-- | app/services/activitypub/fetch_collection_items_service.rb | 60 | ||||
-rw-r--r-- | app/services/activitypub/fetch_replies_service.rb | 49 |
2 files changed, 73 insertions, 36 deletions
diff --git a/app/services/activitypub/fetch_collection_items_service.rb b/app/services/activitypub/fetch_collection_items_service.rb new file mode 100644 index 000000000..2d990dbf5 --- /dev/null +++ b/app/services/activitypub/fetch_collection_items_service.rb @@ -0,0 +1,60 @@ +# frozen_string_literal: true + +class ActivityPub::FetchCollectionItemsService < BaseService + include JsonLdHelper + + def call(collection_or_uri, account, page_limit: 10, item_limit: 100, allow_synchronous_requests: true) + @account = account + @allow_synchronous_requests = allow_synchronous_requests + + collection_items(collection_or_uri, page_limit, item_limit) + end + + private + + def collection_items(collection_or_uri, page_limit, item_limit) + collection = fetch_collection(collection_or_uri) + return [] unless collection.is_a?(Hash) + + collection = fetch_collection(collection['first']) if collection['first'].present? + page_count = 0 + item_count = 0 + items = [] + + while collection.present? && collection.is_a?(Hash) && collection['type'].present? + batch = case collection['type'] + when 'Collection', 'CollectionPage' + collection['items'].each + when 'OrderedCollection', 'OrderedCollectionPage' + collection['orderedItems'] + end + + batch_size = [batch.count, item_limit - item_count].min + items.push( + batch.take(batch_size) + .map { |item| value_or_id(item) } + .reject { |uri| unsupported_uri_scheme?(uri) || ActivityPub::TagManager.instance.local_uri?(uri) } + ) + + item_count += batch_size + page_count += 1 + + break unless item_count < item_limit && page_count < page_limit && collection['next'].present? + + collection = fetch_collection(collection['next']) + end + + items.uniq + end + + def fetch_collection(collection_or_uri) + return collection_or_uri if collection_or_uri.is_a?(Hash) + return unless @allow_synchronous_requests + return if invalid_origin?(collection_or_uri) + + on_behalf_of = @account.present? ? @account.followers.local.first : nil + fetch_resource_without_id_validation(collection_or_uri, on_behalf_of, true) + rescue Mastodon::UnexpectedResponseError + nil + end +end diff --git a/app/services/activitypub/fetch_replies_service.rb b/app/services/activitypub/fetch_replies_service.rb index 0145f25da..a398bbc79 100644 --- a/app/services/activitypub/fetch_replies_service.rb +++ b/app/services/activitypub/fetch_replies_service.rb @@ -1,49 +1,26 @@ # frozen_string_literal: true class ActivityPub::FetchRepliesService < BaseService - include JsonLdHelper - def call(parent_status, collection_or_uri, allow_synchronous_requests = true) - @account = parent_status.account + @parent_status = parent_status + @collection_or_uri = collection_or_uri @allow_synchronous_requests = allow_synchronous_requests - @items = collection_items(collection_or_uri) - return if @items.nil? - - FetchReplyWorker.push_bulk(filtered_replies) + items = fetch_collection_items + return if items.blank? - @items + FetchReplyWorker.push_bulk(items) + items end private - def collection_items(collection_or_uri) - collection = fetch_collection(collection_or_uri) - return unless collection.is_a?(Hash) - - collection = fetch_collection(collection['first']) if collection['first'].present? - return unless collection.is_a?(Hash) - - case collection['type'] - when 'Collection', 'CollectionPage' - collection['items'] - when 'OrderedCollection', 'OrderedCollectionPage' - collection['orderedItems'] - end - end - - def fetch_collection(collection_or_uri) - return collection_or_uri if collection_or_uri.is_a?(Hash) - return unless @allow_synchronous_requests - return if invalid_origin?(collection_or_uri) - fetch_resource_without_id_validation(collection_or_uri, nil, true) - end - - def filtered_replies - # Only fetch replies to the same server as the original status to avoid - # amplification attacks. - - # Also limit to 25 fetched replies to limit potential for DoS. - @items.map { |item| value_or_id(item) }.reject { |uri| invalid_origin?(uri) }.take(25) + def fetch_collection_items + ActivityPub::FetchCollectionItemsService.new.call( + @collection_or_uri, @parent_status&.account, + page_limit: 1, + item_limit: 20, + allow_synchronous_requests: @allow_synchronous_requests + ) end end |