diff options
Diffstat (limited to 'app/workers')
-rw-r--r-- | app/workers/scheduler/accounts_statuses_cleanup_scheduler.rb | 96 |
1 files changed, 96 insertions, 0 deletions
diff --git a/app/workers/scheduler/accounts_statuses_cleanup_scheduler.rb b/app/workers/scheduler/accounts_statuses_cleanup_scheduler.rb new file mode 100644 index 000000000..f42d4bca6 --- /dev/null +++ b/app/workers/scheduler/accounts_statuses_cleanup_scheduler.rb @@ -0,0 +1,96 @@ +# frozen_string_literal: true + +class Scheduler::AccountsStatusesCleanupScheduler + include Sidekiq::Worker + + # This limit is mostly to be nice to the fediverse at large and not + # generate too much traffic. + # This also helps limiting the running time of the scheduler itself. + MAX_BUDGET = 50 + + # This is an attempt to spread the load across instances, as various + # accounts are likely to have various followers. + PER_ACCOUNT_BUDGET = 5 + + # This is an attempt to limit the workload generated by status removal + # jobs to something the particular instance can handle. + PER_THREAD_BUDGET = 5 + + # Those avoid loading an instance that is already under load + MAX_DEFAULT_SIZE = 2 + MAX_DEFAULT_LATENCY = 5 + MAX_PUSH_SIZE = 5 + MAX_PUSH_LATENCY = 10 + # 'pull' queue has lower priority jobs, and it's unlikely that pushing + # deletes would cause much issues with this queue if it didn't cause issues + # with default and push. Yet, do not enqueue deletes if the instance is + # lagging behind too much. + MAX_PULL_SIZE = 500 + MAX_PULL_LATENCY = 300 + + # This is less of an issue in general, but deleting old statuses is likely + # to cause delivery errors, and thus increase the number of jobs to be retried. + # This doesn't directly translate to load, but connection errors and a high + # number of dead instances may lead to this spiraling out of control if + # unchecked. + MAX_RETRY_SIZE = 50_000 + + sidekiq_options retry: 0, lock: :until_executed + + def perform + return if under_load? + + budget = compute_budget + first_policy_id = last_processed_id + + loop do + num_processed_accounts = 0 + + scope = AccountStatusesCleanupPolicy.where(enabled: true) + scope.where(Account.arel_table[:id].gt(first_policy_id)) if first_policy_id.present? + scope.find_each(order: :asc) do |policy| + num_deleted = AccountStatusesCleanupService.new.call(policy, [budget, PER_ACCOUNT_BUDGET].min) + num_processed_accounts += 1 unless num_deleted.zero? + budget -= num_deleted + if budget.zero? + save_last_processed_id(policy.id) + break + end + end + + # The idea here is to loop through all policies at least once until the budget is exhausted + # and start back after the last processed account otherwise + break if budget.zero? || (num_processed_accounts.zero? && first_policy_id.nil?) + first_policy_id = nil + end + end + + def compute_budget + threads = Sidekiq::ProcessSet.new.filter { |x| x['queues'].include?('push') }.map { |x| x['concurrency'] }.sum + [PER_THREAD_BUDGET * threads, MAX_BUDGET].min + end + + def under_load? + return true if Sidekiq::Stats.new.retry_size > MAX_RETRY_SIZE + queue_under_load?('default', MAX_DEFAULT_SIZE, MAX_DEFAULT_LATENCY) || queue_under_load?('push', MAX_PUSH_SIZE, MAX_PUSH_LATENCY) || queue_under_load?('pull', MAX_PULL_SIZE, MAX_PULL_LATENCY) + end + + private + + def queue_under_load?(name, max_size, max_latency) + queue = Sidekiq::Queue.new(name) + queue.size > max_size || queue.latency > max_latency + end + + def last_processed_id + Redis.current.get('account_statuses_cleanup_scheduler:last_account_id') + end + + def save_last_processed_id(id) + if id.nil? + Redis.current.del('account_statuses_cleanup_scheduler:last_account_id') + else + Redis.current.set('account_statuses_cleanup_scheduler:last_account_id', id, ex: 1.hour.seconds) + end + end +end |