about summary refs log tree commit diff
path: root/app
diff options
context:
space:
mode:
Diffstat (limited to 'app')
-rw-r--r--app/lib/activitypub/activity/create.rb17
-rw-r--r--app/models/account.rb1
-rw-r--r--app/models/collection_item.rb20
-rw-r--r--app/models/collection_page.rb17
-rw-r--r--app/models/concerns/account_associations.rb6
-rw-r--r--app/models/follow_request.rb5
-rw-r--r--app/services/activitypub/fetch_collection_items_service.rb168
-rw-r--r--app/services/activitypub/fetch_replies_service.rb27
-rw-r--r--app/services/activitypub/process_account_service.rb5
-rw-r--r--app/services/activitypub/process_collection_items_service.rb24
-rw-r--r--app/workers/activitypub/process_collection_items_for_account_worker.rb20
-rw-r--r--app/workers/activitypub/process_collection_items_worker.rb27
-rw-r--r--app/workers/activitypub/sync_account_worker.rb57
-rw-r--r--app/workers/move_worker.rb1
-rw-r--r--app/workers/thread_resolve_worker.rb2
15 files changed, 347 insertions, 50 deletions
diff --git a/app/lib/activitypub/activity/create.rb b/app/lib/activitypub/activity/create.rb
index d37203c40..23786c1cc 100644
--- a/app/lib/activitypub/activity/create.rb
+++ b/app/lib/activitypub/activity/create.rb
@@ -378,16 +378,17 @@ class ActivityPub::Activity::Create < ActivityPub::Activity
   end
 
   def fetch_replies(status)
-    FetchReplyWorker.perform_async(@object['root']) unless @object['root'].blank? || [object_uri, @object['url']].include?(@object['root']) || status_from_uri(@object['root'])
+    FetchReplyWorker.perform_async(@object['root']) unless invalid_root_uri?
 
     collection = @object['replies']
     return if collection.nil?
 
-    replies = ActivityPub::FetchRepliesService.new.call(status, collection, false)
-    return unless replies.nil?
-
-    uri = value_or_id(collection)
-    ActivityPub::FetchRepliesWorker.perform_async(status.id, uri) unless uri.nil?
+    if collection.is_a?(Hash)
+      ActivityPub::FetchRepliesService.new.call(status, collection)
+    else
+      uri = value_or_id(collection)
+      ActivityPub::FetchRepliesWorker.perform_async(status.id, uri) unless uri.nil?
+    end
   end
 
   def conversation_from_uri(uri)
@@ -548,6 +549,10 @@ class ActivityPub::Activity::Create < ActivityPub::Activity
     @skip_download ||= DomainBlock.reject_media?(@account.domain)
   end
 
+  def invalid_root_uri?
+    @object['root'].blank? || [object_uri, @object['url']].include?(@object['root']) || status_from_uri(@object['root'])
+  end
+
   def check_for_spam
     SpamCheck.perform(@status)
   end
diff --git a/app/models/account.rb b/app/models/account.rb
index aad6dc728..c7bf7bf80 100644
--- a/app/models/account.rb
+++ b/app/models/account.rb
@@ -55,6 +55,7 @@
 #  show_unlisted                 :boolean          default(TRUE), not null
 #  private                       :boolean          default(FALSE), not null
 #  require_auth                  :boolean          default(FALSE), not null
+#  last_synced_at                :datetime
 #
 
 class Account < ApplicationRecord
