about summary refs log tree commit diff
path: root/app/workers
diff options
context:
space:
mode:
Diffstat (limited to 'app/workers')
-rw-r--r--app/workers/activitypub/delivery_worker.rb10
-rw-r--r--app/workers/activitypub/distribute_poll_update_worker.rb2
-rw-r--r--app/workers/activitypub/fetch_replies_worker.rb4
-rw-r--r--app/workers/activitypub/migrated_follow_delivery_worker.rb17
-rw-r--r--app/workers/activitypub/move_distribution_worker.rb2
-rw-r--r--app/workers/activitypub/processing_worker.rb2
-rw-r--r--app/workers/activitypub/raw_distribution_worker.rb2
-rw-r--r--app/workers/backup_worker.rb10
-rw-r--r--app/workers/concerns/exponential_backoff.rb2
-rw-r--r--app/workers/fetch_reply_worker.rb4
-rw-r--r--app/workers/import/relationship_worker.rb2
-rw-r--r--app/workers/poll_expiration_notify_worker.rb2
-rw-r--r--app/workers/post_process_media_worker.rb12
-rw-r--r--app/workers/scheduler/accounts_statuses_cleanup_scheduler.rb26
-rw-r--r--app/workers/scheduler/follow_recommendations_scheduler.rb23
-rw-r--r--app/workers/scheduler/indexing_scheduler.rb14
-rw-r--r--app/workers/scheduler/user_cleanup_scheduler.rb2
-rw-r--r--app/workers/thread_resolve_worker.rb4
-rw-r--r--app/workers/unfollow_follow_worker.rb8
-rw-r--r--app/workers/web/push_notification_worker.rb12
-rw-r--r--app/workers/webhooks/delivery_worker.rb2
21 files changed, 85 insertions, 77 deletions
diff --git a/app/workers/activitypub/delivery_worker.rb b/app/workers/activitypub/delivery_worker.rb
index d9153132b..7c1c14766 100644
--- a/app/workers/activitypub/delivery_worker.rb
+++ b/app/workers/activitypub/delivery_worker.rb
@@ -10,6 +10,16 @@ class ActivityPub::DeliveryWorker
 
   sidekiq_options queue: 'push', retry: 16, dead: false
 
+  # Unfortunately, we cannot control Sidekiq's jitter, so add our own
+  sidekiq_retry_in do |count|
+    # This is Sidekiq's default delay
+    delay  = (count**4) + 15
+    # Our custom jitter, that will be added to Sidekiq's built-in one.
+    # Sidekiq's built-in jitter is `rand(10) * (count + 1)`
+    jitter = rand(0.5 * (count**4))
+    delay + jitter
+  end
+
   HEADERS = { 'Content-Type' => 'application/activity+json' }.freeze
 
   def perform(json, source_account_id, inbox_url, options = {})
diff --git a/app/workers/activitypub/distribute_poll_update_worker.rb b/app/workers/activitypub/distribute_poll_update_worker.rb
index 601075ea6..ebdb78bb3 100644
--- a/app/workers/activitypub/distribute_poll_update_worker.rb
+++ b/app/workers/activitypub/distribute_poll_update_worker.rb
@@ -12,7 +12,7 @@ class ActivityPub::DistributePollUpdateWorker
 
     return if @status.preloadable_poll.nil? || @status.local_only?
 
-    ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url|
+    ActivityPub::DeliveryWorker.push_bulk(inboxes, limit: 1_000) do |inbox_url|
       [payload, @account.id, inbox_url]
     end
 
diff --git a/app/workers/activitypub/fetch_replies_worker.rb b/app/workers/activitypub/fetch_replies_worker.rb
index 54d98f228..d72bad745 100644
--- a/app/workers/activitypub/fetch_replies_worker.rb
+++ b/app/workers/activitypub/fetch_replies_worker.rb
@@ -6,8 +6,8 @@ class ActivityPub::FetchRepliesWorker
 
   sidekiq_options queue: 'pull', retry: 3
 
