From c4a3b7eef372db1414cbd6d62741c61ca7201613 Mon Sep 17 00:00:00 2001 From: Claire Date: Wed, 21 Apr 2021 04:46:09 +0200 Subject: Fix processing of remote Delete activities (#16084) * Add tests * Ensure deleted statuses are marked as such * Save some redis memory by not storing URIs in delete_upon_arrival values * Avoid possible race condition when processing incoming Deletes * Avoid potential duplicate Delete forwards * Lower lock durations to reduce issues in case of hard crash of the Rails process * Check for `lock.aquired?` and improve comment * Refactor RedisLock usage in app/lib/activitypub * Fix using incorrect or non-existent sender for relaying Deletes --- app/lib/activitypub/activity.rb | 14 ++++++- app/lib/activitypub/activity/announce.rb | 36 +++++++--------- app/lib/activitypub/activity/create.rb | 36 +++++----------- app/lib/activitypub/activity/delete.rb | 62 +++++++++++++++------------- app/services/remove_status_service.rb | 2 + spec/lib/activitypub/activity/delete_spec.rb | 20 +++++++++ 6 files changed, 91 insertions(+), 79 deletions(-) diff --git a/app/lib/activitypub/activity.rb b/app/lib/activitypub/activity.rb index 85a3370ee..d5f53c525 100644 --- a/app/lib/activitypub/activity.rb +++ b/app/lib/activitypub/activity.rb @@ -149,7 +149,7 @@ class ActivityPub::Activity end def delete_later!(uri) - redis.setex("delete_upon_arrival:#{@account.id}:#{uri}", 6.hours.seconds, uri) + redis.setex("delete_upon_arrival:#{@account.id}:#{uri}", 6.hours.seconds, true) end def status_from_object @@ -215,12 +215,22 @@ class ActivityPub::Activity end end - def lock_or_return(key, expire_after = 7.days.seconds) + def lock_or_return(key, expire_after = 2.hours.seconds) yield if redis.set(key, true, nx: true, ex: expire_after) ensure redis.del(key) end + def lock_or_fail(key) + RedisLock.acquire({ redis: Redis.current, key: key }) do |lock| + if lock.acquired? + yield + else + raise Mastodon::RaceConditionError + end + end + end + def fetch? !@options[:delivery] end diff --git a/app/lib/activitypub/activity/announce.rb b/app/lib/activitypub/activity/announce.rb index 349e8f77e..e2ffafce6 100644 --- a/app/lib/activitypub/activity/announce.rb +++ b/app/lib/activitypub/activity/announce.rb @@ -4,29 +4,25 @@ class ActivityPub::Activity::Announce < ActivityPub::Activity def perform return reject_payload! if delete_arrived_first?(@json['id']) || !related_to_local_activity? - RedisLock.acquire(lock_options) do |lock| - if lock.acquired? - original_status = status_from_object + lock_or_fail("announce:#{@object['id']}") do + original_status = status_from_object - return reject_payload! if original_status.nil? || !announceable?(original_status) + return reject_payload! if original_status.nil? || !announceable?(original_status) - @status = Status.find_by(account: @account, reblog: original_status) + @status = Status.find_by(account: @account, reblog: original_status) - return @status unless @status.nil? + return @status unless @status.nil? - @status = Status.create!( - account: @account, - reblog: original_status, - uri: @json['id'], - created_at: @json['published'], - override_timestamps: @options[:override_timestamps], - visibility: visibility_from_audience - ) + @status = Status.create!( + account: @account, + reblog: original_status, + uri: @json['id'], + created_at: @json['published'], + override_timestamps: @options[:override_timestamps], + visibility: visibility_from_audience + ) - distribute(@status) - else - raise Mastodon::RaceConditionError - end + distribute(@status) end @status @@ -69,8 +65,4 @@ class ActivityPub::Activity::Announce < ActivityPub::Activity def reblog_of_local_status? status_from_uri(object_uri)&.account&.local? end - - def lock_options - { redis: Redis.current, key: "announce:#{@object['id']}" } - end end diff --git a/app/lib/activitypub/activity/create.rb b/app/lib/activitypub/activity/create.rb index 99d52407b..e694b93ad 100644 --- a/app/lib/activitypub/activity/create.rb +++ b/app/lib/activitypub/activity/create.rb @@ -45,19 +45,15 @@ class ActivityPub::Activity::Create < ActivityPub::Activity def create_status return reject_payload! if unsupported_object_type? || invalid_origin?(object_uri) || tombstone_exists? || !related_to_local_activity? - RedisLock.acquire(lock_options) do |lock| - if lock.acquired? - return if delete_arrived_first?(object_uri) || poll_vote? # rubocop:disable Lint/NonLocalExitFromIterator + lock_or_fail("create:#{object_uri}") do + return if delete_arrived_first?(object_uri) || poll_vote? # rubocop:disable Lint/NonLocalExitFromIterator - @status = find_existing_status + @status = find_existing_status - if @status.nil? - process_status - elsif @options[:delivered_to_account_id].present? - postprocess_audience_and_deliver - end - else - raise Mastodon::RaceConditionError + if @status.nil? + process_status + elsif @options[:delivered_to_account_id].present? + postprocess_audience_and_deliver end end @@ -315,13 +311,9 @@ class ActivityPub::Activity::Create < ActivityPub::Activity poll = replied_to_status.preloadable_poll already_voted = true - RedisLock.acquire(poll_lock_options) do |lock| - if lock.acquired? - already_voted = poll.votes.where(account: @account).exists? - poll.votes.create!(account: @account, choice: poll.options.index(@object['name']), uri: object_uri) - else - raise Mastodon::RaceConditionError - end + lock_or_fail("vote:#{replied_to_status.poll_id}:#{@account.id}") do + already_voted = poll.votes.where(account: @account).exists? + poll.votes.create!(account: @account, choice: poll.options.index(@object['name']), uri: object_uri) end increment_voters_count! unless already_voted @@ -514,12 +506,4 @@ class ActivityPub::Activity::Create < ActivityPub::Activity poll.reload retry end - - def lock_options - { redis: Redis.current, key: "create:#{object_uri}" } - end - - def poll_lock_options - { redis: Redis.current, key: "vote:#{replied_to_status.poll_id}:#{@account.id}" } - end end diff --git a/app/lib/activitypub/activity/delete.rb b/app/lib/activitypub/activity/delete.rb index 2e5293b83..801647cf7 100644 --- a/app/lib/activitypub/activity/delete.rb +++ b/app/lib/activitypub/activity/delete.rb @@ -20,33 +20,35 @@ class ActivityPub::Activity::Delete < ActivityPub::Activity def delete_note return if object_uri.nil? - unless invalid_origin?(object_uri) - RedisLock.acquire(lock_options) { |_lock| delete_later!(object_uri) } - Tombstone.find_or_create_by(uri: object_uri, account: @account) - end + lock_or_return("delete_status_in_progress:#{object_uri}", 5.minutes.seconds) do + unless invalid_origin?(object_uri) + # This lock ensures a concurrent `ActivityPub::Activity::Create` either + # does not create a status at all, or has finished saving it to the + # database before we try to load it. + # Without the lock, `delete_later!` could be called after `delete_arrived_first?` + # and `Status.find` before `Status.create!` + lock_or_fail("create:#{object_uri}") { delete_later!(object_uri) } - @status = Status.find_by(uri: object_uri, account: @account) - @status ||= Status.find_by(uri: @object['atomUri'], account: @account) if @object.is_a?(Hash) && @object['atomUri'].present? + Tombstone.find_or_create_by(uri: object_uri, account: @account) + end - return if @status.nil? + @status = Status.find_by(uri: object_uri, account: @account) + @status ||= Status.find_by(uri: @object['atomUri'], account: @account) if @object.is_a?(Hash) && @object['atomUri'].present? - if @status.distributable? - forward_for_reply - forward_for_reblogs - end + return if @status.nil? - delete_now! + forward! if @json['signature'].present? && @status.distributable? + delete_now! + end end - def forward_for_reblogs - return if @json['signature'].blank? - - rebloggers_ids = @status.reblogs.includes(:account).references(:account).merge(Account.local).pluck(:account_id) - inboxes = Account.where(id: ::Follow.where(target_account_id: rebloggers_ids).select(:account_id)).inboxes - [@account.preferred_inbox_url] + def rebloggers_ids + return @rebloggers_ids if defined?(@rebloggers_ids) + @rebloggers_ids = @status.reblogs.includes(:account).references(:account).merge(Account.local).pluck(:account_id) + end - ActivityPub::LowPriorityDeliveryWorker.push_bulk(inboxes) do |inbox_url| - [payload, rebloggers_ids.first, inbox_url] - end + def inboxes_for_reblogs + Account.where(id: ::Follow.where(target_account_id: rebloggers_ids).select(:account_id)).inboxes end def replied_to_status @@ -58,13 +60,19 @@ class ActivityPub::Activity::Delete < ActivityPub::Activity !replied_to_status.nil? && replied_to_status.account.local? end - def forward_for_reply - return unless @json['signature'].present? && reply_to_local? + def inboxes_for_reply + replied_to_status.account.followers.inboxes + end + + def forward! + inboxes = inboxes_for_reblogs + inboxes += inboxes_for_reply if reply_to_local? + inboxes -= [@account.preferred_inbox_url] - inboxes = replied_to_status.account.followers.inboxes - [@account.preferred_inbox_url] + sender_id = reply_to_local? ? replied_to_status.account_id : rebloggers_ids.first - ActivityPub::LowPriorityDeliveryWorker.push_bulk(inboxes) do |inbox_url| - [payload, replied_to_status.account_id, inbox_url] + ActivityPub::LowPriorityDeliveryWorker.push_bulk(inboxes.uniq) do |inbox_url| + [payload, sender_id, inbox_url] end end @@ -75,8 +83,4 @@ class ActivityPub::Activity::Delete < ActivityPub::Activity def payload @payload ||= Oj.dump(@json) end - - def lock_options - { redis: Redis.current, key: "create:#{object_uri}" } - end end diff --git a/app/services/remove_status_service.rb b/app/services/remove_status_service.rb index d6043fb5d..52d3f108c 100644 --- a/app/services/remove_status_service.rb +++ b/app/services/remove_status_service.rb @@ -16,6 +16,8 @@ class RemoveStatusService < BaseService @account = status.account @options = options + @status.discard + RedisLock.acquire(lock_options) do |lock| if lock.acquired? remove_from_self if @account.local? diff --git a/spec/lib/activitypub/activity/delete_spec.rb b/spec/lib/activitypub/activity/delete_spec.rb index 37b93ecf7..9dfb8a61b 100644 --- a/spec/lib/activitypub/activity/delete_spec.rb +++ b/spec/lib/activitypub/activity/delete_spec.rb @@ -49,4 +49,24 @@ RSpec.describe ActivityPub::Activity::Delete do end end end + + context 'when the status has been reported' do + describe '#perform' do + subject { described_class.new(json, sender) } + let!(:reporter) { Fabricate(:account) } + + before do + reporter.reports.create!(target_account: status.account, status_ids: [status.id], forwarded: false) + subject.perform + end + + it 'marks the status as deleted' do + expect(Status.find_by(id: status.id)).to be_nil + end + + it 'actually keeps a copy for inspection' do + expect(Status.with_discarded.find_by(id: status.id)).to_not be_nil + end + end + end end -- cgit