diff options
Diffstat (limited to 'app/workers')
5 files changed, 59 insertions, 15 deletions
diff --git a/app/workers/activitypub/processing_worker.rb b/app/workers/activitypub/processing_worker.rb index cef595319..37e316354 100644 --- a/app/workers/activitypub/processing_worker.rb +++ b/app/workers/activitypub/processing_worker.rb @@ -6,7 +6,10 @@ class ActivityPub::ProcessingWorker sidekiq_options backtrace: true, retry: 8 def perform(account_id, body, delivered_to_account_id = nil) - ActivityPub::ProcessCollectionService.new.call(body, Account.find(account_id), override_timestamps: true, delivered_to_account_id: delivered_to_account_id, delivery: true) + account = Account.find_by(id: account_id) + return if account.nil? + + ActivityPub::ProcessCollectionService.new.call(body, account, override_timestamps: true, delivered_to_account_id: delivered_to_account_id, delivery: true) rescue ActiveRecord::RecordInvalid => e Rails.logger.debug "Error processing incoming ActivityPub object: #{e}" end diff --git a/app/workers/activitypub/status_update_distribution_worker.rb b/app/workers/activitypub/status_update_distribution_worker.rb new file mode 100644 index 000000000..a79ede2bf --- /dev/null +++ b/app/workers/activitypub/status_update_distribution_worker.rb @@ -0,0 +1,29 @@ +# frozen_string_literal: true + +class ActivityPub::StatusUpdateDistributionWorker < ActivityPub::DistributionWorker + # Distribute an profile update to servers that might have a copy + # of the account in question + def perform(status_id, options = {}) + @options = options.with_indifferent_access + @status = Status.find(status_id) + @account = @status.account + + distribute! + rescue ActiveRecord::RecordNotFound + true + end + + protected + + def activity + ActivityPub::ActivityPresenter.new( + id: [ActivityPub::TagManager.instance.uri_for(@status), '#updates/', @status.edited_at.to_i].join, + type: 'Update', + actor: ActivityPub::TagManager.instance.uri_for(@status.account), + published: @status.edited_at, + to: ActivityPub::TagManager.instance.to(@status), + cc: ActivityPub::TagManager.instance.cc(@status), + virtual_object: @status + ) + end +end diff --git a/app/workers/local_notification_worker.rb b/app/workers/local_notification_worker.rb index a22e2834d..749a54b73 100644 --- a/app/workers/local_notification_worker.rb +++ b/app/workers/local_notification_worker.rb @@ -12,7 +12,14 @@ class LocalNotificationWorker activity = activity_class_name.constantize.find(activity_id) end - return if Notification.where(account: receiver, activity: activity).any? + # For most notification types, only one notification should exist, and the older one is + # preferred. For updates, such as when a status is edited, the new notification + # should replace the previous ones. + if type == 'update' + Notification.where(account: receiver, activity: activity, type: 'update').in_batches.delete_all + elsif Notification.where(account: receiver, activity: activity, type: type).any? + return + end NotifyService.new.call(receiver, type || activity_class_name.underscore, activity) rescue ActiveRecord::RecordNotFound diff --git a/app/workers/scheduler/accounts_statuses_cleanup_scheduler.rb b/app/workers/scheduler/accounts_statuses_cleanup_scheduler.rb index f42d4bca6..7195f0ff9 100644 --- a/app/workers/scheduler/accounts_statuses_cleanup_scheduler.rb +++ b/app/workers/scheduler/accounts_statuses_cleanup_scheduler.rb @@ -66,7 +66,7 @@ class Scheduler::AccountsStatusesCleanupScheduler end def compute_budget - threads = Sidekiq::ProcessSet.new.filter { |x| x['queues'].include?('push') }.map { |x| x['concurrency'] }.sum + threads = Sidekiq::ProcessSet.new.select { |x| x['queues'].include?('push') }.map { |x| x['concurrency'] }.sum [PER_THREAD_BUDGET * threads, MAX_BUDGET].min end diff --git a/app/workers/scheduler/follow_recommendations_scheduler.rb b/app/workers/scheduler/follow_recommendations_scheduler.rb index effc63e59..084619cbd 100644 --- a/app/workers/scheduler/follow_recommendations_scheduler.rb +++ b/app/workers/scheduler/follow_recommendations_scheduler.rb @@ -16,28 +16,33 @@ class Scheduler::FollowRecommendationsScheduler AccountSummary.refresh FollowRecommendation.refresh - fallback_recommendations = FollowRecommendation.order(rank: :desc).limit(SET_SIZE).index_by(&:account_id) + fallback_recommendations = FollowRecommendation.order(rank: :desc).limit(SET_SIZE) - I18n.available_locales.each do |locale| + I18n.available_locales.map { |locale| locale.to_s.split(/[_-]/).first }.uniq.each do |locale| recommendations = begin if AccountSummary.safe.filtered.localized(locale).exists? # We can skip the work if no accounts with that language exist - FollowRecommendation.localized(locale).order(rank: :desc).limit(SET_SIZE).index_by(&:account_id) + FollowRecommendation.localized(locale).order(rank: :desc).limit(SET_SIZE).map { |recommendation| [recommendation.account_id, recommendation.rank] } else - {} + [] end end # Use language-agnostic results if there are not enough language-specific ones - missing = SET_SIZE - recommendations.keys.size + missing = SET_SIZE - recommendations.size + + if missing.positive? && fallback_recommendations.size.positive? + max_fallback_rank = fallback_recommendations.first.rank || 0 + + # Language-specific results should be above language-agnostic ones, + # otherwise language-agnostic ones will always overshadow them + recommendations.map! { |(account_id, rank)| [account_id, rank + max_fallback_rank] } - if missing.positive? added = 0 - # Avoid duplicate results - fallback_recommendations.each_value do |recommendation| - next if recommendations.key?(recommendation.account_id) + fallback_recommendations.each do |recommendation| + next if recommendations.any? { |(account_id, _)| account_id == recommendation.account_id } - recommendations[recommendation.account_id] = recommendation + recommendations << [recommendation.account_id, recommendation.rank] added += 1 break if added >= missing @@ -47,8 +52,8 @@ class Scheduler::FollowRecommendationsScheduler redis.pipelined do redis.del(key(locale)) - recommendations.each_value do |recommendation| - redis.zadd(key(locale), recommendation.rank, recommendation.account_id) + recommendations.each do |(account_id, rank)| + redis.zadd(key(locale), rank, account_id) end end end |