-  def perform(parent_status_id, replies_uri)
-    ActivityPub::FetchRepliesService.new.call(Status.find(parent_status_id), replies_uri)
+  def perform(parent_status_id, replies_uri, options = {})
+    ActivityPub::FetchRepliesService.new.call(Status.find(parent_status_id), replies_uri, **options.deep_symbolize_keys)
   rescue ActiveRecord::RecordNotFound
     true
   end
diff --git a/app/workers/activitypub/migrated_follow_delivery_worker.rb b/app/workers/activitypub/migrated_follow_delivery_worker.rb
new file mode 100644
index 000000000..daf30e0ae
--- /dev/null
+++ b/app/workers/activitypub/migrated_follow_delivery_worker.rb
@@ -0,0 +1,17 @@
+# frozen_string_literal: true
+
+class ActivityPub::MigratedFollowDeliveryWorker < ActivityPub::DeliveryWorker
+  def perform(json, source_account_id, inbox_url, old_target_account_id, options = {})
+    super(json, source_account_id, inbox_url, options)
+    unfollow_old_account!(old_target_account_id)
+  end
+
+  private
+
+  def unfollow_old_account!(old_target_account_id)
+    old_target_account = Account.find(old_target_account_id)
+    UnfollowService.new.call(@source_account, old_target_account, skip_unmerge: true)
+  rescue
+    true
+  end
+end
diff --git a/app/workers/activitypub/move_distribution_worker.rb b/app/workers/activitypub/move_distribution_worker.rb
index 65c5c0d1c..1680fcc76 100644
--- a/app/workers/activitypub/move_distribution_worker.rb
+++ b/app/workers/activitypub/move_distribution_worker.rb
@@ -10,7 +10,7 @@ class ActivityPub::MoveDistributionWorker
     @migration = AccountMigration.find(migration_id)
     @account   = @migration.account
 
-    ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url|
+    ActivityPub::DeliveryWorker.push_bulk(inboxes, limit: 1_000) do |inbox_url|
       [signed_payload, @account.id, inbox_url]
     end
 
diff --git a/app/workers/activitypub/processing_worker.rb b/app/workers/activitypub/processing_worker.rb
index 5e36fab51..1bb94b7f2 100644
--- a/app/workers/activitypub/processing_worker.rb
+++ b/app/workers/activitypub/processing_worker.rb
@@ -15,6 +15,6 @@ class ActivityPub::ProcessingWorker
 
     ActivityPub::ProcessCollectionService.new.call(body, actor, override_timestamps: true, delivered_to_account_id: delivered_to_account_id, delivery: true)
   rescue ActiveRecord::RecordInvalid => e
-    Rails.logger.debug "Error processing incoming ActivityPub object: #{e}"
+    Rails.logger.debug { "Error processing incoming ActivityPub object: #{e}" }
   end
 end
diff --git a/app/workers/activitypub/raw_distribution_worker.rb b/app/workers/activitypub/raw_distribution_worker.rb
index 8ecc17db9..c77821e0f 100644
--- a/app/workers/activitypub/raw_distribution_worker.rb
+++ b/app/workers/activitypub/raw_distribution_worker.rb
@@ -25,7 +25,7 @@ class ActivityPub::RawDistributionWorker
   def distribute!
     return if inboxes.empty?
 
-    ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url|
+    ActivityPub::DeliveryWorker.push_bulk(inboxes, limit: 1_000) do |inbox_url|
       [payload, source_account_id, inbox_url, options]
     end
   end
diff --git a/app/workers/backup_worker.rb b/app/workers/backup_worker.rb
index 7b0b52844..df933142a 100644
--- a/app/workers/backup_worker.rb
+++ b/app/workers/backup_worker.rb
@@ -9,12 +9,10 @@ class BackupWorker
     backup_id = msg['args'].first
 
     ActiveRecord::Base.connection_pool.with_connection do
