diff options
Diffstat (limited to 'app/workers')
-rw-r--r-- | app/workers/activitypub/distribution_worker.rb | 48 | ||||
-rw-r--r-- | app/workers/activitypub/raw_distribution_worker.rb | 37 | ||||
-rw-r--r-- | app/workers/activitypub/reply_distribution_worker.rb | 34 | ||||
-rw-r--r-- | app/workers/activitypub/update_distribution_worker.rb | 25 | ||||
-rw-r--r-- | app/workers/distribution_worker.rb | 4 | ||||
-rw-r--r-- | app/workers/feed_insert_worker.rb | 38 | ||||
-rw-r--r-- | app/workers/local_notification_worker.rb | 2 | ||||
-rw-r--r-- | app/workers/poll_expiration_notify_worker.rb | 45 | ||||
-rw-r--r-- | app/workers/push_update_worker.rb | 35 |
9 files changed, 148 insertions, 120 deletions
diff --git a/app/workers/activitypub/distribution_worker.rb b/app/workers/activitypub/distribution_worker.rb index 09898ca49..17c108461 100644 --- a/app/workers/activitypub/distribution_worker.rb +++ b/app/workers/activitypub/distribution_worker.rb @@ -1,54 +1,32 @@ # frozen_string_literal: true -class ActivityPub::DistributionWorker - include Sidekiq::Worker - include Payloadable - - sidekiq_options queue: 'push' - +class ActivityPub::DistributionWorker < ActivityPub::RawDistributionWorker + # Distribute a new status or an edit of a status to all the places + # where the status is supposed to go or where it was interacted with def perform(status_id) @status = Status.find(status_id) @account = @status.account - return if skip_distribution? - - ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url| - [payload, @account.id, inbox_url, { synchronize_followers: !@status.distributable? }] - end - - relay! if relayable? + distribute! rescue ActiveRecord::RecordNotFound true end - private - - def skip_distribution? - @status.direct_visibility? || @status.limited_visibility? - end - - def relayable? - @status.public_visibility? - end + protected def inboxes - # Deliver the status to all followers. - # If the status is a reply to another local status, also forward it to that - # status' authors' followers. - @inboxes ||= if @status.in_reply_to_local_account? && @status.distributable? - @account.followers.or(@status.thread.account.followers).inboxes - else - @account.followers.inboxes - end + @inboxes ||= StatusReachFinder.new(@status).inboxes end def payload - @payload ||= Oj.dump(serialize_payload(ActivityPub::ActivityPresenter.from_status(@status), ActivityPub::ActivitySerializer, signer: @account)) + @payload ||= Oj.dump(serialize_payload(activity, ActivityPub::ActivitySerializer, signer: @account)) + end + + def activity + ActivityPub::ActivityPresenter.from_status(@status) end - def relay! - ActivityPub::DeliveryWorker.push_bulk(Relay.enabled.pluck(:inbox_url)) do |inbox_url| - [payload, @account.id, inbox_url] - end + def options + { synchronize_followers: @status.private_visibility? } end end diff --git a/app/workers/activitypub/raw_distribution_worker.rb b/app/workers/activitypub/raw_distribution_worker.rb index 41e61132f..ac5eda4af 100644 --- a/app/workers/activitypub/raw_distribution_worker.rb +++ b/app/workers/activitypub/raw_distribution_worker.rb @@ -2,22 +2,47 @@ class ActivityPub::RawDistributionWorker include Sidekiq::Worker + include Payloadable sidekiq_options queue: 'push' + # Base worker for when you want to queue up a bunch of deliveries of + # some payload. In this case, we have already generated JSON and + # we are going to distribute it to the account's followers minus + # the explicitly provided inboxes def perform(json, source_account_id, exclude_inboxes = []) - @account = Account.find(source_account_id) + @account = Account.find(source_account_id) + @json = json + @exclude_inboxes = exclude_inboxes - ActivityPub::DeliveryWorker.push_bulk(inboxes - exclude_inboxes) do |inbox_url| - [json, @account.id, inbox_url] - end + distribute! rescue ActiveRecord::RecordNotFound true end - private + protected + + def distribute! + return if inboxes.empty? + + ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url| + [payload, source_account_id, inbox_url, options] + end + end + + def payload + @json + end + + def source_account_id + @account.id + end def inboxes - @inboxes ||= @account.followers.inboxes + @inboxes ||= @account.followers.inboxes - @exclude_inboxes + end + + def options + nil end end diff --git a/app/workers/activitypub/reply_distribution_worker.rb b/app/workers/activitypub/reply_distribution_worker.rb deleted file mode 100644 index d4d0148ac..000000000 --- a/app/workers/activitypub/reply_distribution_worker.rb +++ /dev/null @@ -1,34 +0,0 @@ -# frozen_string_literal: true - -# Obsolete but kept around to make sure existing jobs do not fail after upgrade. -# Should be removed in a subsequent release. - -class ActivityPub::ReplyDistributionWorker - include Sidekiq::Worker - include Payloadable - - sidekiq_options queue: 'push' - - def perform(status_id) - @status = Status.find(status_id) - @account = @status.thread&.account - - return unless @account.present? && @status.distributable? - - ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url| - [payload, @status.account_id, inbox_url] - end - rescue ActiveRecord::RecordNotFound - true - end - - private - - def inboxes - @inboxes ||= @account.followers.inboxes - end - - def payload - @payload ||= Oj.dump(serialize_payload(ActivityPub::ActivityPresenter.from_status(@status), ActivityPub::ActivitySerializer, signer: @status.account)) - end -end diff --git a/app/workers/activitypub/update_distribution_worker.rb b/app/workers/activitypub/update_distribution_worker.rb index 3a207f071..81fde63b8 100644 --- a/app/workers/activitypub/update_distribution_worker.rb +++ b/app/workers/activitypub/update_distribution_worker.rb @@ -1,33 +1,24 @@ # frozen_string_literal: true -class ActivityPub::UpdateDistributionWorker - include Sidekiq::Worker - include Payloadable - - sidekiq_options queue: 'push' - +class ActivityPub::UpdateDistributionWorker < ActivityPub::RawDistributionWorker + # Distribute an profile update to servers that might have a copy + # of the account in question def perform(account_id, options = {}) @options = options.with_indifferent_access @account = Account.find(account_id) - ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url| - [signed_payload, @account.id, inbox_url] - end - - ActivityPub::DeliveryWorker.push_bulk(Relay.enabled.pluck(:inbox_url)) do |inbox_url| - [signed_payload, @account.id, inbox_url] - end + distribute! rescue ActiveRecord::RecordNotFound true end - private + protected def inboxes - @inboxes ||= @account.followers.inboxes + @inboxes ||= AccountReachFinder.new(@account).inboxes end - def signed_payload - @signed_payload ||= Oj.dump(serialize_payload(@account, ActivityPub::UpdateSerializer, signer: @account, sign_with: @options[:sign_with])) + def payload + @payload ||= Oj.dump(serialize_payload(@account, ActivityPub::UpdateSerializer, signer: @account, sign_with: @options[:sign_with])) end end diff --git a/app/workers/distribution_worker.rb b/app/workers/distribution_worker.rb index e85cd7e95..770325ccf 100644 --- a/app/workers/distribution_worker.rb +++ b/app/workers/distribution_worker.rb @@ -3,10 +3,10 @@ class DistributionWorker include Sidekiq::Worker - def perform(status_id) + def perform(status_id, options = {}) RedisLock.acquire(redis: Redis.current, key: "distribute:#{status_id}", autorelease: 5.minutes.seconds) do |lock| if lock.acquired? - FanOutOnWriteService.new.call(Status.find(status_id)) + FanOutOnWriteService.new.call(Status.find(status_id), **options.symbolize_keys) else raise Mastodon::RaceConditionError end diff --git a/app/workers/feed_insert_worker.rb b/app/workers/feed_insert_worker.rb index 45e6bb88d..043e50885 100644 --- a/app/workers/feed_insert_worker.rb +++ b/app/workers/feed_insert_worker.rb @@ -3,9 +3,10 @@ class FeedInsertWorker include Sidekiq::Worker - def perform(status_id, id, type = :home) - @type = type.to_sym - @status = Status.find(status_id) + def perform(status_id, id, type = :home, options = {}) + @type = type.to_sym + @status = Status.find(status_id) + @options = options.symbolize_keys case @type when :home @@ -25,10 +26,12 @@ class FeedInsertWorker private def check_and_insert - return if feed_filtered? - - perform_push - perform_notify if notify? + if feed_filtered? + perform_unpush if update? + else + perform_push + perform_notify if notify? + end end def feed_filtered? @@ -51,15 +54,30 @@ class FeedInsertWorker def perform_push case @type when :home - FeedManager.instance.push_to_home(@follower, @status) + FeedManager.instance.push_to_home(@follower, @status, update: update?) when :list - FeedManager.instance.push_to_list(@list, @status) + FeedManager.instance.push_to_list(@list, @status, update: update?) when :direct - FeedManager.instance.push_to_direct(@account, @status) + FeedManager.instance.push_to_direct(@account, @status, update: update?) + end + end + + def perform_unpush + case @type + when :home + FeedManager.instance.unpush_from_home(@follower, @status, update: true) + when :list + FeedManager.instance.unpush_from_list(@list, @status, update: true) + when :direct + FeedManager.instance.unpush_from_direct(@account, @status, update: true) end end def perform_notify NotifyService.new.call(@follower, :status, @status) end + + def update? + @options[:update] + end end diff --git a/app/workers/local_notification_worker.rb b/app/workers/local_notification_worker.rb index 6b08ca6fc..a22e2834d 100644 --- a/app/workers/local_notification_worker.rb +++ b/app/workers/local_notification_worker.rb @@ -12,6 +12,8 @@ class LocalNotificationWorker activity = activity_class_name.constantize.find(activity_id) end + return if Notification.where(account: receiver, activity: activity).any? + NotifyService.new.call(receiver, type || activity_class_name.underscore, activity) rescue ActiveRecord::RecordNotFound true diff --git a/app/workers/poll_expiration_notify_worker.rb b/app/workers/poll_expiration_notify_worker.rb index f0191d479..7613ed5f1 100644 --- a/app/workers/poll_expiration_notify_worker.rb +++ b/app/workers/poll_expiration_notify_worker.rb @@ -6,19 +6,44 @@ class PollExpirationNotifyWorker sidekiq_options lock: :until_executed def perform(poll_id) - poll = Poll.find(poll_id) + @poll = Poll.find(poll_id) - # Notify poll owner and remote voters - if poll.local? - ActivityPub::DistributePollUpdateWorker.perform_async(poll.status.id) - NotifyService.new.call(poll.account, :poll, poll) - end + return if does_not_expire? + requeue! && return if not_due_yet? - # Notify local voters - poll.votes.includes(:account).group(:account_id).select(:account_id).map(&:account).select(&:local?).each do |account| - NotifyService.new.call(account, :poll, poll) - end + notify_remote_voters_and_owner! if @poll.local? + notify_local_voters! rescue ActiveRecord::RecordNotFound true end + + def self.remove_from_scheduled(poll_id) + queue = Sidekiq::ScheduledSet.new + queue.select { |scheduled| scheduled.klass == name && scheduled.args[0] == poll_id }.map(&:delete) + end + + private + + def does_not_expire? + @poll.expires_at.nil? + end + + def not_due_yet? + @poll.expires_at.present? && !@poll.expired? + end + + def requeue! + PollExpirationNotifyWorker.perform_at(@poll.expires_at + 5.minutes, @poll.id) + end + + def notify_remote_voters_and_owner! + ActivityPub::DistributePollUpdateWorker.perform_async(@poll.status.id) + NotifyService.new.call(@poll.account, :poll, @poll) + end + + def notify_local_voters! + @poll.voters.merge(Account.local).find_each do |account| + NotifyService.new.call(account, :poll, @poll) + end + end end diff --git a/app/workers/push_update_worker.rb b/app/workers/push_update_worker.rb index d76d73d96..ae444cfde 100644 --- a/app/workers/push_update_worker.rb +++ b/app/workers/push_update_worker.rb @@ -2,15 +2,38 @@ class PushUpdateWorker include Sidekiq::Worker + include Redisable - def perform(account_id, status_id, timeline_id = nil) - account = Account.find(account_id) - status = Status.find(status_id) - message = InlineRenderer.render(status, account, :status) - timeline_id = "timeline:#{account.id}" if timeline_id.nil? + def perform(account_id, status_id, timeline_id = nil, options = {}) + @account = Account.find(account_id) + @status = Status.find(status_id) + @timeline_id = timeline_id || "timeline:#{account.id}" + @options = options.symbolize_keys - Redis.current.publish(timeline_id, Oj.dump(event: :update, payload: message, queued_at: (Time.now.to_f * 1000.0).to_i)) + publish! rescue ActiveRecord::RecordNotFound true end + + private + + def payload + InlineRenderer.render(@status, @account, :status) + end + + def message + Oj.dump( + event: update? ? :'status.update' : :update, + payload: payload, + queued_at: (Time.now.to_f * 1000.0).to_i + ) + end + + def publish! + redis.publish(@timeline_id, message) + end + + def update? + @options[:update] + end end |