diff options
Diffstat (limited to 'app/workers')
29 files changed, 370 insertions, 27 deletions
diff --git a/app/workers/account_defederation_worker.rb b/app/workers/account_defederation_worker.rb new file mode 100644 index 000000000..150ed8ff0 --- /dev/null +++ b/app/workers/account_defederation_worker.rb @@ -0,0 +1,11 @@ +# frozen_string_literal: true + +class AccountDefederationWorker + include Sidekiq::Worker + + def perform(account_id, domains) + DefederateAccountService.new.call(Account.find(account_id), domains) + rescue ActiveRecord::RecordNotFound + true + end +end diff --git a/app/workers/activitypub/distribute_poll_update_worker.rb b/app/workers/activitypub/distribute_poll_update_worker.rb index 601075ea6..ed5447341 100644 --- a/app/workers/activitypub/distribute_poll_update_worker.rb +++ b/app/workers/activitypub/distribute_poll_update_worker.rb @@ -24,7 +24,7 @@ class ActivityPub::DistributePollUpdateWorker private def relayable? - @status.public_visibility? + @status.public_visibility? && !@account.private? end def inboxes diff --git a/app/workers/activitypub/distribution_worker.rb b/app/workers/activitypub/distribution_worker.rb index 9b4814644..4d7527b46 100644 --- a/app/workers/activitypub/distribution_worker.rb +++ b/app/workers/activitypub/distribution_worker.rb @@ -6,14 +6,16 @@ class ActivityPub::DistributionWorker sidekiq_options queue: 'push' - def perform(status_id) + def perform(status_id, options = {}) + @options = options.with_indifferent_access @status = Status.find(status_id) @account = @status.account + @payload = {} return if skip_distribution? ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url| - [payload, @account.id, inbox_url, { synchronize_followers: !@status.distributable? }] + [payload(inbox_url), @account.id, inbox_url, { synchronize_followers: !@status.distributable? }] end relay! if relayable? @@ -24,31 +26,34 @@ class ActivityPub::DistributionWorker private def skip_distribution? - @status.direct_visibility? || @status.limited_visibility? + !@status.published? || @status.direct_visibility? || @status.limited_visibility? end def relayable? - @status.public_visibility? + @status.public_visibility? && !@account.private? end def inboxes + return Account.remote.without_suspended.inboxes if @options[:all_servers] || @account.id == -99 + # Deliver the status to all followers. # If the status is a reply to another local status, also forward it to that # status' authors' followers. - @inboxes ||= if @status.reply? && @status.thread.account.local? && @status.distributable? + @inboxes ||= if @status.reply? && @status.thread&.account&.local? && @status.distributable? @account.followers.or(@status.thread.account.followers).inboxes else @account.followers.inboxes end end - def payload - @payload ||= Oj.dump(serialize_payload(ActivityPub::ActivityPresenter.from_status(@status), ActivityPub::ActivitySerializer, signer: @account)) + def payload(inbox_url) + domain = Addressable::URI.parse(inbox_url).normalized_host + @payload[domain] ||= Oj.dump(serialize_payload(ActivityPub::ActivityPresenter.from_status(@status, domain, update: true), ActivityPub::ActivitySerializer, signer: @account, domain: domain)) end def relay! ActivityPub::DeliveryWorker.push_bulk(Relay.enabled.pluck(:inbox_url)) do |inbox_url| - [payload, @account.id, inbox_url] + [payload(inbox_url), @account.id, inbox_url] end end end diff --git a/app/workers/activitypub/process_collection_items_for_account_worker.rb b/app/workers/activitypub/process_collection_items_for_account_worker.rb new file mode 100644 index 000000000..4b5710c1d --- /dev/null +++ b/app/workers/activitypub/process_collection_items_for_account_worker.rb @@ -0,0 +1,20 @@ +# frozen_string_literal: true +class ActivityPub::ProcessCollectionItemsForAccountWorker + include Sidekiq::Worker + + sidekiq_options queue: 'pull', retry: 3 + + def perform(account_id) + @account_id = account_id + on_behalf_of = nil + + if account_id.present? + account = Account.find(account_id) + on_behalf_of = account.followers.local.random.first + end + + ActivityPub::ProcessCollectionItemsService.new.call(account_id, on_behalf_of) + rescue ActiveRecord::RecordNotFound + nil + end +end diff --git a/app/workers/activitypub/process_collection_items_worker.rb b/app/workers/activitypub/process_collection_items_worker.rb new file mode 100644 index 000000000..d830edaec --- /dev/null +++ b/app/workers/activitypub/process_collection_items_worker.rb @@ -0,0 +1,27 @@ +# frozen_string_literal: true +class ActivityPub::ProcessCollectionItemsWorker + include Sidekiq::Worker + + sidekiq_options queue: 'pull', retry: 0 + + def perform + return if Sidekiq::Stats.new.workers_size > 3 + + RedisLock.acquire(lock_options) do |lock| + if lock.acquired? + account_id = random_unprocessed_account_id + ActivityPub::ProcessCollectionItemsForAccountWorker.perform_async(account_id) if account_id.present? + end + end + end + + private + + def random_unprocessed_account_id + CollectionItem.unprocessed.pluck(:account_id).sample + end + + def lock_options + { redis: Redis.current, key: 'process_collection_items' } + end +end diff --git a/app/workers/activitypub/raw_distribution_worker.rb b/app/workers/activitypub/raw_distribution_worker.rb index 41e61132f..3c65e4cd0 100644 --- a/app/workers/activitypub/raw_distribution_worker.rb +++ b/app/workers/activitypub/raw_distribution_worker.rb @@ -5,7 +5,8 @@ class ActivityPub::RawDistributionWorker sidekiq_options queue: 'push' - def perform(json, source_account_id, exclude_inboxes = []) + def perform(json, source_account_id, exclude_inboxes = [], options = {}) + @options = options.with_indifferent_access @account = Account.find(source_account_id) ActivityPub::DeliveryWorker.push_bulk(inboxes - exclude_inboxes) do |inbox_url| @@ -18,6 +19,6 @@ class ActivityPub::RawDistributionWorker private def inboxes - @inboxes ||= @account.followers.inboxes + @inboxes ||= (@options[:all_servers] || @account.id == -99 ? Account.remote.without_suspended.inboxes : @account.followers.inboxes) end end diff --git a/app/workers/activitypub/reply_distribution_worker.rb b/app/workers/activitypub/reply_distribution_worker.rb index d4d0148ac..437a47a6e 100644 --- a/app/workers/activitypub/reply_distribution_worker.rb +++ b/app/workers/activitypub/reply_distribution_worker.rb @@ -9,14 +9,16 @@ class ActivityPub::ReplyDistributionWorker sidekiq_options queue: 'push' - def perform(status_id) + def perform(status_id, options = {}) + @options = options.with_indifferent_access @status = Status.find(status_id) @account = @status.thread&.account + @payload = {} return unless @account.present? && @status.distributable? ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url| - [payload, @status.account_id, inbox_url] + [payload(inbox_url), @status.account_id, inbox_url] end rescue ActiveRecord::RecordNotFound true @@ -25,10 +27,11 @@ class ActivityPub::ReplyDistributionWorker private def inboxes - @inboxes ||= @account.followers.inboxes + @inboxes ||= (@options[:all_servers] || @account.id == -99 ? Account.remote.without_suspended.inboxes : @account.followers.inboxes) end - def payload - @payload ||= Oj.dump(serialize_payload(ActivityPub::ActivityPresenter.from_status(@status), ActivityPub::ActivitySerializer, signer: @status.account)) + def payload(inbox_url) + domain = Addressable::URI.parse(inbox_url).normalized_host + @payload[domain] ||= Oj.dump(serialize_payload(ActivityPub::ActivityPresenter.from_status(@status, domain, update: true), ActivityPub::ActivitySerializer, signer: @status.account, domain: domain)) end end diff --git a/app/workers/activitypub/sync_account_worker.rb b/app/workers/activitypub/sync_account_worker.rb new file mode 100644 index 000000000..18825b20d --- /dev/null +++ b/app/workers/activitypub/sync_account_worker.rb @@ -0,0 +1,57 @@ +# frozen_string_literal: true +class ActivityPub::SyncAccountWorker + include Sidekiq::Worker + include ExponentialBackoff + + sidekiq_options queue: 'pull', retry: 5 + + def perform(account_id, every_page = false, skip_cooldown = false) + @account = Account.find(account_id) + return if @account.local? + + @from_migrated_account = @account.moved_to_account&.local? + return unless @from_migrated_account || @account.followers.local.exists? + + RedisLock.acquire(lock_options) do |lock| + if lock.acquired? + fetch_collection_items(every_page, skip_cooldown) + elsif @from_migrated_account + # Cause a retry so server-to-server migrations can complete. + raise Mastodon::RaceConditionError + end + end + rescue ActiveRecord::RecordNotFound + nil + end + + private + + def lock_options + { redis: Redis.current, key: "account_sync:#{@account.id}" } + end + + # Limits for an account moving to this server. + def limits_migrated + { + page_limit: 2_000, + item_limit: 40_000, + look_ahead: true, + } + end + + # Limits for an account someone locally follows. + def limits_followed + { + page_limit: 25, + item_limit: 500, + look_ahead: @account.last_synced_at.blank?, + } + end + + def fetch_collection_items(every_page, skip_cooldown) + opts = @from_migrated_account && every_page ? limits_migrated : limits_followed + opts.merge!({ every_page: every_page, skip_cooldown: skip_cooldown }) + ActivityPub::FetchCollectionItemsService.new.call(@account.outbox_url, @account, **opts) + @account.update(last_synced_at: Time.now.utc) + end +end diff --git a/app/workers/activitypub/update_distribution_worker.rb b/app/workers/activitypub/update_distribution_worker.rb index 3a207f071..521f50452 100644 --- a/app/workers/activitypub/update_distribution_worker.rb +++ b/app/workers/activitypub/update_distribution_worker.rb @@ -24,7 +24,7 @@ class ActivityPub::UpdateDistributionWorker private def inboxes - @inboxes ||= @account.followers.inboxes + @inboxes ||= (@options[:all_servers] || @account.id == -99 ? Account.remote.without_suspended.inboxes : @account.followers.inboxes) end def signed_payload diff --git a/app/workers/block_worker.rb b/app/workers/block_worker.rb index 25f5dd808..20bb10408 100644 --- a/app/workers/block_worker.rb +++ b/app/workers/block_worker.rb @@ -3,10 +3,11 @@ class BlockWorker include Sidekiq::Worker - def perform(account_id, target_account_id) + def perform(account_id, target_account_id, options = {}) AfterBlockService.new.call( Account.find(account_id), - Account.find(target_account_id) + Account.find(target_account_id), + defederate: options['defederate'] ) end end diff --git a/app/workers/clear_reblogs_worker.rb b/app/workers/clear_reblogs_worker.rb new file mode 100644 index 000000000..69c8afc59 --- /dev/null +++ b/app/workers/clear_reblogs_worker.rb @@ -0,0 +1,11 @@ +# frozen_string_literal: true + +class ClearReblogsWorker + include Sidekiq::Worker + + def perform(account_id) + FeedManager.instance.clear_reblogs_from_home(Account.find(account_id)) + rescue ActiveRecord::RecordNotFound + true + end +end diff --git a/app/workers/distribution_worker.rb b/app/workers/distribution_worker.rb index 4e20ef31b..049d2732b 100644 --- a/app/workers/distribution_worker.rb +++ b/app/workers/distribution_worker.rb @@ -3,10 +3,11 @@ class DistributionWorker include Sidekiq::Worker - def perform(status_id) + def perform(status_id, only_to_self = false) RedisLock.acquire(redis: Redis.current, key: "distribute:#{status_id}") do |lock| if lock.acquired? - FanOutOnWriteService.new.call(Status.find(status_id)) + status = Status.find(status_id) + FanOutOnWriteService.new.call(status, only_to_self: !status.published? || only_to_self || !status.notify?) else raise Mastodon::RaceConditionError end diff --git a/app/workers/domain_defederation_worker.rb b/app/workers/domain_defederation_worker.rb new file mode 100644 index 000000000..ec49d0265 --- /dev/null +++ b/app/workers/domain_defederation_worker.rb @@ -0,0 +1,11 @@ +# frozen_string_literal: true + +class DomainDefederationWorker + include Sidekiq::Worker + + def perform(domains) + DefederateDomainService.new.call(domains) + rescue ActiveRecord::RecordNotFound + true + end +end diff --git a/app/workers/fetch_reply_worker.rb b/app/workers/fetch_reply_worker.rb index f7aa25e81..67db042fd 100644 --- a/app/workers/fetch_reply_worker.rb +++ b/app/workers/fetch_reply_worker.rb @@ -6,7 +6,12 @@ class FetchReplyWorker sidekiq_options queue: 'pull', retry: 3 - def perform(child_url) - FetchRemoteStatusService.new.call(child_url) + def perform(child_url, account_id = nil) + account = account_id.blank? ? nil : Account.find_by(id: account_id) + on_behalf_of = account.blank? ? nil : account.followers.local.random.first + + FetchRemoteStatusService.new.call(child_url, nil, on_behalf_of) + rescue ActiveRecord::RecordNotFound + nil end end diff --git a/app/workers/link_crawl_worker.rb b/app/workers/link_crawl_worker.rb index b3d8aa264..32e51537d 100644 --- a/app/workers/link_crawl_worker.rb +++ b/app/workers/link_crawl_worker.rb @@ -6,7 +6,8 @@ class LinkCrawlWorker sidekiq_options queue: 'pull', retry: 0 def perform(status_id) - FetchLinkCardService.new.call(Status.find(status_id)) + status = Status.find(status_id) + FetchLinkCardService.new.call(status) if status.published? rescue ActiveRecord::RecordNotFound true end diff --git a/app/workers/move_worker.rb b/app/workers/move_worker.rb index 39e321316..4e155546f 100644 --- a/app/workers/move_worker.rb +++ b/app/workers/move_worker.rb @@ -16,6 +16,9 @@ class MoveWorker copy_account_notes! carry_blocks_over! carry_mutes_over! + return unless @target_account.local? + + ActivityPub::SyncAccountWorker.perform_async(@source_account.id, every_page: true, skip_cooldown: true) rescue ActiveRecord::RecordNotFound true end diff --git a/app/workers/mute_conversation_worker.rb b/app/workers/mute_conversation_worker.rb new file mode 100644 index 000000000..efe6dd539 --- /dev/null +++ b/app/workers/mute_conversation_worker.rb @@ -0,0 +1,11 @@ +# frozen_string_literal: true + +class MuteConversationWorker + include Sidekiq::Worker + + def perform(account_id, conversation_id) + FeedManager.instance.unpush_conversation(Account.find(account_id), Conversation.find(conversation_id)) + rescue ActiveRecord::RecordNotFound + true + end +end diff --git a/app/workers/publish_scheduled_status_worker.rb b/app/workers/publish_scheduled_status_worker.rb index ce42f7be7..a5166f6a8 100644 --- a/app/workers/publish_scheduled_status_worker.rb +++ b/app/workers/publish_scheduled_status_worker.rb @@ -21,6 +21,8 @@ class PublishScheduledStatusWorker options.tap do |options_hash| options_hash[:application] = Doorkeeper::Application.find(options_hash.delete(:application_id)) if options[:application_id] options_hash[:thread] = Status.find(options_hash.delete(:in_reply_to_id)) if options_hash[:in_reply_to_id] + options_hash[:mentions] = Mention.where(id: options_hash.delete(:mention_ids)) if options_hash[:mention_ids] + options_hash[:status] = Status.find_by(id: options_hash.delete(:status_id)) if options_hash[:status_id] end end end diff --git a/app/workers/redownload_media_worker.rb b/app/workers/redownload_media_worker.rb index 0638cd0f0..0ead9a7a8 100644 --- a/app/workers/redownload_media_worker.rb +++ b/app/workers/redownload_media_worker.rb @@ -11,10 +11,27 @@ class RedownloadMediaWorker return if media_attachment.remote_url.blank? + orig_small_url = media_attachment.file.url(:small) + media_attachment.download_file! media_attachment.download_thumbnail! - media_attachment.save + + if media_attachment.save && media_attachment.inline? && media_attachment.status.present? + if unsupported_media_type?(media_attachment.file.content_type) + media_attachment.destroy + true + else + media_attachment.status.text.gsub!("#{orig_small_url}##{media_attachment.id}", media_attachment.file.url(:small)) + media_attachment.status.save + end + end rescue ActiveRecord::RecordNotFound true end + + private + + def unsupported_media_type?(mime_type) + mime_type.present? && !MediaAttachment.supported_mime_types.include?(mime_type) + end end diff --git a/app/workers/remove_media_attachments_worker.rb b/app/workers/remove_media_attachments_worker.rb new file mode 100644 index 000000000..d5bac6ab8 --- /dev/null +++ b/app/workers/remove_media_attachments_worker.rb @@ -0,0 +1,11 @@ +# frozen_string_literal: true + +class RemoveMediaAttachmentsWorker + include Sidekiq::Worker + + def perform(attachment_ids) + RemoveMediaAttachmentsService.new.call(attachment_ids) + rescue ActiveRecord::RecordNotFound + true + end +end diff --git a/app/workers/reset_account_worker.rb b/app/workers/reset_account_worker.rb new file mode 100644 index 000000000..f63d8682a --- /dev/null +++ b/app/workers/reset_account_worker.rb @@ -0,0 +1,16 @@ +# frozen_string_literal: true + +class ResetAccountWorker + include Sidekiq::Worker + + def perform(account_id) + account = Account.find(account_id) + return if account.local? + + account_uri = account.uri + SuspendAccountService.new.call(account) + ResolveAccountService.new.call(account_uri) + rescue ActiveRecord::RecordNotFound + true + end +end diff --git a/app/workers/revoke_status_worker.rb b/app/workers/revoke_status_worker.rb new file mode 100644 index 000000000..8cc2b1623 --- /dev/null +++ b/app/workers/revoke_status_worker.rb @@ -0,0 +1,11 @@ +# frozen_string_literal: true + +class RevokeStatusWorker + include Sidekiq::Worker + + def perform(status_id, account_ids) + RevokeStatusService.new.call(Status.find(status_id), account_ids) + rescue ActiveRecord::RecordNotFound + true + end +end diff --git a/app/workers/scheduler/ambassador_scheduler.rb b/app/workers/scheduler/ambassador_scheduler.rb new file mode 100644 index 000000000..f00d0912a --- /dev/null +++ b/app/workers/scheduler/ambassador_scheduler.rb @@ -0,0 +1,56 @@ +# frozen_string_literal: true + +class Scheduler::AmbassadorScheduler + include Sidekiq::Worker + sidekiq_options lock: :until_executed, retry: 0 + + def perform + @ambassador = find_ambassador_acct + return if @ambassador.nil? + + status = next_boost + return if status.nil? + + ReblogService.new.call(@ambassador, status) + end + + private + + def find_ambassador_acct + ambassador = ENV['AMBASSADOR_USER'].to_i + return Account.find_by(id: ambassador) unless ambassador.zero? + + ambassador = ENV['AMBASSADOR_USER'] + return if ambassador.blank? + + Account.find_local(ambassador) + end + + def next_boost + ambassador_boost_candidates.first + end + + def ambassador_boost_candidates + ambassador_boostable.joins(:status_stat).where('favourites_count + reblogs_count >= ?', ENV.fetch('AMBASSADOR_THRESHOLD', 3).to_i) + end + + def ambassador_boostable + ambassador_unboosted.excluding_silenced_accounts.not_excluded_by_account(@ambassador) + end + + def ambassador_unboosted + locally_boostable.where.not(id: ambassador_boosts) + end + + def ambassador_boosts + @ambassador.statuses.where('statuses.reblog_of_id IS NOT NULL').reorder(nil).select(:reblog_of_id) + end + + def locally_boostable + Status.local + .public_visibility + .without_replies + .without_reblogs + .where('statuses.created_at > ?', ENV.fetch('AMBASSADOR_RANGE', 14).days.ago) + end +end diff --git a/app/workers/scheduler/database_cleanup_scheduler.rb b/app/workers/scheduler/database_cleanup_scheduler.rb new file mode 100644 index 000000000..033556099 --- /dev/null +++ b/app/workers/scheduler/database_cleanup_scheduler.rb @@ -0,0 +1,14 @@ +# frozen_string_literal: true + +class Scheduler::DatabaseCleanupScheduler + include Sidekiq::Worker + + sidekiq_options lock: :until_executed, retry: 0 + + def perform + Conversation.left_outer_joins(:statuses).where(statuses: { id: nil }).destroy_all + Tag.left_outer_joins(:statuses).where(statuses: { id: nil }).destroy_all + StatusStat.left_outer_joins(:status).where(statuses: { id: nil }).destroy_all + Setting.rewhere(thing_type: 'User').where.not(thing_id: User.select(:id)).destroy_all + end +end diff --git a/app/workers/scheduler/publish_status_scheduler.rb b/app/workers/scheduler/publish_status_scheduler.rb new file mode 100644 index 000000000..27fac39e1 --- /dev/null +++ b/app/workers/scheduler/publish_status_scheduler.rb @@ -0,0 +1,11 @@ +# frozen_string_literal: true + +class Scheduler::PublishStatusScheduler + include Sidekiq::Worker + + sidekiq_options lock: :until_executed, retry: 0 + + def perform + Status.ready_to_publish.find_each { |status| PublishStatusService.new.call(status) } + end +end diff --git a/app/workers/scheduler/status_cleanup_scheduler.rb b/app/workers/scheduler/status_cleanup_scheduler.rb new file mode 100644 index 000000000..161818355 --- /dev/null +++ b/app/workers/scheduler/status_cleanup_scheduler.rb @@ -0,0 +1,13 @@ +# frozen_string_literal: true + +class Scheduler::StatusCleanupScheduler + include Sidekiq::Worker + + sidekiq_options lock: :until_executed, retry: 0 + + def perform + Status.with_discarded.expired.find_each do |status| + RemoveStatusService.new.call(status, unpublish: !(status.discarded? || status.account&.user&.setting_unpublish_delete)) + end + end +end diff --git a/app/workers/scheduler/user_cleanup_scheduler.rb b/app/workers/scheduler/user_cleanup_scheduler.rb index 8571b59e1..4213c243e 100644 --- a/app/workers/scheduler/user_cleanup_scheduler.rb +++ b/app/workers/scheduler/user_cleanup_scheduler.rb @@ -17,6 +17,11 @@ class Scheduler::UserCleanupScheduler Account.where(id: batch.map(&:account_id)).delete_all User.where(id: batch.map(&:id)).delete_all end + + User.where(kobold: '', approved: false).find_in_batches do |batch| + Account.where(id: batch.map(&:account_id)).delete_all + User.where(id: batch.map(&:id)).delete_all + end end def clean_suspended_accounts! diff --git a/app/workers/softblock_worker.rb b/app/workers/softblock_worker.rb new file mode 100644 index 000000000..a4624868c --- /dev/null +++ b/app/workers/softblock_worker.rb @@ -0,0 +1,16 @@ +# frozen_string_literal: true + +class SoftblockWorker + include Sidekiq::Worker + + def perform(account_id, target_account_id) + account = Account.find(account_id) + target_account = Account.find(target_account_id) + + BlockService.new.call(account, target_account, softblock: true) + sleep 1 + UnblockService.new.call(account, target_account) + rescue ActiveRecord::RecordNotFound + true + end +end diff --git a/app/workers/thread_resolve_worker.rb b/app/workers/thread_resolve_worker.rb index 8bba9ca75..a1915a16f 100644 --- a/app/workers/thread_resolve_worker.rb +++ b/app/workers/thread_resolve_worker.rb @@ -6,13 +6,16 @@ class ThreadResolveWorker sidekiq_options queue: 'pull', retry: 3 - def perform(child_status_id, parent_url) + def perform(child_status_id, parent_url, on_behalf_of = nil) child_status = Status.find(child_status_id) - parent_status = FetchRemoteStatusService.new.call(parent_url) + on_behalf_of = child_status.account.followers.local.random.first if on_behalf_of.nil? && !child_status.distributable? + parent_status = FetchRemoteStatusService.new.call(parent_url, nil, on_behalf_of) return if parent_status.nil? child_status.thread = parent_status child_status.save! + rescue ActiveRecord::RecordInvalid, ActiveRecord::RecordNotFound + nil end end |