diff --git a/app/models/collection_item.rb b/app/models/collection_item.rb
new file mode 100644
index 000000000..cecb86534
--- /dev/null
+++ b/app/models/collection_item.rb
@@ -0,0 +1,20 @@
+# frozen_string_literal: true
+# == Schema Information
+#
+# Table name: collection_items
+#
+#  id         :bigint(8)        not null, primary key
+#  account_id :bigint(8)
+#  uri        :string           not null
+#  processed  :boolean          default(FALSE), not null
+#
+
+class CollectionItem < ApplicationRecord
+  belongs_to :account, inverse_of: :collection_items, optional: true
+
+  default_scope { order(id: :desc) }
+  scope :unprocessed, -> { where(processed: false) }
+  scope :joins_on_collection_pages, -> { joins('LEFT OUTER JOIN collection_pages ON collection_pages.account_id = collection_items.account_id') }
+  scope :inactive, -> { joins_on_collection_pages.where('collection_pages.account_id IS NULL') }
+  scope :active, -> { joins_on_collection_pages.where('collection_pages.account_id IS NOT NULL') }
+end
diff --git a/app/models/collection_page.rb b/app/models/collection_page.rb
new file mode 100644
index 000000000..e974e58a2
--- /dev/null
+++ b/app/models/collection_page.rb
@@ -0,0 +1,17 @@
+# frozen_string_literal: true
+# == Schema Information
+#
+# Table name: collection_pages
+#
+#  id         :bigint(8)        not null, primary key
+#  account_id :bigint(8)
+#  uri        :string           not null
+#  next       :string
+#
+
+class CollectionPage < ApplicationRecord
+  belongs_to :account, inverse_of: :collection_pages, optional: true
+
+  default_scope { order(id: :desc) }
+  scope :current, -> { where(next: nil) }
+end
diff --git a/app/models/concerns/account_associations.rb b/app/models/concerns/account_associations.rb
index 14f64cb71..eff470ffa 100644
--- a/app/models/concerns/account_associations.rb
+++ b/app/models/concerns/account_associations.rb
@@ -78,5 +78,11 @@ module AccountAssociations
 
     # Queued boosts
     has_many :queued_boosts, inverse_of: :account, dependent: :destroy
+
+    # Collection pages
+    has_many :collection_pages, inverse_of: :account, dependent: :destroy
+
+    # Collection items
+    has_many :collection_items, inverse_of: :account, dependent: :destroy
   end
 end
diff --git a/app/models/follow_request.rb b/app/models/follow_request.rb
index 3325e264c..cdf0f4bda 100644
--- a/app/models/follow_request.rb
+++ b/app/models/follow_request.rb
@@ -29,7 +29,10 @@ class FollowRequest < ApplicationRecord
 
   def authorize!
     account.follow!(target_account, reblogs: show_reblogs, uri: uri)
-    MergeWorker.perform_async(target_account.id, account.id) if account.local?
+    if account.local?
+      MergeWorker.perform_async(target_account.id, account.id)
+      ActivityPub::SyncAccountWorker.perform_async(target_account.id, every_page: true) unless target_account.local?
+    end
     destroy!
   end
 
diff --git a/app/services/activitypub/fetch_collection_items_service.rb b/app/services/activitypub/fetch_collection_items_service.rb
index 9d5fddcfd..ef54321de 100644
--- a/app/services/activitypub/fetch_collection_items_service.rb
+++ b/app/services/activitypub/fetch_collection_items_service.rb
@@ -3,61 +3,165 @@
 class ActivityPub::FetchCollectionItemsService < BaseService
   include JsonLdHelper
 
-  def call(collection_or_uri, account, page_limit: 10, item_limit: 100, **options)
+  COOLDOWN = 30.minutes
+
+  # Fetches objects in a collection from a URI or hash and queues them for processing.
+  # @param collection [Hash, String] Collection hash or URI
+  # @param account [Account] Owner of the collection
+  # @param page_limit [Integer] (10) Maximum number of pages to fetch from the collection.
+  # @param item_limit [Integer] (100) Maximum number of items to fetch from the collection.
+  # @option options [Boolean] :every_page (false) Whether to fetch every page in the collection,
+  #   even if its items have been previously fetched.  By default, fetching will stop if all the
+  #   items on any page have already been fetched.
+  # @option options [Boolean] :look_ahead (false) Whether to check the next page for unfetched
+  #   items if the current page's items have been previously fetched.  If there are unfetched
+  #   items on the next page, fetching will continue.
+  # @option options [Boolean] :skip_cooldown (false) Skip the fetch cooldown period on the a
+  #   collection URI (e.g., for account migration).
+  # @option options [Boolean] :include_boosts (false) Whether to skip boosts.  Including these
+  #   will cause a LOT of server traffic.
+  # @return [void]
+  # @raise [Mastodon::RaceConditionError] Collection is already being fetched.
+  # @raise [Mastodon::UnexpectedResponseError] Server returned an error while fetching a page.
+  def call(collection, account, page_limit: 10, item_limit: 100, **options)
+    uri = value_or_id(collection)
+    return if uri.blank? || ActivityPub::TagManager.instance.local_uri?(uri)
+
+    uri = collection['partOf'] if collection.is_a?(Hash) && collection['partOf'].present?
+
     @account = account
