about summary refs log tree commit diff
path: root/app/workers
diff options
context:
space:
mode:
authorStarfall <root@starfall.blue>2019-12-09 19:07:33 -0600
committerStarfall <root@starfall.blue>2019-12-09 19:09:31 -0600
commit6b34fcfef7566105e8d80ab5fee0a539c06cddbf (patch)
tree8fad2d47bf8be255d3c671c40cbfd04c2f55ed03 /app/workers
parent9fbb4af7611aa7836e65ef9f544d341423c15685 (diff)
parent246addd5b33a172600342af3fb6fb5e4c80ad95e (diff)
Merge branch 'glitch'`
Diffstat (limited to 'app/workers')
-rw-r--r--app/workers/activitypub/delivery_worker.rb43
-rw-r--r--app/workers/activitypub/move_distribution_worker.rb33
-rw-r--r--app/workers/admin/suspension_worker.rb2
-rw-r--r--app/workers/after_remote_follow_request_worker.rb24
-rw-r--r--app/workers/after_remote_follow_worker.rb24
-rw-r--r--app/workers/domain_block_worker.rb4
-rw-r--r--app/workers/feed_insert_worker.rb11
-rw-r--r--app/workers/maintenance/destroy_media_worker.rb14
-rw-r--r--app/workers/maintenance/redownload_account_media_worker.rb16
-rw-r--r--app/workers/maintenance/uncache_media_worker.rb18
-rw-r--r--app/workers/move_worker.rb37
-rw-r--r--app/workers/notification_worker.rb4
-rw-r--r--app/workers/processing_worker.rb4
-rw-r--r--app/workers/pubsubhubbub/confirmation_worker.rb75
-rw-r--r--app/workers/pubsubhubbub/delivery_worker.rb74
-rw-r--r--app/workers/pubsubhubbub/distribution_worker.rb25
-rw-r--r--app/workers/pubsubhubbub/raw_distribution_worker.rb15
-rw-r--r--app/workers/pubsubhubbub/subscribe_worker.rb27
-rw-r--r--app/workers/pubsubhubbub/unsubscribe_worker.rb8
-rw-r--r--app/workers/redownload_media_worker.rb19
-rw-r--r--app/workers/remote_profile_update_worker.rb6
-rw-r--r--app/workers/removal_worker.rb4
-rw-r--r--app/workers/salmon_worker.rb6
-rw-r--r--app/workers/scheduler/feed_cleanup_scheduler.rb5
-rw-r--r--app/workers/scheduler/ip_cleanup_scheduler.rb4
-rw-r--r--app/workers/scheduler/subscriptions_cleanup_scheduler.rb4
-rw-r--r--app/workers/scheduler/subscriptions_scheduler.rb10
-rw-r--r--app/workers/scheduler/trending_tags_scheduler.rb11
-rw-r--r--app/workers/unfollow_follow_worker.rb9
-rw-r--r--app/workers/web/push_notification_worker.rb10
30 files changed, 174 insertions, 372 deletions
diff --git a/app/workers/activitypub/delivery_worker.rb b/app/workers/activitypub/delivery_worker.rb
index 5e4c391f0..5457d9d4b 100644
--- a/app/workers/activitypub/delivery_worker.rb
+++ b/app/workers/activitypub/delivery_worker.rb
@@ -2,6 +2,7 @@
 
 class ActivityPub::DeliveryWorker
   include Sidekiq::Worker
+  include JsonLdHelper
 
   STOPLIGHT_FAILURE_THRESHOLD = 10
   STOPLIGHT_COOLDOWN = 60
@@ -17,27 +18,35 @@ class ActivityPub::DeliveryWorker
     @json           = json
     @source_account = Account.find(source_account_id)
     @inbox_url      = inbox_url
+    @host           = Addressable::URI.parse(inbox_url).normalized_site
+    @performed      = false
 
     perform_request
-
-    failure_tracker.track_success!
-  rescue => e
-    failure_tracker.track_failure!
-    raise e.class, "Delivery failed for #{inbox_url}: #{e.message}", e.backtrace[0]
+  ensure
+    if @performed
+      failure_tracker.track_success!
+    else
+      failure_tracker.track_failure!
+    end
   end
 
   private
 
-  def build_request
-    request = Request.new(:post, @inbox_url, body: @json)
-    request.on_behalf_of(@source_account, :uri, sign_with: @options[:sign_with])
-    request.add_headers(HEADERS)
+  def build_request(http_client)
+    Request.new(:post, @inbox_url, body: @json, http_client: http_client).tap do |request|
+      request.on_behalf_of(@source_account, :uri, sign_with: @options[:sign_with])
+      request.add_headers(HEADERS)
+    end
   end
 
   def perform_request
     light = Stoplight(@inbox_url) do
-      build_request.perform do |response|
-        raise Mastodon::UnexpectedResponseError, response unless response_successful?(response) || response_error_unsalvageable?(response)
+      request_pool.with(@host) do |http_client|
+        build_request(http_client).perform do |response|
+          raise Mastodon::UnexpectedResponseError, response unless response_successful?(response) || response_error_unsalvageable?(response)
+
+          @performed = true
+        end
       end
     end
 
@@ -46,15 +55,11 @@ class ActivityPub::DeliveryWorker
          .run
   end
 
-  def response_successful?(response)
-    (200...300).cover?(response.code)
-  end
-
-  def response_error_unsalvageable?(response)
-    (400...500).cover?(response.code) && ![401, 408, 429].include?(response.code)
-  end
-
   def failure_tracker
     @failure_tracker ||= DeliveryFailureTracker.new(@inbox_url)
   end
+
+  def request_pool
+    RequestPool.current
+  end
 end
diff --git a/app/workers/activitypub/move_distribution_worker.rb b/app/workers/activitypub/move_distribution_worker.rb
new file mode 100644
index 000000000..bf1c0e7ae
--- /dev/null
+++ b/app/workers/activitypub/move_distribution_worker.rb
@@ -0,0 +1,33 @@
+# frozen_string_literal: true
+
+class ActivityPub::MoveDistributionWorker
+  include Sidekiq::Worker
+  include Payloadable
+
+  sidekiq_options queue: 'push'
+
+  def perform(migration_id)
+    @migration = AccountMigration.find(migration_id)
+    @account   = @migration.account
+
+    ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url|
+      [signed_payload, @account.id, inbox_url]
+    end
+
+    ActivityPub::DeliveryWorker.push_bulk(Relay.enabled.pluck(:inbox_url)) do |inbox_url|
+      [signed_payload, @account.id, inbox_url]
+    end
+  rescue ActiveRecord::RecordNotFound
+    true
+  end
+
+  private
+
+  def inboxes
+    @inboxes ||= @migration.account.followers.inboxes
+  end
+
+  def signed_payload
+    @signed_payload ||= Oj.dump(serialize_payload(@migration, ActivityPub::MoveSerializer, signer: @account))
+  end
+end
diff --git a/app/workers/admin/suspension_worker.rb b/app/workers/admin/suspension_worker.rb
index ae8b24d8c..83c815efd 100644
--- a/app/workers/admin/suspension_worker.rb
+++ b/app/workers/admin/suspension_worker.rb
@@ -6,6 +6,6 @@ class Admin::SuspensionWorker
   sidekiq_options queue: 'pull'
 
   def perform(account_id, remove_user = false)
-    SuspendAccountService.new.call(Account.find(account_id), including_user: remove_user)
+    SuspendAccountService.new.call(Account.find(account_id), reserve_username: true, reserve_email: !remove_user)
   end
 end
diff --git a/app/workers/after_remote_follow_request_worker.rb b/app/workers/after_remote_follow_request_worker.rb
index 84eb6ade2..ce9c65834 100644
--- a/app/workers/after_remote_follow_request_worker.rb
+++ b/app/workers/after_remote_follow_request_worker.rb
@@ -5,27 +5,5 @@ class AfterRemoteFollowRequestWorker
 
   sidekiq_options queue: 'pull', retry: 5
 
-  attr_reader :follow_request
-
-  def perform(follow_request_id)
-    @follow_request = FollowRequest.find(follow_request_id)
-    process_follow_service if processing_required?
-  rescue ActiveRecord::RecordNotFound
-    true
-  end
-
-  private
-
-  def process_follow_service
-    follow_request.destroy
-    FollowService.new.call(follow_request.account, updated_account.acct)
-  end
-
-  def processing_required?
-    !updated_account.nil? && !updated_account.locked?
-  end
-
-  def updated_account
-    @_updated_account ||= FetchRemoteAccountService.new.call(follow_request.target_account.remote_url)
-  end
+  def perform(follow_request_id); end
 end
diff --git a/app/workers/after_remote_follow_worker.rb b/app/workers/after_remote_follow_worker.rb
index edab83f85..d9719f2bf 100644
--- a/app/workers/after_remote_follow_worker.rb
+++ b/app/workers/after_remote_follow_worker.rb
@@ -5,27 +5,5 @@ class AfterRemoteFollowWorker
 
   sidekiq_options queue: 'pull', retry: 5
 
-  attr_reader :follow
-
-  def perform(follow_id)
-    @follow = Follow.find(follow_id)
-    process_follow_service if processing_required?
-  rescue ActiveRecord::RecordNotFound
-    true
-  end
-
-  private
-
-  def process_follow_service
-    follow.destroy
-    FollowService.new.call(follow.account, updated_account.acct)
-  end
-
-  def updated_account
-    @_updated_account ||= FetchRemoteAccountService.new.call(follow.target_account.remote_url)
-  end
-
-  def processing_required?
-    !updated_account.nil? && updated_account.locked?
-  end
+  def perform(follow_id); end
 end
diff --git a/app/workers/domain_block_worker.rb b/app/workers/domain_block_worker.rb
index 884477829..35518d6b5 100644
--- a/app/workers/domain_block_worker.rb
+++ b/app/workers/domain_block_worker.rb
@@ -3,8 +3,8 @@
 class DomainBlockWorker
   include Sidekiq::Worker
 
-  def perform(domain_block_id)
-    BlockDomainService.new.call(DomainBlock.find(domain_block_id))
+  def perform(domain_block_id, update = false)
+    BlockDomainService.new.call(DomainBlock.find(domain_block_id), update)
   rescue ActiveRecord::RecordNotFound
     true
   end
diff --git a/app/workers/feed_insert_worker.rb b/app/workers/feed_insert_worker.rb
index 1ae3c877b..546f5c0c2 100644
--- a/app/workers/feed_insert_worker.rb
+++ b/app/workers/feed_insert_worker.rb
@@ -13,6 +13,8 @@ class FeedInsertWorker
     when :list
       @list     = List.find(id)
       @follower = @list.account
+    when :direct
+      @account  = Account.find(id)
     end
 
     check_and_insert
@@ -29,7 +31,12 @@ class FeedInsertWorker
   def feed_filtered?
     # Note: Lists are a variation of home, so the filtering rules
     # of home apply to both
-    FeedManager.instance.filter?(:home, @status, @follower.id)
+    case @type
+    when :home, :list
+      FeedManager.instance.filter?(:home, @status, @follower.id)
+    when :direct
+      FeedManager.instance.filter?(:direct, @status, @account.id)
+    end
   end
 
   def perform_push
@@ -38,6 +45,8 @@ class FeedInsertWorker
       FeedManager.instance.push_to_home(@follower, @status)
     when :list
       FeedManager.instance.push_to_list(@list, @status)
+    when :direct
+      FeedManager.instance.push_to_direct(@account, @status)
     end
   end
 end
diff --git a/app/workers/maintenance/destroy_media_worker.rb b/app/workers/maintenance/destroy_media_worker.rb
deleted file mode 100644
index cde33d6d7..000000000
--- a/app/workers/maintenance/destroy_media_worker.rb
+++ /dev/null
@@ -1,14 +0,0 @@
-# frozen_string_literal: true
-
-class Maintenance::DestroyMediaWorker
-  include Sidekiq::Worker
-
-  sidekiq_options queue: 'pull'
-
-  def perform(media_attachment_id)
-    media = media_attachment_id.is_a?(MediaAttachment) ? media_attachment_id : MediaAttachment.find(media_attachment_id)
-    media.destroy
-  rescue ActiveRecord::RecordNotFound
-    true
-  end
-end
diff --git a/app/workers/maintenance/redownload_account_media_worker.rb b/app/workers/maintenance/redownload_account_media_worker.rb
deleted file mode 100644
index 6afbe6e19..000000000
--- a/app/workers/maintenance/redownload_account_media_worker.rb
+++ /dev/null
@@ -1,16 +0,0 @@
-# frozen_string_literal: true
-
-class Maintenance::RedownloadAccountMediaWorker
-  include Sidekiq::Worker
-
-  sidekiq_options queue: 'pull', retry: false
-
-  def perform(account_id)
-    account = account_id.is_a?(Account) ? account_id : Account.find(account_id)
-    account.reset_avatar!
-    account.reset_header!
-    account.save
-  rescue ActiveRecord::RecordNotFound
-    true
-  end
-end
diff --git a/app/workers/maintenance/uncache_media_worker.rb b/app/workers/maintenance/uncache_media_worker.rb
deleted file mode 100644
index 4bc62ef75..000000000
--- a/app/workers/maintenance/uncache_media_worker.rb
+++ /dev/null
@@ -1,18 +0,0 @@
-# frozen_string_literal: true
-
-class Maintenance::UncacheMediaWorker
-  include Sidekiq::Worker
-
-  sidekiq_options queue: 'pull'
-
-  def perform(media_attachment_id)
-    media = media_attachment_id.is_a?(MediaAttachment) ? media_attachment_id : MediaAttachment.find(media_attachment_id)
-
-    return if media.file.blank?
-
-    media.file.destroy
-    media.save
-  rescue ActiveRecord::RecordNotFound
-    true
-  end
-end
diff --git a/app/workers/move_worker.rb b/app/workers/move_worker.rb
new file mode 100644
index 000000000..595730226
--- /dev/null
+++ b/app/workers/move_worker.rb
@@ -0,0 +1,37 @@
+# frozen_string_literal: true
+
+class MoveWorker
+  include Sidekiq::Worker
+
+  def perform(source_account_id, target_account_id)
+    @source_account = Account.find(source_account_id)
+    @target_account = Account.find(target_account_id)
+
+    if @target_account.local? && @source_account.local?
+      rewrite_follows!
+    else
+      queue_follow_unfollows!
+    end
+  rescue ActiveRecord::RecordNotFound
+    true
+  end
+
+  private
+
+  def rewrite_follows!
+    @source_account.passive_relationships
+                   .where(account: Account.local)
+                   .where.not(account: @target_account.followers.local)
+                   .where.not(account_id: @target_account.id)
+                   .in_batches
+                   .update_all(target_account_id: @target_account.id)
+  end
+
+  def queue_follow_unfollows!
+    bypass_locked = @target_account.local?
+
+    @source_account.followers.local.select(:id).find_in_batches do |accounts|
+      UnfollowFollowWorker.push_bulk(accounts.map(&:id)) { |follower_id| [follower_id, @source_account.id, @target_account.id, bypass_locked] }
+    end
+  end
+end
diff --git a/app/workers/notification_worker.rb b/app/workers/notification_worker.rb
index da1d6ab45..1c0f001cf 100644
--- a/app/workers/notification_worker.rb
+++ b/app/workers/notification_worker.rb
@@ -5,7 +5,5 @@ class NotificationWorker
 
   sidekiq_options queue: 'push', retry: 5
 
-  def perform(xml, source_account_id, target_account_id)
-    SendInteractionService.new.call(xml, Account.find(source_account_id), Account.find(target_account_id))
-  end
+  def perform(xml, source_account_id, target_account_id); end
 end
diff --git a/app/workers/processing_worker.rb b/app/workers/processing_worker.rb
index 978c3aba2..cf3bd8397 100644
--- a/app/workers/processing_worker.rb
+++ b/app/workers/processing_worker.rb
@@ -5,7 +5,5 @@ class ProcessingWorker
 
   sidekiq_options backtrace: true
 
-  def perform(account_id, body)
-    ProcessFeedService.new.call(body, Account.find(account_id), override_timestamps: true)
-  end
+  def perform(account_id, body); end
 end
diff --git a/app/workers/pubsubhubbub/confirmation_worker.rb b/app/workers/pubsubhubbub/confirmation_worker.rb
index c0e7b677e..783a8c95f 100644
--- a/app/workers/pubsubhubbub/confirmation_worker.rb
+++ b/app/workers/pubsubhubbub/confirmation_worker.rb
@@ -2,81 +2,8 @@
 
 class Pubsubhubbub::ConfirmationWorker
   include Sidekiq::Worker
-  include RoutingHelper
 
   sidekiq_options queue: 'push', retry: false
 
-  attr_reader :subscription, :mode, :secret, :lease_seconds
-
-  def perform(subscription_id, mode, secret = nil, lease_seconds = nil)
-    @subscription = Subscription.find(subscription_id)
-    @mode = mode
-    @secret = secret
-    @lease_seconds = lease_seconds
-    process_confirmation
-  end
-
-  private
-
-  def process_confirmation
-    prepare_subscription
-
-    callback_get_with_params
-    logger.debug "Confirming PuSH subscription for #{subscription.callback_url} with challenge #{challenge}: #{@callback_response_body}"
-
-    update_subscription
-  end
-
-  def update_subscription
-    if successful_subscribe?
-      subscription.save!
-    elsif successful_unsubscribe?
-      subscription.destroy!
-    end
-  end
-
-  def successful_subscribe?
-    subscribing? && response_matches_challenge?
-  end
-
-  def successful_unsubscribe?
-    (unsubscribing? && response_matches_challenge?) || !subscription.confirmed?
-  end
-
-  def response_matches_challenge?
-    @callback_response_body == challenge
-  end
-
-  def subscribing?
-    mode == 'subscribe'
-  end
-
-  def unsubscribing?
-    mode == 'unsubscribe'
-  end
-
-  def callback_get_with_params
-    Request.new(:get, subscription.callback_url, params: callback_params).perform do |response|
-      @callback_response_body = response.body_with_limit
-    end
-  end
-
-  def callback_params
-    {
-      'hub.topic': account_url(subscription.account, format: :atom),
-      'hub.mode': mode,
-      'hub.challenge': challenge,
-      'hub.lease_seconds': subscription.lease_seconds,
-    }
-  end
-
-  def prepare_subscription
-    subscription.secret = secret
-    subscription.lease_seconds = lease_seconds
-    subscription.confirmed = true
-  end
-
-  def challenge
-    @_challenge ||= SecureRandom.hex
-  end
+  def perform(subscription_id, mode, secret = nil, lease_seconds = nil); end
 end
diff --git a/app/workers/pubsubhubbub/delivery_worker.rb b/app/workers/pubsubhubbub/delivery_worker.rb
index 619bfa48a..1260060bd 100644
--- a/app/workers/pubsubhubbub/delivery_worker.rb
+++ b/app/workers/pubsubhubbub/delivery_worker.rb
@@ -2,80 +2,8 @@
 
 class Pubsubhubbub::DeliveryWorker
   include Sidekiq::Worker
-  include RoutingHelper
 
   sidekiq_options queue: 'push', retry: 3, dead: false
 
-  sidekiq_retry_in do |count|
-    5 * (count + 1)
-  end
-
-  attr_reader :subscription, :payload
-
-  def perform(subscription_id, payload)
-    @subscription = Subscription.find(subscription_id)
-    @payload = payload
-    process_delivery unless blocked_domain?
-  rescue => e
-    raise e.class, "Delivery failed for #{subscription&.callback_url}: #{e.message}", e.backtrace[0]
-  end
-
-  private
-
-  def process_delivery
-    callback_post_payload do |payload_delivery|
-      raise Mastodon::UnexpectedResponseError, payload_delivery unless response_successful? payload_delivery
-    end
-
-    subscription.touch(:last_successful_delivery_at)
-  end
-
-  def callback_post_payload(&block)
-    request = Request.new(:post, subscription.callback_url, body: payload)
-    request.add_headers(headers)
-    request.perform(&block)
-  end
-
-  def blocked_domain?
-    DomainBlock.blocked?(host)
-  end
-
-  def host
-    Addressable::URI.parse(subscription.callback_url).normalized_host
-  end
-
-  def headers
-    {
-      'Content-Type' => 'application/atom+xml',
-      'Link' => link_header,
-    }.merge(signature_headers.to_h)
-  end
-
-  def link_header
-    LinkHeader.new([hub_link_header, self_link_header]).to_s
-  end
-
-  def hub_link_header
-    [api_push_url, [%w(rel hub)]]
-  end
-
-  def self_link_header
-    [account_url(subscription.account, format: :atom), [%w(rel self)]]
-  end
-
-  def signature_headers
-    { 'X-Hub-Signature' => payload_signature } if subscription.secret?
-  end
-
-  def payload_signature
-    "sha1=#{hmac_payload_digest}"
-  end
-
-  def hmac_payload_digest
-    OpenSSL::HMAC.hexdigest(OpenSSL::Digest.new('sha1'), subscription.secret, payload)
-  end
-
-  def response_successful?(payload_delivery)
-    payload_delivery.code > 199 && payload_delivery.code < 300
-  end
+  def perform(subscription_id, payload); end
 end
diff --git a/app/workers/pubsubhubbub/distribution_worker.rb b/app/workers/pubsubhubbub/distribution_worker.rb
index fed5e917d..75bac5d6f 100644
--- a/app/workers/pubsubhubbub/distribution_worker.rb
+++ b/app/workers/pubsubhubbub/distribution_worker.rb
@@ -5,28 +5,5 @@ class Pubsubhubbub::DistributionWorker
 
   sidekiq_options queue: 'push'
 
-  def perform(stream_entry_ids)
-    stream_entries = StreamEntry.where(id: stream_entry_ids).includes(:status).reject { |e| e.status.nil? || e.status.hidden? }
-
-    return if stream_entries.empty?
-
-    @account       = stream_entries.first.account
-    @subscriptions = active_subscriptions.to_a
-
-    distribute_public!(stream_entries)
-  end
-
-  private
-
-  def distribute_public!(stream_entries)
-    @payload = OStatus::AtomSerializer.render(OStatus::AtomSerializer.new.feed(@account, stream_entries))
-
-    Pubsubhubbub::DeliveryWorker.push_bulk(@subscriptions) do |subscription_id|
-      [subscription_id, @payload]
-    end
-  end
-
-  def active_subscriptions
-    Subscription.where(account: @account).active.pluck(:id)
-  end
+  def perform(stream_entry_ids); end
 end
diff --git a/app/workers/pubsubhubbub/raw_distribution_worker.rb b/app/workers/pubsubhubbub/raw_distribution_worker.rb
index 16962a623..ece9c80ac 100644
--- a/app/workers/pubsubhubbub/raw_distribution_worker.rb
+++ b/app/workers/pubsubhubbub/raw_distribution_worker.rb
@@ -5,18 +5,5 @@ class Pubsubhubbub::RawDistributionWorker
 
   sidekiq_options queue: 'push'
 
-  def perform(xml, source_account_id)
-    @account       = Account.find(source_account_id)
-    @subscriptions = active_subscriptions.to_a
-
-    Pubsubhubbub::DeliveryWorker.push_bulk(@subscriptions) do |subscription|
-      [subscription.id, xml]
-    end
-  end
-
-  private
-
-  def active_subscriptions
-    Subscription.where(account: @account).active.select('id, callback_url, domain')
-  end
+  def perform(xml, source_account_id); end
 end
diff --git a/app/workers/pubsubhubbub/subscribe_worker.rb b/app/workers/pubsubhubbub/subscribe_worker.rb
index 2e176d1c1..b861b5e67 100644
--- a/app/workers/pubsubhubbub/subscribe_worker.rb
+++ b/app/workers/pubsubhubbub/subscribe_worker.rb
@@ -5,30 +5,5 @@ class Pubsubhubbub::SubscribeWorker
 
   sidekiq_options queue: 'push', retry: 10, unique: :until_executed, dead: false
 
-  sidekiq_retry_in do |count|
-    case count
-    when 0
-      30.minutes.seconds
-    when 1
-      2.hours.seconds
-    when 2
-      12.hours.seconds
-    else
-      24.hours.seconds * (count - 2)
-    end
-  end
-
-  sidekiq_retries_exhausted do |msg, _e|
-    account = Account.find(msg['args'].first)
-    Sidekiq.logger.error "PuSH subscription attempts for #{account.acct} exhausted. Unsubscribing"
-    ::UnsubscribeService.new.call(account)
-  end
-
-  def perform(account_id)
-    account = Account.find(account_id)
-    logger.debug "PuSH re-subscribing to #{account.acct}"
-    ::SubscribeService.new.call(account)
-  rescue => e
-    raise e.class, "Subscribe failed for #{account&.acct}: #{e.message}", e.backtrace[0]
-  end
+  def perform(account_id); end
 end
diff --git a/app/workers/pubsubhubbub/unsubscribe_worker.rb b/app/workers/pubsubhubbub/unsubscribe_worker.rb
index a271715b7..0c1c263f6 100644
--- a/app/workers/pubsubhubbub/unsubscribe_worker.rb
+++ b/app/workers/pubsubhubbub/unsubscribe_worker.rb
@@ -5,11 +5,5 @@ class Pubsubhubbub::UnsubscribeWorker
 
   sidekiq_options queue: 'push', retry: false, unique: :until_executed, dead: false
 
-  def perform(account_id)
-    account = Account.find(account_id)
-    logger.debug "PuSH unsubscribing from #{account.acct}"
-    ::UnsubscribeService.new.call(account)
-  rescue ActiveRecord::RecordNotFound
-    true
-  end
+  def perform(account_id); end
 end
diff --git a/app/workers/redownload_media_worker.rb b/app/workers/redownload_media_worker.rb
new file mode 100644
index 000000000..98e995918
--- /dev/null
+++ b/app/workers/redownload_media_worker.rb
@@ -0,0 +1,19 @@
+# frozen_string_literal: true
+
+class RedownloadMediaWorker
+  include Sidekiq::Worker
+  include ExponentialBackoff
+
+  sidekiq_options queue: 'pull', retry: 3
+
+  def perform(id)
+    media_attachment = MediaAttachment.find(id)
+
+    return if media_attachment.remote_url.blank?
+
+    media_attachment.reset_file!
+    media_attachment.save
+  rescue ActiveRecord::RecordNotFound
+    true
+  end
+end
diff --git a/app/workers/remote_profile_update_worker.rb b/app/workers/remote_profile_update_worker.rb
index 03585ad2d..01e8daf8f 100644
--- a/app/workers/remote_profile_update_worker.rb
+++ b/app/workers/remote_profile_update_worker.rb
@@ -5,9 +5,5 @@ class RemoteProfileUpdateWorker
 
   sidekiq_options queue: 'pull'
 
-  def perform(account_id, body, resubscribe)
-    UpdateRemoteProfileService.new.call(body, Account.find(account_id), resubscribe)
-  rescue ActiveRecord::RecordNotFound
-    true
-  end
+  def perform(account_id, body, resubscribe); end
 end
diff --git a/app/workers/removal_worker.rb b/app/workers/removal_worker.rb
index 19a660dd3..2a1eaa89b 100644
--- a/app/workers/removal_worker.rb
+++ b/app/workers/removal_worker.rb
@@ -3,8 +3,8 @@
 class RemovalWorker
   include Sidekiq::Worker
 
-  def perform(status_id)
-    RemoveStatusService.new.call(Status.find(status_id))
+  def perform(status_id, options = {})
+    RemoveStatusService.new.call(Status.with_discarded.find(status_id), **options.symbolize_keys)
   rescue ActiveRecord::RecordNotFound
     true
   end
diff --git a/app/workers/salmon_worker.rb b/app/workers/salmon_worker.rb
index d37d40432..10200b06c 100644
--- a/app/workers/salmon_worker.rb
+++ b/app/workers/salmon_worker.rb
@@ -5,9 +5,5 @@ class SalmonWorker
 
   sidekiq_options backtrace: true
 
-  def perform(account_id, body)
-    ProcessInteractionService.new.call(body, Account.find(account_id))
-  rescue Nokogiri::XML::XPath::SyntaxError, ActiveRecord::RecordNotFound
-    true
-  end
+  def perform(account_id, body); end
 end
diff --git a/app/workers/scheduler/feed_cleanup_scheduler.rb b/app/workers/scheduler/feed_cleanup_scheduler.rb
index bf5e20757..4933f1753 100644
--- a/app/workers/scheduler/feed_cleanup_scheduler.rb
+++ b/app/workers/scheduler/feed_cleanup_scheduler.rb
@@ -9,6 +9,7 @@ class Scheduler::FeedCleanupScheduler
   def perform
     clean_home_feeds!
     clean_list_feeds!
+    clean_direct_feeds!
   end
 
   private
@@ -21,6 +22,10 @@ class Scheduler::FeedCleanupScheduler
     clean_feeds!(inactive_list_ids, :list)
   end
 
+  def clean_direct_feeds!
+    clean_feeds!(inactive_account_ids, :direct)
+  end
+
   def clean_feeds!(ids, type)
     reblogged_id_sets = {}
 
diff --git a/app/workers/scheduler/ip_cleanup_scheduler.rb b/app/workers/scheduler/ip_cleanup_scheduler.rb
index 42620332e..4f44078d8 100644
--- a/app/workers/scheduler/ip_cleanup_scheduler.rb
+++ b/app/workers/scheduler/ip_cleanup_scheduler.rb
@@ -9,7 +9,7 @@ class Scheduler::IpCleanupScheduler
 
   def perform
     time_ago = RETENTION_PERIOD.ago
-    SessionActivation.where('updated_at < ?', time_ago).destroy_all
-    User.where('last_sign_in_at < ?', time_ago).update_all(last_sign_in_ip: nil)
+    SessionActivation.where('updated_at < ?', time_ago).in_batches.destroy_all
+    User.where('last_sign_in_at < ?', time_ago).where.not(last_sign_in_ip: nil).in_batches.update_all(last_sign_in_ip: nil)
   end
 end
diff --git a/app/workers/scheduler/subscriptions_cleanup_scheduler.rb b/app/workers/scheduler/subscriptions_cleanup_scheduler.rb
index 5fba120f6..75fe681a9 100644
--- a/app/workers/scheduler/subscriptions_cleanup_scheduler.rb
+++ b/app/workers/scheduler/subscriptions_cleanup_scheduler.rb
@@ -5,7 +5,5 @@ class Scheduler::SubscriptionsCleanupScheduler
 
   sidekiq_options unique: :until_executed, retry: 0
 
-  def perform
-    Subscription.expired.in_batches.delete_all
-  end
+  def perform; end
 end
diff --git a/app/workers/scheduler/subscriptions_scheduler.rb b/app/workers/scheduler/subscriptions_scheduler.rb
index d5873bccb..6903cadc7 100644
--- a/app/workers/scheduler/subscriptions_scheduler.rb
+++ b/app/workers/scheduler/subscriptions_scheduler.rb
@@ -5,13 +5,5 @@ class Scheduler::SubscriptionsScheduler
 
   sidekiq_options unique: :until_executed, retry: 0
 
-  def perform
-    Pubsubhubbub::SubscribeWorker.push_bulk(expiring_accounts.pluck(:id))
-  end
-
-  private
-
-  def expiring_accounts
-    Account.expiring(1.day.from_now).partitioned
-  end
+  def perform; end
 end
diff --git a/app/workers/scheduler/trending_tags_scheduler.rb b/app/workers/scheduler/trending_tags_scheduler.rb
new file mode 100644
index 000000000..77f0d5747
--- /dev/null
+++ b/app/workers/scheduler/trending_tags_scheduler.rb
@@ -0,0 +1,11 @@
+# frozen_string_literal: true
+
+class Scheduler::TrendingTagsScheduler
+  include Sidekiq::Worker
+
+  sidekiq_options unique: :until_executed, retry: 0
+
+  def perform
+    TrendingTags.update! if Setting.trends
+  end
+end
diff --git a/app/workers/unfollow_follow_worker.rb b/app/workers/unfollow_follow_worker.rb
index 50d3bf034..b6e665a41 100644
--- a/app/workers/unfollow_follow_worker.rb
+++ b/app/workers/unfollow_follow_worker.rb
@@ -5,13 +5,16 @@ class UnfollowFollowWorker
 
   sidekiq_options queue: 'pull'
 
-  def perform(follower_account_id, old_target_account_id, new_target_account_id)
+  def perform(follower_account_id, old_target_account_id, new_target_account_id, bypass_locked = false)
     follower_account   = Account.find(follower_account_id)
     old_target_account = Account.find(old_target_account_id)
     new_target_account = Account.find(new_target_account_id)
 
-    FollowService.new.call(follower_account, new_target_account)
-    UnfollowService.new.call(follower_account, old_target_account)
+    follow = follower_account.active_relationships.find_by(target_account: old_target_account)
+    reblogs = follow&.show_reblogs?
+
+    FollowService.new.call(follower_account, new_target_account, reblogs: reblogs, bypass_locked: bypass_locked)
+    UnfollowService.new.call(follower_account, old_target_account, skip_unmerge: true)
   rescue ActiveRecord::RecordNotFound, Mastodon::NotPermittedError
     true
   end
diff --git a/app/workers/web/push_notification_worker.rb b/app/workers/web/push_notification_worker.rb
index 8e8a35973..46aeaa30b 100644
--- a/app/workers/web/push_notification_worker.rb
+++ b/app/workers/web/push_notification_worker.rb
@@ -3,7 +3,7 @@
 class Web::PushNotificationWorker
   include Sidekiq::Worker
 
-  sidekiq_options backtrace: true
+  sidekiq_options backtrace: true, retry: 5
 
   def perform(subscription_id, notification_id)
     subscription = ::Web::PushSubscription.find(subscription_id)
@@ -11,7 +11,13 @@ class Web::PushNotificationWorker
 
     subscription.push(notification) unless notification.activity.nil?
   rescue Webpush::ResponseError => e
-    subscription.destroy! if (400..499).cover?(e.response.code.to_i)
+    code = e.response.code.to_i
+
+    if (400..499).cover?(code) && ![408, 429].include?(code)
+      subscription.destroy!
+    else
+      raise e
+    end
   rescue ActiveRecord::RecordNotFound
     true
   end