about summary refs log tree commit diff
path: root/app/workers
diff options
context:
space:
mode:
Diffstat (limited to 'app/workers')
-rw-r--r--app/workers/activitypub/distribution_worker.rb48
-rw-r--r--app/workers/activitypub/raw_distribution_worker.rb37
-rw-r--r--app/workers/activitypub/reply_distribution_worker.rb34
-rw-r--r--app/workers/activitypub/update_distribution_worker.rb25
-rw-r--r--app/workers/distribution_worker.rb4
-rw-r--r--app/workers/feed_insert_worker.rb38
-rw-r--r--app/workers/local_notification_worker.rb2
-rw-r--r--app/workers/poll_expiration_notify_worker.rb45
-rw-r--r--app/workers/push_update_worker.rb35
9 files changed, 148 insertions, 120 deletions
diff --git a/app/workers/activitypub/distribution_worker.rb b/app/workers/activitypub/distribution_worker.rb
index 09898ca49..17c108461 100644
--- a/app/workers/activitypub/distribution_worker.rb
+++ b/app/workers/activitypub/distribution_worker.rb
@@ -1,54 +1,32 @@
 # frozen_string_literal: true
 
-class ActivityPub::DistributionWorker
-  include Sidekiq::Worker
-  include Payloadable
-
-  sidekiq_options queue: 'push'
-
+class ActivityPub::DistributionWorker < ActivityPub::RawDistributionWorker
+  # Distribute a new status or an edit of a status to all the places
+  # where the status is supposed to go or where it was interacted with
   def perform(status_id)
     @status  = Status.find(status_id)
     @account = @status.account
 
-    return if skip_distribution?
-
-    ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url|
-      [payload, @account.id, inbox_url, { synchronize_followers: !@status.distributable? }]
-    end
-
-    relay! if relayable?
+    distribute!
   rescue ActiveRecord::RecordNotFound
     true
   end
 
-  private
-
-  def skip_distribution?
-    @status.direct_visibility? || @status.limited_visibility?
-  end
-
-  def relayable?
-    @status.public_visibility?
-  end
+  protected
 
   def inboxes
-    # Deliver the status to all followers.
-    # If the status is a reply to another local status, also forward it to that
-    # status' authors' followers.
-    @inboxes ||= if @status.in_reply_to_local_account? && @status.distributable?
-                   @account.followers.or(@status.thread.account.followers).inboxes
-                 else
-                   @account.followers.inboxes
-                 end
+    @inboxes ||= StatusReachFinder.new(@status).inboxes
   end
 
   def payload
-    @payload ||= Oj.dump(serialize_payload(ActivityPub::ActivityPresenter.from_status(@status), ActivityPub::ActivitySerializer, signer: @account))
+    @payload ||= Oj.dump(serialize_payload(activity, ActivityPub::ActivitySerializer, signer: @account))
+  end
+
+  def activity
+    ActivityPub::ActivityPresenter.from_status(@status)
   end
 
-  def relay!
-    ActivityPub::DeliveryWorker.push_bulk(Relay.enabled.pluck(:inbox_url)) do |inbox_url|
-      [payload, @account.id, inbox_url]
-    end
+  def options
+    { synchronize_followers: @status.private_visibility? }
   end
 end
diff --git a/app/workers/activitypub/raw_distribution_worker.rb b/app/workers/activitypub/raw_distribution_worker.rb
index 41e61132f..ac5eda4af 100644
--- a/app/workers/activitypub/raw_distribution_worker.rb
+++ b/app/workers/activitypub/raw_distribution_worker.rb
@@ -2,22 +2,47 @@
 
 class ActivityPub::RawDistributionWorker
   include Sidekiq::Worker
+  include Payloadable
 
   sidekiq_options queue: 'push'
 
+  # Base worker for when you want to queue up a bunch of deliveries of
+  # some payload. In this case, we have already generated JSON and
+  # we are going to distribute it to the account's followers minus
+  # the explicitly provided inboxes
   def perform(json, source_account_id, exclude_inboxes = [])
-    @account = Account.find(source_account_id)
+    @account         = Account.find(source_account_id)
+    @json            = json
+    @exclude_inboxes = exclude_inboxes
 
-    ActivityPub::DeliveryWorker.push_bulk(inboxes - exclude_inboxes) do |inbox_url|
-      [json, @account.id, inbox_url]
-    end
+    distribute!
   rescue ActiveRecord::RecordNotFound
     true
   end
 
-  private
+  protected
+
+  def distribute!
+    return if inboxes.empty?
+
+    ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url|
+      [payload, source_account_id, inbox_url, options]
+    end
+  end
+
+  def payload
+    @json
+  end
+
+  def source_account_id
+    @account.id
+  end
 
   def inboxes
-    @inboxes ||= @account.followers.inboxes
+    @inboxes ||= @account.followers.inboxes - @exclude_inboxes
+  end
+
+  def options
+    nil
   end
 end
