about summary refs log tree commit diff
path: root/app/workers
diff options
context:
space:
mode:
authorStarfall <us@starfall.systems>2021-05-11 11:19:04 -0500
committerStarfall <us@starfall.systems>2021-05-11 11:19:04 -0500
commitd56731a0b9d73c48bbfbced8732e25587ba892a4 (patch)
treed3830ce2e0292ce07336496e40882c222f455a33 /app/workers
parent459a36ab7303db4ee59945b4b2121b25cc86eb38 (diff)
parentffc3f8eebe134ca9b18af73aa29eaa1627082e40 (diff)
Merge branch 'glitch'
Diffstat (limited to 'app/workers')
-rw-r--r--app/workers/activitypub/distribution_worker.rb2
-rw-r--r--app/workers/import/relationship_worker.rb6
-rw-r--r--app/workers/merge_worker.rb4
-rw-r--r--app/workers/redownload_media_worker.rb11
-rw-r--r--app/workers/scheduler/follow_recommendations_scheduler.rb62
-rw-r--r--app/workers/thread_resolve_worker.rb2
-rw-r--r--app/workers/web/push_notification_worker.rb65
7 files changed, 135 insertions, 17 deletions
diff --git a/app/workers/activitypub/distribution_worker.rb b/app/workers/activitypub/distribution_worker.rb
index 9b4814644..09898ca49 100644
--- a/app/workers/activitypub/distribution_worker.rb
+++ b/app/workers/activitypub/distribution_worker.rb
@@ -35,7 +35,7 @@ class ActivityPub::DistributionWorker
     # Deliver the status to all followers.
     # If the status is a reply to another local status, also forward it to that
     # status' authors' followers.
-    @inboxes ||= if @status.reply? && @status.thread.account.local? && @status.distributable?
+    @inboxes ||= if @status.in_reply_to_local_account? && @status.distributable?
                    @account.followers.or(@status.thread.account.followers).inboxes
                  else
                    @account.followers.inboxes
diff --git a/app/workers/import/relationship_worker.rb b/app/workers/import/relationship_worker.rb
index 4a7100435..6791b15c3 100644
--- a/app/workers/import/relationship_worker.rb
+++ b/app/workers/import/relationship_worker.rb
@@ -5,7 +5,7 @@ class Import::RelationshipWorker
 
   sidekiq_options queue: 'pull', retry: 8, dead: false
 
-  def perform(account_id, target_account_uri, relationship, options = {})
+  def perform(account_id, target_account_uri, relationship, options)
     from_account   = Account.find(account_id)
     target_domain  = domain(target_account_uri)
     target_account = stoplight_wrap_request(target_domain) { ResolveAccountService.new.call(target_account_uri, { check_delivery_availability: true }) }
@@ -16,7 +16,7 @@ class Import::RelationshipWorker
     case relationship
     when 'follow'
       begin
-        FollowService.new.call(from_account, target_account, options)
+        FollowService.new.call(from_account, target_account, **options)
       rescue ActiveRecord::RecordInvalid
         raise if FollowLimitValidator.limit_for_account(from_account) < from_account.following_count
       end
@@ -27,7 +27,7 @@ class Import::RelationshipWorker
     when 'unblock'
       UnblockService.new.call(from_account, target_account)
     when 'mute'
-      MuteService.new.call(from_account, target_account, options)
+      MuteService.new.call(from_account, target_account, **options)
     when 'unmute'
       UnmuteService.new.call(from_account, target_account)
     end
diff --git a/app/workers/merge_worker.rb b/app/workers/merge_worker.rb
index 74ef7d4da..6ebb9a400 100644
--- a/app/workers/merge_worker.rb
+++ b/app/workers/merge_worker.rb
@@ -3,11 +3,11 @@
 class MergeWorker
   include Sidekiq::Worker
 
-  sidekiq_options queue: 'pull'
-
   def perform(from_account_id, into_account_id)
     FeedManager.instance.merge_into_home(Account.find(from_account_id), Account.find(into_account_id))
   rescue ActiveRecord::RecordNotFound
     true
+  ensure
+    Redis.current.del("account:#{into_account_id}:regeneration")
   end
 end
diff --git a/app/workers/redownload_media_worker.rb b/app/workers/redownload_media_worker.rb
index 0638cd0f0..343caa32c 100644
--- a/app/workers/redownload_media_worker.rb
+++ b/app/workers/redownload_media_worker.rb
@@ -3,6 +3,7 @@
 class RedownloadMediaWorker
   include Sidekiq::Worker
   include ExponentialBackoff
+  include JsonLdHelper
 
   sidekiq_options queue: 'pull', retry: 3
 
@@ -15,6 +16,14 @@ class RedownloadMediaWorker
     media_attachment.download_thumbnail!
     media_attachment.save
   rescue ActiveRecord::RecordNotFound
-    true
+    # Do nothing
+  rescue Mastodon::UnexpectedResponseError => e
+    response = e.response
+
+    if response_error_unsalvageable?(response)
+      # Give up
+    else
+      raise e
+    end
   end
 end
