diff options
Diffstat (limited to 'app/workers')
-rw-r--r-- | app/workers/activitypub/distribute_poll_update_worker.rb | 2 | ||||
-rw-r--r-- | app/workers/activitypub/post_upgrade_worker.rb | 15 | ||||
-rw-r--r-- | app/workers/notification_worker.rb | 11 | ||||
-rw-r--r-- | app/workers/processing_worker.rb | 11 | ||||
-rw-r--r-- | app/workers/pubsubhubbub/confirmation_worker.rb | 82 | ||||
-rw-r--r-- | app/workers/pubsubhubbub/delivery_worker.rb | 81 | ||||
-rw-r--r-- | app/workers/pubsubhubbub/distribution_worker.rb | 32 | ||||
-rw-r--r-- | app/workers/pubsubhubbub/raw_distribution_worker.rb | 22 | ||||
-rw-r--r-- | app/workers/pubsubhubbub/subscribe_worker.rb | 34 | ||||
-rw-r--r-- | app/workers/pubsubhubbub/unsubscribe_worker.rb | 15 | ||||
-rw-r--r-- | app/workers/refollow_worker.rb | 1 | ||||
-rw-r--r-- | app/workers/remote_profile_update_worker.rb | 13 | ||||
-rw-r--r-- | app/workers/salmon_worker.rb | 13 | ||||
-rw-r--r-- | app/workers/scheduler/subscriptions_scheduler.rb | 17 |
14 files changed, 1 insertions, 348 deletions
diff --git a/app/workers/activitypub/distribute_poll_update_worker.rb b/app/workers/activitypub/distribute_poll_update_worker.rb index 98b227111..310e42433 100644 --- a/app/workers/activitypub/distribute_poll_update_worker.rb +++ b/app/workers/activitypub/distribute_poll_update_worker.rb @@ -31,7 +31,7 @@ class ActivityPub::DistributePollUpdateWorker @inboxes = [@status.mentions, @status.reblogs, @status.preloadable_poll.votes].flat_map do |relation| relation.includes(:account).map do |record| - record.account.preferred_inbox_url if !record.account.local? && record.account.activitypub? + record.account.preferred_inbox_url if !record.account.local? end end diff --git a/app/workers/activitypub/post_upgrade_worker.rb b/app/workers/activitypub/post_upgrade_worker.rb deleted file mode 100644 index 4154b8582..000000000 --- a/app/workers/activitypub/post_upgrade_worker.rb +++ /dev/null @@ -1,15 +0,0 @@ -# frozen_string_literal: true - -class ActivityPub::PostUpgradeWorker - include Sidekiq::Worker - - sidekiq_options queue: 'pull' - - def perform(domain) - Account.where(domain: domain) - .where(protocol: :ostatus) - .where.not(last_webfingered_at: nil) - .in_batches - .update_all(last_webfingered_at: nil) - end -end diff --git a/app/workers/notification_worker.rb b/app/workers/notification_worker.rb deleted file mode 100644 index da1d6ab45..000000000 --- a/app/workers/notification_worker.rb +++ /dev/null @@ -1,11 +0,0 @@ -# frozen_string_literal: true - -class NotificationWorker - include Sidekiq::Worker - - sidekiq_options queue: 'push', retry: 5 - - def perform(xml, source_account_id, target_account_id) - SendInteractionService.new.call(xml, Account.find(source_account_id), Account.find(target_account_id)) - end -end diff --git a/app/workers/processing_worker.rb b/app/workers/processing_worker.rb deleted file mode 100644 index 978c3aba2..000000000 --- a/app/workers/processing_worker.rb +++ /dev/null @@ -1,11 +0,0 @@ -# frozen_string_literal: true - -class ProcessingWorker - include Sidekiq::Worker - - sidekiq_options backtrace: true - - def perform(account_id, body) - ProcessFeedService.new.call(body, Account.find(account_id), override_timestamps: true) - end -end diff --git a/app/workers/pubsubhubbub/confirmation_worker.rb b/app/workers/pubsubhubbub/confirmation_worker.rb deleted file mode 100644 index c0e7b677e..000000000 --- a/app/workers/pubsubhubbub/confirmation_worker.rb +++ /dev/null @@ -1,82 +0,0 @@ -# frozen_string_literal: true - -class Pubsubhubbub::ConfirmationWorker - include Sidekiq::Worker - include RoutingHelper - - sidekiq_options queue: 'push', retry: false - - attr_reader :subscription, :mode, :secret, :lease_seconds - - def perform(subscription_id, mode, secret = nil, lease_seconds = nil) - @subscription = Subscription.find(subscription_id) - @mode = mode - @secret = secret - @lease_seconds = lease_seconds - process_confirmation - end - - private - - def process_confirmation - prepare_subscription - - callback_get_with_params - logger.debug "Confirming PuSH subscription for #{subscription.callback_url} with challenge #{challenge}: #{@callback_response_body}" - - update_subscription - end - - def update_subscription - if successful_subscribe? - subscription.save! - elsif successful_unsubscribe? - subscription.destroy! - end - end - - def successful_subscribe? - subscribing? && response_matches_challenge? - end - - def successful_unsubscribe? - (unsubscribing? && response_matches_challenge?) || !subscription.confirmed? - end - - def response_matches_challenge? - @callback_response_body == challenge - end - - def subscribing? - mode == 'subscribe' - end - - def unsubscribing? - mode == 'unsubscribe' - end - - def callback_get_with_params - Request.new(:get, subscription.callback_url, params: callback_params).perform do |response| - @callback_response_body = response.body_with_limit - end - end - - def callback_params - { - 'hub.topic': account_url(subscription.account, format: :atom), - 'hub.mode': mode, - 'hub.challenge': challenge, - 'hub.lease_seconds': subscription.lease_seconds, - } - end - - def prepare_subscription - subscription.secret = secret - subscription.lease_seconds = lease_seconds - subscription.confirmed = true - end - - def challenge - @_challenge ||= SecureRandom.hex - end -end diff --git a/app/workers/pubsubhubbub/delivery_worker.rb b/app/workers/pubsubhubbub/delivery_worker.rb deleted file mode 100644 index 619bfa48a..000000000 --- a/app/workers/pubsubhubbub/delivery_worker.rb +++ /dev/null @@ -1,81 +0,0 @@ -# frozen_string_literal: true - -class Pubsubhubbub::DeliveryWorker - include Sidekiq::Worker - include RoutingHelper - - sidekiq_options queue: 'push', retry: 3, dead: false - - sidekiq_retry_in do |count| - 5 * (count + 1) - end - - attr_reader :subscription, :payload - - def perform(subscription_id, payload) - @subscription = Subscription.find(subscription_id) - @payload = payload - process_delivery unless blocked_domain? - rescue => e - raise e.class, "Delivery failed for #{subscription&.callback_url}: #{e.message}", e.backtrace[0] - end - - private - - def process_delivery - callback_post_payload do |payload_delivery| - raise Mastodon::UnexpectedResponseError, payload_delivery unless response_successful? payload_delivery - end - - subscription.touch(:last_successful_delivery_at) - end - - def callback_post_payload(&block) - request = Request.new(:post, subscription.callback_url, body: payload) - request.add_headers(headers) - request.perform(&block) - end - - def blocked_domain? - DomainBlock.blocked?(host) - end - - def host - Addressable::URI.parse(subscription.callback_url).normalized_host - end - - def headers - { - 'Content-Type' => 'application/atom+xml', - 'Link' => link_header, - }.merge(signature_headers.to_h) - end - - def link_header - LinkHeader.new([hub_link_header, self_link_header]).to_s - end - - def hub_link_header - [api_push_url, [%w(rel hub)]] - end - - def self_link_header - [account_url(subscription.account, format: :atom), [%w(rel self)]] - end - - def signature_headers - { 'X-Hub-Signature' => payload_signature } if subscription.secret? - end - - def payload_signature - "sha1=#{hmac_payload_digest}" - end - - def hmac_payload_digest - OpenSSL::HMAC.hexdigest(OpenSSL::Digest.new('sha1'), subscription.secret, payload) - end - - def response_successful?(payload_delivery) - payload_delivery.code > 199 && payload_delivery.code < 300 - end -end diff --git a/app/workers/pubsubhubbub/distribution_worker.rb b/app/workers/pubsubhubbub/distribution_worker.rb deleted file mode 100644 index fed5e917d..000000000 --- a/app/workers/pubsubhubbub/distribution_worker.rb +++ /dev/null @@ -1,32 +0,0 @@ -# frozen_string_literal: true - -class Pubsubhubbub::DistributionWorker - include Sidekiq::Worker - - sidekiq_options queue: 'push' - - def perform(stream_entry_ids) - stream_entries = StreamEntry.where(id: stream_entry_ids).includes(:status).reject { |e| e.status.nil? || e.status.hidden? } - - return if stream_entries.empty? - - @account = stream_entries.first.account - @subscriptions = active_subscriptions.to_a - - distribute_public!(stream_entries) - end - - private - - def distribute_public!(stream_entries) - @payload = OStatus::AtomSerializer.render(OStatus::AtomSerializer.new.feed(@account, stream_entries)) - - Pubsubhubbub::DeliveryWorker.push_bulk(@subscriptions) do |subscription_id| - [subscription_id, @payload] - end - end - - def active_subscriptions - Subscription.where(account: @account).active.pluck(:id) - end -end diff --git a/app/workers/pubsubhubbub/raw_distribution_worker.rb b/app/workers/pubsubhubbub/raw_distribution_worker.rb deleted file mode 100644 index 16962a623..000000000 --- a/app/workers/pubsubhubbub/raw_distribution_worker.rb +++ /dev/null @@ -1,22 +0,0 @@ -# frozen_string_literal: true - -class Pubsubhubbub::RawDistributionWorker - include Sidekiq::Worker - - sidekiq_options queue: 'push' - - def perform(xml, source_account_id) - @account = Account.find(source_account_id) - @subscriptions = active_subscriptions.to_a - - Pubsubhubbub::DeliveryWorker.push_bulk(@subscriptions) do |subscription| - [subscription.id, xml] - end - end - - private - - def active_subscriptions - Subscription.where(account: @account).active.select('id, callback_url, domain') - end -end diff --git a/app/workers/pubsubhubbub/subscribe_worker.rb b/app/workers/pubsubhubbub/subscribe_worker.rb deleted file mode 100644 index 2e176d1c1..000000000 --- a/app/workers/pubsubhubbub/subscribe_worker.rb +++ /dev/null @@ -1,34 +0,0 @@ -# frozen_string_literal: true - -class Pubsubhubbub::SubscribeWorker - include Sidekiq::Worker - - sidekiq_options queue: 'push', retry: 10, unique: :until_executed, dead: false - - sidekiq_retry_in do |count| - case count - when 0 - 30.minutes.seconds - when 1 - 2.hours.seconds - when 2 - 12.hours.seconds - else - 24.hours.seconds * (count - 2) - end - end - - sidekiq_retries_exhausted do |msg, _e| - account = Account.find(msg['args'].first) - Sidekiq.logger.error "PuSH subscription attempts for #{account.acct} exhausted. Unsubscribing" - ::UnsubscribeService.new.call(account) - end - - def perform(account_id) - account = Account.find(account_id) - logger.debug "PuSH re-subscribing to #{account.acct}" - ::SubscribeService.new.call(account) - rescue => e - raise e.class, "Subscribe failed for #{account&.acct}: #{e.message}", e.backtrace[0] - end -end diff --git a/app/workers/pubsubhubbub/unsubscribe_worker.rb b/app/workers/pubsubhubbub/unsubscribe_worker.rb deleted file mode 100644 index a271715b7..000000000 --- a/app/workers/pubsubhubbub/unsubscribe_worker.rb +++ /dev/null @@ -1,15 +0,0 @@ -# frozen_string_literal: true - -class Pubsubhubbub::UnsubscribeWorker - include Sidekiq::Worker - - sidekiq_options queue: 'push', retry: false, unique: :until_executed, dead: false - - def perform(account_id) - account = Account.find(account_id) - logger.debug "PuSH unsubscribing from #{account.acct}" - ::UnsubscribeService.new.call(account) - rescue ActiveRecord::RecordNotFound - true - end -end diff --git a/app/workers/refollow_worker.rb b/app/workers/refollow_worker.rb index 12f2bf671..eb659a72e 100644 --- a/app/workers/refollow_worker.rb +++ b/app/workers/refollow_worker.rb @@ -7,7 +7,6 @@ class RefollowWorker def perform(target_account_id) target_account = Account.find(target_account_id) - return unless target_account.protocol == :activitypub target_account.followers.where(domain: nil).reorder(nil).find_each do |follower| # Locally unfollow remote account diff --git a/app/workers/remote_profile_update_worker.rb b/app/workers/remote_profile_update_worker.rb deleted file mode 100644 index 03585ad2d..000000000 --- a/app/workers/remote_profile_update_worker.rb +++ /dev/null @@ -1,13 +0,0 @@ -# frozen_string_literal: true - -class RemoteProfileUpdateWorker - include Sidekiq::Worker - - sidekiq_options queue: 'pull' - - def perform(account_id, body, resubscribe) - UpdateRemoteProfileService.new.call(body, Account.find(account_id), resubscribe) - rescue ActiveRecord::RecordNotFound - true - end -end diff --git a/app/workers/salmon_worker.rb b/app/workers/salmon_worker.rb deleted file mode 100644 index d37d40432..000000000 --- a/app/workers/salmon_worker.rb +++ /dev/null @@ -1,13 +0,0 @@ -# frozen_string_literal: true - -class SalmonWorker - include Sidekiq::Worker - - sidekiq_options backtrace: true - - def perform(account_id, body) - ProcessInteractionService.new.call(body, Account.find(account_id)) - rescue Nokogiri::XML::XPath::SyntaxError, ActiveRecord::RecordNotFound - true - end -end diff --git a/app/workers/scheduler/subscriptions_scheduler.rb b/app/workers/scheduler/subscriptions_scheduler.rb deleted file mode 100644 index d5873bccb..000000000 --- a/app/workers/scheduler/subscriptions_scheduler.rb +++ /dev/null @@ -1,17 +0,0 @@ -# frozen_string_literal: true - -class Scheduler::SubscriptionsScheduler - include Sidekiq::Worker - - sidekiq_options unique: :until_executed, retry: 0 - - def perform - Pubsubhubbub::SubscribeWorker.push_bulk(expiring_accounts.pluck(:id)) - end - - private - - def expiring_accounts - Account.expiring(1.day.from_now).partitioned - end -end |