-      begin
-        backup = Backup.find(backup_id)
-        backup.destroy
-      rescue ActiveRecord::RecordNotFound
-        true
-      end
+      backup = Backup.find(backup_id)
+      backup.destroy
+    rescue ActiveRecord::RecordNotFound
+      true
     end
   end
 
diff --git a/app/workers/concerns/exponential_backoff.rb b/app/workers/concerns/exponential_backoff.rb
index f2b931e33..7626b2151 100644
--- a/app/workers/concerns/exponential_backoff.rb
+++ b/app/workers/concerns/exponential_backoff.rb
@@ -5,7 +5,7 @@ module ExponentialBackoff
 
   included do
     sidekiq_retry_in do |count|
-      15 + 10 * (count**4) + rand(10 * (count**4))
+      15 + (10 * (count**4)) + rand(10 * (count**4))
     end
   end
 end
diff --git a/app/workers/fetch_reply_worker.rb b/app/workers/fetch_reply_worker.rb
index f7aa25e81..68a7414be 100644
--- a/app/workers/fetch_reply_worker.rb
+++ b/app/workers/fetch_reply_worker.rb
@@ -6,7 +6,7 @@ class FetchReplyWorker
 
   sidekiq_options queue: 'pull', retry: 3
 
-  def perform(child_url)
-    FetchRemoteStatusService.new.call(child_url)
+  def perform(child_url, options = {})
+    FetchRemoteStatusService.new.call(child_url, **options.deep_symbolize_keys)
   end
 end
diff --git a/app/workers/import/relationship_worker.rb b/app/workers/import/relationship_worker.rb
index 6791b15c3..c2728c196 100644
--- a/app/workers/import/relationship_worker.rb
+++ b/app/workers/import/relationship_worker.rb
@@ -49,7 +49,7 @@ class Import::RelationshipWorker
         .with_error_handler { |error, handle| error.is_a?(HTTP::Error) || error.is_a?(OpenSSL::SSL::SSLError) ? handle.call(error) : raise(error) }
         .run
     else
-      block.call
+      yield
     end
   end
 end
diff --git a/app/workers/poll_expiration_notify_worker.rb b/app/workers/poll_expiration_notify_worker.rb
index 0e29a5f60..b7a60fab8 100644
--- a/app/workers/poll_expiration_notify_worker.rb
+++ b/app/workers/poll_expiration_notify_worker.rb
@@ -3,7 +3,7 @@
 class PollExpirationNotifyWorker
   include Sidekiq::Worker
 
-  sidekiq_options lock: :until_executed
+  sidekiq_options lock: :until_executing
 
   def perform(poll_id)
     @poll = Poll.find(poll_id)
diff --git a/app/workers/post_process_media_worker.rb b/app/workers/post_process_media_worker.rb
index 24201101c..996d5def9 100644
--- a/app/workers/post_process_media_worker.rb
+++ b/app/workers/post_process_media_worker.rb
@@ -9,13 +9,11 @@ class PostProcessMediaWorker
     media_attachment_id = msg['args'].first
 
     ActiveRecord::Base.connection_pool.with_connection do
-      begin
-        media_attachment = MediaAttachment.find(media_attachment_id)
-        media_attachment.processing = :failed
-        media_attachment.save
-      rescue ActiveRecord::RecordNotFound
-        true
-      end
+      media_attachment = MediaAttachment.find(media_attachment_id)
+      media_attachment.processing = :failed
+      media_attachment.save
+    rescue ActiveRecord::RecordNotFound
+      true
     end
 
     Sidekiq.logger.error("Processing media attachment #{media_attachment_id} failed with #{msg['error_message']}")
diff --git a/app/workers/scheduler/accounts_statuses_cleanup_scheduler.rb b/app/workers/scheduler/accounts_statuses_cleanup_scheduler.rb
index bd92fe32c..f237f1dc9 100644
--- a/app/workers/scheduler/accounts_statuses_cleanup_scheduler.rb
+++ b/app/workers/scheduler/accounts_statuses_cleanup_scheduler.rb
@@ -7,7 +7,7 @@ class Scheduler::AccountsStatusesCleanupScheduler
   # This limit is mostly to be nice to the fediverse at large and not
   # generate too much traffic.
   # This also helps limiting the running time of the scheduler itself.
