about summary refs log tree commit diff
path: root/app/services/activitypub
diff options
context:
space:
mode:
Diffstat (limited to 'app/services/activitypub')
-rw-r--r--app/services/activitypub/fetch_collection_items_service.rb167
-rw-r--r--app/services/activitypub/fetch_featured_collection_service.rb5
-rw-r--r--app/services/activitypub/fetch_replies_service.rb56
-rw-r--r--app/services/activitypub/process_account_service.rb15
-rw-r--r--app/services/activitypub/process_collection_items_service.rb30
5 files changed, 230 insertions, 43 deletions
diff --git a/app/services/activitypub/fetch_collection_items_service.rb b/app/services/activitypub/fetch_collection_items_service.rb
new file mode 100644
index 000000000..ef54321de
--- /dev/null
+++ b/app/services/activitypub/fetch_collection_items_service.rb
@@ -0,0 +1,167 @@
+# frozen_string_literal: true
+
+class ActivityPub::FetchCollectionItemsService < BaseService
+  include JsonLdHelper
+
+  COOLDOWN = 30.minutes
+
+  # Fetches objects in a collection from a URI or hash and queues them for processing.
+  # @param collection [Hash, String] Collection hash or URI
+  # @param account [Account] Owner of the collection
+  # @param page_limit [Integer] (10) Maximum number of pages to fetch from the collection.
+  # @param item_limit [Integer] (100) Maximum number of items to fetch from the collection.
+  # @option options [Boolean] :every_page (false) Whether to fetch every page in the collection,
+  #   even if its items have been previously fetched.  By default, fetching will stop if all the
+  #   items on any page have already been fetched.
+  # @option options [Boolean] :look_ahead (false) Whether to check the next page for unfetched
+  #   items if the current page's items have been previously fetched.  If there are unfetched
+  #   items on the next page, fetching will continue.
+  # @option options [Boolean] :skip_cooldown (false) Skip the fetch cooldown period on the a
+  #   collection URI (e.g., for account migration).
+  # @option options [Boolean] :include_boosts (false) Whether to skip boosts.  Including these
+  #   will cause a LOT of server traffic.
+  # @return [void]
+  # @raise [Mastodon::RaceConditionError] Collection is already being fetched.
+  # @raise [Mastodon::UnexpectedResponseError] Server returned an error while fetching a page.
+  def call(collection, account, page_limit: 10, item_limit: 100, **options)
+    uri = value_or_id(collection)
+    return if uri.blank? || ActivityPub::TagManager.instance.local_uri?(uri)
+
+    uri = collection['partOf'] if collection.is_a?(Hash) && collection['partOf'].present?
+
+    @account = account
+    @account = account_from_uri(uri) if @account.blank?
+    set_fetch_account
+
+    return if !options[:skip_cooldown] && Redis.current.get("fetch_collection_cooldown:#{uri}")
+
+    collection = fetch_collection(collection)
+    return if collection.blank?
+
+    if @account.blank?
+      @account = account_from_uri(collection['partOf'].presence || collection['id'])
+      set_fetch_account
+    end
+
+    fetch_collection_pages(collection, page_limit, item_limit, **options)
+  end
+
+  private
+
+  def lock_options(uri)
+    { redis: Redis.current, key: "fetch_collection:#{uri}" }
+  end
+
+  def set_fetch_account
+    @on_behalf_of = @account.present? ? @account.followers.local.random.first : nil
+  end
+
+  def account_from_uri(uri)
+    ActivityPub::TagManager.instance.uri_to_resource(uri, Account)
+  end
+
+  def account_id_from_uri(uri)
+    return if uri.blank?
+
+    Rails.cache.fetch("account_id_from_uri:#{uri}", expires_in: 10.minutes) do
+      account_from_uri(uri)&.id
+    end
+  end
+
+  def valid_item?(item)
+    item.is_a?(Hash) &&
+      !invalid_uri?(item['id']) &&
+      (item['attributedTo'].present? || item['actor'].present?) && (
+        item['object'].blank? || item['type'] == 'Create' && !invalid_uri?(value_or_id(item['object']))
+      )
+  end
+
+  def uri_with_account_id(item)
+    object = item['object'].presence || item
+    [value_or_id(object), object.is_a?(Hash) ? account_id_from_uri(object['attributedTo']) : account_id_from_uri(item['actor'])]
+  end
+
+  def invalid_uri?(uri)
+    unsupported_uri_scheme?(uri) || !uri_allowed?(uri) || ActivityPub::TagManager.instance.local_uri?(uri)
+  end
+
+  def fetch_collection(collection_or_uri)
+    return (collection_or_uri['id'].present? ? collection_or_uri : nil) if collection_or_uri.is_a?(Hash)
+    return if !collection_or_uri.is_a?(String) || invalid_origin?(collection_or_uri)
+
+    fetch_resource_without_id_validation(collection_or_uri, @on_behalf_of, true)
+  end
+
+  def fetch_collection_pages(collection, page_limit, item_limit, **options)
+    uri = collection['partOf'].presence || collection['id']
+    cooldown_key = "fetch_collection_cooldown:#{uri}"
+
+    return if !options[:skip_cooldown] && Redis.current.get(cooldown_key)
+
+    Redis.current.set(cooldown_key, 1, ex: COOLDOWN)
+
+    RedisLock.acquire(lock_options(uri)) do |lock|
+      raise Mastodon::RaceConditionError unless lock.acquired?
+
+      page = CollectionPage.find_or_create_by(uri: uri, account: @account)
+      every_page = options[:every_page]
+
+      if page.next.present?
+        collection = fetch_collection(page.next)
+        fetch_collection_items(collection, page, page_limit, item_limit, **options)
+        every_page = false
+      end
+
+      uri = collection['first'].presence || collection['id']
+      page.update!(next: uri)
+      collection = fetch_collection(uri) if collection['id'] != uri
+      fetch_collection_items(collection, page, page_limit, item_limit, **options.merge({ every_page: every_page }))
+    end
+  end
+
+  def fetch_collection_items(collection, page, page_limit, item_limit, **options)
+    page_count = 0
+    item_count = 0
+    seen_pages = Set[page.next]
+    have_items = false
+
+    while collection.present? && collection['type'].present?
+      batch = case collection['type']
+              when 'Collection', 'CollectionPage'
+                collection['items']
+              when 'OrderedCollection', 'OrderedCollectionPage'
+                collection['orderedItems']
+              end
+
+      break unless batch.is_a?(Array)
+
+      batch_size = [batch.count, item_limit - item_count].min
+      batch = batch.take(batch_size).select { |item| valid_item?(item) }.map { |item| uri_with_account_id(item) }
+      result = CollectionItem.import([:uri, :account_id], batch, validate: false, on_duplicate_key_ignore: true)
+
+      if !options[:every_page] && result.ids.blank?
+        break if have_items || !options[:look_ahead]
+
+        have_items = true
+      elsif have_items
+        have_items = false
+      end
+
+      item_count += result.ids.count
+      page_count += 1
+
+      next_page = collection['next']
+      break unless item_count < item_limit && page_count < page_limit && next_page.present?
+      break if seen_pages.include?(next_page)
+
+      sleep [page_count.to_f / 5, 1].min
+
+      seen_pages << next_page
+      page.update!(next: next_page)
+      collection = fetch_collection(next_page)
+    end
+
+    page.delete
+    ActivityPub::ProcessCollectionItemsWorker.perform_async
+  end
+end
diff --git a/app/services/activitypub/fetch_featured_collection_service.rb b/app/services/activitypub/fetch_featured_collection_service.rb
index 2c2770466..0a20f5edc 100644
--- a/app/services/activitypub/fetch_featured_collection_service.rb
+++ b/app/services/activitypub/fetch_featured_collection_service.rb
@@ -22,9 +22,10 @@ class ActivityPub::FetchFeaturedCollectionService < BaseService
   private
 
   def process_items(items)
