diff options
Diffstat (limited to 'app/workers')
-rw-r--r-- | app/workers/activitypub/delivery_worker.rb | 11 | ||||
-rw-r--r-- | app/workers/activitypub/processing_worker.rb | 2 | ||||
-rw-r--r-- | app/workers/import/relationship_worker.rb | 25 | ||||
-rw-r--r-- | app/workers/import_worker.rb | 56 | ||||
-rw-r--r-- | app/workers/link_crawl_worker.rb | 2 | ||||
-rw-r--r-- | app/workers/processing_worker.rb | 2 | ||||
-rw-r--r-- | app/workers/pubsubhubbub/delivery_worker.rb | 2 |
7 files changed, 50 insertions, 50 deletions
diff --git a/app/workers/activitypub/delivery_worker.rb b/app/workers/activitypub/delivery_worker.rb index a4e829343..7b1e06a70 100644 --- a/app/workers/activitypub/delivery_worker.rb +++ b/app/workers/activitypub/delivery_worker.rb @@ -3,7 +3,7 @@ class ActivityPub::DeliveryWorker include Sidekiq::Worker - sidekiq_options queue: 'push', retry: 5, dead: false + sidekiq_options queue: 'push', retry: 8, dead: false HEADERS = { 'Content-Type' => 'application/activity+json' }.freeze @@ -15,7 +15,10 @@ class ActivityPub::DeliveryWorker perform_request raise Mastodon::UnexpectedResponseError, @response unless response_successful? + + failure_tracker.track_success! rescue => e + failure_tracker.track_failure! raise e.class, "Delivery failed for #{inbox_url}: #{e.message}", e.backtrace[0] end @@ -28,10 +31,14 @@ class ActivityPub::DeliveryWorker end def perform_request - @response = build_request.perform + @response = build_request.perform.flush end def response_successful? @response.code > 199 && @response.code < 300 end + + def failure_tracker + @failure_tracker ||= DeliveryFailureTracker.new(@inbox_url) + end end diff --git a/app/workers/activitypub/processing_worker.rb b/app/workers/activitypub/processing_worker.rb index bb9adf64b..0e2e0eddd 100644 --- a/app/workers/activitypub/processing_worker.rb +++ b/app/workers/activitypub/processing_worker.rb @@ -6,6 +6,6 @@ class ActivityPub::ProcessingWorker sidekiq_options backtrace: true def perform(account_id, body) - ActivityPub::ProcessCollectionService.new.call(body, Account.find(account_id)) + ActivityPub::ProcessCollectionService.new.call(body, Account.find(account_id), override_timestamps: true) end end diff --git a/app/workers/import/relationship_worker.rb b/app/workers/import/relationship_worker.rb new file mode 100644 index 000000000..ed4c962c1 --- /dev/null +++ b/app/workers/import/relationship_worker.rb @@ -0,0 +1,25 @@ +# frozen_string_literal: true + +class Import::RelationshipWorker + include Sidekiq::Worker + + sidekiq_options queue: 'pull', retry: 8, dead: false + + def perform(account_id, target_account_uri, relationship) + from_account = Account.find(account_id) + target_account = ResolveRemoteAccountService.new.call(target_account_uri) + + return if target_account.nil? + + case relationship + when 'follow' + FollowService.new.call(from_account, target_account.acct) + when 'block' + BlockService.new.call(from_account, target_account) + when 'mute' + MuteService.new.call(from_account, target_account) + end + rescue ActiveRecord::RecordNotFound + true + end +end diff --git a/app/workers/import_worker.rb b/app/workers/import_worker.rb index 27cc6b365..d7c126f75 100644 --- a/app/workers/import_worker.rb +++ b/app/workers/import_worker.rb @@ -12,13 +12,8 @@ class ImportWorker def perform(import_id) @import = Import.find(import_id) - case @import.type - when 'blocking' - process_blocks - when 'following' - process_follows - when 'muting' - process_mutes + Import::RelationshipWorker.push_bulk(import_rows) do |row| + [@import.account_id, row.first, relationship_type] end @import.destroy @@ -26,49 +21,22 @@ class ImportWorker private - def from_account - @import.account - end - def import_contents Paperclip.io_adapters.for(@import.data).read end - def import_rows - CSV.new(import_contents).reject(&:blank?) - end - - def process_mutes - import_rows.each do |row| - begin - target_account = ResolveRemoteAccountService.new.call(row.first) - next if target_account.nil? - MuteService.new.call(from_account, target_account) - rescue Mastodon::UnexpectedResponseError, HTTP::Error, OpenSSL::SSL::SSLError - next - end - end - end - - def process_blocks - import_rows.each do |row| - begin - target_account = ResolveRemoteAccountService.new.call(row.first) - next if target_account.nil? - BlockService.new.call(from_account, target_account) - rescue Mastodon::UnexpectedResponseError, HTTP::Error, OpenSSL::SSL::SSLError - next - end + def relationship_type + case @import.type + when 'following' + 'follow' + when 'blocking' + 'block' + when 'muting' + 'mute' end end - def process_follows - import_rows.each do |row| - begin - FollowService.new.call(from_account, row.first) - rescue Mastodon::NotPermittedError, ActiveRecord::RecordNotFound, Mastodon::UnexpectedResponseError, HTTP::Error, OpenSSL::SSL::SSLError - next - end - end + def import_rows + CSV.new(import_contents).reject(&:blank?) end end diff --git a/app/workers/link_crawl_worker.rb b/app/workers/link_crawl_worker.rb index 834b0088b..b3d8aa264 100644 --- a/app/workers/link_crawl_worker.rb +++ b/app/workers/link_crawl_worker.rb @@ -3,7 +3,7 @@ class LinkCrawlWorker include Sidekiq::Worker - sidekiq_options queue: 'pull', retry: false + sidekiq_options queue: 'pull', retry: 0 def perform(status_id) FetchLinkCardService.new.call(Status.find(status_id)) diff --git a/app/workers/processing_worker.rb b/app/workers/processing_worker.rb index 5df404bcc..978c3aba2 100644 --- a/app/workers/processing_worker.rb +++ b/app/workers/processing_worker.rb @@ -6,6 +6,6 @@ class ProcessingWorker sidekiq_options backtrace: true def perform(account_id, body) - ProcessFeedService.new.call(body, Account.find(account_id)) + ProcessFeedService.new.call(body, Account.find(account_id), override_timestamps: true) end end diff --git a/app/workers/pubsubhubbub/delivery_worker.rb b/app/workers/pubsubhubbub/delivery_worker.rb index 110b8bf16..c3506727b 100644 --- a/app/workers/pubsubhubbub/delivery_worker.rb +++ b/app/workers/pubsubhubbub/delivery_worker.rb @@ -37,7 +37,7 @@ class Pubsubhubbub::DeliveryWorker def callback_post_payload request = Request.new(:post, subscription.callback_url, body: payload) request.add_headers(headers) - request.perform + request.perform.flush end def blocked_domain? |