-  MAX_BUDGET         = 50
+  MAX_BUDGET         = 150
 
   # This is an attempt to spread the load across instances, as various
   # accounts are likely to have various followers.
@@ -15,28 +15,22 @@ class Scheduler::AccountsStatusesCleanupScheduler
 
   # This is an attempt to limit the workload generated by status removal
   # jobs to something the particular instance can handle.
-  PER_THREAD_BUDGET  = 5
+  PER_THREAD_BUDGET  = 6
 
   # Those avoid loading an instance that is already under load
-  MAX_DEFAULT_SIZE    = 2
+  MAX_DEFAULT_SIZE    = 200
   MAX_DEFAULT_LATENCY = 5
-  MAX_PUSH_SIZE       = 5
+  MAX_PUSH_SIZE       = 500
   MAX_PUSH_LATENCY    = 10
+
   # 'pull' queue has lower priority jobs, and it's unlikely that pushing
   # deletes would cause much issues with this queue if it didn't cause issues
   # with default and push. Yet, do not enqueue deletes if the instance is
   # lagging behind too much.
-  MAX_PULL_SIZE       = 500
-  MAX_PULL_LATENCY    = 300
-
-  # This is less of an issue in general, but deleting old statuses is likely
-  # to cause delivery errors, and thus increase the number of jobs to be retried.
-  # This doesn't directly translate to load, but connection errors and a high
-  # number of dead instances may lead to this spiraling out of control if
-  # unchecked.
-  MAX_RETRY_SIZE = 50_000
+  MAX_PULL_SIZE       = 10_000
+  MAX_PULL_LATENCY    = 5.minutes.to_i
 
-  sidekiq_options retry: 0, lock: :until_executed
+  sidekiq_options retry: 0, lock: :until_executed, lock_ttl: 1.day.to_i
 
   def perform
     return if under_load?
@@ -62,17 +56,17 @@ class Scheduler::AccountsStatusesCleanupScheduler
       # The idea here is to loop through all policies at least once until the budget is exhausted
       # and start back after the last processed account otherwise
       break if budget.zero? || (num_processed_accounts.zero? && first_policy_id.nil?)
+
       first_policy_id = nil
     end
   end
 
   def compute_budget
-    threads = Sidekiq::ProcessSet.new.select { |x| x['queues'].include?('push') }.map { |x| x['concurrency'] }.sum
+    threads = Sidekiq::ProcessSet.new.select { |x| x['queues'].include?('push') }.pluck('concurrency').sum
     [PER_THREAD_BUDGET * threads, MAX_BUDGET].min
   end
 
   def under_load?
-    return true if Sidekiq::Stats.new.retry_size > MAX_RETRY_SIZE
     queue_under_load?('default', MAX_DEFAULT_SIZE, MAX_DEFAULT_LATENCY) || queue_under_load?('push', MAX_PUSH_SIZE, MAX_PUSH_LATENCY) || queue_under_load?('pull', MAX_PULL_SIZE, MAX_PULL_LATENCY)
   end
 
diff --git a/app/workers/scheduler/follow_recommendations_scheduler.rb b/app/workers/scheduler/follow_recommendations_scheduler.rb
index 57f78170e..17cf3f2cc 100644
--- a/app/workers/scheduler/follow_recommendations_scheduler.rb
+++ b/app/workers/scheduler/follow_recommendations_scheduler.rb
@@ -19,13 +19,11 @@ class Scheduler::FollowRecommendationsScheduler
     fallback_recommendations = FollowRecommendation.order(rank: :desc).limit(SET_SIZE)
 
     Trends.available_locales.each do |locale|