+    first_local_follower = @account.followers.local.random.first
     status_ids = items.map { |item| value_or_id(item) }
                       .reject { |uri| ActivityPub::TagManager.instance.local_uri?(uri) }
-                      .map { |uri| ActivityPub::FetchRemoteStatusService.new.call(uri) }
+                      .map { |uri| ActivityPub::FetchRemoteStatusService.new.call(uri, on_behalf_of: first_local_follower) }
                       .compact
                       .select { |status| status.account_id == @account.id }
                       .map(&:id)
@@ -43,7 +44,7 @@ class ActivityPub::FetchFeaturedCollectionService < BaseService
     StatusPin.where(account: @account, status_id: to_remove).delete_all unless to_remove.empty?
 
     to_add.each do |status_id|
-      StatusPin.create!(account: @account, status_id: status_id)
+      StatusPin.create(account: @account, status_id: status_id)
     end
   end
 
diff --git a/app/services/activitypub/fetch_replies_service.rb b/app/services/activitypub/fetch_replies_service.rb
index 8cb309e52..e36ca9f39 100644
--- a/app/services/activitypub/fetch_replies_service.rb
+++ b/app/services/activitypub/fetch_replies_service.rb
@@ -1,49 +1,27 @@
 # frozen_string_literal: true
 
 class ActivityPub::FetchRepliesService < BaseService
-  include JsonLdHelper
-
-  def call(parent_status, collection_or_uri, allow_synchronous_requests = true)
+  def call(parent_status, collection, **options)
     @account = parent_status.account
-    @allow_synchronous_requests = allow_synchronous_requests
-
-    @items = collection_items(collection_or_uri)
-    return if @items.nil?
-
-    FetchReplyWorker.push_bulk(filtered_replies)
-
-    @items
+    fetch_collection_items(collection, **options)
+  rescue ActiveRecord::RecordNotFound
+    nil
   end
 
   private
 