diff --git a/app/workers/activitypub/reply_distribution_worker.rb b/app/workers/activitypub/reply_distribution_worker.rb
deleted file mode 100644
index d4d0148ac..000000000
--- a/app/workers/activitypub/reply_distribution_worker.rb
+++ /dev/null
@@ -1,34 +0,0 @@
-# frozen_string_literal: true
-
-# Obsolete but kept around to make sure existing jobs do not fail after upgrade.
-# Should be removed in a subsequent release.
-
-class ActivityPub::ReplyDistributionWorker
-  include Sidekiq::Worker
-  include Payloadable
-
-  sidekiq_options queue: 'push'
-
-  def perform(status_id)
-    @status  = Status.find(status_id)
-    @account = @status.thread&.account
-
-    return unless @account.present? && @status.distributable?
-
-    ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url|
-      [payload, @status.account_id, inbox_url]
-    end
-  rescue ActiveRecord::RecordNotFound
-    true
-  end
-
-  private
-
-  def inboxes
-    @inboxes ||= @account.followers.inboxes
-  end
-
-  def payload
-    @payload ||= Oj.dump(serialize_payload(ActivityPub::ActivityPresenter.from_status(@status), ActivityPub::ActivitySerializer, signer: @status.account))
-  end
-end
diff --git a/app/workers/activitypub/update_distribution_worker.rb b/app/workers/activitypub/update_distribution_worker.rb
index 3a207f071..81fde63b8 100644
--- a/app/workers/activitypub/update_distribution_worker.rb
+++ b/app/workers/activitypub/update_distribution_worker.rb
@@ -1,33 +1,24 @@
 # frozen_string_literal: true
 
-class ActivityPub::UpdateDistributionWorker
-  include Sidekiq::Worker
-  include Payloadable
-
-  sidekiq_options queue: 'push'
-
+class ActivityPub::UpdateDistributionWorker < ActivityPub::RawDistributionWorker
+  # Distribute an profile update to servers that might have a copy
+  # of the account in question
   def perform(account_id, options = {})
     @options = options.with_indifferent_access
     @account = Account.find(account_id)
 
-    ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url|
-      [signed_payload, @account.id, inbox_url]
-    end
-
-    ActivityPub::DeliveryWorker.push_bulk(Relay.enabled.pluck(:inbox_url)) do |inbox_url|
-      [signed_payload, @account.id, inbox_url]
-    end
+    distribute!
   rescue ActiveRecord::RecordNotFound
     true
   end
 
-  private
+  protected
 
   def inboxes
-    @inboxes ||= @account.followers.inboxes
+    @inboxes ||= AccountReachFinder.new(@account).inboxes
   end
 
-  def signed_payload
-    @signed_payload ||= Oj.dump(serialize_payload(@account, ActivityPub::UpdateSerializer, signer: @account, sign_with: @options[:sign_with]))
+  def payload
+    @payload ||= Oj.dump(serialize_payload(@account, ActivityPub::UpdateSerializer, signer: @account, sign_with: @options[:sign_with]))
   end
 end
diff --git a/app/workers/distribution_worker.rb b/app/workers/distribution_worker.rb
index e85cd7e95..770325ccf 100644
--- a/app/workers/distribution_worker.rb
+++ b/app/workers/distribution_worker.rb
@@ -3,10 +3,10 @@
 class DistributionWorker
   include Sidekiq::Worker
 
-  def perform(status_id)
+  def perform(status_id, options = {})
     RedisLock.acquire(redis: Redis.current, key: "distribute:#{status_id}", autorelease: 5.minutes.seconds) do |lock|
       if lock.acquired?
-        FanOutOnWriteService.new.call(Status.find(status_id))
+        FanOutOnWriteService.new.call(Status.find(status_id), **options.symbolize_keys)
       else
         raise Mastodon::RaceConditionError
       end
diff --git a/app/workers/feed_insert_worker.rb b/app/workers/feed_insert_worker.rb
index 45e6bb88d..043e50885 100644
--- a/app/workers/feed_insert_worker.rb
+++ b/app/workers/feed_insert_worker.rb
@@ -3,9 +3,10 @@
 class FeedInsertWorker
   include Sidekiq::Worker
 
-  def perform(status_id, id, type = :home)
-    @type     = type.to_sym
-    @status   = Status.find(status_id)
+  def perform(status_id, id, type = :home, options = {})
+    @type      = type.to_sym
+    @status    = Status.find(status_id)
+    @options   = options.symbolize_keys
 
     case @type
     when :home
@@ -25,10 +26,12 @@ class FeedInsertWorker
   private
 
   def check_and_insert
-    return if feed_filtered?
-
-    perform_push
-    perform_notify if notify?
+    if feed_filtered?
+      perform_unpush if update?
+    else
+      perform_push
+      perform_notify if notify?
+    end
   end
 
   def feed_filtered?
@@ -51,15 +54,30 @@ class FeedInsertWorker
   def perform_push
     case @type
     when :home
-      FeedManager.instance.push_to_home(@follower, @status)
+      FeedManager.instance.push_to_home(@follower, @status, update: update?)
     when :list
-      FeedManager.instance.push_to_list(@list, @status)
+      FeedManager.instance.push_to_list(@list, @status, update: update?)
     when :direct