-    @allow_synchronous_requests = options[:allow_synchronous_requests]
-    @sync = options[:sync]
+    @account = account_from_uri(uri) if @account.blank?
+    set_fetch_account
 
-    return [] if collection_or_uri.is_a?(String) && ActivityPub::TagManager.instance.local_uri?(collection_or_uri)
+    return if !options[:skip_cooldown] && Redis.current.get("fetch_collection_cooldown:#{uri}")
+
+    collection = fetch_collection(collection)
+    return if collection.blank?
+
+    if @account.blank?
+      @account = account_from_uri(collection['partOf'].presence || collection['id'])
+      set_fetch_account
+    end
 
-    collection_items(collection_or_uri, page_limit, item_limit)
+    fetch_collection_pages(collection, page_limit, item_limit, **options)
   end
 
   private
 
-  def collection_items(collection_or_uri, page_limit, item_limit)
-    collection = fetch_collection(collection_or_uri)
-    return [] unless collection.is_a?(Hash)
+  def lock_options(uri)
+    { redis: Redis.current, key: "fetch_collection:#{uri}" }
+  end
+
+  def set_fetch_account
+    @on_behalf_of = @account.present? ? @account.followers.local.random.first : nil
+  end
+
+  def account_from_uri(uri)
+    ActivityPub::TagManager.instance.uri_to_resource(uri, Account)
+  end
+
+  def account_id_from_uri(uri)
+    return if uri.blank?
+
+    Rails.cache.fetch("account_id_from_uri:#{uri}", expires_in: 10.minutes) do
+      account_from_uri(uri)&.id
+    end
+  end
+
+  def valid_item?(item)
+    item.is_a?(Hash) &&
+      !invalid_uri?(item['id']) &&
+      (item['attributedTo'].present? || item['actor'].present?) && (
+        item['object'].blank? || item['type'] == 'Create' && !invalid_uri?(value_or_id(item['object']))
+      )
+  end
+
+  def uri_with_account_id(item)
+    object = item['object'].presence || item
+    [value_or_id(object), object.is_a?(Hash) ? account_id_from_uri(object['attributedTo']) : account_id_from_uri(item['actor'])]
+  end
+
+  def invalid_uri?(uri)
+    unsupported_uri_scheme?(uri) || !uri_allowed?(uri) || ActivityPub::TagManager.instance.local_uri?(uri)
+  end
 
-    collection = fetch_collection(collection['first']) if collection['first'].present?
+  def fetch_collection(collection_or_uri)
+    return (collection_or_uri['id'].present? ? collection_or_uri : nil) if collection_or_uri.is_a?(Hash)
+    return if !collection_or_uri.is_a?(String) || invalid_origin?(collection_or_uri)
+
+    fetch_resource_without_id_validation(collection_or_uri, @on_behalf_of, true)
+  end
+
+  def fetch_collection_pages(collection, page_limit, item_limit, **options)
+    uri = collection['partOf'].presence || collection['id']
+    cooldown_key = "fetch_collection_cooldown:#{uri}"
+
+    return if !options[:skip_cooldown] && Redis.current.get(cooldown_key)
+
+    Redis.current.set(cooldown_key, 1, ex: COOLDOWN)
+
+    RedisLock.acquire(lock_options(uri)) do |lock|
+      raise Mastodon::RaceConditionError unless lock.acquired?
+
+      page = CollectionPage.find_or_create_by(uri: uri, account: @account)
+      every_page = options[:every_page]
+
+      if page.next.present?
+        collection = fetch_collection(page.next)
+        fetch_collection_items(collection, page, page_limit, item_limit, **options)
+        every_page = false
+      end
+
+      uri = collection['first'].presence || collection['id']
+      page.update!(next: uri)
+      collection = fetch_collection(uri) if collection['id'] != uri
+      fetch_collection_items(collection, page, page_limit, item_limit, **options.merge({ every_page: every_page }))
+    end
+  end
+
+  def fetch_collection_items(collection, page, page_limit, item_limit, **options)
     page_count = 0
     item_count = 0
