about summary refs log tree commit diff
path: root/app/workers/scheduler/accounts_statuses_cleanup_scheduler.rb
blob: bd92fe32c40c4462a2d3bd1bc52537cb84f505c1 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# frozen_string_literal: true

class Scheduler::AccountsStatusesCleanupScheduler
  include Sidekiq::Worker
  include Redisable

  # This limit is mostly to be nice to the fediverse at large and not
  # generate too much traffic.
  # This also helps limiting the running time of the scheduler itself.
  MAX_BUDGET         = 50

  # This is an attempt to spread the load across instances, as various
  # accounts are likely to have various followers.
  PER_ACCOUNT_BUDGET = 5

  # This is an attempt to limit the workload generated by status removal
  # jobs to something the particular instance can handle.
  PER_THREAD_BUDGET  = 5

  # Those avoid loading an instance that is already under load
  MAX_DEFAULT_SIZE    = 2
  MAX_DEFAULT_LATENCY = 5
  MAX_PUSH_SIZE       = 5
  MAX_PUSH_LATENCY    = 10
  # 'pull' queue has lower priority jobs, and it's unlikely that pushing
  # deletes would cause much issues with this queue if it didn't cause issues
  # with default and push. Yet, do not enqueue deletes if the instance is
  # lagging behind too much.
  MAX_PULL_SIZE       = 500
  MAX_PULL_LATENCY    = 300

  # This is less of an issue in general, but deleting old statuses is likely
  # to cause delivery errors, and thus increase the number of jobs to be retried.
  # This doesn't directly translate to load, but connection errors and a high
  # number of dead instances may lead to this spiraling out of control if
  # unchecked.
  MAX_RETRY_SIZE = 50_000

  sidekiq_options retry: 0, lock: :until_executed

  def perform
    return if under_load?

    budget = compute_budget
    first_policy_id = last_processed_id

    loop do
      num_processed_accounts = 0

      scope = AccountStatusesCleanupPolicy.where(enabled: true)
      scope.where(Account.arel_table[:id].gt(first_policy_id)) if first_policy_id.present?
      scope.find_each(order: :asc) do |policy|
        num_deleted = AccountStatusesCleanupService.new.call(policy, [budget, PER_ACCOUNT_BUDGET].min)
        num_processed_accounts += 1 unless num_deleted.zero?
        budget -= num_deleted
        if budget.zero?
          save_last_processed_id(policy.id)
          break
        end
      end

      # The idea here is to loop through all policies at least once until the budget is exhausted
      # and start back after the last processed account otherwise
      break if budget.zero? || (num_processed_accounts.zero? && first_policy_id.nil?)
      first_policy_id = nil
    end
  end

  def compute_budget
    threads = Sidekiq::ProcessSet.new.select { |x| x['queues'].include?('push') }.map { |x| x['concurrency'] }.sum
    [PER_THREAD_BUDGET * threads, MAX_BUDGET].min
  end

  def under_load?
    return true if Sidekiq::Stats.new.retry_size > MAX_RETRY_SIZE
    queue_under_load?('default', MAX_DEFAULT_SIZE, MAX_DEFAULT_LATENCY) || queue_under_load?('push', MAX_PUSH_SIZE, MAX_PUSH_LATENCY) || queue_under_load?('pull', MAX_PULL_SIZE, MAX_PULL_LATENCY)
  end

  private

  def queue_under_load?(name, max_size, max_latency)
    queue = Sidekiq::Queue.new(name)
    queue.size > max_size || queue.latency > max_latency
  end

  def last_processed_id
    redis.get('account_statuses_cleanup_scheduler:last_account_id')
  end

  def save_last_processed_id(id)
    if id.nil?
      redis.del('account_statuses_cleanup_scheduler:last_account_id')
    else
      redis.set('account_statuses_cleanup_scheduler:last_account_id', id, ex: 1.hour.seconds)
    end
  end
end