about summary refs log tree commit diff
path: root/app/workers/scheduler
diff options
context:
space:
mode:
Diffstat (limited to 'app/workers/scheduler')
-rw-r--r--app/workers/scheduler/accounts_statuses_cleanup_scheduler.rb96
-rw-r--r--app/workers/scheduler/follow_recommendations_scheduler.rb4
-rw-r--r--app/workers/scheduler/ip_cleanup_scheduler.rb2
-rw-r--r--app/workers/scheduler/trends/refresh_scheduler.rb (renamed from app/workers/scheduler/trending_tags_scheduler.rb)4
-rw-r--r--app/workers/scheduler/trends/review_notifications_scheduler.rb11
-rw-r--r--app/workers/scheduler/user_cleanup_scheduler.rb9
6 files changed, 121 insertions, 5 deletions
diff --git a/app/workers/scheduler/accounts_statuses_cleanup_scheduler.rb b/app/workers/scheduler/accounts_statuses_cleanup_scheduler.rb
new file mode 100644
index 000000000..f42d4bca6
--- /dev/null
+++ b/app/workers/scheduler/accounts_statuses_cleanup_scheduler.rb
@@ -0,0 +1,96 @@
+# frozen_string_literal: true
+
+class Scheduler::AccountsStatusesCleanupScheduler
+  include Sidekiq::Worker
+
+  # This limit is mostly to be nice to the fediverse at large and not
+  # generate too much traffic.
+  # This also helps limiting the running time of the scheduler itself.
+  MAX_BUDGET         = 50
+
+  # This is an attempt to spread the load across instances, as various
+  # accounts are likely to have various followers.
+  PER_ACCOUNT_BUDGET = 5
+
+  # This is an attempt to limit the workload generated by status removal
+  # jobs to something the particular instance can handle.
+  PER_THREAD_BUDGET  = 5
+
+  # Those avoid loading an instance that is already under load
+  MAX_DEFAULT_SIZE    = 2
+  MAX_DEFAULT_LATENCY = 5
+  MAX_PUSH_SIZE       = 5
+  MAX_PUSH_LATENCY    = 10
+  # 'pull' queue has lower priority jobs, and it's unlikely that pushing
+  # deletes would cause much issues with this queue if it didn't cause issues
+  # with default and push. Yet, do not enqueue deletes if the instance is
+  # lagging behind too much.
+  MAX_PULL_SIZE       = 500
+  MAX_PULL_LATENCY    = 300
+
+  # This is less of an issue in general, but deleting old statuses is likely
+  # to cause delivery errors, and thus increase the number of jobs to be retried.
+  # This doesn't directly translate to load, but connection errors and a high
+  # number of dead instances may lead to this spiraling out of control if
+  # unchecked.
+  MAX_RETRY_SIZE = 50_000
+
+  sidekiq_options retry: 0, lock: :until_executed
+
+  def perform
+    return if under_load?
+
+    budget = compute_budget
+    first_policy_id = last_processed_id
+
+    loop do
+      num_processed_accounts = 0
+
+      scope = AccountStatusesCleanupPolicy.where(enabled: true)
+      scope.where(Account.arel_table[:id].gt(first_policy_id)) if first_policy_id.present?
+      scope.find_each(order: :asc) do |policy|
+        num_deleted = AccountStatusesCleanupService.new.call(policy, [budget, PER_ACCOUNT_BUDGET].min)
+        num_processed_accounts += 1 unless num_deleted.zero?
+        budget -= num_deleted
+        if budget.zero?
+          save_last_processed_id(policy.id)
+          break
+        end
+      end
+
+      # The idea here is to loop through all policies at least once until the budget is exhausted
+      # and start back after the last processed account otherwise
+      break if budget.zero? || (num_processed_accounts.zero? && first_policy_id.nil?)
+      first_policy_id = nil
+    end
+  end
+
+  def compute_budget
+    threads = Sidekiq::ProcessSet.new.filter { |x| x['queues'].include?('push') }.map { |x| x['concurrency'] }.sum
+    [PER_THREAD_BUDGET * threads, MAX_BUDGET].min
+  end
+
+  def under_load?
+    return true if Sidekiq::Stats.new.retry_size > MAX_RETRY_SIZE
+    queue_under_load?('default', MAX_DEFAULT_SIZE, MAX_DEFAULT_LATENCY) || queue_under_load?('push', MAX_PUSH_SIZE, MAX_PUSH_LATENCY) || queue_under_load?('pull', MAX_PULL_SIZE, MAX_PULL_LATENCY)
+  end
+
+  private
+
+  def queue_under_load?(name, max_size, max_latency)
+    queue = Sidekiq::Queue.new(name)
+    queue.size > max_size || queue.latency > max_latency
+  end
+
+  def last_processed_id
+    Redis.current.get('account_statuses_cleanup_scheduler:last_account_id')
+  end
+
+  def save_last_processed_id(id)
+    if id.nil?
+      Redis.current.del('account_statuses_cleanup_scheduler:last_account_id')
+    else
+      Redis.current.set('account_statuses_cleanup_scheduler:last_account_id', id, ex: 1.hour.seconds)
+    end
+  end
+end
diff --git a/app/workers/scheduler/follow_recommendations_scheduler.rb b/app/workers/scheduler/follow_recommendations_scheduler.rb
index cb1e15961..effc63e59 100644
--- a/app/workers/scheduler/follow_recommendations_scheduler.rb
+++ b/app/workers/scheduler/follow_recommendations_scheduler.rb
@@ -16,12 +16,12 @@ class Scheduler::FollowRecommendationsScheduler
     AccountSummary.refresh
     FollowRecommendation.refresh
 
