about summary refs log tree commit diff
path: root/app/workers
diff options
context:
space:
mode:
Diffstat (limited to 'app/workers')
-rw-r--r--app/workers/activitypub/delivery_worker.rb6
-rw-r--r--app/workers/activitypub/distribution_worker.rb48
-rw-r--r--app/workers/activitypub/raw_distribution_worker.rb37
-rw-r--r--app/workers/activitypub/reply_distribution_worker.rb34
-rw-r--r--app/workers/activitypub/update_distribution_worker.rb25
-rw-r--r--app/workers/admin/domain_purge_worker.rb9
-rw-r--r--app/workers/distribution_worker.rb4
-rw-r--r--app/workers/feed_insert_worker.rb38
-rw-r--r--app/workers/local_notification_worker.rb2
-rw-r--r--app/workers/move_worker.rb12
-rw-r--r--app/workers/poll_expiration_notify_worker.rb45
-rw-r--r--app/workers/push_update_worker.rb35
-rw-r--r--app/workers/remote_account_refresh_worker.rb24
-rw-r--r--app/workers/scheduler/accounts_statuses_cleanup_scheduler.rb96
-rw-r--r--app/workers/scheduler/follow_recommendations_scheduler.rb4
-rw-r--r--app/workers/scheduler/ip_cleanup_scheduler.rb2
-rw-r--r--app/workers/scheduler/trends/refresh_scheduler.rb (renamed from app/workers/scheduler/trending_tags_scheduler.rb)4
-rw-r--r--app/workers/scheduler/trends/review_notifications_scheduler.rb11
-rw-r--r--app/workers/scheduler/user_cleanup_scheduler.rb9
19 files changed, 312 insertions, 133 deletions
diff --git a/app/workers/activitypub/delivery_worker.rb b/app/workers/activitypub/delivery_worker.rb
index 6c5a576a7..788f2cf80 100644
--- a/app/workers/activitypub/delivery_worker.rb
+++ b/app/workers/activitypub/delivery_worker.rb
@@ -44,11 +44,7 @@ class ActivityPub::DeliveryWorker
   end
 
   def synchronization_header