-    items = []
+    seen_pages = Set[page.next]
+    have_items = false
 
-    while collection.present? && collection.is_a?(Hash) && collection['type'].present?
+    while collection.present? && collection['type'].present?
       batch = case collection['type']
               when 'Collection', 'CollectionPage'
-                collection['items'].each
+                collection['items']
               when 'OrderedCollection', 'OrderedCollectionPage'
                 collection['orderedItems']
               end
 
+      break unless batch.is_a?(Array)
+
       batch_size = [batch.count, item_limit - item_count].min
-      items.push(
-        batch.take(batch_size)
-             .map { |item| value_or_id(item) }
-             .reject { |uri| unsupported_uri_scheme?(uri) || ActivityPub::TagManager.instance.local_uri?(uri) }
-      )
+      batch = batch.take(batch_size).select { |item| valid_item?(item) }.map { |item| uri_with_account_id(item) }
+      result = CollectionItem.import([:uri, :account_id], batch, validate: false, on_duplicate_key_ignore: true)
+
+      if !options[:every_page] && result.ids.blank?
+        break if have_items || !options[:look_ahead]
 
-      item_count += batch_size
+        have_items = true
+      elsif have_items
+        have_items = false
+      end
+
+      item_count += result.ids.count
       page_count += 1
 
-      break unless item_count < item_limit && page_count < page_limit && collection['next'].present?
+      next_page = collection['next']
+      break unless item_count < item_limit && page_count < page_limit && next_page.present?
+      break if seen_pages.include?(next_page)
 
-      collection = fetch_collection(collection['next'])
-    end
+      sleep [page_count.to_f / 5, 1].min
 
-    items.uniq
-  end
+      seen_pages << next_page
+      page.update!(next: next_page)
+      collection = fetch_collection(next_page)
+    end
 
-  def fetch_collection(collection_or_uri)
-    return collection_or_uri if collection_or_uri.is_a?(Hash)
-    return unless @allow_synchronous_requests
-    return if invalid_origin?(collection_or_uri)
-
-    on_behalf_of = @account.present? ? @account.followers.local.first : nil
-    fetch_resource_without_id_validation(collection_or_uri, on_behalf_of, true)
-  rescue Mastodon::UnexpectedResponseError
-    nil
+    page.delete
+    ActivityPub::ProcessCollectionItemsWorker.perform_async
   end
 end
diff --git a/app/services/activitypub/fetch_replies_service.rb b/app/services/activitypub/fetch_replies_service.rb
index a398bbc79..64a7dc9de 100644
--- a/app/services/activitypub/fetch_replies_service.rb
+++ b/app/services/activitypub/fetch_replies_service.rb
@@ -1,26 +1,31 @@
 # frozen_string_literal: true
 
 class ActivityPub::FetchRepliesService < BaseService
-  def call(parent_status, collection_or_uri, allow_synchronous_requests = true)
-    @parent_status = parent_status
-    @collection_or_uri = collection_or_uri
-    @allow_synchronous_requests = allow_synchronous_requests
+  def call(parent_status, collection, **options)
+    @account = parent_status.account
 
-    items = fetch_collection_items
-    return if items.blank?
+    fetch_collection_items(collection, **options)
+    return if collection.is_a?(String) && collection == @account.outbox_url
 