-    fallback_recommendations = FollowRecommendation.limit(SET_SIZE).index_by(&:account_id)
+    fallback_recommendations = FollowRecommendation.order(rank: :desc).limit(SET_SIZE).index_by(&:account_id)
 
     I18n.available_locales.each do |locale|
       recommendations = begin
         if AccountSummary.safe.filtered.localized(locale).exists? # We can skip the work if no accounts with that language exist
-          FollowRecommendation.localized(locale).limit(SET_SIZE).index_by(&:account_id)
+          FollowRecommendation.localized(locale).order(rank: :desc).limit(SET_SIZE).index_by(&:account_id)
         else
           {}
         end
diff --git a/app/workers/scheduler/ip_cleanup_scheduler.rb b/app/workers/scheduler/ip_cleanup_scheduler.rb
index 918c10ac9..adc99c605 100644
--- a/app/workers/scheduler/ip_cleanup_scheduler.rb
+++ b/app/workers/scheduler/ip_cleanup_scheduler.rb
@@ -16,7 +16,7 @@ class Scheduler::IpCleanupScheduler
 
   def clean_ip_columns!
     SessionActivation.where('updated_at < ?', IP_RETENTION_PERIOD.ago).in_batches.destroy_all
-    User.where('current_sign_in_at < ?', IP_RETENTION_PERIOD.ago).in_batches.update_all(last_sign_in_ip: nil, current_sign_in_ip: nil, sign_up_ip: nil)
+    User.where('current_sign_in_at < ?', IP_RETENTION_PERIOD.ago).in_batches.update_all(sign_up_ip: nil)
     LoginActivity.where('created_at < ?', IP_RETENTION_PERIOD.ago).in_batches.destroy_all
   end
 
diff --git a/app/workers/scheduler/trending_tags_scheduler.rb b/app/workers/scheduler/trends/refresh_scheduler.rb
index 94d76d010..b559ba46b 100644
--- a/app/workers/scheduler/trending_tags_scheduler.rb
+++ b/app/workers/scheduler/trends/refresh_scheduler.rb
@@ -1,11 +1,11 @@
 # frozen_string_literal: true
 
-class Scheduler::TrendingTagsScheduler
+class Scheduler::Trends::RefreshScheduler
   include Sidekiq::Worker
 
   sidekiq_options retry: 0
 
   def perform
-    TrendingTags.update! if Setting.trends
+    Trends.refresh!
   end
 end
diff --git a/app/workers/scheduler/trends/review_notifications_scheduler.rb b/app/workers/scheduler/trends/review_notifications_scheduler.rb
new file mode 100644
index 000000000..f334261bd
--- /dev/null
+++ b/app/workers/scheduler/trends/review_notifications_scheduler.rb
@@ -0,0 +1,11 @@
+# frozen_string_literal: true
+
+class Scheduler::Trends::ReviewNotificationsScheduler
+  include Sidekiq::Worker
+
+  sidekiq_options retry: 0
+
+  def perform
+    Trends.request_review!
+  end
+end
diff --git a/app/workers/scheduler/user_cleanup_scheduler.rb b/app/workers/scheduler/user_cleanup_scheduler.rb
index be0c4277d..750d2127b 100644
--- a/app/workers/scheduler/user_cleanup_scheduler.rb
+++ b/app/workers/scheduler/user_cleanup_scheduler.rb
@@ -8,6 +8,7 @@ class Scheduler::UserCleanupScheduler
   def perform
     clean_unconfirmed_accounts!
     clean_suspended_accounts!
+    clean_discarded_statuses!
   end
 
   private
@@ -24,4 +25,12 @@ class Scheduler::UserCleanupScheduler
       Admin::AccountDeletionWorker.perform_async(deletion_request.account_id)
     end
   end
+
+  def clean_discarded_statuses!
+    Status.discarded.where('deleted_at <= ?', 30.days.ago).find_in_batches do |statuses|
+      RemovalWorker.push_bulk(statuses) do |status|
+        [status.id, { 'immediate' => true }]
+      end
+    end
+  end
 end