diff options
Diffstat (limited to 'app/services/activitypub')
5 files changed, 230 insertions, 43 deletions
diff --git a/app/services/activitypub/fetch_collection_items_service.rb b/app/services/activitypub/fetch_collection_items_service.rb new file mode 100644 index 000000000..ef54321de --- /dev/null +++ b/app/services/activitypub/fetch_collection_items_service.rb @@ -0,0 +1,167 @@ +# frozen_string_literal: true + +class ActivityPub::FetchCollectionItemsService < BaseService + include JsonLdHelper + + COOLDOWN = 30.minutes + + # Fetches objects in a collection from a URI or hash and queues them for processing. + # @param collection [Hash, String] Collection hash or URI + # @param account [Account] Owner of the collection + # @param page_limit [Integer] (10) Maximum number of pages to fetch from the collection. + # @param item_limit [Integer] (100) Maximum number of items to fetch from the collection. + # @option options [Boolean] :every_page (false) Whether to fetch every page in the collection, + # even if its items have been previously fetched. By default, fetching will stop if all the + # items on any page have already been fetched. + # @option options [Boolean] :look_ahead (false) Whether to check the next page for unfetched + # items if the current page's items have been previously fetched. If there are unfetched + # items on the next page, fetching will continue. + # @option options [Boolean] :skip_cooldown (false) Skip the fetch cooldown period on the a + # collection URI (e.g., for account migration). + # @option options [Boolean] :include_boosts (false) Whether to skip boosts. Including these + # will cause a LOT of server traffic. + # @return [void] + # @raise [Mastodon::RaceConditionError] Collection is already being fetched. + # @raise [Mastodon::UnexpectedResponseError] Server returned an error while fetching a page. + def call(collection, account, page_limit: 10, item_limit: 100, **options) + uri = value_or_id(collection) + return if uri.blank? || ActivityPub::TagManager.instance.local_uri?(uri) + + uri = collection['partOf'] if collection.is_a?(Hash) && collection['partOf'].present? + + @account = account + @account = account_from_uri(uri) if @account.blank? + set_fetch_account + + return if !options[:skip_cooldown] && Redis.current.get("fetch_collection_cooldown:#{uri}") + + collection = fetch_collection(collection) + return if collection.blank? + + if @account.blank? + @account = account_from_uri(collection['partOf'].presence || collection['id']) + set_fetch_account + end + + fetch_collection_pages(collection, page_limit, item_limit, **options) + end + + private + + def lock_options(uri) + { redis: Redis.current, key: "fetch_collection:#{uri}" } + end + + def set_fetch_account + @on_behalf_of = @account.present? ? @account.followers.local.random.first : nil + end + + def account_from_uri(uri) + ActivityPub::TagManager.instance.uri_to_resource(uri, Account) + end + + def account_id_from_uri(uri) + return if uri.blank? + + Rails.cache.fetch("account_id_from_uri:#{uri}", expires_in: 10.minutes) do + account_from_uri(uri)&.id + end + end + + def valid_item?(item) + item.is_a?(Hash) && + !invalid_uri?(item['id']) && + (item['attributedTo'].present? || item['actor'].present?) && ( + item['object'].blank? || item['type'] == 'Create' && !invalid_uri?(value_or_id(item['object'])) + ) + end + + def uri_with_account_id(item) + object = item['object'].presence || item + [value_or_id(object), object.is_a?(Hash) ? account_id_from_uri(object['attributedTo']) : account_id_from_uri(item['actor'])] + end + + def invalid_uri?(uri) + unsupported_uri_scheme?(uri) || !uri_allowed?(uri) || ActivityPub::TagManager.instance.local_uri?(uri) + end + + def fetch_collection(collection_or_uri) + return (collection_or_uri['id'].present? ? collection_or_uri : nil) if collection_or_uri.is_a?(Hash) + return if !collection_or_uri.is_a?(String) || invalid_origin?(collection_or_uri) + + fetch_resource_without_id_validation(collection_or_uri, @on_behalf_of, true) + end + + def fetch_collection_pages(collection, page_limit, item_limit, **options) + uri = collection['partOf'].presence || collection['id'] + cooldown_key = "fetch_collection_cooldown:#{uri}" + + return if !options[:skip_cooldown] && Redis.current.get(cooldown_key) + + Redis.current.set(cooldown_key, 1, ex: COOLDOWN) + + RedisLock.acquire(lock_options(uri)) do |lock| + raise Mastodon::RaceConditionError unless lock.acquired? + + page = CollectionPage.find_or_create_by(uri: uri, account: @account) + every_page = options[:every_page] + + if page.next.present? + collection = fetch_collection(page.next) + fetch_collection_items(collection, page, page_limit, item_limit, **options) + every_page = false + end + + uri = collection['first'].presence || collection['id'] + page.update!(next: uri) + collection = fetch_collection(uri) if collection['id'] != uri + fetch_collection_items(collection, page, page_limit, item_limit, **options.merge({ every_page: every_page })) + end + end + + def fetch_collection_items(collection, page, page_limit, item_limit, **options) + page_count = 0 + item_count = 0 + seen_pages = Set[page.next] + have_items = false + + while collection.present? && collection['type'].present? + batch = case collection['type'] + when 'Collection', 'CollectionPage' + collection['items'] + when 'OrderedCollection', 'OrderedCollectionPage' + collection['orderedItems'] + end + + break unless batch.is_a?(Array) + + batch_size = [batch.count, item_limit - item_count].min + batch = batch.take(batch_size).select { |item| valid_item?(item) }.map { |item| uri_with_account_id(item) } + result = CollectionItem.import([:uri, :account_id], batch, validate: false, on_duplicate_key_ignore: true) + + if !options[:every_page] && result.ids.blank? + break if have_items || !options[:look_ahead] + + have_items = true + elsif have_items + have_items = false + end + + item_count += result.ids.count + page_count += 1 + + next_page = collection['next'] + break unless item_count < item_limit && page_count < page_limit && next_page.present? + break if seen_pages.include?(next_page) + + sleep [page_count.to_f / 5, 1].min + + seen_pages << next_page + page.update!(next: next_page) + collection = fetch_collection(next_page) + end + + page.delete + ActivityPub::ProcessCollectionItemsWorker.perform_async + end +end diff --git a/app/services/activitypub/fetch_featured_collection_service.rb b/app/services/activitypub/fetch_featured_collection_service.rb index 2c2770466..0a20f5edc 100644 --- a/app/services/activitypub/fetch_featured_collection_service.rb +++ b/app/services/activitypub/fetch_featured_collection_service.rb @@ -22,9 +22,10 @@ class ActivityPub::FetchFeaturedCollectionService < BaseService private def process_items(items) + first_local_follower = @account.followers.local.random.first status_ids = items.map { |item| value_or_id(item) } .reject { |uri| ActivityPub::TagManager.instance.local_uri?(uri) } - .map { |uri| ActivityPub::FetchRemoteStatusService.new.call(uri) } + .map { |uri| ActivityPub::FetchRemoteStatusService.new.call(uri, on_behalf_of: first_local_follower) } .compact .select { |status| status.account_id == @account.id } .map(&:id) @@ -43,7 +44,7 @@ class ActivityPub::FetchFeaturedCollectionService < BaseService StatusPin.where(account: @account, status_id: to_remove).delete_all unless to_remove.empty? to_add.each do |status_id| - StatusPin.create!(account: @account, status_id: status_id) + StatusPin.create(account: @account, status_id: status_id) end end diff --git a/app/services/activitypub/fetch_replies_service.rb b/app/services/activitypub/fetch_replies_service.rb index 8cb309e52..e36ca9f39 100644 --- a/app/services/activitypub/fetch_replies_service.rb +++ b/app/services/activitypub/fetch_replies_service.rb @@ -1,49 +1,27 @@ # frozen_string_literal: true class ActivityPub::FetchRepliesService < BaseService - include JsonLdHelper - - def call(parent_status, collection_or_uri, allow_synchronous_requests = true) + def call(parent_status, collection, **options) @account = parent_status.account - @allow_synchronous_requests = allow_synchronous_requests - - @items = collection_items(collection_or_uri) - return if @items.nil? - - FetchReplyWorker.push_bulk(filtered_replies) - - @items + fetch_collection_items(collection, **options) + rescue ActiveRecord::RecordNotFound + nil end private - def collection_items(collection_or_uri) - collection = fetch_collection(collection_or_uri) - return unless collection.is_a?(Hash) - - collection = fetch_collection(collection['first']) if collection['first'].present? - return unless collection.is_a?(Hash) - - case collection['type'] - when 'Collection', 'CollectionPage' - collection['items'] - when 'OrderedCollection', 'OrderedCollectionPage' - collection['orderedItems'] - end - end - - def fetch_collection(collection_or_uri) - return collection_or_uri if collection_or_uri.is_a?(Hash) - return unless @allow_synchronous_requests - return if invalid_origin?(collection_or_uri) - fetch_resource_without_id_validation(collection_or_uri, nil, true) - end - - def filtered_replies - # Only fetch replies to the same server as the original status to avoid - # amplification attacks. - - # Also limit to 5 fetched replies to limit potential for DoS. - @items.map { |item| value_or_id(item) }.reject { |uri| invalid_origin?(uri) }.take(5) + def fetch_collection_items(collection, **options) + ActivityPub::FetchCollectionItemsService.new.call( + collection, + @account, + page_limit: 1, + item_limit: 20, + **options + ) + rescue Mastodon::RaceConditionError, Mastodon::UnexpectedResponseError + collection_uri = collection.is_a?(Hash) ? collection['id'] : collection + return unless collection_uri.present? && collection_uri.is_a?(String) + + ActivityPub::FetchRepliesWorker.perform_async(@account.id, collection_uri) end end diff --git a/app/services/activitypub/process_account_service.rb b/app/services/activitypub/process_account_service.rb index 85b915ec6..7f17e460c 100644 --- a/app/services/activitypub/process_account_service.rb +++ b/app/services/activitypub/process_account_service.rb @@ -35,12 +35,13 @@ class ActivityPub::ProcessAccountService < BaseService return if @account.nil? after_protocol_change! if protocol_changed? - after_key_change! if key_changed? && !@options[:signed_with_known_key] clear_tombstones! if key_changed? + return after_key_change! if key_changed? && !@options[:signed_with_known_key] unless @options[:only_key] check_featured_collection! if @account.featured_collection_url.present? check_links! unless @account.fields.empty? + process_sync end @account @@ -86,6 +87,11 @@ class ActivityPub::ProcessAccountService < BaseService @account.also_known_as = as_array(@json['alsoKnownAs'] || []).map { |item| value_or_id(item) } @account.actor_type = actor_type @account.discoverable = @json['discoverable'] || false + @account.require_dereference = @json['requireDereference'] || false + @account.show_replies = @json['showReplies'] || true + @account.show_unlisted = @json['showUnlisted'] || true + @account.private = @json['private'] || false + @account.require_auth = @json['require_auth'] || false end def set_fetchable_attributes! @@ -104,7 +110,8 @@ class ActivityPub::ProcessAccountService < BaseService end def after_key_change! - RefollowWorker.perform_async(@account.id) + ResetAccountWorker.perform_async(@account.id) + nil end def check_featured_collection! @@ -288,4 +295,8 @@ class ActivityPub::ProcessAccountService < BaseService @account.identity_proofs.where(provider: provider, provider_username: provider_username).find_or_create_by(provider: provider, provider_username: provider_username, token: token) end + + def process_sync + ActivityPub::SyncAccountWorker.perform_async(@account.id) + end end diff --git a/app/services/activitypub/process_collection_items_service.rb b/app/services/activitypub/process_collection_items_service.rb new file mode 100644 index 000000000..9c30d81e9 --- /dev/null +++ b/app/services/activitypub/process_collection_items_service.rb @@ -0,0 +1,30 @@ +# frozen_string_literal: true + +class ActivityPub::ProcessCollectionItemsService < BaseService + def call(account_id, on_behalf_of) + RedisLock.acquire(lock_options(account_id)) do |lock| + if lock.acquired? + CollectionItem.unprocessed.where(account_id: account_id).find_each do |item| + # Avoid failing servers holding up the rest of the queue. + next if item.retries.positive? && rand(3).positive? + + begin + FetchRemoteStatusService.new.call(item.uri, nil, on_behalf_of) + rescue ActiveRecord::RecordInvalid, ActiveRecord::RecordNotFound + nil + rescue HTTP::TimeoutError + item.increment!(:retries) + end + + item.update!(processed: true) if item.retries.zero? || item.retries > 4 + end + end + end + end + + private + + def lock_options(account_id) + { redis: Redis.current, key: "process_collection_items:#{account_id}" } + end +end |