about summary refs log tree commit diff
path: root/app/services/batched_remove_status_service.rb
diff options
context:
space:
mode:
Diffstat (limited to 'app/services/batched_remove_status_service.rb')
-rw-r--r--app/services/batched_remove_status_service.rb67
1 files changed, 54 insertions, 13 deletions
diff --git a/app/services/batched_remove_status_service.rb b/app/services/batched_remove_status_service.rb
index ab810c628..86eaa5735 100644
--- a/app/services/batched_remove_status_service.rb
+++ b/app/services/batched_remove_status_service.rb
@@ -15,19 +15,26 @@ class BatchedRemoveStatusService < BaseService
     @mentions = statuses.map { |s| [s.id, s.mentions.includes(:account).to_a] }.to_h
     @tags     = statuses.map { |s| [s.id, s.tags.pluck(:name)] }.to_h
 
-    @stream_entry_batches = []
-    @salmon_batches       = []
-    @json_payloads        = statuses.map { |s| [s.id, Oj.dump(event: :delete, payload: s.id)] }.to_h
+    @stream_entry_batches  = []
+    @salmon_batches        = []
+    @activity_json_batches = []
+    @json_payloads         = statuses.map { |s| [s.id, Oj.dump(event: :delete, payload: s.id)] }.to_h
+    @activity_json         = {}
+    @activity_xml          = {}
 
     # Ensure that rendered XML reflects destroyed state
-    Status.where(id: statuses.map(&:id)).in_batches.destroy_all
+    statuses.each(&:destroy)
 
     # Batch by source account
     statuses.group_by(&:account_id).each do |_, account_statuses|
       account = account_statuses.first.account
 
       unpush_from_home_timelines(account_statuses)
-      batch_stream_entries(account_statuses) if account.local?
+
+      if account.local?
+        batch_stream_entries(account, account_statuses)
+        batch_activity_json(account, account_statuses)
+      end
     end
 
     # Cannot be batched
@@ -36,17 +43,32 @@ class BatchedRemoveStatusService < BaseService
       batch_salmon_slaps(status) if status.local?
     end
 
-    Pubsubhubbub::DistributionWorker.push_bulk(@stream_entry_batches) { |batch| batch }
+    Pubsubhubbub::RawDistributionWorker.push_bulk(@stream_entry_batches) { |batch| batch }
     NotificationWorker.push_bulk(@salmon_batches) { |batch| batch }
+    ActivityPub::DeliveryWorker.push_bulk(@activity_json_batches) { |batch| batch }
   end
 
   private
 
-  def batch_stream_entries(statuses)
-    stream_entry_ids = statuses.map { |s| s.stream_entry.id }
+  def batch_stream_entries(account, statuses)
+    statuses.each do |status|
+      @stream_entry_batches << [build_xml(status.stream_entry), account.id]
+    end
+  end
 
-    stream_entry_ids.each_slice(100) do |batch_of_stream_entry_ids|
-      @stream_entry_batches << [batch_of_stream_entry_ids]
+  def batch_activity_json(account, statuses)
+    account.followers.inboxes.each do |inbox_url|
+      statuses.each do |status|
+        @activity_json_batches << [build_json(status), account.id, inbox_url]
+      end
+    end
+
+    statuses.each do |status|
+      other_recipients = (status.mentions + status.reblogs).map(&:account).reject(&:local?).select(&:activitypub?).uniq(&:id)
+
+      other_recipients.each do |target_account|
+        @activity_json_batches << [build_json(status), account.id, target_account.inbox_url]
+      end
     end
   end
 
@@ -78,11 +100,10 @@ class BatchedRemoveStatusService < BaseService
   def batch_salmon_slaps(status)
     return if @mentions[status.id].empty?
 
-    payload    = stream_entry_to_xml(status.stream_entry.reload)
-    recipients = @mentions[status.id].map(&:account).reject(&:local?).uniq(&:domain).map(&:id)
+    recipients = @mentions[status.id].map(&:account).reject(&:local?).select(&:ostatus?).uniq(&:domain).map(&:id)
 
     recipients.each do |recipient_id|
-      @salmon_batches << [payload, status.account_id, recipient_id]
+      @salmon_batches << [build_xml(status.stream_entry), status.account_id, recipient_id]
     end
   end
 
@@ -111,4 +132,24 @@ class BatchedRemoveStatusService < BaseService
   def redis
     Redis.current
   end
+
+  def build_json(status)
+    return @activity_json[status.id] if @activity_json.key?(status.id)
+
+    @activity_json[status.id] = sign_json(status, ActiveModelSerializers::SerializableResource.new(
+      status,
+      serializer: status.reblog? ? ActivityPub::UndoAnnounceSerializer : ActivityPub::DeleteSerializer,
+      adapter: ActivityPub::Adapter
+    ).as_json)
+  end
+
+  def build_xml(stream_entry)
+    return @activity_xml[stream_entry.id] if @activity_xml.key?(stream_entry.id)
+
+    @activity_xml[stream_entry.id] = stream_entry_to_xml(stream_entry)
+  end
+
+  def sign_json(status, json)
+    Oj.dump(ActivityPub::LinkedDataSignature.new(json).sign!(status.account))
+  end
 end