about summary refs log tree commit diff
path: root/app/workers/activitypub
diff options
context:
space:
mode:
Diffstat (limited to 'app/workers/activitypub')
-rw-r--r--app/workers/activitypub/distribution_worker.rb48
-rw-r--r--app/workers/activitypub/raw_distribution_worker.rb37
-rw-r--r--app/workers/activitypub/reply_distribution_worker.rb34
-rw-r--r--app/workers/activitypub/update_distribution_worker.rb25
4 files changed, 52 insertions, 92 deletions
diff --git a/app/workers/activitypub/distribution_worker.rb b/app/workers/activitypub/distribution_worker.rb
index 09898ca49..17c108461 100644
--- a/app/workers/activitypub/distribution_worker.rb
+++ b/app/workers/activitypub/distribution_worker.rb
@@ -1,54 +1,32 @@
 # frozen_string_literal: true
 
-class ActivityPub::DistributionWorker
-  include Sidekiq::Worker
-  include Payloadable
-
-  sidekiq_options queue: 'push'
-
+class ActivityPub::DistributionWorker < ActivityPub::RawDistributionWorker
+  # Distribute a new status or an edit of a status to all the places
+  # where the status is supposed to go or where it was interacted with
   def perform(status_id)
     @status  = Status.find(status_id)
     @account = @status.account
 
-    return if skip_distribution?
-
-    ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url|
-      [payload, @account.id, inbox_url, { synchronize_followers: !@status.distributable? }]
-    end
-
-    relay! if relayable?
+    distribute!
   rescue ActiveRecord::RecordNotFound
     true
   end
 
-  private
-
-  def skip_distribution?
-    @status.direct_visibility? || @status.limited_visibility?
-  end
-
-  def relayable?
-    @status.public_visibility?
-  end
+  protected
 
   def inboxes
-    # Deliver the status to all followers.
-    # If the status is a reply to another local status, also forward it to that
-    # status' authors' followers.
-    @inboxes ||= if @status.in_reply_to_local_account? && @status.distributable?
-                   @account.followers.or(@status.thread.account.followers).inboxes
-                 else
-                   @account.followers.inboxes
-                 end
+    @inboxes ||= StatusReachFinder.new(@status).inboxes
   end
 
   def payload
-    @payload ||= Oj.dump(serialize_payload(ActivityPub::ActivityPresenter.from_status(@status), ActivityPub::ActivitySerializer, signer: @account))
+    @payload ||= Oj.dump(serialize_payload(activity, ActivityPub::ActivitySerializer, signer: @account))
+  end
+
+  def activity
+    ActivityPub::ActivityPresenter.from_status(@status)
   end
 
-  def relay!
-    ActivityPub::DeliveryWorker.push_bulk(Relay.enabled.pluck(:inbox_url)) do |inbox_url|
-      [payload, @account.id, inbox_url]
-    end
+  def options
+    { synchronize_followers: @status.private_visibility? }
   end
 end
diff --git a/app/workers/activitypub/raw_distribution_worker.rb b/app/workers/activitypub/raw_distribution_worker.rb
index 41e61132f..8ecc17db9 100644
--- a/app/workers/activitypub/raw_distribution_worker.rb
+++ b/app/workers/activitypub/raw_distribution_worker.rb
@@ -2,22 +2,47 @@
 
 class ActivityPub::RawDistributionWorker
   include Sidekiq::Worker
+  include Payloadable
 
   sidekiq_options queue: 'push'
 
+  # Base worker for when you want to queue up a bunch of deliveries of
+  # some payload. In this case, we have already generated JSON and
+  # we are going to distribute it to the account's followers minus
+  # the explicitly provided inboxes
   def perform(json, source_account_id, exclude_inboxes = [])
-    @account = Account.find(source_account_id)
+    @account         = Account.find(source_account_id)
+    @json            = json
+    @exclude_inboxes = exclude_inboxes
 
-    ActivityPub::DeliveryWorker.push_bulk(inboxes - exclude_inboxes) do |inbox_url|
-      [json, @account.id, inbox_url]
-    end
+    distribute!
   rescue ActiveRecord::RecordNotFound
     true
   end
 
-  private
+  protected
+
+  def distribute!
+    return if inboxes.empty?
+
+    ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url|
+      [payload, source_account_id, inbox_url, options]
+    end
+  end
+
+  def payload
+    @json
+  end
+
+  def source_account_id
+    @account.id
+  end
 
   def inboxes
-    @inboxes ||= @account.followers.inboxes
+    @inboxes ||= @account.followers.inboxes - @exclude_inboxes
+  end
+
+  def options
+    {}
   end
 end
diff --git a/app/workers/activitypub/reply_distribution_worker.rb b/app/workers/activitypub/reply_distribution_worker.rb
deleted file mode 100644
index d4d0148ac..000000000
--- a/app/workers/activitypub/reply_distribution_worker.rb
+++ /dev/null
@@ -1,34 +0,0 @@
-# frozen_string_literal: true
-
-# Obsolete but kept around to make sure existing jobs do not fail after upgrade.
-# Should be removed in a subsequent release.
-
-class ActivityPub::ReplyDistributionWorker
-  include Sidekiq::Worker
-  include Payloadable
-
-  sidekiq_options queue: 'push'
-
-  def perform(status_id)
-    @status  = Status.find(status_id)
-    @account = @status.thread&.account
-
-    return unless @account.present? && @status.distributable?
-
-    ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url|
-      [payload, @status.account_id, inbox_url]
-    end
-  rescue ActiveRecord::RecordNotFound
-    true
-  end
-
-  private
-
-  def inboxes
-    @inboxes ||= @account.followers.inboxes
-  end
-
-  def payload
-    @payload ||= Oj.dump(serialize_payload(ActivityPub::ActivityPresenter.from_status(@status), ActivityPub::ActivitySerializer, signer: @status.account))
-  end
-end
diff --git a/app/workers/activitypub/update_distribution_worker.rb b/app/workers/activitypub/update_distribution_worker.rb
index 3a207f071..81fde63b8 100644
--- a/app/workers/activitypub/update_distribution_worker.rb
+++ b/app/workers/activitypub/update_distribution_worker.rb
@@ -1,33 +1,24 @@
 # frozen_string_literal: true
 
-class ActivityPub::UpdateDistributionWorker
-  include Sidekiq::Worker
-  include Payloadable
-
-  sidekiq_options queue: 'push'
-
+class ActivityPub::UpdateDistributionWorker < ActivityPub::RawDistributionWorker
+  # Distribute an profile update to servers that might have a copy
+  # of the account in question
   def perform(account_id, options = {})
     @options = options.with_indifferent_access
     @account = Account.find(account_id)
 
-    ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url|
-      [signed_payload, @account.id, inbox_url]
-    end
-
-    ActivityPub::DeliveryWorker.push_bulk(Relay.enabled.pluck(:inbox_url)) do |inbox_url|
-      [signed_payload, @account.id, inbox_url]
-    end
+    distribute!
   rescue ActiveRecord::RecordNotFound
     true
   end
 
-  private
+  protected
 
   def inboxes
-    @inboxes ||= @account.followers.inboxes
+    @inboxes ||= AccountReachFinder.new(@account).inboxes
   end
 
-  def signed_payload
-    @signed_payload ||= Oj.dump(serialize_payload(@account, ActivityPub::UpdateSerializer, signer: @account, sign_with: @options[:sign_with]))
+  def payload
+    @payload ||= Oj.dump(serialize_payload(@account, ActivityPub::UpdateSerializer, signer: @account, sign_with: @options[:sign_with]))
   end
 end