diff --git a/app/workers/scheduler/follow_recommendations_scheduler.rb b/app/workers/scheduler/follow_recommendations_scheduler.rb
new file mode 100644
index 000000000..cb1e15961
--- /dev/null
+++ b/app/workers/scheduler/follow_recommendations_scheduler.rb
@@ -0,0 +1,62 @@
+# frozen_string_literal: true
+
+class Scheduler::FollowRecommendationsScheduler
+  include Sidekiq::Worker
+  include Redisable
+
+  sidekiq_options retry: 0
+
+  # The maximum number of accounts that can be requested in one page from the
+  # API is 80, and the suggestions API does not allow pagination. This number
+  # leaves some room for accounts being filtered during live access
+  SET_SIZE = 100
+
+  def perform
+    # Maintaining a materialized view speeds-up subsequent queries significantly
+    AccountSummary.refresh
+    FollowRecommendation.refresh
+
+    fallback_recommendations = FollowRecommendation.limit(SET_SIZE).index_by(&:account_id)
+
+    I18n.available_locales.each do |locale|
+      recommendations = begin
+        if AccountSummary.safe.filtered.localized(locale).exists? # We can skip the work if no accounts with that language exist
+          FollowRecommendation.localized(locale).limit(SET_SIZE).index_by(&:account_id)
+        else
+          {}
+        end
+      end
+
+      # Use language-agnostic results if there are not enough language-specific ones
+      missing = SET_SIZE - recommendations.keys.size
+
+      if missing.positive?
+        added = 0
+
+        # Avoid duplicate results
+        fallback_recommendations.each_value do |recommendation|
+          next if recommendations.key?(recommendation.account_id)
+
+          recommendations[recommendation.account_id] = recommendation
+          added += 1
+
+          break if added >= missing
+        end
+      end
+
+      redis.pipelined do
+        redis.del(key(locale))
+
+        recommendations.each_value do |recommendation|
+          redis.zadd(key(locale), recommendation.rank, recommendation.account_id)
+        end
+      end
+    end
+  end
+
+  private
+
+  def key(locale)
+    "follow_recommendations:#{locale}"
+  end
+end
diff --git a/app/workers/thread_resolve_worker.rb b/app/workers/thread_resolve_worker.rb
index 8bba9ca75..1b77dfdd9 100644
--- a/app/workers/thread_resolve_worker.rb
+++ b/app/workers/thread_resolve_worker.rb
@@ -14,5 +14,7 @@ class ThreadResolveWorker
 
     child_status.thread = parent_status
     child_status.save!
+  rescue ActiveRecord::RecordNotFound
+    true
   end
 end
diff --git a/app/workers/web/push_notification_worker.rb b/app/workers/web/push_notification_worker.rb
index 46aeaa30b..57f5b5c22 100644
--- a/app/workers/web/push_notification_worker.rb
+++ b/app/workers/web/push_notification_worker.rb
@@ -3,22 +3,67 @@
 class Web::PushNotificationWorker
   include Sidekiq::Worker
 
-  sidekiq_options backtrace: true, retry: 5
+  sidekiq_options queue: 'push', retry: 5
+
+  TTL     = 48.hours.to_s
+  URGENCY = 'normal'
 
   def perform(subscription_id, notification_id)
-    subscription = ::Web::PushSubscription.find(subscription_id)
-    notification = Notification.find(notification_id)
+    @subscription = Web::PushSubscription.find(subscription_id)
+    @notification = Notification.find(notification_id)
+
+    # Polymorphically associated activity could have been deleted
+    # in the meantime, so we have to double-check before proceeding
+    return unless @notification.activity.present? && @subscription.pushable?(@notification)
+
+    payload = @subscription.encrypt(push_notification_json)
 
-    subscription.push(notification) unless notification.activity.nil?
-  rescue Webpush::ResponseError => e
-    code = e.response.code.to_i
+    request_pool.with(@subscription.audience) do |http_client|
+      request = Request.new(:post, @subscription.endpoint, body: payload.fetch(:ciphertext), http_client: http_client)
 
-    if (400..499).cover?(code) && ![408, 429].include?(code)
-      subscription.destroy!
-    else
-      raise e
+      request.add_headers(
+        'Content-Type'     => 'application/octet-stream',
+        'Ttl'              => TTL,
+        'Urgency'          => URGENCY,
+        'Content-Encoding' => 'aesgcm',
+        'Encryption'       => "salt=#{Webpush.encode64(payload.fetch(:salt)).delete('=')}",
+        'Crypto-Key'       => "dh=#{Webpush.encode64(payload.fetch(:server_public_key)).delete('=')};#{@subscription.crypto_key_header}",
+        'Authorization'    => @subscription.authorization_header
+      )
+
+      request.perform do |response|
+        # If the server responds with an error in the 4xx range
+        # that isn't about rate-limiting or timeouts, we can
+        # assume that the subscription is invalid or expired
+        # and must be removed
+
+        if (400..499).cover?(response.code) && ![408, 429].include?(response.code)
+          @subscription.destroy!
+        elsif !(200...300).cover?(response.code)
+          raise Mastodon::UnexpectedResponseError, response
+        end
+      end
     end
   rescue ActiveRecord::RecordNotFound
     true
   end
+
+  private
+
+  def push_notification_json
+    json = I18n.with_locale(@subscription.locale || I18n.default_locale) do
+      ActiveModelSerializers::SerializableResource.new(
+        @notification,
+        serializer: Web::NotificationSerializer,
+        scope: @subscription,
+        scope_name: :current_push_subscription
+      ).as_json
+    end
+
+    Oj.dump(json)
+  end
+
+  def request_pool
+    RequestPool.current
+  end
 end