diff options
Diffstat (limited to 'app/workers')
-rw-r--r-- | app/workers/activitypub/distribution_worker.rb | 2 | ||||
-rw-r--r-- | app/workers/import/relationship_worker.rb | 6 | ||||
-rw-r--r-- | app/workers/merge_worker.rb | 4 | ||||
-rw-r--r-- | app/workers/redownload_media_worker.rb | 11 | ||||
-rw-r--r-- | app/workers/scheduler/follow_recommendations_scheduler.rb | 62 | ||||
-rw-r--r-- | app/workers/thread_resolve_worker.rb | 2 | ||||
-rw-r--r-- | app/workers/web/push_notification_worker.rb | 65 |
7 files changed, 135 insertions, 17 deletions
diff --git a/app/workers/activitypub/distribution_worker.rb b/app/workers/activitypub/distribution_worker.rb index 9b4814644..09898ca49 100644 --- a/app/workers/activitypub/distribution_worker.rb +++ b/app/workers/activitypub/distribution_worker.rb @@ -35,7 +35,7 @@ class ActivityPub::DistributionWorker # Deliver the status to all followers. # If the status is a reply to another local status, also forward it to that # status' authors' followers. - @inboxes ||= if @status.reply? && @status.thread.account.local? && @status.distributable? + @inboxes ||= if @status.in_reply_to_local_account? && @status.distributable? @account.followers.or(@status.thread.account.followers).inboxes else @account.followers.inboxes diff --git a/app/workers/import/relationship_worker.rb b/app/workers/import/relationship_worker.rb index 4a7100435..6791b15c3 100644 --- a/app/workers/import/relationship_worker.rb +++ b/app/workers/import/relationship_worker.rb @@ -5,7 +5,7 @@ class Import::RelationshipWorker sidekiq_options queue: 'pull', retry: 8, dead: false - def perform(account_id, target_account_uri, relationship, options = {}) + def perform(account_id, target_account_uri, relationship, options) from_account = Account.find(account_id) target_domain = domain(target_account_uri) target_account = stoplight_wrap_request(target_domain) { ResolveAccountService.new.call(target_account_uri, { check_delivery_availability: true }) } @@ -16,7 +16,7 @@ class Import::RelationshipWorker case relationship when 'follow' begin - FollowService.new.call(from_account, target_account, options) + FollowService.new.call(from_account, target_account, **options) rescue ActiveRecord::RecordInvalid raise if FollowLimitValidator.limit_for_account(from_account) < from_account.following_count end @@ -27,7 +27,7 @@ class Import::RelationshipWorker when 'unblock' UnblockService.new.call(from_account, target_account) when 'mute' - MuteService.new.call(from_account, target_account, options) + MuteService.new.call(from_account, target_account, **options) when 'unmute' UnmuteService.new.call(from_account, target_account) end diff --git a/app/workers/merge_worker.rb b/app/workers/merge_worker.rb index 74ef7d4da..6ebb9a400 100644 --- a/app/workers/merge_worker.rb +++ b/app/workers/merge_worker.rb @@ -3,11 +3,11 @@ class MergeWorker include Sidekiq::Worker - sidekiq_options queue: 'pull' - def perform(from_account_id, into_account_id) FeedManager.instance.merge_into_home(Account.find(from_account_id), Account.find(into_account_id)) rescue ActiveRecord::RecordNotFound true + ensure + Redis.current.del("account:#{into_account_id}:regeneration") end end diff --git a/app/workers/redownload_media_worker.rb b/app/workers/redownload_media_worker.rb index 0638cd0f0..343caa32c 100644 --- a/app/workers/redownload_media_worker.rb +++ b/app/workers/redownload_media_worker.rb @@ -3,6 +3,7 @@ class RedownloadMediaWorker include Sidekiq::Worker include ExponentialBackoff + include JsonLdHelper sidekiq_options queue: 'pull', retry: 3 @@ -15,6 +16,14 @@ class RedownloadMediaWorker media_attachment.download_thumbnail! media_attachment.save rescue ActiveRecord::RecordNotFound - true + # Do nothing + rescue Mastodon::UnexpectedResponseError => e + response = e.response + + if response_error_unsalvageable?(response) + # Give up + else + raise e + end end end diff --git a/app/workers/scheduler/follow_recommendations_scheduler.rb b/app/workers/scheduler/follow_recommendations_scheduler.rb new file mode 100644 index 000000000..cb1e15961 --- /dev/null +++ b/app/workers/scheduler/follow_recommendations_scheduler.rb @@ -0,0 +1,62 @@ +# frozen_string_literal: true + +class Scheduler::FollowRecommendationsScheduler + include Sidekiq::Worker + include Redisable + + sidekiq_options retry: 0 + + # The maximum number of accounts that can be requested in one page from the + # API is 80, and the suggestions API does not allow pagination. This number + # leaves some room for accounts being filtered during live access + SET_SIZE = 100 + + def perform + # Maintaining a materialized view speeds-up subsequent queries significantly + AccountSummary.refresh + FollowRecommendation.refresh + + fallback_recommendations = FollowRecommendation.limit(SET_SIZE).index_by(&:account_id) + + I18n.available_locales.each do |locale| + recommendations = begin + if AccountSummary.safe.filtered.localized(locale).exists? # We can skip the work if no accounts with that language exist + FollowRecommendation.localized(locale).limit(SET_SIZE).index_by(&:account_id) + else + {} + end + end + + # Use language-agnostic results if there are not enough language-specific ones + missing = SET_SIZE - recommendations.keys.size + + if missing.positive? + added = 0 + + # Avoid duplicate results + fallback_recommendations.each_value do |recommendation| + next if recommendations.key?(recommendation.account_id) + + recommendations[recommendation.account_id] = recommendation + added += 1 + + break if added >= missing + end + end + + redis.pipelined do + redis.del(key(locale)) + + recommendations.each_value do |recommendation| + redis.zadd(key(locale), recommendation.rank, recommendation.account_id) + end + end + end + end + + private + + def key(locale) + "follow_recommendations:#{locale}" + end +end diff --git a/app/workers/thread_resolve_worker.rb b/app/workers/thread_resolve_worker.rb index 8bba9ca75..1b77dfdd9 100644 --- a/app/workers/thread_resolve_worker.rb +++ b/app/workers/thread_resolve_worker.rb @@ -14,5 +14,7 @@ class ThreadResolveWorker child_status.thread = parent_status child_status.save! + rescue ActiveRecord::RecordNotFound + true end end diff --git a/app/workers/web/push_notification_worker.rb b/app/workers/web/push_notification_worker.rb index 46aeaa30b..57f5b5c22 100644 --- a/app/workers/web/push_notification_worker.rb +++ b/app/workers/web/push_notification_worker.rb @@ -3,22 +3,67 @@ class Web::PushNotificationWorker include Sidekiq::Worker - sidekiq_options backtrace: true, retry: 5 + sidekiq_options queue: 'push', retry: 5 + + TTL = 48.hours.to_s + URGENCY = 'normal' def perform(subscription_id, notification_id) - subscription = ::Web::PushSubscription.find(subscription_id) - notification = Notification.find(notification_id) + @subscription = Web::PushSubscription.find(subscription_id) + @notification = Notification.find(notification_id) + + # Polymorphically associated activity could have been deleted + # in the meantime, so we have to double-check before proceeding + return unless @notification.activity.present? && @subscription.pushable?(@notification) + + payload = @subscription.encrypt(push_notification_json) - subscription.push(notification) unless notification.activity.nil? - rescue Webpush::ResponseError => e - code = e.response.code.to_i + request_pool.with(@subscription.audience) do |http_client| + request = Request.new(:post, @subscription.endpoint, body: payload.fetch(:ciphertext), http_client: http_client) - if (400..499).cover?(code) && ![408, 429].include?(code) - subscription.destroy! - else - raise e + request.add_headers( + 'Content-Type' => 'application/octet-stream', + 'Ttl' => TTL, + 'Urgency' => URGENCY, + 'Content-Encoding' => 'aesgcm', + 'Encryption' => "salt=#{Webpush.encode64(payload.fetch(:salt)).delete('=')}", + 'Crypto-Key' => "dh=#{Webpush.encode64(payload.fetch(:server_public_key)).delete('=')};#{@subscription.crypto_key_header}", + 'Authorization' => @subscription.authorization_header + ) + + request.perform do |response| + # If the server responds with an error in the 4xx range + # that isn't about rate-limiting or timeouts, we can + # assume that the subscription is invalid or expired + # and must be removed + + if (400..499).cover?(response.code) && ![408, 429].include?(response.code) + @subscription.destroy! + elsif !(200...300).cover?(response.code) + raise Mastodon::UnexpectedResponseError, response + end + end end rescue ActiveRecord::RecordNotFound true end + + private + + def push_notification_json + json = I18n.with_locale(@subscription.locale || I18n.default_locale) do + ActiveModelSerializers::SerializableResource.new( + @notification, + serializer: Web::NotificationSerializer, + scope: @subscription, + scope_name: :current_push_subscription + ).as_json + end + + Oj.dump(json) + end + + def request_pool + RequestPool.current + end end |