about summary refs log tree commit diff
path: root/app/workers
diff options
context:
space:
mode:
authorEugen Rochko <eugen@zeonfederated.com>2017-05-05 02:23:01 +0200
committerGitHub <noreply@github.com>2017-05-05 02:23:01 +0200
commit81584779cb1795d2fe7827e054bbe245712528a2 (patch)
tree008186fee04307d50db7a2dc3fa50696488ed686 /app/workers
parent61c33652ad7a98f0c30fea67bc382e1306b69880 (diff)
More robust PuSH subscription refreshes (#2799)
* Fix #2473 - Use sidekiq scheduler to refresh PuSH subscriptions instead of cron

Fix an issue where / in domain would raise exception in TagManager#normalize_domain

PuSH subscriptions refresh done in a round-robin way to avoid hammering a single
server's hub in sequence. Correct handling of failures/retries through Sidekiq (see
also #2613). Optimize Account#with_followers scope. Also, since subscriptions
are now delegated to Sidekiq jobs, an uncaught exception will not stop the entire
refreshing operation halfway through

Fix #2702 - Correct user agent header on outgoing http requests

* Add test for SubscribeService

* Extract #expiring_accounts into method

* Make mastodon:push:refresh no-op

* Queues are now defined in sidekiq.yml

* Queues are now in sidekiq.yml
Diffstat (limited to 'app/workers')
-rw-r--r--app/workers/pubsubhubbub/delivery_worker.rb12
-rw-r--r--app/workers/pubsubhubbub/subscribe_worker.rb13
-rw-r--r--app/workers/scheduler/subscriptions_scheduler.rb20
3 files changed, 43 insertions, 2 deletions
diff --git a/app/workers/pubsubhubbub/delivery_worker.rb b/app/workers/pubsubhubbub/delivery_worker.rb
index f645b1e24..511ae14b3 100644
--- a/app/workers/pubsubhubbub/delivery_worker.rb
+++ b/app/workers/pubsubhubbub/delivery_worker.rb
@@ -25,8 +25,8 @@ class Pubsubhubbub::DeliveryWorker
                    .headers(headers)
                    .post(subscription.callback_url, body: payload)
 
-    return subscription.destroy! if response.code > 299 && response.code < 500 && response.code != 429 # HTTP 4xx means error is not temporary, except for 429 (throttling)
-    raise "Delivery failed for #{subscription.callback_url}: HTTP #{response.code}" unless response.code > 199 && response.code < 300
+    return subscription.destroy! if response_failed_permanently?(response) # HTTP 4xx means error is not temporary, except for 429 (throttling)
+    raise "Delivery failed for #{subscription.callback_url}: HTTP #{response.code}" unless response_successful?(response)
 
     subscription.touch(:last_successful_delivery_at)
   end
@@ -37,4 +37,12 @@ class Pubsubhubbub::DeliveryWorker
     hmac = OpenSSL::HMAC.hexdigest(OpenSSL::Digest.new('sha1'), secret, payload)
     "sha1=#{hmac}"
   end
+
+  def response_failed_permanently?(response)
+    response.code > 299 && response.code < 500 && response.code != 429
+  end
+
+  def response_successful?(response)
+    response.code > 199 && response.code < 300
+  end
 end
diff --git a/app/workers/pubsubhubbub/subscribe_worker.rb b/app/workers/pubsubhubbub/subscribe_worker.rb
new file mode 100644
index 000000000..0c4111a8c
--- /dev/null
+++ b/app/workers/pubsubhubbub/subscribe_worker.rb
@@ -0,0 +1,13 @@
+# frozen_string_literal: true
+
+class Pubsubhubbub::SubscribeWorker
+  include Sidekiq::Worker
+
+  sidekiq_options queue: 'push'
+
+  def perform(account_id)
+    account = Account.find(account_id)
+    Rails.logger.debug "PuSH re-subscribing to #{account.acct}"
+    ::SubscribeService.new.call(account)
+  end
+end
diff --git a/app/workers/scheduler/subscriptions_scheduler.rb b/app/workers/scheduler/subscriptions_scheduler.rb
new file mode 100644
index 000000000..03622e95b
--- /dev/null
+++ b/app/workers/scheduler/subscriptions_scheduler.rb
@@ -0,0 +1,20 @@
+# frozen_string_literal: true
+require 'sidekiq-scheduler'
+
+class Scheduler::SubscriptionsScheduler
+  include Sidekiq::Worker
+
+  def perform
+    Rails.logger.debug 'Queueing PuSH re-subscriptions'
+
+    expiring_accounts.pluck(:id) do |id|
+      Pubsubhubbub::SubscribeWorker.perform_async(id)
+    end
+  end
+
+  private
+
+  def expiring_accounts
+    Account.expiring(1.day.from_now).partitioned
+  end
+end