about summary refs log tree commit diff
diff options
context:
space:
mode:
authorClaire <claire.github-309c@sitedethib.com>2021-04-21 04:46:09 +0200
committerchr <chr@cybre.space>2021-04-24 15:41:17 -0700
commitc4a3b7eef372db1414cbd6d62741c61ca7201613 (patch)
treee5669e41100f8069f1b15bc8fadd4af2f4dc7a19
parenta22ab5038aaf90a182f99f45c1cddc234b794388 (diff)
Fix processing of remote Delete activities (#16084) cybrespace-3.3
* Add tests

* Ensure deleted statuses are marked as such

* Save some redis memory by not storing URIs in delete_upon_arrival values

* Avoid possible race condition when processing incoming Deletes

* Avoid potential duplicate Delete forwards

* Lower lock durations to reduce issues in case of hard crash of the Rails process

* Check for `lock.aquired?` and improve comment

* Refactor RedisLock usage in app/lib/activitypub

* Fix using incorrect or non-existent sender for relaying Deletes
-rw-r--r--app/lib/activitypub/activity.rb14
-rw-r--r--app/lib/activitypub/activity/announce.rb36
-rw-r--r--app/lib/activitypub/activity/create.rb36
-rw-r--r--app/lib/activitypub/activity/delete.rb62
-rw-r--r--app/services/remove_status_service.rb2
-rw-r--r--spec/lib/activitypub/activity/delete_spec.rb20
6 files changed, 91 insertions, 79 deletions
diff --git a/app/lib/activitypub/activity.rb b/app/lib/activitypub/activity.rb
index 85a3370ee..d5f53c525 100644
--- a/app/lib/activitypub/activity.rb
+++ b/app/lib/activitypub/activity.rb
@@ -149,7 +149,7 @@ class ActivityPub::Activity
   end
 
   def delete_later!(uri)
-    redis.setex("delete_upon_arrival:#{@account.id}:#{uri}", 6.hours.seconds, uri)
+    redis.setex("delete_upon_arrival:#{@account.id}:#{uri}", 6.hours.seconds, true)
   end
 
   def status_from_object
@@ -215,12 +215,22 @@ class ActivityPub::Activity
     end
   end
 
-  def lock_or_return(key, expire_after = 7.days.seconds)
+  def lock_or_return(key, expire_after = 2.hours.seconds)
     yield if redis.set(key, true, nx: true, ex: expire_after)
   ensure
     redis.del(key)
   end
 
+  def lock_or_fail(key)
+    RedisLock.acquire({ redis: Redis.current, key: key }) do |lock|
+      if lock.acquired?
+        yield
+      else
+        raise Mastodon::RaceConditionError
+      end
+    end
+  end
+
   def fetch?
     !@options[:delivery]
   end
diff --git a/app/lib/activitypub/activity/announce.rb b/app/lib/activitypub/activity/announce.rb
index 349e8f77e..e2ffafce6 100644
--- a/app/lib/activitypub/activity/announce.rb
+++ b/app/lib/activitypub/activity/announce.rb
@@ -4,29 +4,25 @@ class ActivityPub::Activity::Announce < ActivityPub::Activity
   def perform
     return reject_payload! if delete_arrived_first?(@json['id']) || !related_to_local_activity?
 
-    RedisLock.acquire(lock_options) do |lock|
-      if lock.acquired?
-        original_status = status_from_object
+    lock_or_fail("announce:#{@object['id']}") do
+      original_status = status_from_object
 
-        return reject_payload! if original_status.nil? || !announceable?(original_status)
+      return reject_payload! if original_status.nil? || !announceable?(original_status)
 
-        @status = Status.find_by(account: @account, reblog: original_status)
+      @status = Status.find_by(account: @account, reblog: original_status)
 
-        return @status unless @status.nil?
+      return @status unless @status.nil?
 
-        @status = Status.create!(
-          account: @account,
-          reblog: original_status,
-          uri: @json['id'],
-          created_at: @json['published'],
-          override_timestamps: @options[:override_timestamps],
-          visibility: visibility_from_audience
-        )
+      @status = Status.create!(
+        account: @account,
+        reblog: original_status,
+        uri: @json['id'],
+        created_at: @json['published'],
+        override_timestamps: @options[:override_timestamps],
+        visibility: visibility_from_audience
+      )
 
-        distribute(@status)
-      else
-        raise Mastodon::RaceConditionError
-      end
+      distribute(@status)
     end
 
     @status
@@ -69,8 +65,4 @@ class ActivityPub::Activity::Announce < ActivityPub::Activity
   def reblog_of_local_status?
     status_from_uri(object_uri)&.account&.local?
   end
-
-  def lock_options
-    { redis: Redis.current, key: "announce:#{@object['id']}" }
-  end
 end
diff --git a/app/lib/activitypub/activity/create.rb b/app/lib/activitypub/activity/create.rb
index 99d52407b..e694b93ad 100644
--- a/app/lib/activitypub/activity/create.rb
+++ b/app/lib/activitypub/activity/create.rb
@@ -45,19 +45,15 @@ class ActivityPub::Activity::Create < ActivityPub::Activity
   def create_status
     return reject_payload! if unsupported_object_type? || invalid_origin?(object_uri) || tombstone_exists? || !related_to_local_activity?
 
-    RedisLock.acquire(lock_options) do |lock|
-      if lock.acquired?
-        return if delete_arrived_first?(object_uri) || poll_vote? # rubocop:disable Lint/NonLocalExitFromIterator
+    lock_or_fail("create:#{object_uri}") do
+      return if delete_arrived_first?(object_uri) || poll_vote? # rubocop:disable Lint/NonLocalExitFromIterator
 
-        @status = find_existing_status
+      @status = find_existing_status
 
-        if @status.nil?
-          process_status
-        elsif @options[:delivered_to_account_id].present?
-          postprocess_audience_and_deliver
-        end
-      else
-        raise Mastodon::RaceConditionError
+      if @status.nil?
+        process_status
+      elsif @options[:delivered_to_account_id].present?
+        postprocess_audience_and_deliver
       end
     end
 
@@ -315,13 +311,9 @@ class ActivityPub::Activity::Create < ActivityPub::Activity
     poll = replied_to_status.preloadable_poll
     already_voted = true
 
-    RedisLock.acquire(poll_lock_options) do |lock|
-      if lock.acquired?
-        already_voted = poll.votes.where(account: @account).exists?
-        poll.votes.create!(account: @account, choice: poll.options.index(@object['name']), uri: object_uri)
-      else
-        raise Mastodon::RaceConditionError
-      end
+    lock_or_fail("vote:#{replied_to_status.poll_id}:#{@account.id}") do
+      already_voted = poll.votes.where(account: @account).exists?
+      poll.votes.create!(account: @account, choice: poll.options.index(@object['name']), uri: object_uri)
     end
 
     increment_voters_count! unless already_voted
@@ -514,12 +506,4 @@ class ActivityPub::Activity::Create < ActivityPub::Activity
     poll.reload
     retry
   end
-
-  def lock_options
-    { redis: Redis.current, key: "create:#{object_uri}" }
-  end
-
-  def poll_lock_options
-    { redis: Redis.current, key: "vote:#{replied_to_status.poll_id}:#{@account.id}" }
-  end
 end
diff --git a/app/lib/activitypub/activity/delete.rb b/app/lib/activitypub/activity/delete.rb
index 2e5293b83..801647cf7 100644
--- a/app/lib/activitypub/activity/delete.rb
+++ b/app/lib/activitypub/activity/delete.rb
@@ -20,33 +20,35 @@ class ActivityPub::Activity::Delete < ActivityPub::Activity
   def delete_note
     return if object_uri.nil?
 
-    unless invalid_origin?(object_uri)
-      RedisLock.acquire(lock_options) { |_lock| delete_later!(object_uri) }
-      Tombstone.find_or_create_by(uri: object_uri, account: @account)
-    end
+    lock_or_return("delete_status_in_progress:#{object_uri}", 5.minutes.seconds) do
+      unless invalid_origin?(object_uri)
+        # This lock ensures a concurrent `ActivityPub::Activity::Create` either
+        # does not create a status at all, or has finished saving it to the
+        # database before we try to load it.
+        # Without the lock, `delete_later!` could be called after `delete_arrived_first?`
+        # and `Status.find` before `Status.create!`
+        lock_or_fail("create:#{object_uri}") { delete_later!(object_uri) }
 
-    @status   = Status.find_by(uri: object_uri, account: @account)
-    @status ||= Status.find_by(uri: @object['atomUri'], account: @account) if @object.is_a?(Hash) && @object['atomUri'].present?
+        Tombstone.find_or_create_by(uri: object_uri, account: @account)
+      end
 
-    return if @status.nil?
+      @status   = Status.find_by(uri: object_uri, account: @account)
+      @status ||= Status.find_by(uri: @object['atomUri'], account: @account) if @object.is_a?(Hash) && @object['atomUri'].present?
 
-    if @status.distributable?
-      forward_for_reply
-      forward_for_reblogs
-    end
+      return if @status.nil?
 
-    delete_now!
+      forward! if @json['signature'].present? && @status.distributable?
+      delete_now!
+    end
   end
 
-  def forward_for_reblogs
-    return if @json['signature'].blank?
-
-    rebloggers_ids = @status.reblogs.includes(:account).references(:account).merge(Account.local).pluck(:account_id)
-    inboxes        = Account.where(id: ::Follow.where(target_account_id: rebloggers_ids).select(:account_id)).inboxes - [@account.preferred_inbox_url]
+  def rebloggers_ids
+    return @rebloggers_ids if defined?(@rebloggers_ids)
+    @rebloggers_ids = @status.reblogs.includes(:account).references(:account).merge(Account.local).pluck(:account_id)
+  end
 
-    ActivityPub::LowPriorityDeliveryWorker.push_bulk(inboxes) do |inbox_url|
-      [payload, rebloggers_ids.first, inbox_url]
-    end
+  def inboxes_for_reblogs
+    Account.where(id: ::Follow.where(target_account_id: rebloggers_ids).select(:account_id)).inboxes
   end
 
   def replied_to_status
@@ -58,13 +60,19 @@ class ActivityPub::Activity::Delete < ActivityPub::Activity
     !replied_to_status.nil? && replied_to_status.account.local?
   end
 
-  def forward_for_reply
-    return unless @json['signature'].present? && reply_to_local?
+  def inboxes_for_reply
+    replied_to_status.account.followers.inboxes
+  end
+
+  def forward!
+    inboxes = inboxes_for_reblogs
+    inboxes += inboxes_for_reply if reply_to_local?
+    inboxes -= [@account.preferred_inbox_url]
 
-    inboxes = replied_to_status.account.followers.inboxes - [@account.preferred_inbox_url]
+    sender_id = reply_to_local? ? replied_to_status.account_id : rebloggers_ids.first
 
-    ActivityPub::LowPriorityDeliveryWorker.push_bulk(inboxes) do |inbox_url|
-      [payload, replied_to_status.account_id, inbox_url]
+    ActivityPub::LowPriorityDeliveryWorker.push_bulk(inboxes.uniq) do |inbox_url|
+      [payload, sender_id, inbox_url]
     end
   end
 
@@ -75,8 +83,4 @@ class ActivityPub::Activity::Delete < ActivityPub::Activity
   def payload
     @payload ||= Oj.dump(@json)
   end
-
-  def lock_options
-    { redis: Redis.current, key: "create:#{object_uri}" }
-  end
 end
diff --git a/app/services/remove_status_service.rb b/app/services/remove_status_service.rb
index d6043fb5d..52d3f108c 100644
--- a/app/services/remove_status_service.rb
+++ b/app/services/remove_status_service.rb
@@ -16,6 +16,8 @@ class RemoveStatusService < BaseService
     @account  = status.account
     @options  = options
 
+    @status.discard
+
     RedisLock.acquire(lock_options) do |lock|
       if lock.acquired?
         remove_from_self if @account.local?
diff --git a/spec/lib/activitypub/activity/delete_spec.rb b/spec/lib/activitypub/activity/delete_spec.rb
index 37b93ecf7..9dfb8a61b 100644
--- a/spec/lib/activitypub/activity/delete_spec.rb
+++ b/spec/lib/activitypub/activity/delete_spec.rb
@@ -49,4 +49,24 @@ RSpec.describe ActivityPub::Activity::Delete do
       end
     end
   end
+
+  context 'when the status has been reported' do
+    describe '#perform' do
+      subject { described_class.new(json, sender) }
+      let!(:reporter) { Fabricate(:account) }
+
+      before do
+        reporter.reports.create!(target_account: status.account, status_ids: [status.id], forwarded: false)
+        subject.perform
+      end
+
+      it 'marks the status as deleted' do
+        expect(Status.find_by(id: status.id)).to be_nil
+      end
+
+      it 'actually keeps a copy for inspection' do
+        expect(Status.with_discarded.find_by(id: status.id)).to_not be_nil
+      end
+    end
+  end
 end