From 6e9eda53319bc970b085c7c55277981320b2a835 Mon Sep 17 00:00:00 2001 From: Eugen Rochko Date: Mon, 21 Aug 2017 01:14:40 +0200 Subject: ActivityPub migration procedure (#4617) * ActivityPub migration procedure Once one account is detected as going from OStatus to ActivityPub, invalidate WebFinger cache for other accounts from the same domain * Unsubscribe from PuSH updates once we receive an ActivityPub payload * Re-subscribe to PuSH unless already unsubscribed, regardless of protocol --- app/workers/pubsubhubbub/unsubscribe_worker.rb | 15 +++++++++++++++ 1 file changed, 15 insertions(+) create mode 100644 app/workers/pubsubhubbub/unsubscribe_worker.rb (limited to 'app/workers/pubsubhubbub') diff --git a/app/workers/pubsubhubbub/unsubscribe_worker.rb b/app/workers/pubsubhubbub/unsubscribe_worker.rb new file mode 100644 index 000000000..a271715b7 --- /dev/null +++ b/app/workers/pubsubhubbub/unsubscribe_worker.rb @@ -0,0 +1,15 @@ +# frozen_string_literal: true + +class Pubsubhubbub::UnsubscribeWorker + include Sidekiq::Worker + + sidekiq_options queue: 'push', retry: false, unique: :until_executed, dead: false + + def perform(account_id) + account = Account.find(account_id) + logger.debug "PuSH unsubscribing from #{account.acct}" + ::UnsubscribeService.new.call(account) + rescue ActiveRecord::RecordNotFound + true + end +end -- cgit From cf615abbf9323f3b73681306090de48f9e13a6b9 Mon Sep 17 00:00:00 2001 From: Eugen Rochko Date: Thu, 24 Aug 2017 17:51:32 +0200 Subject: Add configuration to disable private status federation over PuSH (#4582) --- app/services/process_mentions_service.rb | 2 +- app/workers/pubsubhubbub/distribution_worker.rb | 2 +- config/initializers/ostatus.rb | 3 +- .../pubsubhubbub/distribution_worker_spec.rb | 64 +++++++++++++++++----- 4 files changed, 55 insertions(+), 16 deletions(-) (limited to 'app/workers/pubsubhubbub') diff --git a/app/services/process_mentions_service.rb b/app/services/process_mentions_service.rb index 407fa8c18..2b8a77147 100644 --- a/app/services/process_mentions_service.rb +++ b/app/services/process_mentions_service.rb @@ -39,7 +39,7 @@ class ProcessMentionsService < BaseService if mentioned_account.local? NotifyService.new.call(mentioned_account, mention) - elsif mentioned_account.ostatus? + elsif mentioned_account.ostatus? && (Rails.configuration.x.use_ostatus_privacy || !status.stream_entry.hidden?) NotificationWorker.perform_async(stream_entry_to_xml(status.stream_entry), status.account_id, mentioned_account.id) elsif mentioned_account.activitypub? ActivityPub::DeliveryWorker.perform_async(build_json(mention.status), mention.status.account_id, mentioned_account.inbox_url) diff --git a/app/workers/pubsubhubbub/distribution_worker.rb b/app/workers/pubsubhubbub/distribution_worker.rb index ea246128d..2a5e60fa0 100644 --- a/app/workers/pubsubhubbub/distribution_worker.rb +++ b/app/workers/pubsubhubbub/distribution_worker.rb @@ -14,7 +14,7 @@ class Pubsubhubbub::DistributionWorker @subscriptions = active_subscriptions.to_a distribute_public!(stream_entries.reject(&:hidden?)) - distribute_hidden!(stream_entries.select(&:hidden?)) + distribute_hidden!(stream_entries.select(&:hidden?)) if Rails.configuration.x.use_ostatus_privacy end private diff --git a/config/initializers/ostatus.rb b/config/initializers/ostatus.rb index 342996dcd..a885545f8 100644 --- a/config/initializers/ostatus.rb +++ b/config/initializers/ostatus.rb @@ -5,7 +5,7 @@ host = ENV.fetch('LOCAL_DOMAIN') { "localhost:#{port}" } web_host = ENV.fetch('WEB_DOMAIN') { host } https = ENV['LOCAL_HTTPS'] == 'true' -alternate_domains = ENV.fetch('ALTERNATE_DOMAINS') { "" } +alternate_domains = ENV.fetch('ALTERNATE_DOMAINS') { '' } Rails.application.configure do config.x.local_domain = host @@ -17,6 +17,7 @@ Rails.application.configure do config.action_mailer.default_url_options = { host: web_host, protocol: https ? 'https://' : 'http://', trailing_slash: false } config.x.streaming_api_base_url = 'ws://localhost:4000' + config.x.use_ostatus_privacy = true if Rails.env.production? config.x.streaming_api_base_url = ENV.fetch('STREAMING_API_BASE_URL') { "ws#{https ? 's' : ''}://#{web_host}" } diff --git a/spec/workers/pubsubhubbub/distribution_worker_spec.rb b/spec/workers/pubsubhubbub/distribution_worker_spec.rb index 89191c084..5c22e7fa8 100644 --- a/spec/workers/pubsubhubbub/distribution_worker_spec.rb +++ b/spec/workers/pubsubhubbub/distribution_worker_spec.rb @@ -22,24 +22,62 @@ describe Pubsubhubbub::DistributionWorker do end end - describe 'with private status' do - let(:status) { Fabricate(:status, account: alice, text: 'Hello', visibility: :private) } + context 'when OStatus privacy is used' do + around do |example| + before_val = Rails.configuration.x.use_ostatus_privacy + Rails.configuration.x.use_ostatus_privacy = true + example.run + Rails.configuration.x.use_ostatus_privacy = before_val + end - it 'delivers payload only to subscriptions with followers' do - allow(Pubsubhubbub::DeliveryWorker).to receive(:push_bulk) - subject.perform(status.stream_entry.id) - expect(Pubsubhubbub::DeliveryWorker).to have_received(:push_bulk).with([subscription_with_follower]) - expect(Pubsubhubbub::DeliveryWorker).to_not have_received(:push_bulk).with([anonymous_subscription]) + describe 'with private status' do + let(:status) { Fabricate(:status, account: alice, text: 'Hello', visibility: :private) } + + it 'delivers payload only to subscriptions with followers' do + allow(Pubsubhubbub::DeliveryWorker).to receive(:push_bulk) + subject.perform(status.stream_entry.id) + expect(Pubsubhubbub::DeliveryWorker).to have_received(:push_bulk).with([subscription_with_follower]) + expect(Pubsubhubbub::DeliveryWorker).to_not have_received(:push_bulk).with([anonymous_subscription]) + end + end + + describe 'with direct status' do + let(:status) { Fabricate(:status, account: alice, text: 'Hello', visibility: :direct) } + + it 'does not deliver payload' do + allow(Pubsubhubbub::DeliveryWorker).to receive(:push_bulk) + subject.perform(status.stream_entry.id) + expect(Pubsubhubbub::DeliveryWorker).to_not have_received(:push_bulk) + end end end - describe 'with direct status' do - let(:status) { Fabricate(:status, account: alice, text: 'Hello', visibility: :direct) } + context 'when OStatus privacy is not used' do + around do |example| + before_val = Rails.configuration.x.use_ostatus_privacy + Rails.configuration.x.use_ostatus_privacy = false + example.run + Rails.configuration.x.use_ostatus_privacy = before_val + end - it 'does not deliver payload' do - allow(Pubsubhubbub::DeliveryWorker).to receive(:push_bulk) - subject.perform(status.stream_entry.id) - expect(Pubsubhubbub::DeliveryWorker).to_not have_received(:push_bulk) + describe 'with private status' do + let(:status) { Fabricate(:status, account: alice, text: 'Hello', visibility: :private) } + + it 'does not deliver anything' do + allow(Pubsubhubbub::DeliveryWorker).to receive(:push_bulk) + subject.perform(status.stream_entry.id) + expect(Pubsubhubbub::DeliveryWorker).to_not have_received(:push_bulk) + end + end + + describe 'with direct status' do + let(:status) { Fabricate(:status, account: alice, text: 'Hello', visibility: :direct) } + + it 'does not deliver payload' do + allow(Pubsubhubbub::DeliveryWorker).to receive(:push_bulk) + subject.perform(status.stream_entry.id) + expect(Pubsubhubbub::DeliveryWorker).to_not have_received(:push_bulk) + end end end end -- cgit From 4c76402ba1d355061e7e208b7a2f8251388a38e1 Mon Sep 17 00:00:00 2001 From: Eugen Rochko Date: Tue, 29 Aug 2017 16:11:05 +0200 Subject: Serialize ActivityPub alternate link into OStatus deletes, handle it (#4730) Requires moving Atom rendering from DistributionWorker (where `stream_entry.status` is already nil) to inline (where `stream_entry.status.destroyed?` is true) and distributing that. Unfortunately, such XML renderings can no longer be easily chained together into one payload of n items. --- app/lib/ostatus/activity/deletion.rb | 4 +++- app/lib/ostatus/atom_serializer.rb | 3 +++ app/models/status.rb | 13 ++++++++++-- app/services/batched_remove_status_service.rb | 24 +++++++++++++--------- app/services/remove_status_service.rb | 4 +--- .../pubsubhubbub/raw_distribution_worker.rb | 22 ++++++++++++++++++++ .../services/batched_remove_status_service_spec.rb | 7 +++---- 7 files changed, 57 insertions(+), 20 deletions(-) create mode 100644 app/workers/pubsubhubbub/raw_distribution_worker.rb (limited to 'app/workers/pubsubhubbub') diff --git a/app/lib/ostatus/activity/deletion.rb b/app/lib/ostatus/activity/deletion.rb index 860faf501..c98f5ee0a 100644 --- a/app/lib/ostatus/activity/deletion.rb +++ b/app/lib/ostatus/activity/deletion.rb @@ -3,7 +3,9 @@ class OStatus::Activity::Deletion < OStatus::Activity::Base def perform Rails.logger.debug "Deleting remote status #{id}" - status = Status.find_by(uri: id, account: @account) + + status = Status.find_by(uri: id, account: @account) + status ||= Status.find_by(uri: activitypub_uri, account: @account) if activitypub_uri? if status.nil? redis.setex("delete_upon_arrival:#{@account.id}:#{id}", 6 * 3_600, id) diff --git a/app/lib/ostatus/atom_serializer.rb b/app/lib/ostatus/atom_serializer.rb index 92a16d228..81fae4140 100644 --- a/app/lib/ostatus/atom_serializer.rb +++ b/app/lib/ostatus/atom_serializer.rb @@ -79,6 +79,9 @@ class OStatus::AtomSerializer if stream_entry.status.nil? append_element(entry, 'content', 'Deleted status') + elsif stream_entry.status.destroyed? + append_element(entry, 'content', 'Deleted status') + append_element(entry, 'link', nil, rel: :alternate, type: 'application/activity+json', href: ActivityPub::TagManager.instance.uri_for(stream_entry.status)) if stream_entry.account.local? else serialize_status_attributes(entry, stream_entry.status) end diff --git a/app/models/status.rb b/app/models/status.rb index 3dc83ad1f..abd902cd7 100644 --- a/app/models/status.rb +++ b/app/models/status.rb @@ -51,6 +51,7 @@ class Status < ApplicationRecord has_one :notification, as: :activity, dependent: :destroy has_one :preview_card, dependent: :destroy + has_one :stream_entry, as: :activity, inverse_of: :status validates :uri, uniqueness: true, unless: :local? validates :text, presence: true, unless: :reblog? @@ -90,7 +91,11 @@ class Status < ApplicationRecord end def verb - reblog? ? :share : :post + if destroyed? + :delete + else + reblog? ? :share : :post + end end def object_type @@ -110,7 +115,11 @@ class Status < ApplicationRecord end def title - reblog? ? "#{account.acct} shared a status by #{reblog.account.acct}" : "New status by #{account.acct}" + if destroyed? + "#{account.acct} deleted status" + else + reblog? ? "#{account.acct} shared a status by #{reblog.account.acct}" : "New status by #{account.acct}" + end end def hidden? diff --git a/app/services/batched_remove_status_service.rb b/app/services/batched_remove_status_service.rb index e9e22298d..86eaa5735 100644 --- a/app/services/batched_remove_status_service.rb +++ b/app/services/batched_remove_status_service.rb @@ -20,9 +20,10 @@ class BatchedRemoveStatusService < BaseService @activity_json_batches = [] @json_payloads = statuses.map { |s| [s.id, Oj.dump(event: :delete, payload: s.id)] }.to_h @activity_json = {} + @activity_xml = {} # Ensure that rendered XML reflects destroyed state - Status.where(id: statuses.map(&:id)).in_batches.destroy_all + statuses.each(&:destroy) # Batch by source account statuses.group_by(&:account_id).each do |_, account_statuses| @@ -31,7 +32,7 @@ class BatchedRemoveStatusService < BaseService unpush_from_home_timelines(account_statuses) if account.local? - batch_stream_entries(account_statuses) + batch_stream_entries(account, account_statuses) batch_activity_json(account, account_statuses) end end @@ -42,18 +43,16 @@ class BatchedRemoveStatusService < BaseService batch_salmon_slaps(status) if status.local? end - Pubsubhubbub::DistributionWorker.push_bulk(@stream_entry_batches) { |batch| batch } + Pubsubhubbub::RawDistributionWorker.push_bulk(@stream_entry_batches) { |batch| batch } NotificationWorker.push_bulk(@salmon_batches) { |batch| batch } ActivityPub::DeliveryWorker.push_bulk(@activity_json_batches) { |batch| batch } end private - def batch_stream_entries(statuses) - stream_entry_ids = statuses.map { |s| s.stream_entry.id } - - stream_entry_ids.each_slice(100) do |batch_of_stream_entry_ids| - @stream_entry_batches << [batch_of_stream_entry_ids] + def batch_stream_entries(account, statuses) + statuses.each do |status| + @stream_entry_batches << [build_xml(status.stream_entry), account.id] end end @@ -101,11 +100,10 @@ class BatchedRemoveStatusService < BaseService def batch_salmon_slaps(status) return if @mentions[status.id].empty? - payload = stream_entry_to_xml(status.stream_entry.reload) recipients = @mentions[status.id].map(&:account).reject(&:local?).select(&:ostatus?).uniq(&:domain).map(&:id) recipients.each do |recipient_id| - @salmon_batches << [payload, status.account_id, recipient_id] + @salmon_batches << [build_xml(status.stream_entry), status.account_id, recipient_id] end end @@ -145,6 +143,12 @@ class BatchedRemoveStatusService < BaseService ).as_json) end + def build_xml(stream_entry) + return @activity_xml[stream_entry.id] if @activity_xml.key?(stream_entry.id) + + @activity_xml[stream_entry.id] = stream_entry_to_xml(stream_entry) + end + def sign_json(status, json) Oj.dump(ActivityPub::LinkedDataSignature.new(json).sign!(status.account)) end diff --git a/app/services/remove_status_service.rb b/app/services/remove_status_service.rb index 7ddbd8906..83fc77043 100644 --- a/app/services/remove_status_service.rb +++ b/app/services/remove_status_service.rb @@ -22,8 +22,6 @@ class RemoveStatusService < BaseService return unless @account.local? - @stream_entry = @stream_entry.reload - remove_from_remote_followers remove_from_remote_affected end @@ -62,7 +60,7 @@ class RemoveStatusService < BaseService def remove_from_remote_followers # OStatus - Pubsubhubbub::DistributionWorker.perform_async(@stream_entry.id) + Pubsubhubbub::RawDistributionWorker.perform_async(salmon_xml, @account.id) # ActivityPub ActivityPub::DeliveryWorker.push_bulk(@account.followers.inboxes) do |inbox_url| diff --git a/app/workers/pubsubhubbub/raw_distribution_worker.rb b/app/workers/pubsubhubbub/raw_distribution_worker.rb new file mode 100644 index 000000000..16962a623 --- /dev/null +++ b/app/workers/pubsubhubbub/raw_distribution_worker.rb @@ -0,0 +1,22 @@ +# frozen_string_literal: true + +class Pubsubhubbub::RawDistributionWorker + include Sidekiq::Worker + + sidekiq_options queue: 'push' + + def perform(xml, source_account_id) + @account = Account.find(source_account_id) + @subscriptions = active_subscriptions.to_a + + Pubsubhubbub::DeliveryWorker.push_bulk(@subscriptions) do |subscription| + [subscription.id, xml] + end + end + + private + + def active_subscriptions + Subscription.where(account: @account).active.select('id, callback_url, domain') + end +end diff --git a/spec/services/batched_remove_status_service_spec.rb b/spec/services/batched_remove_status_service_spec.rb index 2484d4b58..b1e9ac567 100644 --- a/spec/services/batched_remove_status_service_spec.rb +++ b/spec/services/batched_remove_status_service_spec.rb @@ -48,11 +48,10 @@ RSpec.describe BatchedRemoveStatusService do expect(Redis.current).to have_received(:publish).with('timeline:public', any_args).at_least(:once) end - it 'sends PuSH update to PuSH subscribers with two payloads united' do + it 'sends PuSH update to PuSH subscribers' do expect(a_request(:post, 'http://example.com/push').with { |req| - matches = req.body.scan(TagManager::VERBS[:delete]) - matches.size == 2 - }).to have_been_made + matches = req.body.match(TagManager::VERBS[:delete]) + }).to have_been_made.at_least_once end it 'sends Salmon slap to previously mentioned users' do -- cgit