From 4c76402ba1d355061e7e208b7a2f8251388a38e1 Mon Sep 17 00:00:00 2001 From: Eugen Rochko Date: Tue, 29 Aug 2017 16:11:05 +0200 Subject: Serialize ActivityPub alternate link into OStatus deletes, handle it (#4730) Requires moving Atom rendering from DistributionWorker (where `stream_entry.status` is already nil) to inline (where `stream_entry.status.destroyed?` is true) and distributing that. Unfortunately, such XML renderings can no longer be easily chained together into one payload of n items. --- app/services/batched_remove_status_service.rb | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) (limited to 'app/services/batched_remove_status_service.rb') diff --git a/app/services/batched_remove_status_service.rb b/app/services/batched_remove_status_service.rb index e9e22298d..86eaa5735 100644 --- a/app/services/batched_remove_status_service.rb +++ b/app/services/batched_remove_status_service.rb @@ -20,9 +20,10 @@ class BatchedRemoveStatusService < BaseService @activity_json_batches = [] @json_payloads = statuses.map { |s| [s.id, Oj.dump(event: :delete, payload: s.id)] }.to_h @activity_json = {} + @activity_xml = {} # Ensure that rendered XML reflects destroyed state - Status.where(id: statuses.map(&:id)).in_batches.destroy_all + statuses.each(&:destroy) # Batch by source account statuses.group_by(&:account_id).each do |_, account_statuses| @@ -31,7 +32,7 @@ class BatchedRemoveStatusService < BaseService unpush_from_home_timelines(account_statuses) if account.local? - batch_stream_entries(account_statuses) + batch_stream_entries(account, account_statuses) batch_activity_json(account, account_statuses) end end @@ -42,18 +43,16 @@ class BatchedRemoveStatusService < BaseService batch_salmon_slaps(status) if status.local? end - Pubsubhubbub::DistributionWorker.push_bulk(@stream_entry_batches) { |batch| batch } + Pubsubhubbub::RawDistributionWorker.push_bulk(@stream_entry_batches) { |batch| batch } NotificationWorker.push_bulk(@salmon_batches) { |batch| batch } ActivityPub::DeliveryWorker.push_bulk(@activity_json_batches) { |batch| batch } end private - def batch_stream_entries(statuses) - stream_entry_ids = statuses.map { |s| s.stream_entry.id } - - stream_entry_ids.each_slice(100) do |batch_of_stream_entry_ids| - @stream_entry_batches << [batch_of_stream_entry_ids] + def batch_stream_entries(account, statuses) + statuses.each do |status| + @stream_entry_batches << [build_xml(status.stream_entry), account.id] end end @@ -101,11 +100,10 @@ class BatchedRemoveStatusService < BaseService def batch_salmon_slaps(status) return if @mentions[status.id].empty? - payload = stream_entry_to_xml(status.stream_entry.reload) recipients = @mentions[status.id].map(&:account).reject(&:local?).select(&:ostatus?).uniq(&:domain).map(&:id) recipients.each do |recipient_id| - @salmon_batches << [payload, status.account_id, recipient_id] + @salmon_batches << [build_xml(status.stream_entry), status.account_id, recipient_id] end end @@ -145,6 +143,12 @@ class BatchedRemoveStatusService < BaseService ).as_json) end + def build_xml(stream_entry) + return @activity_xml[stream_entry.id] if @activity_xml.key?(stream_entry.id) + + @activity_xml[stream_entry.id] = stream_entry_to_xml(stream_entry) + end + def sign_json(status, json) Oj.dump(ActivityPub::LinkedDataSignature.new(json).sign!(status.account)) end -- cgit