about summary refs log tree commit diff
path: root/app/workers
diff options
context:
space:
mode:
Diffstat (limited to 'app/workers')
-rw-r--r--app/workers/activitypub/processing_worker.rb5
-rw-r--r--app/workers/activitypub/status_update_distribution_worker.rb29
-rw-r--r--app/workers/local_notification_worker.rb9
-rw-r--r--app/workers/scheduler/accounts_statuses_cleanup_scheduler.rb2
-rw-r--r--app/workers/scheduler/follow_recommendations_scheduler.rb29
5 files changed, 59 insertions, 15 deletions
diff --git a/app/workers/activitypub/processing_worker.rb b/app/workers/activitypub/processing_worker.rb
index cef595319..37e316354 100644
--- a/app/workers/activitypub/processing_worker.rb
+++ b/app/workers/activitypub/processing_worker.rb
@@ -6,7 +6,10 @@ class ActivityPub::ProcessingWorker
   sidekiq_options backtrace: true, retry: 8
 
   def perform(account_id, body, delivered_to_account_id = nil)
-    ActivityPub::ProcessCollectionService.new.call(body, Account.find(account_id), override_timestamps: true, delivered_to_account_id: delivered_to_account_id, delivery: true)
+    account = Account.find_by(id: account_id)
+    return if account.nil?
+
+    ActivityPub::ProcessCollectionService.new.call(body, account, override_timestamps: true, delivered_to_account_id: delivered_to_account_id, delivery: true)
   rescue ActiveRecord::RecordInvalid => e
     Rails.logger.debug "Error processing incoming ActivityPub object: #{e}"
   end
diff --git a/app/workers/activitypub/status_update_distribution_worker.rb b/app/workers/activitypub/status_update_distribution_worker.rb
new file mode 100644
index 000000000..a79ede2bf
--- /dev/null
+++ b/app/workers/activitypub/status_update_distribution_worker.rb
@@ -0,0 +1,29 @@
+# frozen_string_literal: true
+
+class ActivityPub::StatusUpdateDistributionWorker < ActivityPub::DistributionWorker
+  # Distribute an profile update to servers that might have a copy
+  # of the account in question
+  def perform(status_id, options = {})
+    @options = options.with_indifferent_access
+    @status  = Status.find(status_id)
+    @account = @status.account
+
+    distribute!
+  rescue ActiveRecord::RecordNotFound
+    true
+  end
+
+  protected
+
+  def activity
+    ActivityPub::ActivityPresenter.new(
+      id: [ActivityPub::TagManager.instance.uri_for(@status), '#updates/', @status.edited_at.to_i].join,
+      type: 'Update',
+      actor: ActivityPub::TagManager.instance.uri_for(@status.account),
+      published: @status.edited_at,
+      to: ActivityPub::TagManager.instance.to(@status),
+      cc: ActivityPub::TagManager.instance.cc(@status),
+      virtual_object: @status
+    )
+  end
+end
diff --git a/app/workers/local_notification_worker.rb b/app/workers/local_notification_worker.rb
index a22e2834d..749a54b73 100644
--- a/app/workers/local_notification_worker.rb
+++ b/app/workers/local_notification_worker.rb
@@ -12,7 +12,14 @@ class LocalNotificationWorker
       activity = activity_class_name.constantize.find(activity_id)
     end
 
-    return if Notification.where(account: receiver, activity: activity).any?
+    # For most notification types, only one notification should exist, and the older one is
+    # preferred. For updates, such as when a status is edited, the new notification
+    # should replace the previous ones.
+    if type == 'update'
+      Notification.where(account: receiver, activity: activity, type: 'update').in_batches.delete_all
+    elsif Notification.where(account: receiver, activity: activity, type: type).any?
+      return
+    end
 
     NotifyService.new.call(receiver, type || activity_class_name.underscore, activity)
   rescue ActiveRecord::RecordNotFound
diff --git a/app/workers/scheduler/accounts_statuses_cleanup_scheduler.rb b/app/workers/scheduler/accounts_statuses_cleanup_scheduler.rb
index f42d4bca6..7195f0ff9 100644
--- a/app/workers/scheduler/accounts_statuses_cleanup_scheduler.rb
+++ b/app/workers/scheduler/accounts_statuses_cleanup_scheduler.rb
@@ -66,7 +66,7 @@ class Scheduler::AccountsStatusesCleanupScheduler
   end
 
   def compute_budget
-    threads = Sidekiq::ProcessSet.new.filter { |x| x['queues'].include?('push') }.map { |x| x['concurrency'] }.sum
+    threads = Sidekiq::ProcessSet.new.select { |x| x['queues'].include?('push') }.map { |x| x['concurrency'] }.sum
     [PER_THREAD_BUDGET * threads, MAX_BUDGET].min
   end
 
diff --git a/app/workers/scheduler/follow_recommendations_scheduler.rb b/app/workers/scheduler/follow_recommendations_scheduler.rb
index effc63e59..084619cbd 100644
--- a/app/workers/scheduler/follow_recommendations_scheduler.rb
+++ b/app/workers/scheduler/follow_recommendations_scheduler.rb
@@ -16,28 +16,33 @@ class Scheduler::FollowRecommendationsScheduler
     AccountSummary.refresh
     FollowRecommendation.refresh
 
-    fallback_recommendations = FollowRecommendation.order(rank: :desc).limit(SET_SIZE).index_by(&:account_id)
+    fallback_recommendations = FollowRecommendation.order(rank: :desc).limit(SET_SIZE)
 
-    I18n.available_locales.each do |locale|
+    I18n.available_locales.map { |locale| locale.to_s.split(/[_-]/).first }.uniq.each do |locale|
       recommendations = begin
         if AccountSummary.safe.filtered.localized(locale).exists? # We can skip the work if no accounts with that language exist
-          FollowRecommendation.localized(locale).order(rank: :desc).limit(SET_SIZE).index_by(&:account_id)
+          FollowRecommendation.localized(locale).order(rank: :desc).limit(SET_SIZE).map { |recommendation| [recommendation.account_id, recommendation.rank] }
         else
-          {}
+          []
         end
       end
 
       # Use language-agnostic results if there are not enough language-specific ones
-      missing = SET_SIZE - recommendations.keys.size
+      missing = SET_SIZE - recommendations.size
+
+      if missing.positive? && fallback_recommendations.size.positive?
+        max_fallback_rank = fallback_recommendations.first.rank || 0
+
+        # Language-specific results should be above language-agnostic ones,
+        # otherwise language-agnostic ones will always overshadow them
+        recommendations.map! { |(account_id, rank)| [account_id, rank + max_fallback_rank] }
 
-      if missing.positive?
         added = 0
 
-        # Avoid duplicate results
-        fallback_recommendations.each_value do |recommendation|
-          next if recommendations.key?(recommendation.account_id)
+        fallback_recommendations.each do |recommendation|
+          next if recommendations.any? { |(account_id, _)| account_id == recommendation.account_id }
 
-          recommendations[recommendation.account_id] = recommendation
+          recommendations << [recommendation.account_id, recommendation.rank]
           added += 1
 
           break if added >= missing
@@ -47,8 +52,8 @@ class Scheduler::FollowRecommendationsScheduler
       redis.pipelined do
         redis.del(key(locale))
 
-        recommendations.each_value do |recommendation|
-          redis.zadd(key(locale), recommendation.rank, recommendation.account_id)
+        recommendations.each do |(account_id, rank)|
+          redis.zadd(key(locale), rank, account_id)
         end
       end
     end