about summary refs log tree commit diff
diff options
context:
space:
mode:
authorEugen Rochko <eugen@zeonfederated.com>2022-05-13 00:02:35 +0200
committerGitHub <noreply@github.com>2022-05-13 00:02:35 +0200
commit6cf57c676550068a59149ca82d63fcb5b5431158 (patch)
tree4832da7de8828519c72a205d9878ccd5a606377a
parent12535568f7435ed627c37312782f8ca07e83eca9 (diff)
Refactor how Redis locks are created (#18400)
* Refactor how Redis locks are created

* Fix autorelease duration on account deletion lock
-rw-r--r--app/controllers/media_proxy_controller.rb17
-rw-r--r--app/controllers/settings/exports_controller.rb15
-rw-r--r--app/lib/activitypub/activity.rb17
-rw-r--r--app/lib/activitypub/activity/announce.rb2
-rw-r--r--app/lib/activitypub/activity/create.rb4
-rw-r--r--app/lib/activitypub/activity/delete.rb6
-rw-r--r--app/models/account_migration.rb13
-rw-r--r--app/models/concerns/lockable.rb19
-rw-r--r--app/models/concerns/redisable.rb8
-rw-r--r--app/services/activitypub/process_account_service.rb33
-rw-r--r--app/services/activitypub/process_status_update_service.rb46
-rw-r--r--app/services/fetch_link_card_service.rb15
-rw-r--r--app/services/remove_status_service.rb61
-rw-r--r--app/services/resolve_account_service.rb13
-rw-r--r--app/services/vote_service.rb19
-rw-r--r--app/workers/distribution_worker.rb9
16 files changed, 115 insertions, 182 deletions
diff --git a/app/controllers/media_proxy_controller.rb b/app/controllers/media_proxy_controller.rb
index d2a4cb207..3b228722f 100644
--- a/app/controllers/media_proxy_controller.rb
+++ b/app/controllers/media_proxy_controller.rb
@@ -4,6 +4,7 @@ class MediaProxyController < ApplicationController
   include RoutingHelper
   include Authorization
   include Redisable
+  include Lockable
 
   skip_before_action :store_current_location
   skip_before_action :require_functional!
@@ -16,14 +17,10 @@ class MediaProxyController < ApplicationController
   rescue_from HTTP::TimeoutError, HTTP::ConnectionError, OpenSSL::SSL::SSLError, with: :internal_server_error
 
   def show
-    RedisLock.acquire(lock_options) do |lock|
-      if lock.acquired?
-        @media_attachment = MediaAttachment.remote.attached.find(params[:id])
-        authorize @media_attachment.status, :show?
-        redownload! if @media_attachment.needs_redownload? && !reject_media?
-      else
-        raise Mastodon::RaceConditionError
-      end
+    with_lock("media_download:#{params[:id]}") do
+      @media_attachment = MediaAttachment.remote.attached.find(params[:id])
+      authorize @media_attachment.status, :show?
+      redownload! if @media_attachment.needs_redownload? && !reject_media?
     end
 
     redirect_to full_asset_url(@media_attachment.file.url(version))
@@ -45,10 +42,6 @@ class MediaProxyController < ApplicationController
     end
   end
 
-  def lock_options
-    { redis: redis, key: "media_download:#{params[:id]}", autorelease: 15.minutes.seconds }
-  end
-
   def reject_media?
     DomainBlock.reject_media?(@media_attachment.account.domain)
   end
diff --git a/app/controllers/settings/exports_controller.rb b/app/controllers/settings/exports_controller.rb
index 1638d3412..deaa7940e 100644
--- a/app/controllers/settings/exports_controller.rb
+++ b/app/controllers/settings/exports_controller.rb
@@ -3,6 +3,7 @@
 class Settings::ExportsController < Settings::BaseController
   include Authorization
   include Redisable
+  include Lockable
 
   skip_before_action :require_functional!
 
@@ -14,21 +15,13 @@ class Settings::ExportsController < Settings::BaseController
   def create
     backup = nil
 
-    RedisLock.acquire(lock_options) do |lock|
-      if lock.acquired?
-        authorize :backup, :create?
-        backup = current_user.backups.create!
-      else
-        raise Mastodon::RaceConditionError
-      end
+    with_lock("backup:#{current_user.id}") do
+      authorize :backup, :create?
+      backup = current_user.backups.create!
     end
 
     BackupWorker.perform_async(backup.id)
 
     redirect_to settings_export_path
   end
-
-  def lock_options
-    { redis: redis, key: "backup:#{current_user.id}" }
-  end
 end
diff --git a/app/lib/activitypub/activity.rb b/app/lib/activitypub/activity.rb
index 3c51a7a51..7ff06ea39 100644
--- a/app/lib/activitypub/activity.rb
+++ b/app/lib/activitypub/activity.rb
@@ -3,6 +3,7 @@
 class ActivityPub::Activity
   include JsonLdHelper
   include Redisable
+  include Lockable
 
   SUPPORTED_TYPES = %w(Note Question).freeze
   CONVERTED_TYPES = %w(Image Audio Video Article Page Event).freeze
@@ -157,22 +158,6 @@ class ActivityPub::Activity
     end
   end
 
-  def lock_or_return(key, expire_after = 2.hours.seconds)
-    yield if redis.set(key, true, nx: true, ex: expire_after)
-  ensure
-    redis.del(key)
-  end
-
-  def lock_or_fail(key, expire_after = 15.minutes.seconds)
-    RedisLock.acquire({ redis: redis, key: key, autorelease: expire_after }) do |lock|
-      if lock.acquired?
-        yield
-      else
-        raise Mastodon::RaceConditionError
-      end
-    end
-  end
-
   def fetch?
     !@options[:delivery]
   end
diff --git a/app/lib/activitypub/activity/announce.rb b/app/lib/activitypub/activity/announce.rb
index 0674b1083..0032f13e6 100644
--- a/app/lib/activitypub/activity/announce.rb
+++ b/app/lib/activitypub/activity/announce.rb
@@ -4,7 +4,7 @@ class ActivityPub::Activity::Announce < ActivityPub::Activity
   def perform
     return reject_payload! if delete_arrived_first?(@json['id']) || !related_to_local_activity?
 
-    lock_or_fail("announce:#{@object['id']}") do
+    with_lock("announce:#{@object['id']}") do
       original_status = status_from_object
 
       return reject_payload! if original_status.nil? || !announceable?(original_status)
diff --git a/app/lib/activitypub/activity/create.rb b/app/lib/activitypub/activity/create.rb
index 1f32d8cce..73882e134 100644
--- a/app/lib/activitypub/activity/create.rb
+++ b/app/lib/activitypub/activity/create.rb
@@ -47,7 +47,7 @@ class ActivityPub::Activity::Create < ActivityPub::Activity
   def create_status
     return reject_payload! if unsupported_object_type? || invalid_origin?(object_uri) || tombstone_exists? || !related_to_local_activity?
 
-    lock_or_fail("create:#{object_uri}") do
+    with_lock("create:#{object_uri}") do
       return if delete_arrived_first?(object_uri) || poll_vote?
 
       @status = find_existing_status
@@ -315,7 +315,7 @@ class ActivityPub::Activity::Create < ActivityPub::Activity
     poll = replied_to_status.preloadable_poll
     already_voted = true
 
-    lock_or_fail("vote:#{replied_to_status.poll_id}:#{@account.id}") do
+    with_lock("vote:#{replied_to_status.poll_id}:#{@account.id}") do
       already_voted = poll.votes.where(account: @account).exists?
       poll.votes.create!(account: @account, choice: poll.options.index(@object['name']), uri: object_uri)
     end
diff --git a/app/lib/activitypub/activity/delete.rb b/app/lib/activitypub/activity/delete.rb
index f5ef863f3..871eb3966 100644
--- a/app/lib/activitypub/activity/delete.rb
+++ b/app/lib/activitypub/activity/delete.rb
@@ -12,7 +12,7 @@ class ActivityPub::Activity::Delete < ActivityPub::Activity
   private
 
   def delete_person
-    lock_or_return("delete_in_progress:#{@account.id}") do
+    with_lock("delete_in_progress:#{@account.id}", autorelease: 2.hours, raise_on_failure: false) do
       DeleteAccountService.new.call(@account, reserve_username: false, skip_activitypub: true)
     end
   end
@@ -20,14 +20,14 @@ class ActivityPub::Activity::Delete < ActivityPub::Activity
   def delete_note
     return if object_uri.nil?
 
-    lock_or_return("delete_status_in_progress:#{object_uri}", 5.minutes.seconds) do
+    with_lock("delete_status_in_progress:#{object_uri}", raise_on_failure: false) do
       unless invalid_origin?(object_uri)
         # This lock ensures a concurrent `ActivityPub::Activity::Create` either
         # does not create a status at all, or has finished saving it to the
         # database before we try to load it.
         # Without the lock, `delete_later!` could be called after `delete_arrived_first?`
         # and `Status.find` before `Status.create!`
-        lock_or_fail("create:#{object_uri}") { delete_later!(object_uri) }
+        with_lock("create:#{object_uri}") { delete_later!(object_uri) }
 
         Tombstone.find_or_create_by(uri: object_uri, account: @account)
       end
diff --git a/app/models/account_migration.rb b/app/models/account_migration.rb
index ded32c9c6..06291c9f3 100644
--- a/app/models/account_migration.rb
+++ b/app/models/account_migration.rb
@@ -15,6 +15,7 @@
 
 class AccountMigration < ApplicationRecord
   include Redisable
+  include Lockable
 
   COOLDOWN_PERIOD = 30.days.freeze
 
@@ -41,12 +42,8 @@ class AccountMigration < ApplicationRecord
 
     return false unless errors.empty?
 
-    RedisLock.acquire(lock_options) do |lock|
-      if lock.acquired?
-        save
-      else
-        raise Mastodon::RaceConditionError
-      end
+    with_lock("account_migration:#{account.id}") do
+      save
     end
   end
 
@@ -83,8 +80,4 @@ class AccountMigration < ApplicationRecord
   def validate_migration_cooldown
     errors.add(:base, I18n.t('migrations.errors.on_cooldown')) if account.migrations.within_cooldown.exists?
   end
-
-  def lock_options
-    { redis: redis, key: "account_migration:#{account.id}" }
-  end
 end
diff --git a/app/models/concerns/lockable.rb b/app/models/concerns/lockable.rb
new file mode 100644
index 000000000..55a9714ca
--- /dev/null
+++ b/app/models/concerns/lockable.rb
@@ -0,0 +1,19 @@
+# frozen_string_literal: true
+
+module Lockable
+  # @param [String] lock_name
+  # @param [ActiveSupport::Duration] autorelease Automatically release the lock after this time
+  # @param [Boolean] raise_on_failure Raise an error if a lock cannot be acquired, or fail silently
+  # @raise [Mastodon::RaceConditionError]
+  def with_lock(lock_name, autorelease: 15.minutes, raise_on_failure: true)
+    with_redis do |redis|
+      RedisLock.acquire(redis: redis, key: "lock:#{lock_name}", autorelease: autorelease.seconds) do |lock|
+        if lock.acquired?
+          yield
+        elsif raise_on_failure
+          raise Mastodon::RaceConditionError, "Could not acquire lock for #{lock_name}, try again later"
+        end
+      end
+    end
+  end
+end
diff --git a/app/models/concerns/redisable.rb b/app/models/concerns/redisable.rb
index 8d76b6b82..0dad3abb2 100644
--- a/app/models/concerns/redisable.rb
+++ b/app/models/concerns/redisable.rb
@@ -1,11 +1,11 @@
 # frozen_string_literal: true
 
 module Redisable
-  extend ActiveSupport::Concern
-
-  private
-
   def redis
     Thread.current[:redis] ||= RedisConfiguration.pool.checkout
   end
+
+  def with_redis(&block)
+    RedisConfiguration.with(&block)
+  end
 end
diff --git a/app/services/activitypub/process_account_service.rb b/app/services/activitypub/process_account_service.rb
index 5649153ee..4449a5427 100644
--- a/app/services/activitypub/process_account_service.rb
+++ b/app/services/activitypub/process_account_service.rb
@@ -4,6 +4,7 @@ class ActivityPub::ProcessAccountService < BaseService
   include JsonLdHelper
   include DomainControlHelper
   include Redisable
+  include Lockable
 
   # Should be called with confirmed valid JSON
   # and WebFinger-resolved username and domain
@@ -17,22 +18,18 @@ class ActivityPub::ProcessAccountService < BaseService
     @domain      = domain
     @collections = {}
 
-    RedisLock.acquire(lock_options) do |lock|
-      if lock.acquired?
-        @account            = Account.remote.find_by(uri: @uri) if @options[:only_key]
-        @account          ||= Account.find_remote(@username, @domain)
-        @old_public_key     = @account&.public_key
-        @old_protocol       = @account&.protocol
-        @suspension_changed = false
-
-        create_account if @account.nil?
-        update_account
-        process_tags
-
-        process_duplicate_accounts! if @options[:verified_webfinger]
-      else
-        raise Mastodon::RaceConditionError
-      end
+    with_lock("process_account:#{@uri}") do
+      @account            = Account.remote.find_by(uri: @uri) if @options[:only_key]
+      @account          ||= Account.find_remote(@username, @domain)
+      @old_public_key     = @account&.public_key
+      @old_protocol       = @account&.protocol
+      @suspension_changed = false
+
+      create_account if @account.nil?
+      update_account
+      process_tags
+
+      process_duplicate_accounts! if @options[:verified_webfinger]
     end
 
     return if @account.nil?
@@ -289,10 +286,6 @@ class ActivityPub::ProcessAccountService < BaseService
     !@old_protocol.nil? && @old_protocol != @account.protocol
   end
 
-  def lock_options
-    { redis: redis, key: "process_account:#{@uri}", autorelease: 15.minutes.seconds }
-  end
-
   def process_tags
     return if @json['tag'].blank?
 
diff --git a/app/services/activitypub/process_status_update_service.rb b/app/services/activitypub/process_status_update_service.rb
index fb6e44c6d..addd5fc27 100644
--- a/app/services/activitypub/process_status_update_service.rb
+++ b/app/services/activitypub/process_status_update_service.rb
@@ -3,6 +3,7 @@
 class ActivityPub::ProcessStatusUpdateService < BaseService
   include JsonLdHelper
   include Redisable
+  include Lockable
 
   def call(status, json)
     raise ArgumentError, 'Status has unsaved changes' if status.changed?
@@ -33,41 +34,32 @@ class ActivityPub::ProcessStatusUpdateService < BaseService
     last_edit_date = @status.edited_at.presence || @status.created_at
 
     # Only allow processing one create/update per status at a time
-    RedisLock.acquire(lock_options) do |lock|
-      if lock.acquired?
-        Status.transaction do
-          record_previous_edit!
-          update_media_attachments!
-          update_poll!
-          update_immediate_attributes!
-          update_metadata!
-          create_edits!
-        end
+    with_lock("create:#{@uri}") do
+      Status.transaction do
+        record_previous_edit!
+        update_media_attachments!
+        update_poll!
+        update_immediate_attributes!
+        update_metadata!
+        create_edits!
+      end
 
-        queue_poll_notifications!
+      queue_poll_notifications!
 
-        next unless significant_changes?
+      next unless significant_changes?
 
-        reset_preview_card!
-        broadcast_updates!
-      else
-        raise Mastodon::RaceConditionError
-      end
+      reset_preview_card!
+      broadcast_updates!
     end
 
     forward_activity! if significant_changes? && @status_parser.edited_at > last_edit_date
   end
 
   def handle_implicit_update!
-    RedisLock.acquire(lock_options) do |lock|
-      if lock.acquired?
-        update_poll!(allow_significant_changes: false)
-      else
-        raise Mastodon::RaceConditionError
-      end
+    with_lock("create:#{@uri}") do
+      update_poll!(allow_significant_changes: false)
+      queue_poll_notifications!
     end
-
-    queue_poll_notifications!
   end
 
   def update_media_attachments!
@@ -241,10 +233,6 @@ class ActivityPub::ProcessStatusUpdateService < BaseService
     equals_or_includes_any?(@json['type'], %w(Note Question))
   end
 
-  def lock_options
-    { redis: redis, key: "create:#{@uri}", autorelease: 15.minutes.seconds }
-  end
-
   def record_previous_edit!
     @previous_edit = @status.build_snapshot(at_time: @status.created_at, rate_limit: false) if @status.edits.empty?
   end
diff --git a/app/services/fetch_link_card_service.rb b/app/services/fetch_link_card_service.rb
index 868796a6b..e5b5b730e 100644
--- a/app/services/fetch_link_card_service.rb
+++ b/app/services/fetch_link_card_service.rb
@@ -2,6 +2,7 @@
 
 class FetchLinkCardService < BaseService
   include Redisable
+  include Lockable
 
   URL_PATTERN = %r{
     (#{Twitter::TwitterText::Regex[:valid_url_preceding_chars]})                                                                #   $1 preceding chars
@@ -22,13 +23,9 @@ class FetchLinkCardService < BaseService
 
     @url = @original_url.to_s
 
-    RedisLock.acquire(lock_options) do |lock|
-      if lock.acquired?
-        @card = PreviewCard.find_by(url: @url)
-        process_url if @card.nil? || @card.updated_at <= 2.weeks.ago || @card.missing_image?
-      else
-        raise Mastodon::RaceConditionError
-      end
+    with_lock("fetch:#{@original_url}") do
+      @card = PreviewCard.find_by(url: @url)
+      process_url if @card.nil? || @card.updated_at <= 2.weeks.ago || @card.missing_image?
     end
 
     attach_card if @card&.persisted?
@@ -155,8 +152,4 @@ class FetchLinkCardService < BaseService
     @card.assign_attributes(link_details_extractor.to_preview_card_attributes)
     @card.save_with_optional_image! unless @card.title.blank? && @card.html.blank?
   end
-
-  def lock_options
-    { redis: redis, key: "fetch:#{@original_url}", autorelease: 15.minutes.seconds }
-  end
 end
diff --git a/app/services/remove_status_service.rb b/app/services/remove_status_service.rb
index dbd1f6430..8dc521eed 100644
--- a/app/services/remove_status_service.rb
+++ b/app/services/remove_status_service.rb
@@ -3,6 +3,7 @@
 class RemoveStatusService < BaseService
   include Redisable
   include Payloadable
+  include Lockable
 
   # Delete a status
   # @param   [Status] status
@@ -17,37 +18,33 @@ class RemoveStatusService < BaseService
     @account  = status.account
     @options  = options
 
-    RedisLock.acquire(lock_options) do |lock|
-      if lock.acquired?
-        @status.discard
-
-        remove_from_self if @account.local?
-        remove_from_followers
-        remove_from_lists
-
-        # There is no reason to send out Undo activities when the
-        # cause is that the original object has been removed, since
-        # original object being removed implicitly removes reblogs
-        # of it. The Delete activity of the original is forwarded
-        # separately.
-        remove_from_remote_reach if @account.local? && !@options[:original_removed]
-
-        # Since reblogs don't mention anyone, don't get reblogged,
-        # favourited and don't contain their own media attachments
-        # or hashtags, this can be skipped
-        unless @status.reblog?
-          remove_from_mentions
-          remove_reblogs
-          remove_from_hashtags
-          remove_from_public
-          remove_from_media if @status.with_media?
-          remove_media
-        end
-
-        @status.destroy! if permanently?
-      else
-        raise Mastodon::RaceConditionError
+    with_lock("distribute:#{@status.id}") do
+      @status.discard
+
+      remove_from_self if @account.local?
+      remove_from_followers
+      remove_from_lists
+
+      # There is no reason to send out Undo activities when the
+      # cause is that the original object has been removed, since
+      # original object being removed implicitly removes reblogs
+      # of it. The Delete activity of the original is forwarded
+      # separately.
+      remove_from_remote_reach if @account.local? && !@options[:original_removed]
+
+      # Since reblogs don't mention anyone, don't get reblogged,
+      # favourited and don't contain their own media attachments
+      # or hashtags, this can be skipped
+      unless @status.reblog?
+        remove_from_mentions
+        remove_reblogs
+        remove_from_hashtags
+        remove_from_public
+        remove_from_media if @status.with_media?
+        remove_media
       end
+
+      @status.destroy! if permanently?
     end
   end
 
@@ -144,8 +141,4 @@ class RemoveStatusService < BaseService
   def permanently?
     @options[:immediate] || !(@options[:preserve] || @status.reported?)
   end
-
-  def lock_options
-    { redis: redis, key: "distribute:#{@status.id}", autorelease: 5.minutes.seconds }
-  end
 end
diff --git a/app/services/resolve_account_service.rb b/app/services/resolve_account_service.rb
index 387e2e09b..b55e45409 100644
--- a/app/services/resolve_account_service.rb
+++ b/app/services/resolve_account_service.rb
@@ -5,6 +5,7 @@ class ResolveAccountService < BaseService
   include DomainControlHelper
   include WebfingerHelper
   include Redisable
+  include Lockable
 
   # Find or create an account record for a remote user. When creating,
   # look up the user's webfinger and fetch ActivityPub data
@@ -108,12 +109,8 @@ class ResolveAccountService < BaseService
   def fetch_account!
     return unless activitypub_ready?
 
-    RedisLock.acquire(lock_options) do |lock|
-      if lock.acquired?
-        @account = ActivityPub::FetchRemoteAccountService.new.call(actor_url)
-      else
-        raise Mastodon::RaceConditionError
-      end
+    with_lock("resolve:#{@username}@#{@domain}") do
+      @account = ActivityPub::FetchRemoteAccountService.new.call(actor_url)
     end
 
     @account
@@ -146,8 +143,4 @@ class ResolveAccountService < BaseService
     @account.suspend!(origin: :remote)
     AccountDeletionWorker.perform_async(@account.id, { 'reserve_username' => false, 'skip_activitypub' => true })
   end
-
-  def lock_options
-    { redis: redis, key: "resolve:#{@username}@#{@domain}", autorelease: 15.minutes.seconds }
-  end
 end
diff --git a/app/services/vote_service.rb b/app/services/vote_service.rb
index b77812970..ccd04dbfc 100644
--- a/app/services/vote_service.rb
+++ b/app/services/vote_service.rb
@@ -4,6 +4,7 @@ class VoteService < BaseService
   include Authorization
   include Payloadable
   include Redisable
+  include Lockable
 
   def call(account, poll, choices)
     authorize_with account, poll, :vote?
@@ -15,17 +16,13 @@ class VoteService < BaseService
 
     already_voted = true
 
-    RedisLock.acquire(lock_options) do |lock|
-      if lock.acquired?
-        already_voted = @poll.votes.where(account: @account).exists?
+    with_lock("vote:#{@poll.id}:#{@account.id}") do
+      already_voted = @poll.votes.where(account: @account).exists?
 
-        ApplicationRecord.transaction do
-          @choices.each do |choice|
-            @votes << @poll.votes.create!(account: @account, choice: Integer(choice))
-          end
+      ApplicationRecord.transaction do
+        @choices.each do |choice|
+          @votes << @poll.votes.create!(account: @account, choice: Integer(choice))
         end
-      else
-        raise Mastodon::RaceConditionError
       end
     end
 
@@ -76,8 +73,4 @@ class VoteService < BaseService
     @poll.reload
     retry
   end
-
-  def lock_options
-    { redis: redis, key: "vote:#{@poll.id}:#{@account.id}" }
-  end
 end
diff --git a/app/workers/distribution_worker.rb b/app/workers/distribution_worker.rb
index 474b4daaf..59cdbc7b2 100644
--- a/app/workers/distribution_worker.rb
+++ b/app/workers/distribution_worker.rb
@@ -3,14 +3,11 @@
 class DistributionWorker
   include Sidekiq::Worker
   include Redisable
+  include Lockable
 
   def perform(status_id, options = {})
-    RedisLock.acquire(redis: redis, key: "distribute:#{status_id}", autorelease: 5.minutes.seconds) do |lock|
-      if lock.acquired?
-        FanOutOnWriteService.new.call(Status.find(status_id), **options.symbolize_keys)
-      else
-        raise Mastodon::RaceConditionError
-      end
+    with_lock("distribute:#{status_id}") do
+      FanOutOnWriteService.new.call(Status.find(status_id), **options.symbolize_keys)
     end
   rescue ActiveRecord::RecordNotFound
     true