diff options
author | Fire Demon <firedemon@creature.cafe> | 2020-06-27 14:22:30 -0500 |
---|---|---|
committer | Fire Demon <firedemon@creature.cafe> | 2020-09-08 03:37:04 -0500 |
commit | 9d4f18b984d6699bdf96e5f5963edfe80063426c (patch) | |
tree | e00fb54963769a259cd9bbe97754a2a872d028be /app/workers/activitypub | |
parent | 437d71bddf967573df3912ee5976f7c5a5a7b4c7 (diff) |
Monsterfork v2 Kaiju Commit 2020.06.27.1 - 2020.09.05.5
Diffstat (limited to 'app/workers/activitypub')
5 files changed, 115 insertions, 9 deletions
diff --git a/app/workers/activitypub/distribution_worker.rb b/app/workers/activitypub/distribution_worker.rb index e4997ba0e..716d751c4 100644 --- a/app/workers/activitypub/distribution_worker.rb +++ b/app/workers/activitypub/distribution_worker.rb @@ -9,11 +9,12 @@ class ActivityPub::DistributionWorker def perform(status_id) @status = Status.find(status_id) @account = @status.account + @payload = {} return if skip_distribution? ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url| - [payload, @account.id, inbox_url] + [payload(Addressable::URI.parse(inbox_url).host), @account.id, inbox_url] end relay! if relayable? @@ -24,7 +25,7 @@ class ActivityPub::DistributionWorker private def skip_distribution? - @status.direct_visibility? || @status.limited_visibility? + !@status.published? || @status.direct_visibility? || @status.limited_visibility? end def relayable? @@ -35,20 +36,20 @@ class ActivityPub::DistributionWorker # Deliver the status to all followers. # If the status is a reply to another local status, also forward it to that # status' authors' followers. - @inboxes ||= if @status.reply? && @status.thread.account.local? && @status.distributable? + @inboxes ||= if @status.reply? && @status.thread&.account&.local? && @status.distributable? @account.followers.or(@status.thread.account.followers).inboxes else @account.followers.inboxes end end - def payload - @payload ||= Oj.dump(serialize_payload(ActivityPub::ActivityPresenter.from_status(@status), ActivityPub::ActivitySerializer, signer: @account)) + def payload(domain) + @payload[domain] ||= Oj.dump(serialize_payload(ActivityPub::ActivityPresenter.from_status(@status, update: true, embed: false), ActivityPub::ActivitySerializer, signer: @account, target_domain: domain)) end def relay! ActivityPub::DeliveryWorker.push_bulk(Relay.enabled.pluck(:inbox_url)) do |inbox_url| - [payload, @account.id, inbox_url] + [payload(Addressable::URI.parse(inbox_url).host), @account.id, inbox_url] end end end diff --git a/app/workers/activitypub/process_collection_items_for_account_worker.rb b/app/workers/activitypub/process_collection_items_for_account_worker.rb new file mode 100644 index 000000000..4b5710c1d --- /dev/null +++ b/app/workers/activitypub/process_collection_items_for_account_worker.rb @@ -0,0 +1,20 @@ +# frozen_string_literal: true +class ActivityPub::ProcessCollectionItemsForAccountWorker + include Sidekiq::Worker + + sidekiq_options queue: 'pull', retry: 3 + + def perform(account_id) + @account_id = account_id + on_behalf_of = nil + + if account_id.present? + account = Account.find(account_id) + on_behalf_of = account.followers.local.random.first + end + + ActivityPub::ProcessCollectionItemsService.new.call(account_id, on_behalf_of) + rescue ActiveRecord::RecordNotFound + nil + end +end diff --git a/app/workers/activitypub/process_collection_items_worker.rb b/app/workers/activitypub/process_collection_items_worker.rb new file mode 100644 index 000000000..d830edaec --- /dev/null +++ b/app/workers/activitypub/process_collection_items_worker.rb @@ -0,0 +1,27 @@ +# frozen_string_literal: true +class ActivityPub::ProcessCollectionItemsWorker + include Sidekiq::Worker + + sidekiq_options queue: 'pull', retry: 0 + + def perform + return if Sidekiq::Stats.new.workers_size > 3 + + RedisLock.acquire(lock_options) do |lock| + if lock.acquired? + account_id = random_unprocessed_account_id + ActivityPub::ProcessCollectionItemsForAccountWorker.perform_async(account_id) if account_id.present? + end + end + end + + private + + def random_unprocessed_account_id + CollectionItem.unprocessed.pluck(:account_id).sample + end + + def lock_options + { redis: Redis.current, key: 'process_collection_items' } + end +end diff --git a/app/workers/activitypub/reply_distribution_worker.rb b/app/workers/activitypub/reply_distribution_worker.rb index d4d0148ac..e8648ffcd 100644 --- a/app/workers/activitypub/reply_distribution_worker.rb +++ b/app/workers/activitypub/reply_distribution_worker.rb @@ -12,11 +12,12 @@ class ActivityPub::ReplyDistributionWorker def perform(status_id) @status = Status.find(status_id) @account = @status.thread&.account + @payload = {} return unless @account.present? && @status.distributable? ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url| - [payload, @status.account_id, inbox_url] + [payload(Addressable::URI.parse(inbox_url).host), @status.account_id, inbox_url] end rescue ActiveRecord::RecordNotFound true @@ -28,7 +29,7 @@ class ActivityPub::ReplyDistributionWorker @inboxes ||= @account.followers.inboxes end - def payload - @payload ||= Oj.dump(serialize_payload(ActivityPub::ActivityPresenter.from_status(@status), ActivityPub::ActivitySerializer, signer: @status.account)) + def payload(domain) + @payload[domain] ||= Oj.dump(serialize_payload(ActivityPub::ActivityPresenter.from_status(@status, update: true, embed: false), ActivityPub::ActivitySerializer, signer: @status.account, target_domain: domain)) end end diff --git a/app/workers/activitypub/sync_account_worker.rb b/app/workers/activitypub/sync_account_worker.rb new file mode 100644 index 000000000..18825b20d --- /dev/null +++ b/app/workers/activitypub/sync_account_worker.rb @@ -0,0 +1,57 @@ +# frozen_string_literal: true +class ActivityPub::SyncAccountWorker + include Sidekiq::Worker + include ExponentialBackoff + + sidekiq_options queue: 'pull', retry: 5 + + def perform(account_id, every_page = false, skip_cooldown = false) + @account = Account.find(account_id) + return if @account.local? + + @from_migrated_account = @account.moved_to_account&.local? + return unless @from_migrated_account || @account.followers.local.exists? + + RedisLock.acquire(lock_options) do |lock| + if lock.acquired? + fetch_collection_items(every_page, skip_cooldown) + elsif @from_migrated_account + # Cause a retry so server-to-server migrations can complete. + raise Mastodon::RaceConditionError + end + end + rescue ActiveRecord::RecordNotFound + nil + end + + private + + def lock_options + { redis: Redis.current, key: "account_sync:#{@account.id}" } + end + + # Limits for an account moving to this server. + def limits_migrated + { + page_limit: 2_000, + item_limit: 40_000, + look_ahead: true, + } + end + + # Limits for an account someone locally follows. + def limits_followed + { + page_limit: 25, + item_limit: 500, + look_ahead: @account.last_synced_at.blank?, + } + end + + def fetch_collection_items(every_page, skip_cooldown) + opts = @from_migrated_account && every_page ? limits_migrated : limits_followed + opts.merge!({ every_page: every_page, skip_cooldown: skip_cooldown }) + ActivityPub::FetchCollectionItemsService.new.call(@account.outbox_url, @account, **opts) + @account.update(last_synced_at: Time.now.utc) + end +end |