diff options
author | Eugen Rochko <eugen@zeonfederated.com> | 2017-05-05 02:23:01 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-05-05 02:23:01 +0200 |
commit | 81584779cb1795d2fe7827e054bbe245712528a2 (patch) | |
tree | 008186fee04307d50db7a2dc3fa50696488ed686 /app | |
parent | 61c33652ad7a98f0c30fea67bc382e1306b69880 (diff) |
More robust PuSH subscription refreshes (#2799)
* Fix #2473 - Use sidekiq scheduler to refresh PuSH subscriptions instead of cron Fix an issue where / in domain would raise exception in TagManager#normalize_domain PuSH subscriptions refresh done in a round-robin way to avoid hammering a single server's hub in sequence. Correct handling of failures/retries through Sidekiq (see also #2613). Optimize Account#with_followers scope. Also, since subscriptions are now delegated to Sidekiq jobs, an uncaught exception will not stop the entire refreshing operation halfway through Fix #2702 - Correct user agent header on outgoing http requests * Add test for SubscribeService * Extract #expiring_accounts into method * Make mastodon:push:refresh no-op * Queues are now defined in sidekiq.yml * Queues are now in sidekiq.yml
Diffstat (limited to 'app')
-rw-r--r-- | app/helpers/http_helper.rb | 10 | ||||
-rw-r--r-- | app/lib/tag_manager.rb | 4 | ||||
-rw-r--r-- | app/models/account.rb | 5 | ||||
-rw-r--r-- | app/services/follow_service.rb | 2 | ||||
-rw-r--r-- | app/services/process_interaction_service.rb | 2 | ||||
-rw-r--r-- | app/services/subscribe_service.rb | 28 | ||||
-rw-r--r-- | app/services/update_remote_profile_service.rb | 2 | ||||
-rw-r--r-- | app/workers/pubsubhubbub/delivery_worker.rb | 12 | ||||
-rw-r--r-- | app/workers/pubsubhubbub/subscribe_worker.rb | 13 | ||||
-rw-r--r-- | app/workers/scheduler/subscriptions_scheduler.rb | 20 |
10 files changed, 81 insertions, 17 deletions
diff --git a/app/helpers/http_helper.rb b/app/helpers/http_helper.rb index 1697de746..e39a52da0 100644 --- a/app/helpers/http_helper.rb +++ b/app/helpers/http_helper.rb @@ -1,13 +1,17 @@ # frozen_string_literal: true module HttpHelper - USER_AGENT = "#{HTTP::Request::USER_AGENT} (Mastodon/#{Mastodon::Version}; +http://#{Rails.configuration.x.local_domain}/)" - def http_client(options = {}) timeout = { write: 10, connect: 10, read: 10 }.merge(options) - HTTP.headers(user_agent: USER_AGENT) + HTTP.headers(user_agent: user_agent) .timeout(:per_operation, timeout) .follow end + + private + + def user_agent + @user_agent ||= "#{HTTP::Request::USER_AGENT} (Mastodon/#{Mastodon::Version}; +http://#{Rails.configuration.x.local_domain}/)" + end end diff --git a/app/lib/tag_manager.rb b/app/lib/tag_manager.rb index 3bddfba7c..6170a90de 100644 --- a/app/lib/tag_manager.rb +++ b/app/lib/tag_manager.rb @@ -65,8 +65,10 @@ class TagManager end def normalize_domain(domain) + return if domain.nil? + uri = Addressable::URI.new - uri.host = domain + uri.host = domain.gsub(/[\/]/, '') uri.normalize.host end diff --git a/app/models/account.rb b/app/models/account.rb index d64591641..87b97a20d 100644 --- a/app/models/account.rb +++ b/app/models/account.rb @@ -103,9 +103,10 @@ class Account < ApplicationRecord scope :remote, -> { where.not(domain: nil) } scope :local, -> { where(domain: nil) } - scope :without_followers, -> { where('(select count(f.id) from follows as f where f.target_account_id = accounts.id) = 0') } - scope :with_followers, -> { where('(select count(f.id) from follows as f where f.target_account_id = accounts.id) > 0') } + scope :without_followers, -> { where(followers_count: 0) } + scope :with_followers, -> { where('followers_count > 0') } scope :expiring, ->(time) { where(subscription_expires_at: nil).or(where('subscription_expires_at < ?', time)).remote.with_followers } + scope :partitioned, -> { order('row_number() over (partition by domain)') } scope :silenced, -> { where(silenced: true) } scope :suspended, -> { where(suspended: true) } scope :recent, -> { reorder(id: :desc) } diff --git a/app/services/follow_service.rb b/app/services/follow_service.rb index 844f5282d..23e721fac 100644 --- a/app/services/follow_service.rb +++ b/app/services/follow_service.rb @@ -40,7 +40,7 @@ class FollowService < BaseService if target_account.local? NotifyService.new.call(target_account, follow) else - SubscribeService.new.call(target_account) unless target_account.subscribed? + Pubsubhubbub::SubscribeWorker.perform_async(target_account.id) unless target_account.subscribed? NotificationWorker.perform_async(build_follow_xml(follow), source_account.id, target_account.id) AfterRemoteFollowWorker.perform_async(follow.id) end diff --git a/app/services/process_interaction_service.rb b/app/services/process_interaction_service.rb index 1f15a265d..16eac2353 100644 --- a/app/services/process_interaction_service.rb +++ b/app/services/process_interaction_service.rb @@ -77,7 +77,7 @@ class ProcessInteractionService < BaseService def authorize_follow_request!(account, target_account) follow_request = FollowRequest.find_by(account: target_account, target_account: account) follow_request&.authorize! - SubscribeService.new.call(account) unless account.subscribed? + Pubsubhubbub::SubscribeWorker.perform_async(account.id) unless account.subscribed? end def reject_follow_request!(account, target_account) diff --git a/app/services/subscribe_service.rb b/app/services/subscribe_service.rb index 820b208e9..138718f14 100644 --- a/app/services/subscribe_service.rb +++ b/app/services/subscribe_service.rb @@ -5,15 +5,31 @@ class SubscribeService < BaseService account.secret = SecureRandom.hex subscription = account.subscription(api_subscription_url(account.id)) - response = subscription.subscribe + response = subscription.subscribe - unless response.successful? + if response_failed_permanently?(response) + # An error in the 4xx range (except for 429, which is rate limiting) + # means we're not allowed to subscribe. Fail and move on account.secret = '' - Rails.logger.debug "PuSH subscription request for #{account.acct} failed: #{response.message}" + account.save! + elsif response_successful?(response) + # Anything in the 2xx range means the subscription will be confirmed + # asynchronously, we've done what we needed to do + account.save! + else + # What's left is the 5xx range and 429, which means we need to retry + # at a later time. Fail loudly! + raise "Subscription attempt failed for #{account.acct} (#{account.hub_url}): HTTP #{response.code}" end + end + + private + + def response_failed_permanently?(response) + response.code > 299 && response.code < 500 && response.code != 429 + end - account.save! - rescue HTTP::Error, OpenSSL::SSL::SSLError - Rails.logger.debug "PuSH subscription request for #{account.acct} could not be made due to HTTP or SSL error" + def response_successful?(response) + response.code > 199 && response.code < 300 end end diff --git a/app/services/update_remote_profile_service.rb b/app/services/update_remote_profile_service.rb index 31f4af2c1..f0c39ecc0 100644 --- a/app/services/update_remote_profile_service.rb +++ b/app/services/update_remote_profile_service.rb @@ -28,7 +28,7 @@ class UpdateRemoteProfileService < BaseService account.save_with_optional_avatar! - SubscribeService.new.call(account) if resubscribe && (account.hub_url != old_hub_url) + Pubsubhubbub::SubscribeWorker.perform_async(account.id) if resubscribe && (account.hub_url != old_hub_url) end private diff --git a/app/workers/pubsubhubbub/delivery_worker.rb b/app/workers/pubsubhubbub/delivery_worker.rb index f645b1e24..511ae14b3 100644 --- a/app/workers/pubsubhubbub/delivery_worker.rb +++ b/app/workers/pubsubhubbub/delivery_worker.rb @@ -25,8 +25,8 @@ class Pubsubhubbub::DeliveryWorker .headers(headers) .post(subscription.callback_url, body: payload) - return subscription.destroy! if response.code > 299 && response.code < 500 && response.code != 429 # HTTP 4xx means error is not temporary, except for 429 (throttling) - raise "Delivery failed for #{subscription.callback_url}: HTTP #{response.code}" unless response.code > 199 && response.code < 300 + return subscription.destroy! if response_failed_permanently?(response) # HTTP 4xx means error is not temporary, except for 429 (throttling) + raise "Delivery failed for #{subscription.callback_url}: HTTP #{response.code}" unless response_successful?(response) subscription.touch(:last_successful_delivery_at) end @@ -37,4 +37,12 @@ class Pubsubhubbub::DeliveryWorker hmac = OpenSSL::HMAC.hexdigest(OpenSSL::Digest.new('sha1'), secret, payload) "sha1=#{hmac}" end + + def response_failed_permanently?(response) + response.code > 299 && response.code < 500 && response.code != 429 + end + + def response_successful?(response) + response.code > 199 && response.code < 300 + end end diff --git a/app/workers/pubsubhubbub/subscribe_worker.rb b/app/workers/pubsubhubbub/subscribe_worker.rb new file mode 100644 index 000000000..0c4111a8c --- /dev/null +++ b/app/workers/pubsubhubbub/subscribe_worker.rb @@ -0,0 +1,13 @@ +# frozen_string_literal: true + +class Pubsubhubbub::SubscribeWorker + include Sidekiq::Worker + + sidekiq_options queue: 'push' + + def perform(account_id) + account = Account.find(account_id) + Rails.logger.debug "PuSH re-subscribing to #{account.acct}" + ::SubscribeService.new.call(account) + end +end diff --git a/app/workers/scheduler/subscriptions_scheduler.rb b/app/workers/scheduler/subscriptions_scheduler.rb new file mode 100644 index 000000000..03622e95b --- /dev/null +++ b/app/workers/scheduler/subscriptions_scheduler.rb @@ -0,0 +1,20 @@ +# frozen_string_literal: true +require 'sidekiq-scheduler' + +class Scheduler::SubscriptionsScheduler + include Sidekiq::Worker + + def perform + Rails.logger.debug 'Queueing PuSH re-subscriptions' + + expiring_accounts.pluck(:id) do |id| + Pubsubhubbub::SubscribeWorker.perform_async(id) + end + end + + private + + def expiring_accounts + Account.expiring(1.day.from_now).partitioned + end +end |