about summary refs log tree commit diff
path: root/app/workers/scheduler/accounts_statuses_cleanup_scheduler.rb
diff options
context:
space:
mode:
authorClaire <claire.github-309c@sitedethib.com>2021-08-09 23:11:50 +0200
committerGitHub <noreply@github.com>2021-08-09 23:11:50 +0200
commit4ac78e2a066508a54de82f1d910ef2fd36c3d106 (patch)
tree350b4b7465ae73a9ad3adb55512586b862f13e9e /app/workers/scheduler/accounts_statuses_cleanup_scheduler.rb
parent432e3d1eaf816b142959afeda0490641ddcfdf61 (diff)
Add feature to automatically delete old toots (#16529)
* Add account statuses cleanup policy model

* Record last inspected toot to delete to speed up successive calls to statuses_to_delete

* Add service to cleanup a given account's statuses within a budget

* Add worker to go through account policies and delete old toots

* Fix last inspected status id logic

All existing statuses older or equal to last inspected status id must be
kept by the current policy. This is an invariant that must be kept so that
resuming deletion from the last inspected status remains sound.

* Add tests

* Refactor scheduler and add tests

* Add user interface

* Add support for discriminating based on boosts/favs

* Add UI support for min_reblogs and min_favs, rework UI

* Address first round of review comments

* Replace Snowflake#id_at_start with with_random parameter

* Add tests

* Add tests for StatusesCleanupController

* Rework settings page

* Adjust load-avoiding mechanisms

* Please CodeClimate
Diffstat (limited to 'app/workers/scheduler/accounts_statuses_cleanup_scheduler.rb')
-rw-r--r--app/workers/scheduler/accounts_statuses_cleanup_scheduler.rb96
1 files changed, 96 insertions, 0 deletions
diff --git a/app/workers/scheduler/accounts_statuses_cleanup_scheduler.rb b/app/workers/scheduler/accounts_statuses_cleanup_scheduler.rb
new file mode 100644
index 000000000..f42d4bca6
--- /dev/null
+++ b/app/workers/scheduler/accounts_statuses_cleanup_scheduler.rb
@@ -0,0 +1,96 @@
+# frozen_string_literal: true
+
+class Scheduler::AccountsStatusesCleanupScheduler
+  include Sidekiq::Worker
+
+  # This limit is mostly to be nice to the fediverse at large and not
+  # generate too much traffic.
+  # This also helps limiting the running time of the scheduler itself.
+  MAX_BUDGET         = 50
+
+  # This is an attempt to spread the load across instances, as various
+  # accounts are likely to have various followers.
+  PER_ACCOUNT_BUDGET = 5
+
+  # This is an attempt to limit the workload generated by status removal
+  # jobs to something the particular instance can handle.
+  PER_THREAD_BUDGET  = 5
+
+  # Those avoid loading an instance that is already under load
+  MAX_DEFAULT_SIZE    = 2
+  MAX_DEFAULT_LATENCY = 5
+  MAX_PUSH_SIZE       = 5
+  MAX_PUSH_LATENCY    = 10
+  # 'pull' queue has lower priority jobs, and it's unlikely that pushing
+  # deletes would cause much issues with this queue if it didn't cause issues
+  # with default and push. Yet, do not enqueue deletes if the instance is
+  # lagging behind too much.
+  MAX_PULL_SIZE       = 500
+  MAX_PULL_LATENCY    = 300
+
+  # This is less of an issue in general, but deleting old statuses is likely
+  # to cause delivery errors, and thus increase the number of jobs to be retried.
+  # This doesn't directly translate to load, but connection errors and a high
+  # number of dead instances may lead to this spiraling out of control if
+  # unchecked.
+  MAX_RETRY_SIZE = 50_000
+
+  sidekiq_options retry: 0, lock: :until_executed
+
+  def perform
+    return if under_load?
+
+    budget = compute_budget
+    first_policy_id = last_processed_id
+
+    loop do
+      num_processed_accounts = 0
+
+      scope = AccountStatusesCleanupPolicy.where(enabled: true)
+      scope.where(Account.arel_table[:id].gt(first_policy_id)) if first_policy_id.present?
+      scope.find_each(order: :asc) do |policy|
+        num_deleted = AccountStatusesCleanupService.new.call(policy, [budget, PER_ACCOUNT_BUDGET].min)
+        num_processed_accounts += 1 unless num_deleted.zero?
+        budget -= num_deleted
+        if budget.zero?
+          save_last_processed_id(policy.id)
+          break
+        end
+      end
+
+      # The idea here is to loop through all policies at least once until the budget is exhausted
+      # and start back after the last processed account otherwise
+      break if budget.zero? || (num_processed_accounts.zero? && first_policy_id.nil?)
+      first_policy_id = nil
+    end
+  end
+
+  def compute_budget
+    threads = Sidekiq::ProcessSet.new.filter { |x| x['queues'].include?('push') }.map { |x| x['concurrency'] }.sum
+    [PER_THREAD_BUDGET * threads, MAX_BUDGET].min
+  end
+
+  def under_load?
+    return true if Sidekiq::Stats.new.retry_size > MAX_RETRY_SIZE
+    queue_under_load?('default', MAX_DEFAULT_SIZE, MAX_DEFAULT_LATENCY) || queue_under_load?('push', MAX_PUSH_SIZE, MAX_PUSH_LATENCY) || queue_under_load?('pull', MAX_PULL_SIZE, MAX_PULL_LATENCY)
+  end
+
+  private
+
+  def queue_under_load?(name, max_size, max_latency)
+    queue = Sidekiq::Queue.new(name)
+    queue.size > max_size || queue.latency > max_latency
+  end
+
+  def last_processed_id
+    Redis.current.get('account_statuses_cleanup_scheduler:last_account_id')
+  end
+
+  def save_last_processed_id(id)
+    if id.nil?
+      Redis.current.del('account_statuses_cleanup_scheduler:last_account_id')
+    else
+      Redis.current.set('account_statuses_cleanup_scheduler:last_account_id', id, ex: 1.hour.seconds)
+    end
+  end
+end