about summary refs log tree commit diff
path: root/app
diff options
context:
space:
mode:
authorEugen Rochko <eugen@zeonfederated.com>2017-08-29 16:11:05 +0200
committerGitHub <noreply@github.com>2017-08-29 16:11:05 +0200
commit4c76402ba1d355061e7e208b7a2f8251388a38e1 (patch)
treef76f71be7326e16ccaeaf74402f36c79973b734b /app
parent9958eba356210f1d0b89db368e17bbd72358e097 (diff)
Serialize ActivityPub alternate link into OStatus deletes, handle it (#4730)
Requires moving Atom rendering from DistributionWorker (where
`stream_entry.status` is already nil) to inline (where
`stream_entry.status.destroyed?` is true) and distributing that.

Unfortunately, such XML renderings can no longer be easily chained
together into one payload of n items.
Diffstat (limited to 'app')
-rw-r--r--app/lib/ostatus/activity/deletion.rb4
-rw-r--r--app/lib/ostatus/atom_serializer.rb3
-rw-r--r--app/models/status.rb13
-rw-r--r--app/services/batched_remove_status_service.rb24
-rw-r--r--app/services/remove_status_service.rb4
-rw-r--r--app/workers/pubsubhubbub/raw_distribution_worker.rb22
6 files changed, 54 insertions, 16 deletions
diff --git a/app/lib/ostatus/activity/deletion.rb b/app/lib/ostatus/activity/deletion.rb
index 860faf501..c98f5ee0a 100644
--- a/app/lib/ostatus/activity/deletion.rb
+++ b/app/lib/ostatus/activity/deletion.rb
@@ -3,7 +3,9 @@
 class OStatus::Activity::Deletion < OStatus::Activity::Base
   def perform
     Rails.logger.debug "Deleting remote status #{id}"
-    status = Status.find_by(uri: id, account: @account)
+
+    status   = Status.find_by(uri: id, account: @account)
+    status ||= Status.find_by(uri: activitypub_uri, account: @account) if activitypub_uri?
 
     if status.nil?
       redis.setex("delete_upon_arrival:#{@account.id}:#{id}", 6 * 3_600, id)
diff --git a/app/lib/ostatus/atom_serializer.rb b/app/lib/ostatus/atom_serializer.rb
index 92a16d228..81fae4140 100644
--- a/app/lib/ostatus/atom_serializer.rb
+++ b/app/lib/ostatus/atom_serializer.rb
@@ -79,6 +79,9 @@ class OStatus::AtomSerializer
 
     if stream_entry.status.nil?
       append_element(entry, 'content', 'Deleted status')
+    elsif stream_entry.status.destroyed?
+      append_element(entry, 'content', 'Deleted status')
+      append_element(entry, 'link', nil, rel: :alternate, type: 'application/activity+json', href: ActivityPub::TagManager.instance.uri_for(stream_entry.status)) if stream_entry.account.local?
     else
       serialize_status_attributes(entry, stream_entry.status)
     end
diff --git a/app/models/status.rb b/app/models/status.rb
index 3dc83ad1f..abd902cd7 100644
--- a/app/models/status.rb
+++ b/app/models/status.rb
@@ -51,6 +51,7 @@ class Status < ApplicationRecord
 
   has_one :notification, as: :activity, dependent: :destroy
   has_one :preview_card, dependent: :destroy
+  has_one :stream_entry, as: :activity, inverse_of: :status
 
   validates :uri, uniqueness: true, unless: :local?
   validates :text, presence: true, unless: :reblog?
@@ -90,7 +91,11 @@ class Status < ApplicationRecord
   end
 
   def verb
-    reblog? ? :share : :post
+    if destroyed?
+      :delete
+    else
+      reblog? ? :share : :post
+    end
   end
 
   def object_type
@@ -110,7 +115,11 @@ class Status < ApplicationRecord
   end
 
   def title
-    reblog? ? "#{account.acct} shared a status by #{reblog.account.acct}" : "New status by #{account.acct}"
+    if destroyed?
+      "#{account.acct} deleted status"
+    else
+      reblog? ? "#{account.acct} shared a status by #{reblog.account.acct}" : "New status by #{account.acct}"
+    end
   end
 
   def hidden?
diff --git a/app/services/batched_remove_status_service.rb b/app/services/batched_remove_status_service.rb
index e9e22298d..86eaa5735 100644
--- a/app/services/batched_remove_status_service.rb
+++ b/app/services/batched_remove_status_service.rb
@@ -20,9 +20,10 @@ class BatchedRemoveStatusService < BaseService
     @activity_json_batches = []
     @json_payloads         = statuses.map { |s| [s.id, Oj.dump(event: :delete, payload: s.id)] }.to_h
     @activity_json         = {}
+    @activity_xml          = {}
 
     # Ensure that rendered XML reflects destroyed state
-    Status.where(id: statuses.map(&:id)).in_batches.destroy_all
+    statuses.each(&:destroy)
 
     # Batch by source account
     statuses.group_by(&:account_id).each do |_, account_statuses|
@@ -31,7 +32,7 @@ class BatchedRemoveStatusService < BaseService
       unpush_from_home_timelines(account_statuses)
 
       if account.local?
-        batch_stream_entries(account_statuses)
+        batch_stream_entries(account, account_statuses)
         batch_activity_json(account, account_statuses)
       end
     end
@@ -42,18 +43,16 @@ class BatchedRemoveStatusService < BaseService
       batch_salmon_slaps(status) if status.local?
     end
 
-    Pubsubhubbub::DistributionWorker.push_bulk(@stream_entry_batches) { |batch| batch }
+    Pubsubhubbub::RawDistributionWorker.push_bulk(@stream_entry_batches) { |batch| batch }
     NotificationWorker.push_bulk(@salmon_batches) { |batch| batch }
     ActivityPub::DeliveryWorker.push_bulk(@activity_json_batches) { |batch| batch }
   end
 
   private
 
-  def batch_stream_entries(statuses)
-    stream_entry_ids = statuses.map { |s| s.stream_entry.id }
-
-    stream_entry_ids.each_slice(100) do |batch_of_stream_entry_ids|
-      @stream_entry_batches << [batch_of_stream_entry_ids]
+  def batch_stream_entries(account, statuses)
+    statuses.each do |status|
+      @stream_entry_batches << [build_xml(status.stream_entry), account.id]
     end
   end
 
@@ -101,11 +100,10 @@ class BatchedRemoveStatusService < BaseService
   def batch_salmon_slaps(status)
     return if @mentions[status.id].empty?
 
-    payload    = stream_entry_to_xml(status.stream_entry.reload)
     recipients = @mentions[status.id].map(&:account).reject(&:local?).select(&:ostatus?).uniq(&:domain).map(&:id)
 
     recipients.each do |recipient_id|
-      @salmon_batches << [payload, status.account_id, recipient_id]
+      @salmon_batches << [build_xml(status.stream_entry), status.account_id, recipient_id]
     end
   end
 
@@ -145,6 +143,12 @@ class BatchedRemoveStatusService < BaseService
     ).as_json)
   end
 