-    FetchReplyWorker.push_bulk(items)
-    items
+    fetch_collection_items(@account.outbox_url, **options) unless @account.silenced?
+  rescue ActiveRecord::RecordNotFound
+    nil
   end
 
   private
 
-  def fetch_collection_items
+  def fetch_collection_items(collection, **options)
     ActivityPub::FetchCollectionItemsService.new.call(
-      @collection_or_uri, @parent_status&.account,
+      collection,
+      @account,
       page_limit: 1,
       item_limit: 20,
-      allow_synchronous_requests: @allow_synchronous_requests
+      **options
     )
+  rescue Mastodon::RaceConditionError, Mastodon::UnexpectedResponseError
+    collection_uri = collection.is_a?(Hash) ? collection['id'] : collection
+    return unless collection_uri.present? && collection_uri.is_a?(String)
+
+    ActivityPub::FetchRepliesWorker.perform_async(@account.id, collection_uri)
   end
 end
diff --git a/app/services/activitypub/process_account_service.rb b/app/services/activitypub/process_account_service.rb
index 39e777b32..f78f8a030 100644
--- a/app/services/activitypub/process_account_service.rb
+++ b/app/services/activitypub/process_account_service.rb
@@ -27,6 +27,7 @@ class ActivityPub::ProcessAccountService < BaseService
         update_account
         process_tags
         process_attachments
+        process_sync
       else
         raise Mastodon::RaceConditionError
       end
@@ -293,4 +294,8 @@ class ActivityPub::ProcessAccountService < BaseService
 
     @account.identity_proofs.where(provider: provider, provider_username: provider_username).find_or_create_by(provider: provider, provider_username: provider_username, token: token)
   end
+
+  def process_sync
+    ActivityPub::SyncAccountWorker.perform_async(@account.id)
+  end
 end
