diff options
author | Starfall <us@starfall.systems> | 2023-04-14 19:22:47 -0500 |
---|---|---|
committer | Starfall <us@starfall.systems> | 2023-04-14 19:22:47 -0500 |
commit | 4fe1689de43f4404eb9530fcfbcbfb26d6c1c13a (patch) | |
tree | 6811b845bb7f4966b10dcefa3dea404246f161c7 /app/workers | |
parent | 65c1e53a32cabcdbb7bca57002bb0f6acdebe07e (diff) | |
parent | bed63f6dae0879ac840066b031229e0d139089cd (diff) |
Diffstat (limited to 'app/workers')
21 files changed, 85 insertions, 77 deletions
diff --git a/app/workers/activitypub/delivery_worker.rb b/app/workers/activitypub/delivery_worker.rb index d9153132b..7c1c14766 100644 --- a/app/workers/activitypub/delivery_worker.rb +++ b/app/workers/activitypub/delivery_worker.rb @@ -10,6 +10,16 @@ class ActivityPub::DeliveryWorker sidekiq_options queue: 'push', retry: 16, dead: false + # Unfortunately, we cannot control Sidekiq's jitter, so add our own + sidekiq_retry_in do |count| + # This is Sidekiq's default delay + delay = (count**4) + 15 + # Our custom jitter, that will be added to Sidekiq's built-in one. + # Sidekiq's built-in jitter is `rand(10) * (count + 1)` + jitter = rand(0.5 * (count**4)) + delay + jitter + end + HEADERS = { 'Content-Type' => 'application/activity+json' }.freeze def perform(json, source_account_id, inbox_url, options = {}) diff --git a/app/workers/activitypub/distribute_poll_update_worker.rb b/app/workers/activitypub/distribute_poll_update_worker.rb index 601075ea6..ebdb78bb3 100644 --- a/app/workers/activitypub/distribute_poll_update_worker.rb +++ b/app/workers/activitypub/distribute_poll_update_worker.rb @@ -12,7 +12,7 @@ class ActivityPub::DistributePollUpdateWorker return if @status.preloadable_poll.nil? || @status.local_only? - ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url| + ActivityPub::DeliveryWorker.push_bulk(inboxes, limit: 1_000) do |inbox_url| [payload, @account.id, inbox_url] end diff --git a/app/workers/activitypub/fetch_replies_worker.rb b/app/workers/activitypub/fetch_replies_worker.rb index 54d98f228..d72bad745 100644 --- a/app/workers/activitypub/fetch_replies_worker.rb +++ b/app/workers/activitypub/fetch_replies_worker.rb @@ -6,8 +6,8 @@ class ActivityPub::FetchRepliesWorker sidekiq_options queue: 'pull', retry: 3 - def perform(parent_status_id, replies_uri) - ActivityPub::FetchRepliesService.new.call(Status.find(parent_status_id), replies_uri) + def perform(parent_status_id, replies_uri, options = {}) + ActivityPub::FetchRepliesService.new.call(Status.find(parent_status_id), replies_uri, **options.deep_symbolize_keys) rescue ActiveRecord::RecordNotFound true end diff --git a/app/workers/activitypub/migrated_follow_delivery_worker.rb b/app/workers/activitypub/migrated_follow_delivery_worker.rb new file mode 100644 index 000000000..daf30e0ae --- /dev/null +++ b/app/workers/activitypub/migrated_follow_delivery_worker.rb @@ -0,0 +1,17 @@ +# frozen_string_literal: true + +class ActivityPub::MigratedFollowDeliveryWorker < ActivityPub::DeliveryWorker + def perform(json, source_account_id, inbox_url, old_target_account_id, options = {}) + super(json, source_account_id, inbox_url, options) + unfollow_old_account!(old_target_account_id) + end + + private + + def unfollow_old_account!(old_target_account_id) + old_target_account = Account.find(old_target_account_id) + UnfollowService.new.call(@source_account, old_target_account, skip_unmerge: true) + rescue + true + end +end diff --git a/app/workers/activitypub/move_distribution_worker.rb b/app/workers/activitypub/move_distribution_worker.rb index 65c5c0d1c..1680fcc76 100644 --- a/app/workers/activitypub/move_distribution_worker.rb +++ b/app/workers/activitypub/move_distribution_worker.rb @@ -10,7 +10,7 @@ class ActivityPub::MoveDistributionWorker @migration = AccountMigration.find(migration_id) @account = @migration.account - ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url| + ActivityPub::DeliveryWorker.push_bulk(inboxes, limit: 1_000) do |inbox_url| [signed_payload, @account.id, inbox_url] end diff --git a/app/workers/activitypub/processing_worker.rb b/app/workers/activitypub/processing_worker.rb index 5e36fab51..1bb94b7f2 100644 --- a/app/workers/activitypub/processing_worker.rb +++ b/app/workers/activitypub/processing_worker.rb @@ -15,6 +15,6 @@ class ActivityPub::ProcessingWorker ActivityPub::ProcessCollectionService.new.call(body, actor, override_timestamps: true, delivered_to_account_id: delivered_to_account_id, delivery: true) rescue ActiveRecord::RecordInvalid => e - Rails.logger.debug "Error processing incoming ActivityPub object: #{e}" + Rails.logger.debug { "Error processing incoming ActivityPub object: #{e}" } end end diff --git a/app/workers/activitypub/raw_distribution_worker.rb b/app/workers/activitypub/raw_distribution_worker.rb index 8ecc17db9..c77821e0f 100644 --- a/app/workers/activitypub/raw_distribution_worker.rb +++ b/app/workers/activitypub/raw_distribution_worker.rb @@ -25,7 +25,7 @@ class ActivityPub::RawDistributionWorker def distribute! return if inboxes.empty? - ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url| + ActivityPub::DeliveryWorker.push_bulk(inboxes, limit: 1_000) do |inbox_url| [payload, source_account_id, inbox_url, options] end end diff --git a/app/workers/backup_worker.rb b/app/workers/backup_worker.rb index 7b0b52844..df933142a 100644 --- a/app/workers/backup_worker.rb +++ b/app/workers/backup_worker.rb @@ -9,12 +9,10 @@ class BackupWorker backup_id = msg['args'].first ActiveRecord::Base.connection_pool.with_connection do - begin - backup = Backup.find(backup_id) - backup.destroy - rescue ActiveRecord::RecordNotFound - true - end + backup = Backup.find(backup_id) + backup.destroy + rescue ActiveRecord::RecordNotFound + true end end diff --git a/app/workers/concerns/exponential_backoff.rb b/app/workers/concerns/exponential_backoff.rb index f2b931e33..7626b2151 100644 --- a/app/workers/concerns/exponential_backoff.rb +++ b/app/workers/concerns/exponential_backoff.rb @@ -5,7 +5,7 @@ module ExponentialBackoff included do sidekiq_retry_in do |count| - 15 + 10 * (count**4) + rand(10 * (count**4)) + 15 + (10 * (count**4)) + rand(10 * (count**4)) end end end diff --git a/app/workers/fetch_reply_worker.rb b/app/workers/fetch_reply_worker.rb index f7aa25e81..68a7414be 100644 --- a/app/workers/fetch_reply_worker.rb +++ b/app/workers/fetch_reply_worker.rb @@ -6,7 +6,7 @@ class FetchReplyWorker sidekiq_options queue: 'pull', retry: 3 - def perform(child_url) - FetchRemoteStatusService.new.call(child_url) + def perform(child_url, options = {}) + FetchRemoteStatusService.new.call(child_url, **options.deep_symbolize_keys) end end diff --git a/app/workers/import/relationship_worker.rb b/app/workers/import/relationship_worker.rb index 6791b15c3..c2728c196 100644 --- a/app/workers/import/relationship_worker.rb +++ b/app/workers/import/relationship_worker.rb @@ -49,7 +49,7 @@ class Import::RelationshipWorker .with_error_handler { |error, handle| error.is_a?(HTTP::Error) || error.is_a?(OpenSSL::SSL::SSLError) ? handle.call(error) : raise(error) } .run else - block.call + yield end end end diff --git a/app/workers/poll_expiration_notify_worker.rb b/app/workers/poll_expiration_notify_worker.rb index 0e29a5f60..b7a60fab8 100644 --- a/app/workers/poll_expiration_notify_worker.rb +++ b/app/workers/poll_expiration_notify_worker.rb @@ -3,7 +3,7 @@ class PollExpirationNotifyWorker include Sidekiq::Worker - sidekiq_options lock: :until_executed + sidekiq_options lock: :until_executing def perform(poll_id) @poll = Poll.find(poll_id) diff --git a/app/workers/post_process_media_worker.rb b/app/workers/post_process_media_worker.rb index 24201101c..996d5def9 100644 --- a/app/workers/post_process_media_worker.rb +++ b/app/workers/post_process_media_worker.rb @@ -9,13 +9,11 @@ class PostProcessMediaWorker media_attachment_id = msg['args'].first ActiveRecord::Base.connection_pool.with_connection do - begin - media_attachment = MediaAttachment.find(media_attachment_id) - media_attachment.processing = :failed - media_attachment.save - rescue ActiveRecord::RecordNotFound - true - end + media_attachment = MediaAttachment.find(media_attachment_id) + media_attachment.processing = :failed + media_attachment.save + rescue ActiveRecord::RecordNotFound + true end Sidekiq.logger.error("Processing media attachment #{media_attachment_id} failed with #{msg['error_message']}") diff --git a/app/workers/scheduler/accounts_statuses_cleanup_scheduler.rb b/app/workers/scheduler/accounts_statuses_cleanup_scheduler.rb index bd92fe32c..f237f1dc9 100644 --- a/app/workers/scheduler/accounts_statuses_cleanup_scheduler.rb +++ b/app/workers/scheduler/accounts_statuses_cleanup_scheduler.rb @@ -7,7 +7,7 @@ class Scheduler::AccountsStatusesCleanupScheduler # This limit is mostly to be nice to the fediverse at large and not # generate too much traffic. # This also helps limiting the running time of the scheduler itself. - MAX_BUDGET = 50 + MAX_BUDGET = 150 # This is an attempt to spread the load across instances, as various # accounts are likely to have various followers. @@ -15,28 +15,22 @@ class Scheduler::AccountsStatusesCleanupScheduler # This is an attempt to limit the workload generated by status removal # jobs to something the particular instance can handle. - PER_THREAD_BUDGET = 5 + PER_THREAD_BUDGET = 6 # Those avoid loading an instance that is already under load - MAX_DEFAULT_SIZE = 2 + MAX_DEFAULT_SIZE = 200 MAX_DEFAULT_LATENCY = 5 - MAX_PUSH_SIZE = 5 + MAX_PUSH_SIZE = 500 MAX_PUSH_LATENCY = 10 + # 'pull' queue has lower priority jobs, and it's unlikely that pushing # deletes would cause much issues with this queue if it didn't cause issues # with default and push. Yet, do not enqueue deletes if the instance is # lagging behind too much. - MAX_PULL_SIZE = 500 - MAX_PULL_LATENCY = 300 - - # This is less of an issue in general, but deleting old statuses is likely - # to cause delivery errors, and thus increase the number of jobs to be retried. - # This doesn't directly translate to load, but connection errors and a high - # number of dead instances may lead to this spiraling out of control if - # unchecked. - MAX_RETRY_SIZE = 50_000 + MAX_PULL_SIZE = 10_000 + MAX_PULL_LATENCY = 5.minutes.to_i - sidekiq_options retry: 0, lock: :until_executed + sidekiq_options retry: 0, lock: :until_executed, lock_ttl: 1.day.to_i def perform return if under_load? @@ -62,17 +56,17 @@ class Scheduler::AccountsStatusesCleanupScheduler # The idea here is to loop through all policies at least once until the budget is exhausted # and start back after the last processed account otherwise break if budget.zero? || (num_processed_accounts.zero? && first_policy_id.nil?) + first_policy_id = nil end end def compute_budget - threads = Sidekiq::ProcessSet.new.select { |x| x['queues'].include?('push') }.map { |x| x['concurrency'] }.sum + threads = Sidekiq::ProcessSet.new.select { |x| x['queues'].include?('push') }.pluck('concurrency').sum [PER_THREAD_BUDGET * threads, MAX_BUDGET].min end def under_load? - return true if Sidekiq::Stats.new.retry_size > MAX_RETRY_SIZE queue_under_load?('default', MAX_DEFAULT_SIZE, MAX_DEFAULT_LATENCY) || queue_under_load?('push', MAX_PUSH_SIZE, MAX_PUSH_LATENCY) || queue_under_load?('pull', MAX_PULL_SIZE, MAX_PULL_LATENCY) end diff --git a/app/workers/scheduler/follow_recommendations_scheduler.rb b/app/workers/scheduler/follow_recommendations_scheduler.rb index 57f78170e..17cf3f2cc 100644 --- a/app/workers/scheduler/follow_recommendations_scheduler.rb +++ b/app/workers/scheduler/follow_recommendations_scheduler.rb @@ -19,13 +19,11 @@ class Scheduler::FollowRecommendationsScheduler fallback_recommendations = FollowRecommendation.order(rank: :desc).limit(SET_SIZE) Trends.available_locales.each do |locale| - recommendations = begin - if AccountSummary.safe.filtered.localized(locale).exists? # We can skip the work if no accounts with that language exist - FollowRecommendation.localized(locale).order(rank: :desc).limit(SET_SIZE).map { |recommendation| [recommendation.account_id, recommendation.rank] } - else - [] - end - end + recommendations = if AccountSummary.safe.filtered.localized(locale).exists? # We can skip the work if no accounts with that language exist + FollowRecommendation.localized(locale).order(rank: :desc).limit(SET_SIZE).map { |recommendation| [recommendation.rank, recommendation.account_id] } + else + [] + end # Use language-agnostic results if there are not enough language-specific ones missing = SET_SIZE - recommendations.size @@ -35,14 +33,14 @@ class Scheduler::FollowRecommendationsScheduler # Language-specific results should be above language-agnostic ones, # otherwise language-agnostic ones will always overshadow them - recommendations.map! { |(account_id, rank)| [account_id, rank + max_fallback_rank] } + recommendations.map! { |(rank, account_id)| [rank + max_fallback_rank, account_id] } added = 0 fallback_recommendations.each do |recommendation| - next if recommendations.any? { |(account_id, _)| account_id == recommendation.account_id } + next if recommendations.any? { |(_, account_id)| account_id == recommendation.account_id } - recommendations << [recommendation.account_id, recommendation.rank] + recommendations << [recommendation.rank, recommendation.account_id] added += 1 break if added >= missing @@ -51,10 +49,7 @@ class Scheduler::FollowRecommendationsScheduler redis.multi do |multi| multi.del(key(locale)) - - recommendations.each do |(account_id, rank)| - multi.zadd(key(locale), rank, account_id) - end + multi.zadd(key(locale), recommendations) end end end diff --git a/app/workers/scheduler/indexing_scheduler.rb b/app/workers/scheduler/indexing_scheduler.rb index c42396629..d622f5586 100644 --- a/app/workers/scheduler/indexing_scheduler.rb +++ b/app/workers/scheduler/indexing_scheduler.rb @@ -6,17 +6,19 @@ class Scheduler::IndexingScheduler sidekiq_options retry: 0 + IMPORT_BATCH_SIZE = 1000 + SCAN_BATCH_SIZE = 10 * IMPORT_BATCH_SIZE + def perform return unless Chewy.enabled? indexes.each do |type| with_redis do |redis| - ids = redis.smembers("chewy:queue:#{type.name}") - - type.import!(ids) - - redis.pipelined do |pipeline| - ids.each { |id| pipeline.srem("chewy:queue:#{type.name}", id) } + redis.sscan_each("chewy:queue:#{type.name}", count: SCAN_BATCH_SIZE).each_slice(IMPORT_BATCH_SIZE) do |ids| + type.import!(ids) + redis.pipelined do |pipeline| + pipeline.srem("chewy:queue:#{type.name}", ids) + end end end end diff --git a/app/workers/scheduler/user_cleanup_scheduler.rb b/app/workers/scheduler/user_cleanup_scheduler.rb index 63f9ed78c..45cfbc62e 100644 --- a/app/workers/scheduler/user_cleanup_scheduler.rb +++ b/app/workers/scheduler/user_cleanup_scheduler.rb @@ -15,7 +15,7 @@ class Scheduler::UserCleanupScheduler def clean_unconfirmed_accounts! User.where('confirmed_at is NULL AND confirmation_sent_at <= ?', 2.days.ago).reorder(nil).find_in_batches do |batch| # We have to do it separately because of missing database constraints - AccountModerationNote.where(account_id: batch.map(&:account_id)).delete_all + AccountModerationNote.where(target_account_id: batch.map(&:account_id)).delete_all Account.where(id: batch.map(&:account_id)).delete_all User.where(id: batch.map(&:id)).delete_all end diff --git a/app/workers/thread_resolve_worker.rb b/app/workers/thread_resolve_worker.rb index 1b77dfdd9..3206c45f6 100644 --- a/app/workers/thread_resolve_worker.rb +++ b/app/workers/thread_resolve_worker.rb @@ -6,9 +6,9 @@ class ThreadResolveWorker sidekiq_options queue: 'pull', retry: 3 - def perform(child_status_id, parent_url) + def perform(child_status_id, parent_url, options = {}) child_status = Status.find(child_status_id) - parent_status = FetchRemoteStatusService.new.call(parent_url) + parent_status = FetchRemoteStatusService.new.call(parent_url, **options.deep_symbolize_keys) return if parent_status.nil? diff --git a/app/workers/unfollow_follow_worker.rb b/app/workers/unfollow_follow_worker.rb index 7203b4888..a4d57839d 100644 --- a/app/workers/unfollow_follow_worker.rb +++ b/app/workers/unfollow_follow_worker.rb @@ -10,13 +10,7 @@ class UnfollowFollowWorker old_target_account = Account.find(old_target_account_id) new_target_account = Account.find(new_target_account_id) - follow = follower_account.active_relationships.find_by(target_account: old_target_account) - reblogs = follow&.show_reblogs? - notify = follow&.notify? - languages = follow&.languages - - FollowService.new.call(follower_account, new_target_account, reblogs: reblogs, notify: notify, languages: languages, bypass_locked: bypass_locked, bypass_limit: true) - UnfollowService.new.call(follower_account, old_target_account, skip_unmerge: true) + FollowMigrationService.new.call(follower_account, new_target_account, old_target_account, bypass_locked: bypass_locked) rescue ActiveRecord::RecordNotFound, Mastodon::NotPermittedError true end diff --git a/app/workers/web/push_notification_worker.rb b/app/workers/web/push_notification_worker.rb index 1ed5bb9e0..7e9691aab 100644 --- a/app/workers/web/push_notification_worker.rb +++ b/app/workers/web/push_notification_worker.rb @@ -22,13 +22,13 @@ class Web::PushNotificationWorker request = Request.new(:post, @subscription.endpoint, body: payload.fetch(:ciphertext), http_client: http_client) request.add_headers( - 'Content-Type' => 'application/octet-stream', - 'Ttl' => TTL, - 'Urgency' => URGENCY, + 'Content-Type' => 'application/octet-stream', + 'Ttl' => TTL, + 'Urgency' => URGENCY, 'Content-Encoding' => 'aesgcm', - 'Encryption' => "salt=#{Webpush.encode64(payload.fetch(:salt)).delete('=')}", - 'Crypto-Key' => "dh=#{Webpush.encode64(payload.fetch(:server_public_key)).delete('=')};#{@subscription.crypto_key_header}", - 'Authorization' => @subscription.authorization_header + 'Encryption' => "salt=#{Webpush.encode64(payload.fetch(:salt)).delete('=')}", + 'Crypto-Key' => "dh=#{Webpush.encode64(payload.fetch(:server_public_key)).delete('=')};#{@subscription.crypto_key_header}", + 'Authorization' => @subscription.authorization_header ) request.perform do |response| diff --git a/app/workers/webhooks/delivery_worker.rb b/app/workers/webhooks/delivery_worker.rb index b1e345c5e..f8ed669fb 100644 --- a/app/workers/webhooks/delivery_worker.rb +++ b/app/workers/webhooks/delivery_worker.rb @@ -19,7 +19,7 @@ class Webhooks::DeliveryWorker private def perform_request - request = Request.new(:post, @webhook.url, body: @body) + request = Request.new(:post, @webhook.url, body: @body, allow_local: true) request.add_headers( 'Content-Type' => 'application/json', |