From 002fb7fbb7f7a3d3b4fa2c6a1d72c3384fcfc20b Mon Sep 17 00:00:00 2001 From: multiple creatures Date: Tue, 10 Dec 2019 00:34:58 -0600 Subject: synchronize remote posts on first follow --- .../activitypub/fetch_account_statuses_service.rb | 76 ++++++++++++++++++++++ app/services/follow_service.rb | 2 + app/workers/sync_remote_account_worker.rb | 12 ++++ 3 files changed, 90 insertions(+) create mode 100644 app/services/activitypub/fetch_account_statuses_service.rb create mode 100644 app/workers/sync_remote_account_worker.rb diff --git a/app/services/activitypub/fetch_account_statuses_service.rb b/app/services/activitypub/fetch_account_statuses_service.rb new file mode 100644 index 000000000..59d4c52bf --- /dev/null +++ b/app/services/activitypub/fetch_account_statuses_service.rb @@ -0,0 +1,76 @@ +# frozen_string_literal: true + +class ActivityPub::FetchAccountStatusesService < BaseService + include JsonLdHelper + + MAX_PAGES = 50 + + def call(account, url = nil) + @account = account + return if account.local? || account.suspended? + + page = 1 + + @json = fetch_collection(url || account.outbox_url) + @items = Rails.cache.fetch("account_sync:#{account.id}") || [] + + if @items.empty? + until page == MAX_PAGES || @json.blank? + items = collection_items(@json).select { |item| item['type'] == 'Create' } + @items.concat(items) + break if @json['next'].blank? + page += 1 + @json = fetch_collection(@json['next']) + end + end + + Rails.cache.write("account_sync:#{account.id}", @items, expires_in: 1.day) + process_items(@items) + Rails.cache.delete("account_sync:#{account.id}") + + @items + end + + private + + def fetch_collection(collection_or_uri) + return collection_or_uri if collection_or_uri.is_a?(Hash) + + collection = _fetch_collection(collection_or_uri) + return unless collection.is_a?(Hash) + + if collection['first'].present? + collection = _fetch_collection(collection['first']) + return unless collection.is_a?(Hash) + end + + collection + end + + def _fetch_collection(collection_or_uri) + return collection_or_uri if collection_or_uri.is_a?(Hash) + return if invalid_origin?(collection_or_uri) + fetch_resource_without_id_validation(collection_or_uri, nil, true) + end + + def collection_items(collection) + case collection['type'] + when 'Collection', 'CollectionPage' + collection['items'] + when 'OrderedCollection', 'OrderedCollectionPage' + collection['orderedItems'] + end + end + + def process_items(items) + items.reverse_each.map { |item| process_item(item) }.compact + end + + def process_item(item) + return unless item.is_a?(Hash) && item['type'].present? + ActivityPub::Activity.factory(item, @account, override_timestamps: true, requested: true)&.perform + rescue => e + Rails.logger.error("Failed to process #{item['type']} #{item['id']} due to #{e}: #{e.message}") + Rails.logger.error("Stack trace: #{backtrace.map {|l| " #{l}\n"}.join}") + end +end diff --git a/app/services/follow_service.rb b/app/services/follow_service.rb index 8fbc54125..93f5b6b16 100644 --- a/app/services/follow_service.rb +++ b/app/services/follow_service.rb @@ -17,6 +17,8 @@ class FollowService < BaseService target_account.mark_known! unless !Setting.auto_mark_known || target_account.known? + SyncRemoteAccountWorker.perform_async(target_account.id) unless target_account.local? || target_account.passive_relationships.exists? + if source_account.following?(target_account) # We're already following this account, but we'll call follow! again to # make sure the reblogs status is set correctly. diff --git a/app/workers/sync_remote_account_worker.rb b/app/workers/sync_remote_account_worker.rb new file mode 100644 index 000000000..5c1f5fabd --- /dev/null +++ b/app/workers/sync_remote_account_worker.rb @@ -0,0 +1,12 @@ +# frozen_string_literal: true + +class SyncRemoteAccountWorker + include Sidekiq::Worker + + def perform(account_id) + account = Account.find(account_id) + ActivityPub::FetchAccountStatusesService.new.call(account) + rescue ActiveRecord::RecordNotFound, ActiveRecord::RecordInvalid + true + end +end -- cgit