# frozen_string_literal: true class ActivityPub::FetchCollectionItemsService < BaseService include JsonLdHelper COOLDOWN = 30.minutes # Fetches objects in a collection from a URI or hash and queues them for processing. # @param collection [Hash, String] Collection hash or URI # @param account [Account] Owner of the collection # @param page_limit [Integer] (10) Maximum number of pages to fetch from the collection. # @param item_limit [Integer] (100) Maximum number of items to fetch from the collection. # @option options [Boolean] :every_page (false) Whether to fetch every page in the collection, # even if its items have been previously fetched. By default, fetching will stop if all the # items on any page have already been fetched. # @option options [Boolean] :look_ahead (false) Whether to check the next page for unfetched # items if the current page's items have been previously fetched. If there are unfetched # items on the next page, fetching will continue. # @option options [Boolean] :skip_cooldown (false) Skip the fetch cooldown period on the a # collection URI (e.g., for account migration). # @option options [Boolean] :include_boosts (false) Whether to skip boosts. Including these # will cause a LOT of server traffic. # @return [void] # @raise [Mastodon::RaceConditionError] Collection is already being fetched. # @raise [Mastodon::UnexpectedResponseError] Server returned an error while fetching a page. def call(collection, account, page_limit: 10, item_limit: 100, **options) uri = value_or_id(collection) return if uri.blank? || ActivityPub::TagManager.instance.local_uri?(uri) uri = collection['partOf'] if collection.is_a?(Hash) && collection['partOf'].present? @account = account @account = account_from_uri(uri) if @account.blank? set_fetch_account return if !options[:skip_cooldown] && Redis.current.get("fetch_collection_cooldown:#{uri}") collection = fetch_collection(collection) return if collection.blank? if @account.blank? @account = account_from_uri(collection['partOf'].presence || collection['id']) set_fetch_account end fetch_collection_pages(collection, page_limit, item_limit, **options) end private def lock_options(uri) { redis: Redis.current, key: "fetch_collection:#{uri}" } end def set_fetch_account @on_behalf_of = @account.present? ? @account.followers.local.random.first : nil end def account_from_uri(uri) ActivityPub::TagManager.instance.uri_to_resource(uri, Account) end def account_id_from_uri(uri) return if uri.blank? Rails.cache.fetch("account_id_from_uri:#{uri}", expires_in: 10.minutes) do account_from_uri(uri)&.id end end def valid_item?(item) item.is_a?(Hash) && !invalid_uri?(item['id']) && (item['attributedTo'].present? || item['actor'].present?) && ( item['object'].blank? || item['type'] == 'Create' && !invalid_uri?(value_or_id(item['object'])) ) end def uri_with_account_id(item) object = item['object'].presence || item [value_or_id(object), object.is_a?(Hash) ? account_id_from_uri(object['attributedTo']) : account_id_from_uri(item['actor'])] end def invalid_uri?(uri) unsupported_uri_scheme?(uri) || !uri_allowed?(uri) || ActivityPub::TagManager.instance.local_uri?(uri) end def fetch_collection(collection_or_uri) return (collection_or_uri['id'].present? ? collection_or_uri : nil) if collection_or_uri.is_a?(Hash) return if !collection_or_uri.is_a?(String) || invalid_origin?(collection_or_uri) fetch_resource_without_id_validation(collection_or_uri, @on_behalf_of, true) end def fetch_collection_pages(collection, page_limit, item_limit, **options) uri = collection['partOf'].presence || collection['id'] cooldown_key = "fetch_collection_cooldown:#{uri}" return if !options[:skip_cooldown] && Redis.current.get(cooldown_key) Redis.current.set(cooldown_key, 1, ex: COOLDOWN) RedisLock.acquire(lock_options(uri)) do |lock| raise Mastodon::RaceConditionError unless lock.acquired? page = CollectionPage.find_or_create_by(uri: uri, account: @account) every_page = options[:every_page] if page.next.present? collection = fetch_collection(page.next) fetch_collection_items(collection, page, page_limit, item_limit, **options) every_page = false end uri = collection['first'].presence || collection['id'] page.update!(next: uri) collection = fetch_collection(uri) if collection['id'] != uri fetch_collection_items(collection, page, page_limit, item_limit, **options.merge({ every_page: every_page })) end end def fetch_collection_items(collection, page, page_limit, item_limit, **options) page_count = 0 item_count = 0 seen_pages = Set[page.next] have_items = false while collection.present? && collection['type'].present? batch = case collection['type'] when 'Collection', 'CollectionPage' collection['items'] when 'OrderedCollection', 'OrderedCollectionPage' collection['orderedItems'] end break unless batch.is_a?(Array) batch_size = [batch.count, item_limit - item_count].min batch = batch.take(batch_size).select { |item| valid_item?(item) }.map { |item| uri_with_account_id(item) } result = CollectionItem.import([:uri, :account_id], batch, validate: false, on_duplicate_key_ignore: true) if !options[:every_page] && result.ids.blank? break if have_items || !options[:look_ahead] have_items = true elsif have_items have_items = false end item_count += result.ids.count page_count += 1 next_page = collection['next'] break unless item_count < item_limit && page_count < page_limit && next_page.present? break if seen_pages.include?(next_page) sleep [page_count.to_f / 5, 1].min seen_pages << next_page page.update!(next: next_page) collection = fetch_collection(next_page) end page.delete ActivityPub::ProcessCollectionItemsWorker.perform_async end end