-  def collection_items(collection_or_uri)
-    collection = fetch_collection(collection_or_uri)
-    return unless collection.is_a?(Hash)
-
-    collection = fetch_collection(collection['first']) if collection['first'].present?
-    return unless collection.is_a?(Hash)
-
-    case collection['type']
-    when 'Collection', 'CollectionPage'
-      collection['items']
-    when 'OrderedCollection', 'OrderedCollectionPage'
-      collection['orderedItems']
-    end
-  end
-
-  def fetch_collection(collection_or_uri)
-    return collection_or_uri if collection_or_uri.is_a?(Hash)
-    return unless @allow_synchronous_requests
-    return if invalid_origin?(collection_or_uri)
-    fetch_resource_without_id_validation(collection_or_uri, nil, true)
-  end
-
-  def filtered_replies
-    # Only fetch replies to the same server as the original status to avoid
-    # amplification attacks.
-
-    # Also limit to 5 fetched replies to limit potential for DoS.
-    @items.map { |item| value_or_id(item) }.reject { |uri| invalid_origin?(uri) }.take(5)
+  def fetch_collection_items(collection, **options)
+    ActivityPub::FetchCollectionItemsService.new.call(
+      collection,
+      @account,
+      page_limit: 1,
+      item_limit: 20,
+      **options
+    )
+  rescue Mastodon::RaceConditionError, Mastodon::UnexpectedResponseError
+    collection_uri = collection.is_a?(Hash) ? collection['id'] : collection
+    return unless collection_uri.present? && collection_uri.is_a?(String)
+
+    ActivityPub::FetchRepliesWorker.perform_async(@account.id, collection_uri)
   end
 end
diff --git a/app/services/activitypub/process_account_service.rb b/app/services/activitypub/process_account_service.rb
index 85b915ec6..7f17e460c 100644
--- a/app/services/activitypub/process_account_service.rb
+++ b/app/services/activitypub/process_account_service.rb
@@ -35,12 +35,13 @@ class ActivityPub::ProcessAccountService < BaseService
     return if @account.nil?
 
     after_protocol_change! if protocol_changed?
-    after_key_change! if key_changed? && !@options[:signed_with_known_key]
     clear_tombstones! if key_changed?
+    return after_key_change! if key_changed? && !@options[:signed_with_known_key]
 
     unless @options[:only_key]
       check_featured_collection! if @account.featured_collection_url.present?
       check_links! unless @account.fields.empty?
+      process_sync
     end
 
     @account
@@ -86,6 +87,11 @@ class ActivityPub::ProcessAccountService < BaseService
     @account.also_known_as           = as_array(@json['alsoKnownAs'] || []).map { |item| value_or_id(item) }
     @account.actor_type              = actor_type
     @account.discoverable            = @json['discoverable'] || false
+    @account.require_dereference     = @json['requireDereference'] || false
+    @account.show_replies            = @json['showReplies'] || true
+    @account.show_unlisted           = @json['showUnlisted'] || true
+    @account.private                 = @json['private'] || false
+    @account.require_auth            = @json['require_auth'] || false
   end
 
   def set_fetchable_attributes!
@@ -104,7 +110,8 @@ class ActivityPub::ProcessAccountService < BaseService
   end
 
   def after_key_change!
-    RefollowWorker.perform_async(@account.id)
+    ResetAccountWorker.perform_async(@account.id)
+    nil
   end
 
   def check_featured_collection!
@@ -288,4 +295,8 @@ class ActivityPub::ProcessAccountService < BaseService
 
     @account.identity_proofs.where(provider: provider, provider_username: provider_username).find_or_create_by(provider: provider, provider_username: provider_username, token: token)
   end
+
+  def process_sync
+    ActivityPub::SyncAccountWorker.perform_async(@account.id)
+  end
 end
diff --git a/app/services/activitypub/process_collection_items_service.rb b/app/services/activitypub/process_collection_items_service.rb
new file mode 100644
index 000000000..9c30d81e9
--- /dev/null
+++ b/app/services/activitypub/process_collection_items_service.rb
@@ -0,0 +1,30 @@
+# frozen_string_literal: true
+
+class ActivityPub::ProcessCollectionItemsService < BaseService
+  def call(account_id, on_behalf_of)
+    RedisLock.acquire(lock_options(account_id)) do |lock|
+      if lock.acquired?
+        CollectionItem.unprocessed.where(account_id: account_id).find_each do |item|
+          # Avoid failing servers holding up the rest of the queue.
+          next if item.retries.positive? && rand(3).positive?
+
+          begin
+            FetchRemoteStatusService.new.call(item.uri, nil, on_behalf_of)
+          rescue ActiveRecord::RecordInvalid, ActiveRecord::RecordNotFound
+            nil
+          rescue HTTP::TimeoutError
+            item.increment!(:retries)
+          end
+
+          item.update!(processed: true) if item.retries.zero? || item.retries > 4
+        end
+      end
+    end
+  end
+
+  private
+
+  def lock_options(account_id)
+    { redis: Redis.current, key: "process_collection_items:#{account_id}" }
+  end
+end