-      recommendations = begin
-        if AccountSummary.safe.filtered.localized(locale).exists? # We can skip the work if no accounts with that language exist
-          FollowRecommendation.localized(locale).order(rank: :desc).limit(SET_SIZE).map { |recommendation| [recommendation.account_id, recommendation.rank] }
-        else
-          []
-        end
-      end
+      recommendations = if AccountSummary.safe.filtered.localized(locale).exists? # We can skip the work if no accounts with that language exist
+                          FollowRecommendation.localized(locale).order(rank: :desc).limit(SET_SIZE).map { |recommendation| [recommendation.rank, recommendation.account_id] }
+                        else
+                          []
+                        end
 
       # Use language-agnostic results if there are not enough language-specific ones
       missing = SET_SIZE - recommendations.size
@@ -35,14 +33,14 @@ class Scheduler::FollowRecommendationsScheduler
 
         # Language-specific results should be above language-agnostic ones,
         # otherwise language-agnostic ones will always overshadow them
-        recommendations.map! { |(account_id, rank)| [account_id, rank + max_fallback_rank] }
+        recommendations.map! { |(rank, account_id)| [rank + max_fallback_rank, account_id] }
 
         added = 0
 
         fallback_recommendations.each do |recommendation|
-          next if recommendations.any? { |(account_id, _)| account_id == recommendation.account_id }
+          next if recommendations.any? { |(_, account_id)| account_id == recommendation.account_id }
 
-          recommendations << [recommendation.account_id, recommendation.rank]
+          recommendations << [recommendation.rank, recommendation.account_id]
           added += 1
 
           break if added >= missing
@@ -51,10 +49,7 @@ class Scheduler::FollowRecommendationsScheduler
 
       redis.multi do |multi|
         multi.del(key(locale))
-
-        recommendations.each do |(account_id, rank)|
-          multi.zadd(key(locale), rank, account_id)
-        end
+        multi.zadd(key(locale), recommendations)
       end
     end
   end
diff --git a/app/workers/scheduler/indexing_scheduler.rb b/app/workers/scheduler/indexing_scheduler.rb
index c42396629..d622f5586 100644
--- a/app/workers/scheduler/indexing_scheduler.rb
+++ b/app/workers/scheduler/indexing_scheduler.rb
@@ -6,17 +6,19 @@ class Scheduler::IndexingScheduler
 
   sidekiq_options retry: 0
 
+  IMPORT_BATCH_SIZE = 1000
+  SCAN_BATCH_SIZE = 10 * IMPORT_BATCH_SIZE
+
   def perform
     return unless Chewy.enabled?
 
     indexes.each do |type|
       with_redis do |redis|
-        ids = redis.smembers("chewy:queue:#{type.name}")
-
-        type.import!(ids)
-
-        redis.pipelined do |pipeline|
-          ids.each { |id| pipeline.srem("chewy:queue:#{type.name}", id) }
+        redis.sscan_each("chewy:queue:#{type.name}", count: SCAN_BATCH_SIZE).each_slice(IMPORT_BATCH_SIZE) do |ids|
+          type.import!(ids)
+          redis.pipelined do |pipeline|
+            pipeline.srem("chewy:queue:#{type.name}", ids)
+          end
         end
       end
     end
diff --git a/app/workers/scheduler/user_cleanup_scheduler.rb b/app/workers/scheduler/user_cleanup_scheduler.rb
index 63f9ed78c..45cfbc62e 100644
--- a/app/workers/scheduler/user_cleanup_scheduler.rb
+++ b/app/workers/scheduler/user_cleanup_scheduler.rb
@@ -15,7 +15,7 @@ class Scheduler::UserCleanupScheduler
   def clean_unconfirmed_accounts!
     User.where('confirmed_at is NULL AND confirmation_sent_at <= ?', 2.days.ago).reorder(nil).find_in_batches do |batch|
       # We have to do it separately because of missing database constraints
-      AccountModerationNote.where(account_id: batch.map(&:account_id)).delete_all
+      AccountModerationNote.where(target_account_id: batch.map(&:account_id)).delete_all
       Account.where(id: batch.map(&:account_id)).delete_all
       User.where(id: batch.map(&:id)).delete_all
     end
