about summary refs log tree commit diff
path: root/app/services
diff options
context:
space:
mode:
authorFire Demon <firedemon@creature.cafe>2020-07-21 20:55:40 -0500
committerFire Demon <firedemon@creature.cafe>2020-08-30 05:44:01 -0500
commit43b25edb4f8be2314eb0d8f22d52dded24c04d0a (patch)
treec10f573d1d45b52ed08915bb382c3a6f01ea6916 /app/services
parent1c9e9213111471613c40ed74d7c0917b560b89f1 (diff)
[Federation] Split collection items fetching from FetchRepliesService off into general-purpose FetchCollectionItemsService
Diffstat (limited to 'app/services')
-rw-r--r--app/services/activitypub/fetch_collection_items_service.rb60
-rw-r--r--app/services/activitypub/fetch_replies_service.rb49
2 files changed, 73 insertions, 36 deletions
diff --git a/app/services/activitypub/fetch_collection_items_service.rb b/app/services/activitypub/fetch_collection_items_service.rb
new file mode 100644
index 000000000..2d990dbf5
--- /dev/null
+++ b/app/services/activitypub/fetch_collection_items_service.rb
@@ -0,0 +1,60 @@
+# frozen_string_literal: true
+
+class ActivityPub::FetchCollectionItemsService < BaseService
+  include JsonLdHelper
+
+  def call(collection_or_uri, account, page_limit: 10, item_limit: 100, allow_synchronous_requests: true)
+    @account = account
+    @allow_synchronous_requests = allow_synchronous_requests
+
+    collection_items(collection_or_uri, page_limit, item_limit)
+  end
+
+  private
+
+  def collection_items(collection_or_uri, page_limit, item_limit)
+    collection = fetch_collection(collection_or_uri)
+    return [] unless collection.is_a?(Hash)
+
+    collection = fetch_collection(collection['first']) if collection['first'].present?
+    page_count = 0
+    item_count = 0
+    items = []
+
+    while collection.present? && collection.is_a?(Hash) && collection['type'].present?
+      batch = case collection['type']
+              when 'Collection', 'CollectionPage'
+                collection['items'].each
+              when 'OrderedCollection', 'OrderedCollectionPage'
+                collection['orderedItems']
+              end
+
+      batch_size = [batch.count, item_limit - item_count].min
+      items.push(
+        batch.take(batch_size)
+             .map { |item| value_or_id(item) }
+             .reject { |uri| unsupported_uri_scheme?(uri) || ActivityPub::TagManager.instance.local_uri?(uri) }
+      )
+
+      item_count += batch_size
+      page_count += 1
+
+      break unless item_count < item_limit && page_count < page_limit && collection['next'].present?
+
+      collection = fetch_collection(collection['next'])
+    end
+
+    items.uniq
+  end
+
+  def fetch_collection(collection_or_uri)
+    return collection_or_uri if collection_or_uri.is_a?(Hash)
+    return unless @allow_synchronous_requests
+    return if invalid_origin?(collection_or_uri)
+
+    on_behalf_of = @account.present? ? @account.followers.local.first : nil
+    fetch_resource_without_id_validation(collection_or_uri, on_behalf_of, true)
+  rescue Mastodon::UnexpectedResponseError
+    nil
+  end
+end
diff --git a/app/services/activitypub/fetch_replies_service.rb b/app/services/activitypub/fetch_replies_service.rb
index 0145f25da..a398bbc79 100644
--- a/app/services/activitypub/fetch_replies_service.rb
+++ b/app/services/activitypub/fetch_replies_service.rb
@@ -1,49 +1,26 @@
 # frozen_string_literal: true
 
 class ActivityPub::FetchRepliesService < BaseService
-  include JsonLdHelper
-
   def call(parent_status, collection_or_uri, allow_synchronous_requests = true)
-    @account = parent_status.account
+    @parent_status = parent_status
+    @collection_or_uri = collection_or_uri
     @allow_synchronous_requests = allow_synchronous_requests
 
-    @items = collection_items(collection_or_uri)
-    return if @items.nil?
-
-    FetchReplyWorker.push_bulk(filtered_replies)
+    items = fetch_collection_items
+    return if items.blank?
 
-    @items
+    FetchReplyWorker.push_bulk(items)
+    items
   end
 
   private
 
-  def collection_items(collection_or_uri)
-    collection = fetch_collection(collection_or_uri)
-    return unless collection.is_a?(Hash)
-
-    collection = fetch_collection(collection['first']) if collection['first'].present?
-    return unless collection.is_a?(Hash)
-
-    case collection['type']
-    when 'Collection', 'CollectionPage'
-      collection['items']
-    when 'OrderedCollection', 'OrderedCollectionPage'
-      collection['orderedItems']
-    end
-  end
-
-  def fetch_collection(collection_or_uri)
-    return collection_or_uri if collection_or_uri.is_a?(Hash)
-    return unless @allow_synchronous_requests
-    return if invalid_origin?(collection_or_uri)
-    fetch_resource_without_id_validation(collection_or_uri, nil, true)
-  end
-
-  def filtered_replies
-    # Only fetch replies to the same server as the original status to avoid
-    # amplification attacks.
-
-    # Also limit to 25 fetched replies to limit potential for DoS.
-    @items.map { |item| value_or_id(item) }.reject { |uri| invalid_origin?(uri) }.take(25)
+  def fetch_collection_items
+    ActivityPub::FetchCollectionItemsService.new.call(
+      @collection_or_uri, @parent_status&.account,
+      page_limit: 1,
+      item_limit: 20,
+      allow_synchronous_requests: @allow_synchronous_requests
+    )
   end
 end