diff options
author | Starfall <us@starfall.systems> | 2023-04-14 19:22:47 -0500 |
---|---|---|
committer | Starfall <us@starfall.systems> | 2023-04-14 19:22:47 -0500 |
commit | 4fe1689de43f4404eb9530fcfbcbfb26d6c1c13a (patch) | |
tree | 6811b845bb7f4966b10dcefa3dea404246f161c7 /app/workers/activitypub | |
parent | 65c1e53a32cabcdbb7bca57002bb0f6acdebe07e (diff) | |
parent | bed63f6dae0879ac840066b031229e0d139089cd (diff) |
Diffstat (limited to 'app/workers/activitypub')
7 files changed, 33 insertions, 6 deletions
diff --git a/app/workers/activitypub/delivery_worker.rb b/app/workers/activitypub/delivery_worker.rb index d9153132b..7c1c14766 100644 --- a/app/workers/activitypub/delivery_worker.rb +++ b/app/workers/activitypub/delivery_worker.rb @@ -10,6 +10,16 @@ class ActivityPub::DeliveryWorker sidekiq_options queue: 'push', retry: 16, dead: false + # Unfortunately, we cannot control Sidekiq's jitter, so add our own + sidekiq_retry_in do |count| + # This is Sidekiq's default delay + delay = (count**4) + 15 + # Our custom jitter, that will be added to Sidekiq's built-in one. + # Sidekiq's built-in jitter is `rand(10) * (count + 1)` + jitter = rand(0.5 * (count**4)) + delay + jitter + end + HEADERS = { 'Content-Type' => 'application/activity+json' }.freeze def perform(json, source_account_id, inbox_url, options = {}) diff --git a/app/workers/activitypub/distribute_poll_update_worker.rb b/app/workers/activitypub/distribute_poll_update_worker.rb index 601075ea6..ebdb78bb3 100644 --- a/app/workers/activitypub/distribute_poll_update_worker.rb +++ b/app/workers/activitypub/distribute_poll_update_worker.rb @@ -12,7 +12,7 @@ class ActivityPub::DistributePollUpdateWorker return if @status.preloadable_poll.nil? || @status.local_only? - ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url| + ActivityPub::DeliveryWorker.push_bulk(inboxes, limit: 1_000) do |inbox_url| [payload, @account.id, inbox_url] end diff --git a/app/workers/activitypub/fetch_replies_worker.rb b/app/workers/activitypub/fetch_replies_worker.rb index 54d98f228..d72bad745 100644 --- a/app/workers/activitypub/fetch_replies_worker.rb +++ b/app/workers/activitypub/fetch_replies_worker.rb @@ -6,8 +6,8 @@ class ActivityPub::FetchRepliesWorker sidekiq_options queue: 'pull', retry: 3 - def perform(parent_status_id, replies_uri) - ActivityPub::FetchRepliesService.new.call(Status.find(parent_status_id), replies_uri) + def perform(parent_status_id, replies_uri, options = {}) + ActivityPub::FetchRepliesService.new.call(Status.find(parent_status_id), replies_uri, **options.deep_symbolize_keys) rescue ActiveRecord::RecordNotFound true end diff --git a/app/workers/activitypub/migrated_follow_delivery_worker.rb b/app/workers/activitypub/migrated_follow_delivery_worker.rb new file mode 100644 index 000000000..daf30e0ae --- /dev/null +++ b/app/workers/activitypub/migrated_follow_delivery_worker.rb @@ -0,0 +1,17 @@ +# frozen_string_literal: true + +class ActivityPub::MigratedFollowDeliveryWorker < ActivityPub::DeliveryWorker + def perform(json, source_account_id, inbox_url, old_target_account_id, options = {}) + super(json, source_account_id, inbox_url, options) + unfollow_old_account!(old_target_account_id) + end + + private + + def unfollow_old_account!(old_target_account_id) + old_target_account = Account.find(old_target_account_id) + UnfollowService.new.call(@source_account, old_target_account, skip_unmerge: true) + rescue + true + end +end diff --git a/app/workers/activitypub/move_distribution_worker.rb b/app/workers/activitypub/move_distribution_worker.rb index 65c5c0d1c..1680fcc76 100644 --- a/app/workers/activitypub/move_distribution_worker.rb +++ b/app/workers/activitypub/move_distribution_worker.rb @@ -10,7 +10,7 @@ class ActivityPub::MoveDistributionWorker @migration = AccountMigration.find(migration_id) @account = @migration.account - ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url| + ActivityPub::DeliveryWorker.push_bulk(inboxes, limit: 1_000) do |inbox_url| [signed_payload, @account.id, inbox_url] end diff --git a/app/workers/activitypub/processing_worker.rb b/app/workers/activitypub/processing_worker.rb index 5e36fab51..1bb94b7f2 100644 --- a/app/workers/activitypub/processing_worker.rb +++ b/app/workers/activitypub/processing_worker.rb @@ -15,6 +15,6 @@ class ActivityPub::ProcessingWorker ActivityPub::ProcessCollectionService.new.call(body, actor, override_timestamps: true, delivered_to_account_id: delivered_to_account_id, delivery: true) rescue ActiveRecord::RecordInvalid => e - Rails.logger.debug "Error processing incoming ActivityPub object: #{e}" + Rails.logger.debug { "Error processing incoming ActivityPub object: #{e}" } end end diff --git a/app/workers/activitypub/raw_distribution_worker.rb b/app/workers/activitypub/raw_distribution_worker.rb index 8ecc17db9..c77821e0f 100644 --- a/app/workers/activitypub/raw_distribution_worker.rb +++ b/app/workers/activitypub/raw_distribution_worker.rb @@ -25,7 +25,7 @@ class ActivityPub::RawDistributionWorker def distribute! return if inboxes.empty? - ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url| + ActivityPub::DeliveryWorker.push_bulk(inboxes, limit: 1_000) do |inbox_url| [payload, source_account_id, inbox_url, options] end end |