diff --git a/app/workers/thread_resolve_worker.rb b/app/workers/thread_resolve_worker.rb
index 1b77dfdd9..3206c45f6 100644
--- a/app/workers/thread_resolve_worker.rb
+++ b/app/workers/thread_resolve_worker.rb
@@ -6,9 +6,9 @@ class ThreadResolveWorker
 
   sidekiq_options queue: 'pull', retry: 3
 
-  def perform(child_status_id, parent_url)
+  def perform(child_status_id, parent_url, options = {})
     child_status  = Status.find(child_status_id)
-    parent_status = FetchRemoteStatusService.new.call(parent_url)
+    parent_status = FetchRemoteStatusService.new.call(parent_url, **options.deep_symbolize_keys)
 
     return if parent_status.nil?
 
diff --git a/app/workers/unfollow_follow_worker.rb b/app/workers/unfollow_follow_worker.rb
index 7203b4888..a4d57839d 100644
--- a/app/workers/unfollow_follow_worker.rb
+++ b/app/workers/unfollow_follow_worker.rb
@@ -10,13 +10,7 @@ class UnfollowFollowWorker
     old_target_account = Account.find(old_target_account_id)
     new_target_account = Account.find(new_target_account_id)
 
-    follow    = follower_account.active_relationships.find_by(target_account: old_target_account)
-    reblogs   = follow&.show_reblogs?
-    notify    = follow&.notify?
-    languages = follow&.languages
-
-    FollowService.new.call(follower_account, new_target_account, reblogs: reblogs, notify: notify, languages: languages, bypass_locked: bypass_locked, bypass_limit: true)
-    UnfollowService.new.call(follower_account, old_target_account, skip_unmerge: true)
+    FollowMigrationService.new.call(follower_account, new_target_account, old_target_account, bypass_locked: bypass_locked)
   rescue ActiveRecord::RecordNotFound, Mastodon::NotPermittedError
     true
   end
diff --git a/app/workers/web/push_notification_worker.rb b/app/workers/web/push_notification_worker.rb
index 1ed5bb9e0..7e9691aab 100644
--- a/app/workers/web/push_notification_worker.rb
+++ b/app/workers/web/push_notification_worker.rb
@@ -22,13 +22,13 @@ class Web::PushNotificationWorker
       request = Request.new(:post, @subscription.endpoint, body: payload.fetch(:ciphertext), http_client: http_client)
 
       request.add_headers(
-        'Content-Type'     => 'application/octet-stream',
-        'Ttl'              => TTL,
-        'Urgency'          => URGENCY,
+        'Content-Type' => 'application/octet-stream',
+        'Ttl' => TTL,
+        'Urgency' => URGENCY,
         'Content-Encoding' => 'aesgcm',
-        'Encryption'       => "salt=#{Webpush.encode64(payload.fetch(:salt)).delete('=')}",
-        'Crypto-Key'       => "dh=#{Webpush.encode64(payload.fetch(:server_public_key)).delete('=')};#{@subscription.crypto_key_header}",
-        'Authorization'    => @subscription.authorization_header
+        'Encryption' => "salt=#{Webpush.encode64(payload.fetch(:salt)).delete('=')}",
+        'Crypto-Key' => "dh=#{Webpush.encode64(payload.fetch(:server_public_key)).delete('=')};#{@subscription.crypto_key_header}",
+        'Authorization' => @subscription.authorization_header
       )
 
       request.perform do |response|
diff --git a/app/workers/webhooks/delivery_worker.rb b/app/workers/webhooks/delivery_worker.rb
index b1e345c5e..f8ed669fb 100644
--- a/app/workers/webhooks/delivery_worker.rb
+++ b/app/workers/webhooks/delivery_worker.rb
@@ -19,7 +19,7 @@ class Webhooks::DeliveryWorker
   private
 
   def perform_request
-    request = Request.new(:post, @webhook.url, body: @body)
+    request = Request.new(:post, @webhook.url, body: @body, allow_local: true)
 
     request.add_headers(
       'Content-Type' => 'application/json',