diff options
Diffstat (limited to 'app/workers')
17 files changed, 277 insertions, 8 deletions
diff --git a/app/workers/activitypub/delivery_worker.rb b/app/workers/activitypub/delivery_worker.rb new file mode 100644 index 000000000..cd67b6710 --- /dev/null +++ b/app/workers/activitypub/delivery_worker.rb @@ -0,0 +1,37 @@ +# frozen_string_literal: true + +class ActivityPub::DeliveryWorker + include Sidekiq::Worker + + sidekiq_options queue: 'push', retry: 5, dead: false + + HEADERS = { 'Content-Type' => 'application/activity+json' }.freeze + + def perform(json, source_account_id, inbox_url) + @json = json + @source_account = Account.find(source_account_id) + @inbox_url = inbox_url + + perform_request + + raise Mastodon::UnexpectedResponseError, @response unless response_successful? + rescue => e + raise e.class, "Delivery failed for #{inbox_url}: #{e.message}" + end + + private + + def build_request + request = Request.new(:post, @inbox_url, body: @json) + request.on_behalf_of(@source_account, :uri) + request.add_headers(HEADERS) + end + + def perform_request + @response = build_request.perform + end + + def response_successful? + @response.code > 199 && @response.code < 300 + end +end diff --git a/app/workers/activitypub/distribution_worker.rb b/app/workers/activitypub/distribution_worker.rb new file mode 100644 index 000000000..14bb933c0 --- /dev/null +++ b/app/workers/activitypub/distribution_worker.rb @@ -0,0 +1,42 @@ +# frozen_string_literal: true + +class ActivityPub::DistributionWorker + include Sidekiq::Worker + + sidekiq_options queue: 'push' + + def perform(status_id) + @status = Status.find(status_id) + @account = @status.account + + return if skip_distribution? + + ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url| + [signed_payload, @account.id, inbox_url] + end + rescue ActiveRecord::RecordNotFound + true + end + + private + + def skip_distribution? + @status.direct_visibility? + end + + def inboxes + @inboxes ||= @account.followers.inboxes + end + + def signed_payload + @signed_payload ||= Oj.dump(ActivityPub::LinkedDataSignature.new(payload).sign!(@account)) + end + + def payload + @payload ||= ActiveModelSerializers::SerializableResource.new( + @status, + serializer: ActivityPub::ActivitySerializer, + adapter: ActivityPub::Adapter + ).as_json + end +end diff --git a/app/workers/activitypub/post_upgrade_worker.rb b/app/workers/activitypub/post_upgrade_worker.rb new file mode 100644 index 000000000..4154b8582 --- /dev/null +++ b/app/workers/activitypub/post_upgrade_worker.rb @@ -0,0 +1,15 @@ +# frozen_string_literal: true + +class ActivityPub::PostUpgradeWorker + include Sidekiq::Worker + + sidekiq_options queue: 'pull' + + def perform(domain) + Account.where(domain: domain) + .where(protocol: :ostatus) + .where.not(last_webfingered_at: nil) + .in_batches + .update_all(last_webfingered_at: nil) + end +end diff --git a/app/workers/activitypub/processing_worker.rb b/app/workers/activitypub/processing_worker.rb new file mode 100644 index 000000000..bb9adf64b --- /dev/null +++ b/app/workers/activitypub/processing_worker.rb @@ -0,0 +1,11 @@ +# frozen_string_literal: true + +class ActivityPub::ProcessingWorker + include Sidekiq::Worker + + sidekiq_options backtrace: true + + def perform(account_id, body) + ActivityPub::ProcessCollectionService.new.call(body, Account.find(account_id)) + end +end diff --git a/app/workers/activitypub/raw_distribution_worker.rb b/app/workers/activitypub/raw_distribution_worker.rb new file mode 100644 index 000000000..d73466f6e --- /dev/null +++ b/app/workers/activitypub/raw_distribution_worker.rb @@ -0,0 +1,23 @@ +# frozen_string_literal: true + +class ActivityPub::RawDistributionWorker + include Sidekiq::Worker + + sidekiq_options queue: 'push' + + def perform(json, source_account_id) + @account = Account.find(source_account_id) + + ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url| + [json, @account.id, inbox_url] + end + rescue ActiveRecord::RecordNotFound + true + end + + private + + def inboxes + @inboxes ||= @account.followers.inboxes + end +end diff --git a/app/workers/activitypub/reply_distribution_worker.rb b/app/workers/activitypub/reply_distribution_worker.rb new file mode 100644 index 000000000..f9127340f --- /dev/null +++ b/app/workers/activitypub/reply_distribution_worker.rb @@ -0,0 +1,42 @@ +# frozen_string_literal: true + +class ActivityPub::ReplyDistributionWorker + include Sidekiq::Worker + + sidekiq_options queue: 'push' + + def perform(status_id) + @status = Status.find(status_id) + @account = @status.thread.account + + return if skip_distribution? + + ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url| + [signed_payload, @status.account_id, inbox_url] + end + rescue ActiveRecord::RecordNotFound + true + end + + private + + def skip_distribution? + @status.private_visibility? || @status.direct_visibility? + end + + def inboxes + @inboxes ||= @account.followers.inboxes + end + + def signed_payload + @signed_payload ||= Oj.dump(ActivityPub::LinkedDataSignature.new(payload).sign!(@status.account)) + end + + def payload + @payload ||= ActiveModelSerializers::SerializableResource.new( + @status, + serializer: ActivityPub::ActivitySerializer, + adapter: ActivityPub::Adapter + ).as_json + end +end diff --git a/app/workers/activitypub/update_distribution_worker.rb b/app/workers/activitypub/update_distribution_worker.rb new file mode 100644 index 000000000..f3377dcec --- /dev/null +++ b/app/workers/activitypub/update_distribution_worker.rb @@ -0,0 +1,31 @@ +# frozen_string_literal: true + +class ActivityPub::UpdateDistributionWorker + include Sidekiq::Worker + + sidekiq_options queue: 'push' + + def perform(account_id) + @account = Account.find(account_id) + + ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url| + [payload, @account.id, inbox_url] + end + rescue ActiveRecord::RecordNotFound + true + end + + private + + def inboxes + @inboxes ||= @account.followers.inboxes + end + + def payload + @payload ||= ActiveModelSerializers::SerializableResource.new( + @account, + serializer: ActivityPub::UpdateSerializer, + adapter: ActivityPub::Adapter + ).to_json + end +end diff --git a/app/workers/authorize_follow_worker.rb b/app/workers/authorize_follow_worker.rb new file mode 100644 index 000000000..0d5014624 --- /dev/null +++ b/app/workers/authorize_follow_worker.rb @@ -0,0 +1,14 @@ +# frozen_string_literal: true + +class AuthorizeFollowWorker + include Sidekiq::Worker + + def perform(source_account_id, target_account_id) + source_account = Account.find(source_account_id) + target_account = Account.find(target_account_id) + + AuthorizeFollowService.new.call(source_account, target_account) + rescue ActiveRecord::RecordNotFound + true + end +end diff --git a/app/workers/pubsubhubbub/distribution_worker.rb b/app/workers/pubsubhubbub/distribution_worker.rb index ea246128d..524f6849f 100644 --- a/app/workers/pubsubhubbub/distribution_worker.rb +++ b/app/workers/pubsubhubbub/distribution_worker.rb @@ -6,7 +6,7 @@ class Pubsubhubbub::DistributionWorker sidekiq_options queue: 'push' def perform(stream_entry_ids) - stream_entries = StreamEntry.where(id: stream_entry_ids).includes(:status).reject { |e| e.status&.direct_visibility? } + stream_entries = StreamEntry.where(id: stream_entry_ids).includes(:status).reject { |e| e.status.nil? || e.status.direct_visibility? } return if stream_entries.empty? @@ -14,7 +14,7 @@ class Pubsubhubbub::DistributionWorker @subscriptions = active_subscriptions.to_a distribute_public!(stream_entries.reject(&:hidden?)) - distribute_hidden!(stream_entries.select(&:hidden?)) + distribute_hidden!(stream_entries.select(&:hidden?)) if Rails.configuration.x.use_ostatus_privacy end private diff --git a/app/workers/pubsubhubbub/raw_distribution_worker.rb b/app/workers/pubsubhubbub/raw_distribution_worker.rb new file mode 100644 index 000000000..16962a623 --- /dev/null +++ b/app/workers/pubsubhubbub/raw_distribution_worker.rb @@ -0,0 +1,22 @@ +# frozen_string_literal: true + +class Pubsubhubbub::RawDistributionWorker + include Sidekiq::Worker + + sidekiq_options queue: 'push' + + def perform(xml, source_account_id) + @account = Account.find(source_account_id) + @subscriptions = active_subscriptions.to_a + + Pubsubhubbub::DeliveryWorker.push_bulk(@subscriptions) do |subscription| + [subscription.id, xml] + end + end + + private + + def active_subscriptions + Subscription.where(account: @account).active.select('id, callback_url, domain') + end +end diff --git a/app/workers/pubsubhubbub/subscribe_worker.rb b/app/workers/pubsubhubbub/subscribe_worker.rb index 7560c2671..130c967e0 100644 --- a/app/workers/pubsubhubbub/subscribe_worker.rb +++ b/app/workers/pubsubhubbub/subscribe_worker.rb @@ -3,7 +3,7 @@ class Pubsubhubbub::SubscribeWorker include Sidekiq::Worker - sidekiq_options queue: 'push', retry: 10, unique: :until_executed, dead: false + sidekiq_options queue: 'push', retry: 10, unique: :until_executed, dead: false, unique_retry: true sidekiq_retry_in do |count| case count diff --git a/app/workers/pubsubhubbub/unsubscribe_worker.rb b/app/workers/pubsubhubbub/unsubscribe_worker.rb new file mode 100644 index 000000000..a271715b7 --- /dev/null +++ b/app/workers/pubsubhubbub/unsubscribe_worker.rb @@ -0,0 +1,15 @@ +# frozen_string_literal: true + +class Pubsubhubbub::UnsubscribeWorker + include Sidekiq::Worker + + sidekiq_options queue: 'push', retry: false, unique: :until_executed, dead: false + + def perform(account_id) + account = Account.find(account_id) + logger.debug "PuSH unsubscribing from #{account.acct}" + ::UnsubscribeService.new.call(account) + rescue ActiveRecord::RecordNotFound + true + end +end diff --git a/app/workers/resolve_remote_account_worker.rb b/app/workers/resolve_remote_account_worker.rb new file mode 100644 index 000000000..5dd84ccb6 --- /dev/null +++ b/app/workers/resolve_remote_account_worker.rb @@ -0,0 +1,11 @@ +# frozen_string_literal: true + +class ResolveRemoteAccountWorker + include Sidekiq::Worker + + sidekiq_options queue: 'pull', unique: :until_executed + + def perform(uri) + ResolveRemoteAccountService.new.call(uri) + end +end diff --git a/app/workers/scheduler/feed_cleanup_scheduler.rb b/app/workers/scheduler/feed_cleanup_scheduler.rb index 402eed7c6..dbebaa2c3 100644 --- a/app/workers/scheduler/feed_cleanup_scheduler.rb +++ b/app/workers/scheduler/feed_cleanup_scheduler.rb @@ -5,8 +5,6 @@ class Scheduler::FeedCleanupScheduler include Sidekiq::Worker def perform - logger.info 'Cleaning out home feeds of inactive users' - redis.pipelined do inactive_users.pluck(:account_id).each do |account_id| redis.del(FeedManager.instance.key(:home, account_id)) diff --git a/app/workers/scheduler/media_cleanup_scheduler.rb b/app/workers/scheduler/media_cleanup_scheduler.rb index a95f512be..ce32ce314 100644 --- a/app/workers/scheduler/media_cleanup_scheduler.rb +++ b/app/workers/scheduler/media_cleanup_scheduler.rb @@ -5,7 +5,6 @@ class Scheduler::MediaCleanupScheduler include Sidekiq::Worker def perform - logger.info 'Cleaning out unattached media attachments' unattached_media.find_each(&:destroy) end diff --git a/app/workers/scheduler/subscriptions_cleanup_scheduler.rb b/app/workers/scheduler/subscriptions_cleanup_scheduler.rb new file mode 100644 index 000000000..3b9211e81 --- /dev/null +++ b/app/workers/scheduler/subscriptions_cleanup_scheduler.rb @@ -0,0 +1,11 @@ +# frozen_string_literal: true + +require 'sidekiq-scheduler' + +class Scheduler::SubscriptionsCleanupScheduler + include Sidekiq::Worker + + def perform + Subscription.expired.in_batches.delete_all + end +end diff --git a/app/workers/scheduler/subscriptions_scheduler.rb b/app/workers/scheduler/subscriptions_scheduler.rb index 7bfd002f4..469a3d2a6 100644 --- a/app/workers/scheduler/subscriptions_scheduler.rb +++ b/app/workers/scheduler/subscriptions_scheduler.rb @@ -7,8 +7,6 @@ class Scheduler::SubscriptionsScheduler include Sidekiq::Worker def perform - logger.info 'Queueing PuSH re-subscriptions' - Pubsubhubbub::SubscribeWorker.push_bulk(expiring_accounts.pluck(:id)) end |