+  def build_xml(stream_entry)
+    return @activity_xml[stream_entry.id] if @activity_xml.key?(stream_entry.id)
+
+    @activity_xml[stream_entry.id] = stream_entry_to_xml(stream_entry)
+  end
+
   def sign_json(status, json)
     Oj.dump(ActivityPub::LinkedDataSignature.new(json).sign!(status.account))
   end
diff --git a/app/services/remove_status_service.rb b/app/services/remove_status_service.rb
index 7ddbd8906..83fc77043 100644
--- a/app/services/remove_status_service.rb
+++ b/app/services/remove_status_service.rb
@@ -22,8 +22,6 @@ class RemoveStatusService < BaseService
 
     return unless @account.local?
 
-    @stream_entry = @stream_entry.reload
-
     remove_from_remote_followers
     remove_from_remote_affected
   end
@@ -62,7 +60,7 @@ class RemoveStatusService < BaseService
 
   def remove_from_remote_followers
     # OStatus
-    Pubsubhubbub::DistributionWorker.perform_async(@stream_entry.id)
+    Pubsubhubbub::RawDistributionWorker.perform_async(salmon_xml, @account.id)
 
     # ActivityPub
     ActivityPub::DeliveryWorker.push_bulk(@account.followers.inboxes) do |inbox_url|
diff --git a/app/workers/pubsubhubbub/raw_distribution_worker.rb b/app/workers/pubsubhubbub/raw_distribution_worker.rb
new file mode 100644
index 000000000..16962a623
--- /dev/null
+++ b/app/workers/pubsubhubbub/raw_distribution_worker.rb
@@ -0,0 +1,22 @@
+# frozen_string_literal: true
+
+class Pubsubhubbub::RawDistributionWorker
+  include Sidekiq::Worker
+
+  sidekiq_options queue: 'push'
+
+  def perform(xml, source_account_id)
+    @account       = Account.find(source_account_id)
+    @subscriptions = active_subscriptions.to_a
+
+    Pubsubhubbub::DeliveryWorker.push_bulk(@subscriptions) do |subscription|
+      [subscription.id, xml]
+    end
+  end
+
+  private
+
+  def active_subscriptions
+    Subscription.where(account: @account).active.select('id, callback_url, domain')
+  end
+end