-      FeedManager.instance.push_to_direct(@account, @status)
+      FeedManager.instance.push_to_direct(@account, @status, update: update?)
+    end
+  end
+
+  def perform_unpush
+    case @type
+    when :home
+      FeedManager.instance.unpush_from_home(@follower, @status, update: true)
+    when :list
+      FeedManager.instance.unpush_from_list(@list, @status, update: true)
+    when :direct
+      FeedManager.instance.unpush_from_direct(@account, @status, update: true)
     end
   end
 
   def perform_notify
     NotifyService.new.call(@follower, :status, @status)
   end
+
+  def update?
+    @options[:update]
+  end
 end
diff --git a/app/workers/local_notification_worker.rb b/app/workers/local_notification_worker.rb
index 6b08ca6fc..a22e2834d 100644
--- a/app/workers/local_notification_worker.rb
+++ b/app/workers/local_notification_worker.rb
@@ -12,6 +12,8 @@ class LocalNotificationWorker
       activity = activity_class_name.constantize.find(activity_id)
     end
 
+    return if Notification.where(account: receiver, activity: activity).any?
+
     NotifyService.new.call(receiver, type || activity_class_name.underscore, activity)
   rescue ActiveRecord::RecordNotFound
     true
diff --git a/app/workers/poll_expiration_notify_worker.rb b/app/workers/poll_expiration_notify_worker.rb
index f0191d479..7613ed5f1 100644
--- a/app/workers/poll_expiration_notify_worker.rb
+++ b/app/workers/poll_expiration_notify_worker.rb
@@ -6,19 +6,44 @@ class PollExpirationNotifyWorker
   sidekiq_options lock: :until_executed
 
   def perform(poll_id)
-    poll = Poll.find(poll_id)
+    @poll = Poll.find(poll_id)
 
-    # Notify poll owner and remote voters
-    if poll.local?
-      ActivityPub::DistributePollUpdateWorker.perform_async(poll.status.id)
-      NotifyService.new.call(poll.account, :poll, poll)
-    end
+    return if does_not_expire?
+    requeue! && return if not_due_yet?
 
-    # Notify local voters
-    poll.votes.includes(:account).group(:account_id).select(:account_id).map(&:account).select(&:local?).each do |account|
-      NotifyService.new.call(account, :poll, poll)
-    end
+    notify_remote_voters_and_owner! if @poll.local?
+    notify_local_voters!
   rescue ActiveRecord::RecordNotFound
     true
   end
+
+  def self.remove_from_scheduled(poll_id)
+    queue = Sidekiq::ScheduledSet.new
+    queue.select { |scheduled| scheduled.klass == name && scheduled.args[0] == poll_id }.map(&:delete)
+  end
+
+  private
+
+  def does_not_expire?
+    @poll.expires_at.nil?
+  end
+
+  def not_due_yet?
+    @poll.expires_at.present? && !@poll.expired?
+  end
+
+  def requeue!
+    PollExpirationNotifyWorker.perform_at(@poll.expires_at + 5.minutes, @poll.id)
+  end
+
+  def notify_remote_voters_and_owner!
+    ActivityPub::DistributePollUpdateWorker.perform_async(@poll.status.id)
+    NotifyService.new.call(@poll.account, :poll, @poll)
+  end
+
+  def notify_local_voters!
+    @poll.voters.merge(Account.local).find_each do |account|
+      NotifyService.new.call(account, :poll, @poll)
+    end
+  end
 end
diff --git a/app/workers/push_update_worker.rb b/app/workers/push_update_worker.rb
index d76d73d96..ae444cfde 100644
--- a/app/workers/push_update_worker.rb
+++ b/app/workers/push_update_worker.rb
@@ -2,15 +2,38 @@
 
 class PushUpdateWorker
   include Sidekiq::Worker
+  include Redisable
 
-  def perform(account_id, status_id, timeline_id = nil)
-    account     = Account.find(account_id)
-    status      = Status.find(status_id)
-    message     = InlineRenderer.render(status, account, :status)
-    timeline_id = "timeline:#{account.id}" if timeline_id.nil?
+  def perform(account_id, status_id, timeline_id = nil, options = {})
+    @account     = Account.find(account_id)
+    @status      = Status.find(status_id)
+    @timeline_id = timeline_id || "timeline:#{account.id}"
+    @options     = options.symbolize_keys
 
-    Redis.current.publish(timeline_id, Oj.dump(event: :update, payload: message, queued_at: (Time.now.to_f * 1000.0).to_i))
+    publish!
   rescue ActiveRecord::RecordNotFound
     true
   end
+
+  private
+
+  def payload
+    InlineRenderer.render(@status, @account, :status)
+  end
+
+  def message
+    Oj.dump(
+      event: update? ? :'status.update' : :update,
+      payload: payload,
+      queued_at: (Time.now.to_f * 1000.0).to_i
+    )
+  end
+
+  def publish!
+    redis.publish(@timeline_id, message)
+  end
+
+  def update?
+    @options[:update]
+  end
 end