-    "collectionId=\"#{account_followers_url(@source_account)}\", digest=\"#{@source_account.remote_followers_hash(inbox_url_prefix)}\", url=\"#{account_followers_synchronization_url(@source_account)}\""
-  end
-
-  def inbox_url_prefix
-    @inbox_url[/http(s?):\/\/[^\/]+\//]
+    "collectionId=\"#{account_followers_url(@source_account)}\", digest=\"#{@source_account.remote_followers_hash(@inbox_url)}\", url=\"#{account_followers_synchronization_url(@source_account)}\""
   end
 
   def perform_request
diff --git a/app/workers/activitypub/distribution_worker.rb b/app/workers/activitypub/distribution_worker.rb
index 09898ca49..575e11025 100644
--- a/app/workers/activitypub/distribution_worker.rb
+++ b/app/workers/activitypub/distribution_worker.rb
@@ -1,54 +1,32 @@
 # frozen_string_literal: true
 
-class ActivityPub::DistributionWorker
-  include Sidekiq::Worker
-  include Payloadable
-
-  sidekiq_options queue: 'push'
-
+class ActivityPub::DistributionWorker < ActivityPub::RawDistributionWorker
+  # Distribute a new status or an edit of a status to all the places
+  # where the status is supposed to go or where it was interacted with
   def perform(status_id)
     @status  = Status.find(status_id)
     @account = @status.account
 
-    return if skip_distribution?
-
-    ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url|
-      [payload, @account.id, inbox_url, { synchronize_followers: !@status.distributable? }]
-    end
-
-    relay! if relayable?
+    distribute!
   rescue ActiveRecord::RecordNotFound
     true
   end
 
-  private
-
-  def skip_distribution?
-    @status.direct_visibility? || @status.limited_visibility?
-  end
-
-  def relayable?
-    @status.public_visibility?
-  end
+  protected
 
   def inboxes
-    # Deliver the status to all followers.
-    # If the status is a reply to another local status, also forward it to that
-    # status' authors' followers.
-    @inboxes ||= if @status.in_reply_to_local_account? && @status.distributable?
-                   @account.followers.or(@status.thread.account.followers).inboxes
-                 else
-                   @account.followers.inboxes
-                 end
+    @inboxes ||= StatusReachFinder.new(@status).inboxes
   end
 
   def payload
-    @payload ||= Oj.dump(serialize_payload(ActivityPub::ActivityPresenter.from_status(@status), ActivityPub::ActivitySerializer, signer: @account))
+    @payload ||= Oj.dump(serialize_payload(activity, ActivityPub::ActivitySerializer, signer: @account))
+  end
+
+  def activity
+    ActivityPub::ActivityPresenter.from_status(@status)
   end
 
-  def relay!
-    ActivityPub::DeliveryWorker.push_bulk(Relay.enabled.pluck(:inbox_url)) do |inbox_url|
-      [payload, @account.id, inbox_url]
-    end
+  def options
+    { 'synchronize_followers' => @status.private_visibility? }
   end
 end
diff --git a/app/workers/activitypub/raw_distribution_worker.rb b/app/workers/activitypub/raw_distribution_worker.rb
index 41e61132f..8ecc17db9 100644
--- a/app/workers/activitypub/raw_distribution_worker.rb
+++ b/app/workers/activitypub/raw_distribution_worker.rb
@@ -2,22 +2,47 @@
 
 class ActivityPub::RawDistributionWorker
   include Sidekiq::Worker
+  include Payloadable
 
   sidekiq_options queue: 'push'
 
+  # Base worker for when you want to queue up a bunch of deliveries of
+  # some payload. In this case, we have already generated JSON and
+  # we are going to distribute it to the account's followers minus
+  # the explicitly provided inboxes
   def perform(json, source_account_id, exclude_inboxes = [])
-    @account = Account.find(source_account_id)
+    @account         = Account.find(source_account_id)
+    @json            = json
+    @exclude_inboxes = exclude_inboxes
 
-    ActivityPub::DeliveryWorker.push_bulk(inboxes - exclude_inboxes) do |inbox_url|
-      [json, @account.id, inbox_url]
-    end
+    distribute!
   rescue ActiveRecord::RecordNotFound
     true
   end
 
-  private
+  protected
+
+  def distribute!
+    return if inboxes.empty?
+
+    ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url|
+      [payload, source_account_id, inbox_url, options]
+    end
+  end
+
+  def payload
+    @json
+  end
+
+  def source_account_id
+    @account.id
+  end
 
   def inboxes
-    @inboxes ||= @account.followers.inboxes
+    @inboxes ||= @account.followers.inboxes - @exclude_inboxes
+  end
+
+  def options
+    {}
   end
 end
diff --git a/app/workers/activitypub/reply_distribution_worker.rb b/app/workers/activitypub/reply_distribution_worker.rb
deleted file mode 100644
index d4d0148ac..000000000
--- a/app/workers/activitypub/reply_distribution_worker.rb
+++ /dev/null
@@ -1,34 +0,0 @@
-# frozen_string_literal: true
-
-# Obsolete but kept around to make sure existing jobs do not fail after upgrade.
-# Should be removed in a subsequent release.
-
-class ActivityPub::ReplyDistributionWorker
-  include Sidekiq::Worker
-  include Payloadable
-
-  sidekiq_options queue: 'push'
-
-  def perform(status_id)
-    @status  = Status.find(status_id)
-    @account = @status.thread&.account
-
-    return unless @account.present? && @status.distributable?
-
-    ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url|
-      [payload, @status.account_id, inbox_url]
-    end
-  rescue ActiveRecord::RecordNotFound
-    true
-  end
-
-  private
-
-  def inboxes
-    @inboxes ||= @account.followers.inboxes
-  end
-
-  def payload
-    @payload ||= Oj.dump(serialize_payload(ActivityPub::ActivityPresenter.from_status(@status), ActivityPub::ActivitySerializer, signer: @status.account))
-  end
-end
diff --git a/app/workers/activitypub/update_distribution_worker.rb b/app/workers/activitypub/update_distribution_worker.rb
index 3a207f071..81fde63b8 100644
--- a/app/workers/activitypub/update_distribution_worker.rb
+++ b/app/workers/activitypub/update_distribution_worker.rb
@@ -1,33 +1,24 @@
 # frozen_string_literal: true
 
-class ActivityPub::UpdateDistributionWorker
-  include Sidekiq::Worker
-  include Payloadable
-
-  sidekiq_options queue: 'push'
-
+class ActivityPub::UpdateDistributionWorker < ActivityPub::RawDistributionWorker
+  # Distribute an profile update to servers that might have a copy
+  # of the account in question
   def perform(account_id, options = {})
     @options = options.with_indifferent_access
     @account = Account.find(account_id)
 
-    ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url|
-      [signed_payload, @account.id, inbox_url]
-    end
-
-    ActivityPub::DeliveryWorker.push_bulk(Relay.enabled.pluck(:inbox_url)) do |inbox_url|
-      [signed_payload, @account.id, inbox_url]
-    end
+    distribute!
   rescue ActiveRecord::RecordNotFound
     true
   end
 
-  private
+  protected
 
   def inboxes
-    @inboxes ||= @account.followers.inboxes
+    @inboxes ||= AccountReachFinder.new(@account).inboxes
   end
 
-  def signed_payload
-    @signed_payload ||= Oj.dump(serialize_payload(@account, ActivityPub::UpdateSerializer, signer: @account, sign_with: @options[:sign_with]))
+  def payload
+    @payload ||= Oj.dump(serialize_payload(@account, ActivityPub::UpdateSerializer, signer: @account, sign_with: @options[:sign_with]))
   end
 end
diff --git a/app/workers/admin/domain_purge_worker.rb b/app/workers/admin/domain_purge_worker.rb
new file mode 100644
index 000000000..7cba2c89e
--- /dev/null
+++ b/app/workers/admin/domain_purge_worker.rb
@@ -0,0 +1,9 @@
+# frozen_string_literal: true
+
+class Admin::DomainPurgeWorker
+  include Sidekiq::Worker
+
+  def perform(domain)
+    PurgeDomainService.new.call(domain)
+  end
+end
diff --git a/app/workers/distribution_worker.rb b/app/workers/distribution_worker.rb
index e85cd7e95..770325ccf 100644
--- a/app/workers/distribution_worker.rb
+++ b/app/workers/distribution_worker.rb
@@ -3,10 +3,10 @@
 class DistributionWorker
   include Sidekiq::Worker
 
-  def perform(status_id)
+  def perform(status_id, options = {})
     RedisLock.acquire(redis: Redis.current, key: "distribute:#{status_id}", autorelease: 5.minutes.seconds) do |lock|
       if lock.acquired?
-        FanOutOnWriteService.new.call(Status.find(status_id))
+        FanOutOnWriteService.new.call(Status.find(status_id), **options.symbolize_keys)
       else
         raise Mastodon::RaceConditionError
       end
diff --git a/app/workers/feed_insert_worker.rb b/app/workers/feed_insert_worker.rb
index 45e6bb88d..b81b09cac 100644
--- a/app/workers/feed_insert_worker.rb
+++ b/app/workers/feed_insert_worker.rb
@@ -3,9 +3,10 @@
 class FeedInsertWorker
   include Sidekiq::Worker
 
-  def perform(status_id, id, type = :home)
-    @type     = type.to_sym
-    @status   = Status.find(status_id)
+  def perform(status_id, id, type = 'home', options = {})
+    @type      = type.to_sym
+    @status    = Status.find(status_id)
+    @options   = options.symbolize_keys
 
     case @type
     when :home
@@ -25,10 +26,12 @@ class FeedInsertWorker
   private
 
   def check_and_insert
-    return if feed_filtered?
-
-    perform_push
-    perform_notify if notify?
+    if feed_filtered?
+      perform_unpush if update?
+    else
+      perform_push
+      perform_notify if notify?
+    end
   end
 
   def feed_filtered?
@@ -51,15 +54,30 @@ class FeedInsertWorker
   def perform_push
     case @type
     when :home
-      FeedManager.instance.push_to_home(@follower, @status)
+      FeedManager.instance.push_to_home(@follower, @status, update: update?)
     when :list
-      FeedManager.instance.push_to_list(@list, @status)
+      FeedManager.instance.push_to_list(@list, @status, update: update?)
     when :direct
-      FeedManager.instance.push_to_direct(@account, @status)
+      FeedManager.instance.push_to_direct(@account, @status, update: update?)
+    end
+  end
+
+  def perform_unpush
+    case @type
+    when :home
+      FeedManager.instance.unpush_from_home(@follower, @status, update: true)
+    when :list
+      FeedManager.instance.unpush_from_list(@list, @status, update: true)
+    when :direct
+      FeedManager.instance.unpush_from_direct(@account, @status, update: true)
     end
   end
 
   def perform_notify
     NotifyService.new.call(@follower, :status, @status)
   end
+
+  def update?
+    @options[:update]
+  end
 end
diff --git a/app/workers/local_notification_worker.rb b/app/workers/local_notification_worker.rb
index 6b08ca6fc..a22e2834d 100644
--- a/app/workers/local_notification_worker.rb
+++ b/app/workers/local_notification_worker.rb
@@ -12,6 +12,8 @@ class LocalNotificationWorker
       activity = activity_class_name.constantize.find(activity_id)
     end
 
+    return if Notification.where(account: receiver, activity: activity).any?
+
     NotifyService.new.call(receiver, type || activity_class_name.underscore, activity)
   rescue ActiveRecord::RecordNotFound
     true
diff --git a/app/workers/move_worker.rb b/app/workers/move_worker.rb
index 53a6b87f1..4a900e3b8 100644
--- a/app/workers/move_worker.rb
+++ b/app/workers/move_worker.rb
@@ -47,16 +47,22 @@ class MoveWorker
 
   def copy_account_notes!
     AccountNote.where(target_account: @source_account).find_each do |note|
-      text = I18n.with_locale(note.account.user.locale || I18n.default_locale) do
+      text = I18n.with_locale(note.account.user&.locale || I18n.default_locale) do
         I18n.t('move_handler.copy_account_note_text', acct: @source_account.acct)
       end
 
       new_note = AccountNote.find_by(account: note.account, target_account: @target_account)
       if new_note.nil?
-        AccountNote.create!(account: note.account, target_account: @target_account, comment: [text, note.comment].join("\n"))
+        begin
+          AccountNote.create!(account: note.account, target_account: @target_account, comment: [text, note.comment].join("\n"))
+        rescue ActiveRecord::RecordInvalid
+          AccountNote.create!(account: note.account, target_account: @target_account, comment: note.comment)
+        end
       else
         new_note.update!(comment: [text, note.comment, "\n", new_note.comment].join("\n"))
       end
+    rescue ActiveRecord::RecordInvalid
+      nil
     rescue => e
       @deferred_error = e
     end
@@ -84,7 +90,7 @@ class MoveWorker
 
   def add_account_note_if_needed!(account, id)
     unless AccountNote.where(account: account, target_account: @target_account).exists?
-      text = I18n.with_locale(account.user.locale || I18n.default_locale) do
+      text = I18n.with_locale(account.user&.locale || I18n.default_locale) do
         I18n.t(id, acct: @source_account.acct)
       end
       AccountNote.create!(account: account, target_account: @target_account, comment: text)
diff --git a/app/workers/poll_expiration_notify_worker.rb b/app/workers/poll_expiration_notify_worker.rb
index f0191d479..7613ed5f1 100644
--- a/app/workers/poll_expiration_notify_worker.rb
+++ b/app/workers/poll_expiration_notify_worker.rb
@@ -6,19 +6,44 @@ class PollExpirationNotifyWorker
   sidekiq_options lock: :until_executed
 
   def perform(poll_id)
-    poll = Poll.find(poll_id)
+    @poll = Poll.find(poll_id)
 
-    # Notify poll owner and remote voters
-    if poll.local?
-      ActivityPub::DistributePollUpdateWorker.perform_async(poll.status.id)
-      NotifyService.new.call(poll.account, :poll, poll)
-    end
+    return if does_not_expire?
+    requeue! && return if not_due_yet?
 
-    # Notify local voters
-    poll.votes.includes(:account).group(:account_id).select(:account_id).map(&:account).select(&:local?).each do |account|
-      NotifyService.new.call(account, :poll, poll)
-    end
+    notify_remote_voters_and_owner! if @poll.local?
+    notify_local_voters!
   rescue ActiveRecord::RecordNotFound
     true
   end
+
+  def self.remove_from_scheduled(poll_id)
+    queue = Sidekiq::ScheduledSet.new
+    queue.select { |scheduled| scheduled.klass == name && scheduled.args[0] == poll_id }.map(&:delete)
+  end
+
+  private
+
+  def does_not_expire?
+    @poll.expires_at.nil?
+  end
+
+  def not_due_yet?
+    @poll.expires_at.present? && !@poll.expired?
+  end
+
+  def requeue!
+    PollExpirationNotifyWorker.perform_at(@poll.expires_at + 5.minutes, @poll.id)
+  end
+
+  def notify_remote_voters_and_owner!
+    ActivityPub::DistributePollUpdateWorker.perform_async(@poll.status.id)
+    NotifyService.new.call(@poll.account, :poll, @poll)
+  end
+
+  def notify_local_voters!
+    @poll.voters.merge(Account.local).find_each do |account|
+      NotifyService.new.call(account, :poll, @poll)
+    end
+  end
 end
diff --git a/app/workers/push_update_worker.rb b/app/workers/push_update_worker.rb
index d76d73d96..ae444cfde 100644
--- a/app/workers/push_update_worker.rb
+++ b/app/workers/push_update_worker.rb
@@ -2,15 +2,38 @@
 
 class PushUpdateWorker
   include Sidekiq::Worker
+  include Redisable
 
-  def perform(account_id, status_id, timeline_id = nil)
-    account     = Account.find(account_id)
-    status      = Status.find(status_id)
-    message     = InlineRenderer.render(status, account, :status)
-    timeline_id = "timeline:#{account.id}" if timeline_id.nil?
+  def perform(account_id, status_id, timeline_id = nil, options = {})
+    @account     = Account.find(account_id)
+    @status      = Status.find(status_id)
+    @timeline_id = timeline_id || "timeline:#{account.id}"
+    @options     = options.symbolize_keys
 
-    Redis.current.publish(timeline_id, Oj.dump(event: :update, payload: message, queued_at: (Time.now.to_f * 1000.0).to_i))
+    publish!
   rescue ActiveRecord::RecordNotFound
     true
   end
+
+  private
+
+  def payload
+    InlineRenderer.render(@status, @account, :status)
+  end
+
+  def message
+    Oj.dump(
+      event: update? ? :'status.update' : :update,
+      payload: payload,
+      queued_at: (Time.now.to_f * 1000.0).to_i
+    )
+  end
+
+  def publish!
+    redis.publish(@timeline_id, message)
+  end
+
+  def update?
+    @options[:update]
+  end
 end
diff --git a/app/workers/remote_account_refresh_worker.rb b/app/workers/remote_account_refresh_worker.rb
new file mode 100644
index 000000000..9632936b5
--- /dev/null
+++ b/app/workers/remote_account_refresh_worker.rb
@@ -0,0 +1,24 @@
+# frozen_string_literal: true
+
+class RemoteAccountRefreshWorker
+  include Sidekiq::Worker
+  include ExponentialBackoff
+  include JsonLdHelper
+
+  sidekiq_options queue: 'pull', retry: 3
+
+  def perform(id)
+    account = Account.find_by(id: id)
+    return if account.nil? || account.local?
+
+    ActivityPub::FetchRemoteAccountService.new.call(account.uri)
+  rescue Mastodon::UnexpectedResponseError => e
+    response = e.response
+
+    if response_error_unsalvageable?(response)
+      # Give up
+    else
+      raise e
+    end
+  end
+end
diff --git a/app/workers/scheduler/accounts_statuses_cleanup_scheduler.rb b/app/workers/scheduler/accounts_statuses_cleanup_scheduler.rb
new file mode 100644
index 000000000..f42d4bca6
--- /dev/null
+++ b/app/workers/scheduler/accounts_statuses_cleanup_scheduler.rb
@@ -0,0 +1,96 @@
+# frozen_string_literal: true
+
+class Scheduler::AccountsStatusesCleanupScheduler
+  include Sidekiq::Worker
+
+  # This limit is mostly to be nice to the fediverse at large and not
+  # generate too much traffic.
+  # This also helps limiting the running time of the scheduler itself.
+  MAX_BUDGET         = 50
+
+  # This is an attempt to spread the load across instances, as various
+  # accounts are likely to have various followers.
+  PER_ACCOUNT_BUDGET = 5
+
+  # This is an attempt to limit the workload generated by status removal
+  # jobs to something the particular instance can handle.
+  PER_THREAD_BUDGET  = 5
+
+  # Those avoid loading an instance that is already under load
+  MAX_DEFAULT_SIZE    = 2
+  MAX_DEFAULT_LATENCY = 5
+  MAX_PUSH_SIZE       = 5
+  MAX_PUSH_LATENCY    = 10
+  # 'pull' queue has lower priority jobs, and it's unlikely that pushing
+  # deletes would cause much issues with this queue if it didn't cause issues
+  # with default and push. Yet, do not enqueue deletes if the instance is
+  # lagging behind too much.
+  MAX_PULL_SIZE       = 500
+  MAX_PULL_LATENCY    = 300
+
+  # This is less of an issue in general, but deleting old statuses is likely
+  # to cause delivery errors, and thus increase the number of jobs to be retried.
+  # This doesn't directly translate to load, but connection errors and a high
+  # number of dead instances may lead to this spiraling out of control if
+  # unchecked.
+  MAX_RETRY_SIZE = 50_000
+
+  sidekiq_options retry: 0, lock: :until_executed
+
+  def perform
+    return if under_load?
+
+    budget = compute_budget
+    first_policy_id = last_processed_id
+
+    loop do
+      num_processed_accounts = 0
+
+      scope = AccountStatusesCleanupPolicy.where(enabled: true)
+      scope.where(Account.arel_table[:id].gt(first_policy_id)) if first_policy_id.present?
+      scope.find_each(order: :asc) do |policy|
+        num_deleted = AccountStatusesCleanupService.new.call(policy, [budget, PER_ACCOUNT_BUDGET].min)
+        num_processed_accounts += 1 unless num_deleted.zero?
+        budget -= num_deleted
+        if budget.zero?
+          save_last_processed_id(policy.id)
+          break
+        end
+      end
+
+      # The idea here is to loop through all policies at least once until the budget is exhausted
+      # and start back after the last processed account otherwise
+      break if budget.zero? || (num_processed_accounts.zero? && first_policy_id.nil?)
+      first_policy_id = nil
+    end
+  end
+
+  def compute_budget
+    threads = Sidekiq::ProcessSet.new.filter { |x| x['queues'].include?('push') }.map { |x| x['concurrency'] }.sum
+    [PER_THREAD_BUDGET * threads, MAX_BUDGET].min
+  end
+
+  def under_load?
+    return true if Sidekiq::Stats.new.retry_size > MAX_RETRY_SIZE
+    queue_under_load?('default', MAX_DEFAULT_SIZE, MAX_DEFAULT_LATENCY) || queue_under_load?('push', MAX_PUSH_SIZE, MAX_PUSH_LATENCY) || queue_under_load?('pull', MAX_PULL_SIZE, MAX_PULL_LATENCY)
+  end
+
+  private
+
+  def queue_under_load?(name, max_size, max_latency)
+    queue = Sidekiq::Queue.new(name)
+    queue.size > max_size || queue.latency > max_latency
+  end
+
+  def last_processed_id
+    Redis.current.get('account_statuses_cleanup_scheduler:last_account_id')
+  end
+
+  def save_last_processed_id(id)
+    if id.nil?
+      Redis.current.del('account_statuses_cleanup_scheduler:last_account_id')
+    else
+      Redis.current.set('account_statuses_cleanup_scheduler:last_account_id', id, ex: 1.hour.seconds)
+    end
+  end
+end
diff --git a/app/workers/scheduler/follow_recommendations_scheduler.rb b/app/workers/scheduler/follow_recommendations_scheduler.rb
index cb1e15961..effc63e59 100644
--- a/app/workers/scheduler/follow_recommendations_scheduler.rb
+++ b/app/workers/scheduler/follow_recommendations_scheduler.rb
@@ -16,12 +16,12 @@ class Scheduler::FollowRecommendationsScheduler
     AccountSummary.refresh
     FollowRecommendation.refresh
 
-    fallback_recommendations = FollowRecommendation.limit(SET_SIZE).index_by(&:account_id)
+    fallback_recommendations = FollowRecommendation.order(rank: :desc).limit(SET_SIZE).index_by(&:account_id)
 
     I18n.available_locales.each do |locale|
       recommendations = begin
         if AccountSummary.safe.filtered.localized(locale).exists? # We can skip the work if no accounts with that language exist
-          FollowRecommendation.localized(locale).limit(SET_SIZE).index_by(&:account_id)
+          FollowRecommendation.localized(locale).order(rank: :desc).limit(SET_SIZE).index_by(&:account_id)
         else
           {}
         end
diff --git a/app/workers/scheduler/ip_cleanup_scheduler.rb b/app/workers/scheduler/ip_cleanup_scheduler.rb
index 918c10ac9..adc99c605 100644
--- a/app/workers/scheduler/ip_cleanup_scheduler.rb
+++ b/app/workers/scheduler/ip_cleanup_scheduler.rb
@@ -16,7 +16,7 @@ class Scheduler::IpCleanupScheduler
 
   def clean_ip_columns!
     SessionActivation.where('updated_at < ?', IP_RETENTION_PERIOD.ago).in_batches.destroy_all
-    User.where('current_sign_in_at < ?', IP_RETENTION_PERIOD.ago).in_batches.update_all(last_sign_in_ip: nil, current_sign_in_ip: nil, sign_up_ip: nil)
+    User.where('current_sign_in_at < ?', IP_RETENTION_PERIOD.ago).in_batches.update_all(sign_up_ip: nil)
     LoginActivity.where('created_at < ?', IP_RETENTION_PERIOD.ago).in_batches.destroy_all
   end
 
diff --git a/app/workers/scheduler/trending_tags_scheduler.rb b/app/workers/scheduler/trends/refresh_scheduler.rb
index 94d76d010..b559ba46b 100644
--- a/app/workers/scheduler/trending_tags_scheduler.rb
+++ b/app/workers/scheduler/trends/refresh_scheduler.rb
@@ -1,11 +1,11 @@
 # frozen_string_literal: true
 
-class Scheduler::TrendingTagsScheduler
+class Scheduler::Trends::RefreshScheduler
   include Sidekiq::Worker
 
   sidekiq_options retry: 0
 
   def perform
-    TrendingTags.update! if Setting.trends
+    Trends.refresh!
   end
 end
diff --git a/app/workers/scheduler/trends/review_notifications_scheduler.rb b/app/workers/scheduler/trends/review_notifications_scheduler.rb
new file mode 100644
index 000000000..f334261bd
--- /dev/null
+++ b/app/workers/scheduler/trends/review_notifications_scheduler.rb
@@ -0,0 +1,11 @@
+# frozen_string_literal: true
+
+class Scheduler::Trends::ReviewNotificationsScheduler
+  include Sidekiq::Worker
+
+  sidekiq_options retry: 0
+
+  def perform
+    Trends.request_review!
+  end
+end
diff --git a/app/workers/scheduler/user_cleanup_scheduler.rb b/app/workers/scheduler/user_cleanup_scheduler.rb
index be0c4277d..750d2127b 100644
--- a/app/workers/scheduler/user_cleanup_scheduler.rb
+++ b/app/workers/scheduler/user_cleanup_scheduler.rb
@@ -8,6 +8,7 @@ class Scheduler::UserCleanupScheduler
   def perform
     clean_unconfirmed_accounts!
     clean_suspended_accounts!
+    clean_discarded_statuses!
   end
 
   private
@@ -24,4 +25,12 @@ class Scheduler::UserCleanupScheduler
       Admin::AccountDeletionWorker.perform_async(deletion_request.account_id)
     end
   end
+
+  def clean_discarded_statuses!
+    Status.discarded.where('deleted_at <= ?', 30.days.ago).find_in_batches do |statuses|
+      RemovalWorker.push_bulk(statuses) do |status|
+        [status.id, { 'immediate' => true }]
+      end
+    end
+  end
 end