diff options
author | Starfall <us@starfall.systems> | 2022-11-10 08:50:11 -0600 |
---|---|---|
committer | Starfall <us@starfall.systems> | 2022-11-10 08:50:11 -0600 |
commit | 67d1a0476d77e2ed0ca15dd2981c54c2b90b0742 (patch) | |
tree | 152f8c13a341d76738e8e2c09b24711936e6af68 /app/workers | |
parent | b581e6b6d4a5ba9ed4ae17427b7f2d5d158be4e5 (diff) | |
parent | ee7e49d1b1323618e16026bc8db8ab7f9459cc2d (diff) |
Merge remote-tracking branch 'glitch/main'
- Remove Helm charts - Lots of conflicts with our removal of recommended settings and custom icons
Diffstat (limited to 'app/workers')
23 files changed, 167 insertions, 164 deletions
diff --git a/app/workers/activitypub/account_raw_distribution_worker.rb b/app/workers/activitypub/account_raw_distribution_worker.rb new file mode 100644 index 000000000..a84c7d214 --- /dev/null +++ b/app/workers/activitypub/account_raw_distribution_worker.rb @@ -0,0 +1,9 @@ +# frozen_string_literal: true + +class ActivityPub::AccountRawDistributionWorker < ActivityPub::RawDistributionWorker + protected + + def inboxes + @inboxes ||= AccountReachFinder.new(@account).inboxes + end +end diff --git a/app/workers/activitypub/delivery_worker.rb b/app/workers/activitypub/delivery_worker.rb index 788f2cf80..d9153132b 100644 --- a/app/workers/activitypub/delivery_worker.rb +++ b/app/workers/activitypub/delivery_worker.rb @@ -37,7 +37,7 @@ class ActivityPub::DeliveryWorker def build_request(http_client) Request.new(:post, @inbox_url, body: @json, http_client: http_client).tap do |request| - request.on_behalf_of(@source_account, :uri, sign_with: @options[:sign_with]) + request.on_behalf_of(@source_account, sign_with: @options[:sign_with]) request.add_headers(HEADERS) request.add_headers({ 'Collection-Synchronization' => synchronization_header }) if ENV['DISABLE_FOLLOWERS_SYNCHRONIZATION'] != 'true' && @options[:synchronize_followers] end diff --git a/app/workers/activitypub/processing_worker.rb b/app/workers/activitypub/processing_worker.rb index 37e316354..4d06ad079 100644 --- a/app/workers/activitypub/processing_worker.rb +++ b/app/workers/activitypub/processing_worker.rb @@ -5,11 +5,15 @@ class ActivityPub::ProcessingWorker sidekiq_options backtrace: true, retry: 8 - def perform(account_id, body, delivered_to_account_id = nil) - account = Account.find_by(id: account_id) - return if account.nil? + def perform(actor_id, body, delivered_to_account_id = nil, actor_type = 'Account') + case actor_type + when 'Account' + actor = Account.find_by(id: actor_id) + end - ActivityPub::ProcessCollectionService.new.call(body, account, override_timestamps: true, delivered_to_account_id: delivered_to_account_id, delivery: true) + return if actor.nil? + + ActivityPub::ProcessCollectionService.new.call(body, actor, override_timestamps: true, delivered_to_account_id: delivered_to_account_id, delivery: true) rescue ActiveRecord::RecordInvalid => e Rails.logger.debug "Error processing incoming ActivityPub object: #{e}" end diff --git a/app/workers/activitypub/synchronize_featured_collection_worker.rb b/app/workers/activitypub/synchronize_featured_collection_worker.rb index 7a0898e89..f67d693cb 100644 --- a/app/workers/activitypub/synchronize_featured_collection_worker.rb +++ b/app/workers/activitypub/synchronize_featured_collection_worker.rb @@ -5,8 +5,10 @@ class ActivityPub::SynchronizeFeaturedCollectionWorker sidekiq_options queue: 'pull', lock: :until_executed - def perform(account_id) - ActivityPub::FetchFeaturedCollectionService.new.call(Account.find(account_id)) + def perform(account_id, options = {}) + options = { note: true, hashtag: false }.deep_merge(options.deep_symbolize_keys) + + ActivityPub::FetchFeaturedCollectionService.new.call(Account.find(account_id), **options) rescue ActiveRecord::RecordNotFound true end diff --git a/app/workers/activitypub/synchronize_featured_tags_collection_worker.rb b/app/workers/activitypub/synchronize_featured_tags_collection_worker.rb new file mode 100644 index 000000000..14af4f725 --- /dev/null +++ b/app/workers/activitypub/synchronize_featured_tags_collection_worker.rb @@ -0,0 +1,13 @@ +# frozen_string_literal: true + +class ActivityPub::SynchronizeFeaturedTagsCollectionWorker + include Sidekiq::Worker + + sidekiq_options queue: 'pull', lock: :until_executed + + def perform(account_id, url) + ActivityPub::FetchFeaturedTagsCollectionService.new.call(Account.find(account_id), url) + rescue ActiveRecord::RecordNotFound + true + end +end diff --git a/app/workers/activitypub/update_distribution_worker.rb b/app/workers/activitypub/update_distribution_worker.rb index 81fde63b8..d0391bb6f 100644 --- a/app/workers/activitypub/update_distribution_worker.rb +++ b/app/workers/activitypub/update_distribution_worker.rb @@ -1,6 +1,8 @@ # frozen_string_literal: true class ActivityPub::UpdateDistributionWorker < ActivityPub::RawDistributionWorker + sidekiq_options queue: 'push', lock: :until_executed + # Distribute an profile update to servers that might have a copy # of the account in question def perform(account_id, options = {}) diff --git a/app/workers/admin/account_deletion_worker.rb b/app/workers/admin/account_deletion_worker.rb index 82f269ad6..6e0eb331b 100644 --- a/app/workers/admin/account_deletion_worker.rb +++ b/app/workers/admin/account_deletion_worker.rb @@ -3,7 +3,7 @@ class Admin::AccountDeletionWorker include Sidekiq::Worker - sidekiq_options queue: 'pull' + sidekiq_options queue: 'pull', lock: :until_executed def perform(account_id) DeleteAccountService.new.call(Account.find(account_id), reserve_username: true, reserve_email: true) diff --git a/app/workers/digest_mailer_worker.rb b/app/workers/digest_mailer_worker.rb deleted file mode 100644 index 21f1c357a..000000000 --- a/app/workers/digest_mailer_worker.rb +++ /dev/null @@ -1,21 +0,0 @@ -# frozen_string_literal: true - -class DigestMailerWorker - include Sidekiq::Worker - - sidekiq_options queue: 'mailers' - - attr_reader :user - - def perform(user_id) - @user = User.find(user_id) - deliver_digest if @user.allows_digest_emails? - end - - private - - def deliver_digest - NotificationMailer.digest(user.account).deliver_now! - user.touch(:last_emailed_at) - end -end diff --git a/app/workers/move_worker.rb b/app/workers/move_worker.rb index c3167f9ca..3b429928e 100644 --- a/app/workers/move_worker.rb +++ b/app/workers/move_worker.rb @@ -8,7 +8,9 @@ class MoveWorker @target_account = Account.find(target_account_id) if @target_account.local? && @source_account.local? - rewrite_follows! + nb_moved = rewrite_follows! + @source_account.update_count!(:followers_count, -nb_moved) + @target_account.update_count!(:followers_count, nb_moved) else queue_follow_unfollows! end diff --git a/app/workers/push_update_worker.rb b/app/workers/push_update_worker.rb index ae444cfde..72c781749 100644 --- a/app/workers/push_update_worker.rb +++ b/app/workers/push_update_worker.rb @@ -5,11 +5,12 @@ class PushUpdateWorker include Redisable def perform(account_id, status_id, timeline_id = nil, options = {}) - @account = Account.find(account_id) @status = Status.find(status_id) - @timeline_id = timeline_id || "timeline:#{account.id}" + @account_id = account_id + @timeline_id = timeline_id || "timeline:#{account_id}" @options = options.symbolize_keys + render_payload! publish! rescue ActiveRecord::RecordNotFound true @@ -17,14 +18,14 @@ class PushUpdateWorker private - def payload - InlineRenderer.render(@status, @account, :status) + def render_payload! + @payload = StatusCacheHydrator.new(@status).hydrate(@account_id) end def message Oj.dump( event: update? ? :'status.update' : :update, - payload: payload, + payload: @payload, queued_at: (Time.now.to_f * 1000.0).to_i ) end diff --git a/app/workers/refollow_worker.rb b/app/workers/refollow_worker.rb index 319b00109..4b712d3aa 100644 --- a/app/workers/refollow_worker.rb +++ b/app/workers/refollow_worker.rb @@ -10,8 +10,9 @@ class RefollowWorker return unless target_account.activitypub? target_account.passive_relationships.where(account: Account.where(domain: nil)).includes(:account).reorder(nil).find_each do |follow| - reblogs = follow.show_reblogs? - notify = follow.notify? + reblogs = follow.show_reblogs? + notify = follow.notify? + languages = follow.languages # Locally unfollow remote account follower = follow.account @@ -19,7 +20,7 @@ class RefollowWorker # Schedule re-follow begin - FollowService.new.call(follower, target_account, reblogs: reblogs, notify: notify, bypass_limit: true) + FollowService.new.call(follower, target_account, reblogs: reblogs, notify: notify, languages: languages, bypass_limit: true) rescue Mastodon::NotPermittedError, ActiveRecord::RecordNotFound, Mastodon::UnexpectedResponseError, HTTP::Error, OpenSSL::SSL::SSLError next end diff --git a/app/workers/remove_featured_tag_worker.rb b/app/workers/remove_featured_tag_worker.rb new file mode 100644 index 000000000..065ec79d8 --- /dev/null +++ b/app/workers/remove_featured_tag_worker.rb @@ -0,0 +1,11 @@ +# frozen_string_literal: true + +class RemoveFeaturedTagWorker + include Sidekiq::Worker + + def perform(account_id, featured_tag_id) + RemoveFeaturedTagService.new.call(Account.find(account_id), FeaturedTag.find(featured_tag_id)) + rescue ActiveRecord::RecordNotFound + true + end +end diff --git a/app/workers/scheduler/backup_cleanup_scheduler.rb b/app/workers/scheduler/backup_cleanup_scheduler.rb deleted file mode 100644 index 85d5312c0..000000000 --- a/app/workers/scheduler/backup_cleanup_scheduler.rb +++ /dev/null @@ -1,17 +0,0 @@ -# frozen_string_literal: true - -class Scheduler::BackupCleanupScheduler - include Sidekiq::Worker - - sidekiq_options retry: 0 - - def perform - old_backups.reorder(nil).find_each(&:destroy!) - end - - private - - def old_backups - Backup.where('created_at < ?', 7.days.ago) - end -end diff --git a/app/workers/scheduler/doorkeeper_cleanup_scheduler.rb b/app/workers/scheduler/doorkeeper_cleanup_scheduler.rb deleted file mode 100644 index 9303a352f..000000000 --- a/app/workers/scheduler/doorkeeper_cleanup_scheduler.rb +++ /dev/null @@ -1,13 +0,0 @@ -# frozen_string_literal: true - -class Scheduler::DoorkeeperCleanupScheduler - include Sidekiq::Worker - - sidekiq_options retry: 0 - - def perform - Doorkeeper::AccessToken.where('revoked_at IS NOT NULL').where('revoked_at < NOW()').delete_all - Doorkeeper::AccessGrant.where('revoked_at IS NOT NULL').where('revoked_at < NOW()').delete_all - SystemKey.expired.delete_all - end -end diff --git a/app/workers/scheduler/email_scheduler.rb b/app/workers/scheduler/email_scheduler.rb deleted file mode 100644 index c052f2fce..000000000 --- a/app/workers/scheduler/email_scheduler.rb +++ /dev/null @@ -1,25 +0,0 @@ -# frozen_string_literal: true - -class Scheduler::EmailScheduler - include Sidekiq::Worker - - sidekiq_options retry: 0 - - FREQUENCY = 7.days.freeze - SIGN_IN_OFFSET = 1.day.freeze - - def perform - eligible_users.reorder(nil).find_each do |user| - next unless user.allows_digest_emails? - DigestMailerWorker.perform_async(user.id) - end - end - - private - - def eligible_users - User.emailable - .where('current_sign_in_at < ?', (FREQUENCY + SIGN_IN_OFFSET).ago) - .where('last_emailed_at IS NULL OR last_emailed_at < ?', FREQUENCY.ago) - end -end diff --git a/app/workers/scheduler/feed_cleanup_scheduler.rb b/app/workers/scheduler/feed_cleanup_scheduler.rb deleted file mode 100644 index 78adc97e2..000000000 --- a/app/workers/scheduler/feed_cleanup_scheduler.rb +++ /dev/null @@ -1,40 +0,0 @@ -# frozen_string_literal: true - -class Scheduler::FeedCleanupScheduler - include Sidekiq::Worker - include Redisable - - sidekiq_options retry: 0 - - def perform - clean_home_feeds! - clean_list_feeds! - clean_direct_feeds! - end - - private - - def clean_home_feeds! - feed_manager.clean_feeds!(:home, inactive_account_ids) - end - - def clean_list_feeds! - feed_manager.clean_feeds!(:list, inactive_list_ids) - end - - def clean_direct_feeds! - feed_manager.clean_feeds!(:direct, inactive_account_ids) - end - - def inactive_account_ids - @inactive_account_ids ||= User.confirmed.inactive.pluck(:account_id) - end - - def inactive_list_ids - List.where(account_id: inactive_account_ids).pluck(:id) - end - - def feed_manager - FeedManager.instance - end -end diff --git a/app/workers/scheduler/indexing_scheduler.rb b/app/workers/scheduler/indexing_scheduler.rb index 3a6f47a29..c42396629 100644 --- a/app/workers/scheduler/indexing_scheduler.rb +++ b/app/workers/scheduler/indexing_scheduler.rb @@ -7,6 +7,8 @@ class Scheduler::IndexingScheduler sidekiq_options retry: 0 def perform + return unless Chewy.enabled? + indexes.each do |type| with_redis do |redis| ids = redis.smembers("chewy:queue:#{type.name}") diff --git a/app/workers/scheduler/media_cleanup_scheduler.rb b/app/workers/scheduler/media_cleanup_scheduler.rb deleted file mode 100644 index 24d30a6be..000000000 --- a/app/workers/scheduler/media_cleanup_scheduler.rb +++ /dev/null @@ -1,17 +0,0 @@ -# frozen_string_literal: true - -class Scheduler::MediaCleanupScheduler - include Sidekiq::Worker - - sidekiq_options retry: 0 - - def perform - unattached_media.find_each(&:destroy) - end - - private - - def unattached_media - MediaAttachment.reorder(nil).unattached.where('created_at < ?', 1.day.ago) - end -end diff --git a/app/workers/scheduler/suspended_user_cleanup_scheduler.rb b/app/workers/scheduler/suspended_user_cleanup_scheduler.rb new file mode 100644 index 000000000..50768f83c --- /dev/null +++ b/app/workers/scheduler/suspended_user_cleanup_scheduler.rb @@ -0,0 +1,38 @@ +# frozen_string_literal: true + +class Scheduler::SuspendedUserCleanupScheduler + include Sidekiq::Worker + + # Each processed deletion request may enqueue an enormous + # amount of jobs in the `pull` queue, so only enqueue when + # the queue is empty or close to being so. + MAX_PULL_SIZE = 50 + + # Since account deletion is very expensive, we want to avoid + # overloading the server by queing too much at once. + # This job runs approximately once per 2 minutes, so with a + # value of `MAX_DELETIONS_PER_JOB` of 10, a server can + # handle the deletion of 7200 accounts per day, provided it + # has the capacity for it. + MAX_DELETIONS_PER_JOB = 10 + + sidekiq_options retry: 0 + + def perform + return if Sidekiq::Queue.new('pull').size > MAX_PULL_SIZE + + clean_suspended_accounts! + end + + private + + def clean_suspended_accounts! + # This should be fine because we only process a small amount of deletion requests at once and + # `id` and `created_at` should follow the same order. + AccountDeletionRequest.reorder(id: :asc).take(MAX_DELETIONS_PER_JOB).each do |deletion_request| + next unless deletion_request.created_at < AccountDeletionRequest::DELAY_TO_DELETION.ago + + Admin::AccountDeletionWorker.perform_async(deletion_request.account_id) + end + end +end diff --git a/app/workers/scheduler/user_cleanup_scheduler.rb b/app/workers/scheduler/user_cleanup_scheduler.rb index d1f00c47f..63f9ed78c 100644 --- a/app/workers/scheduler/user_cleanup_scheduler.rb +++ b/app/workers/scheduler/user_cleanup_scheduler.rb @@ -7,7 +7,6 @@ class Scheduler::UserCleanupScheduler def perform clean_unconfirmed_accounts! - clean_suspended_accounts! clean_discarded_statuses! end @@ -15,17 +14,13 @@ class Scheduler::UserCleanupScheduler def clean_unconfirmed_accounts! User.where('confirmed_at is NULL AND confirmation_sent_at <= ?', 2.days.ago).reorder(nil).find_in_batches do |batch| + # We have to do it separately because of missing database constraints + AccountModerationNote.where(account_id: batch.map(&:account_id)).delete_all Account.where(id: batch.map(&:account_id)).delete_all User.where(id: batch.map(&:id)).delete_all end end - def clean_suspended_accounts! - AccountDeletionRequest.where('created_at <= ?', AccountDeletionRequest::DELAY_TO_DELETION.ago).reorder(nil).find_each do |deletion_request| - Admin::AccountDeletionWorker.perform_async(deletion_request.account_id) - end - end - def clean_discarded_statuses! Status.unscoped.discarded.where('deleted_at <= ?', 30.days.ago).find_in_batches do |statuses| RemovalWorker.push_bulk(statuses) do |status| diff --git a/app/workers/scheduler/vacuum_scheduler.rb b/app/workers/scheduler/vacuum_scheduler.rb new file mode 100644 index 000000000..9544f808b --- /dev/null +++ b/app/workers/scheduler/vacuum_scheduler.rb @@ -0,0 +1,56 @@ +# frozen_string_literal: true + +class Scheduler::VacuumScheduler + include Sidekiq::Worker + + sidekiq_options retry: 0, lock: :until_executed + + def perform + vacuum_operations.each do |operation| + operation.perform + rescue => e + Rails.logger.error("Error while running #{operation.class.name}: #{e}") + end + end + + private + + def vacuum_operations + [ + statuses_vacuum, + media_attachments_vacuum, + preview_cards_vacuum, + backups_vacuum, + access_tokens_vacuum, + feeds_vacuum, + ] + end + + def statuses_vacuum + Vacuum::StatusesVacuum.new(content_retention_policy.content_cache_retention_period) + end + + def media_attachments_vacuum + Vacuum::MediaAttachmentsVacuum.new(content_retention_policy.media_cache_retention_period) + end + + def preview_cards_vacuum + Vacuum::PreviewCardsVacuum.new(content_retention_policy.media_cache_retention_period) + end + + def backups_vacuum + Vacuum::BackupsVacuum.new(content_retention_policy.backups_retention_period) + end + + def access_tokens_vacuum + Vacuum::AccessTokensVacuum.new + end + + def feeds_vacuum + Vacuum::FeedsVacuum.new + end + + def content_retention_policy + ContentRetentionPolicy.current + end +end diff --git a/app/workers/unfollow_follow_worker.rb b/app/workers/unfollow_follow_worker.rb index 0bd5ff472..7203b4888 100644 --- a/app/workers/unfollow_follow_worker.rb +++ b/app/workers/unfollow_follow_worker.rb @@ -10,11 +10,12 @@ class UnfollowFollowWorker old_target_account = Account.find(old_target_account_id) new_target_account = Account.find(new_target_account_id) - follow = follower_account.active_relationships.find_by(target_account: old_target_account) - reblogs = follow&.show_reblogs? - notify = follow&.notify? + follow = follower_account.active_relationships.find_by(target_account: old_target_account) + reblogs = follow&.show_reblogs? + notify = follow&.notify? + languages = follow&.languages - FollowService.new.call(follower_account, new_target_account, reblogs: reblogs, notify: notify, bypass_locked: bypass_locked, bypass_limit: true) + FollowService.new.call(follower_account, new_target_account, reblogs: reblogs, notify: notify, languages: languages, bypass_locked: bypass_locked, bypass_limit: true) UnfollowService.new.call(follower_account, old_target_account, skip_unmerge: true) rescue ActiveRecord::RecordNotFound, Mastodon::NotPermittedError true diff --git a/app/workers/verify_account_links_worker.rb b/app/workers/verify_account_links_worker.rb index 8114d59be..f606e6c26 100644 --- a/app/workers/verify_account_links_worker.rb +++ b/app/workers/verify_account_links_worker.rb @@ -3,14 +3,13 @@ class VerifyAccountLinksWorker include Sidekiq::Worker - sidekiq_options queue: 'pull', retry: false, lock: :until_executed + sidekiq_options queue: 'default', retry: false, lock: :until_executed def perform(account_id) account = Account.find(account_id) account.fields.each do |field| - next unless !field.verified? && field.verifiable? - VerifyLinkService.new.call(field) + VerifyLinkService.new.call(field) if field.requires_verification? end account.save! if account.changed? |