diff --git a/app/services/activitypub/process_collection_items_service.rb b/app/services/activitypub/process_collection_items_service.rb
new file mode 100644
index 000000000..936593166
--- /dev/null
+++ b/app/services/activitypub/process_collection_items_service.rb
@@ -0,0 +1,24 @@
+# frozen_string_literal: true
+
+class ActivityPub::ProcessCollectionItemsService < BaseService
+  def call(account_id, on_behalf_of)
+    RedisLock.acquire(lock_options(account_id)) do |lock|
+      if lock.acquired?
+        CollectionItem.unprocessed.where(account_id: account_id).find_each do |item|
+          begin
+            FetchRemoteStatusService.new.call(item.uri, nil, on_behalf_of)
+          rescue ActiveRecord::RecordInvalid, ActiveRecord::RecordNotFound
+            nil
+          end
+          item.update!(processed: true)
+        end
+      end
+    end
+  end
+
+  private
+
+  def lock_options(account_id)
+    { redis: Redis.current, key: "process_collection_items:#{account_id}" }
+  end
+end
diff --git a/app/workers/activitypub/process_collection_items_for_account_worker.rb b/app/workers/activitypub/process_collection_items_for_account_worker.rb
new file mode 100644
index 000000000..4b5710c1d
--- /dev/null
+++ b/app/workers/activitypub/process_collection_items_for_account_worker.rb
@@ -0,0 +1,20 @@
+# frozen_string_literal: true
+class ActivityPub::ProcessCollectionItemsForAccountWorker
+  include Sidekiq::Worker
+
+  sidekiq_options queue: 'pull', retry: 3
+
+  def perform(account_id)
+    @account_id = account_id
+    on_behalf_of = nil
+
+    if account_id.present?
+      account = Account.find(account_id)
+      on_behalf_of = account.followers.local.random.first
+    end
+
+    ActivityPub::ProcessCollectionItemsService.new.call(account_id, on_behalf_of)
+  rescue ActiveRecord::RecordNotFound
+    nil
+  end
+end
diff --git a/app/workers/activitypub/process_collection_items_worker.rb b/app/workers/activitypub/process_collection_items_worker.rb
new file mode 100644
index 000000000..d830edaec
--- /dev/null
+++ b/app/workers/activitypub/process_collection_items_worker.rb
@@ -0,0 +1,27 @@
+# frozen_string_literal: true
+class ActivityPub::ProcessCollectionItemsWorker
+  include Sidekiq::Worker
+
+  sidekiq_options queue: 'pull', retry: 0
+
+  def perform
+    return if Sidekiq::Stats.new.workers_size > 3
+
+    RedisLock.acquire(lock_options) do |lock|
+      if lock.acquired?
+        account_id = random_unprocessed_account_id
+        ActivityPub::ProcessCollectionItemsForAccountWorker.perform_async(account_id) if account_id.present?
+      end
+    end
+  end
+
+  private
+
+  def random_unprocessed_account_id
+    CollectionItem.unprocessed.pluck(:account_id).sample
+  end
+
+  def lock_options
+    { redis: Redis.current, key: 'process_collection_items' }
+  end
+end
diff --git a/app/workers/activitypub/sync_account_worker.rb b/app/workers/activitypub/sync_account_worker.rb
new file mode 100644
index 000000000..18825b20d
--- /dev/null
+++ b/app/workers/activitypub/sync_account_worker.rb
@@ -0,0 +1,57 @@
+# frozen_string_literal: true
+class ActivityPub::SyncAccountWorker
+  include Sidekiq::Worker
+  include ExponentialBackoff
+
+  sidekiq_options queue: 'pull', retry: 5
+
+  def perform(account_id, every_page = false, skip_cooldown = false)
+    @account = Account.find(account_id)
+    return if @account.local?
+
+    @from_migrated_account = @account.moved_to_account&.local?
+    return unless @from_migrated_account || @account.followers.local.exists?
+
+    RedisLock.acquire(lock_options) do |lock|
+      if lock.acquired?
+        fetch_collection_items(every_page, skip_cooldown)
+      elsif @from_migrated_account
+        # Cause a retry so server-to-server migrations can complete.
+        raise Mastodon::RaceConditionError
+      end
+    end
+  rescue ActiveRecord::RecordNotFound
+    nil
+  end
+
+  private
+
+  def lock_options
+    { redis: Redis.current, key: "account_sync:#{@account.id}" }
+  end
+
+  # Limits for an account moving to this server.
+  def limits_migrated
+    {
+      page_limit: 2_000,
+      item_limit: 40_000,
+      look_ahead: true,
+    }
+  end
+
+  # Limits for an account someone locally follows.
+  def limits_followed
+    {
+      page_limit: 25,
+      item_limit: 500,
+      look_ahead: @account.last_synced_at.blank?,
+    }
+  end
+
+  def fetch_collection_items(every_page, skip_cooldown)
+    opts = @from_migrated_account && every_page ? limits_migrated : limits_followed
+    opts.merge!({ every_page: every_page, skip_cooldown: skip_cooldown })
+    ActivityPub::FetchCollectionItemsService.new.call(@account.outbox_url, @account, **opts)
+    @account.update(last_synced_at: Time.now.utc)
+  end
+end
diff --git a/app/workers/move_worker.rb b/app/workers/move_worker.rb
index 39e321316..1f1a49cbb 100644
--- a/app/workers/move_worker.rb
+++ b/app/workers/move_worker.rb
@@ -16,6 +16,7 @@ class MoveWorker
     copy_account_notes!
     carry_blocks_over!
     carry_mutes_over!
+    ActivityPub::SyncAccountWorker.perform_async(source_account.id, every_page: true, skip_cooldown: true)
   rescue ActiveRecord::RecordNotFound
     true
   end
diff --git a/app/workers/thread_resolve_worker.rb b/app/workers/thread_resolve_worker.rb
index 7599eb784..74cb09ab5 100644
--- a/app/workers/thread_resolve_worker.rb
+++ b/app/workers/thread_resolve_worker.rb
@@ -15,5 +15,7 @@ class ThreadResolveWorker
 
     child_status.thread = parent_status
     child_status.save!
+  rescue ActiveRecord::RecordInvalid
+    nil
   end
 end