From 4fcbb1f8385dbcb0ca0e3b6127b75c9485243367 Mon Sep 17 00:00:00 2001 From: Eugen Rochko Date: Mon, 14 Aug 2017 21:37:21 +0200 Subject: Re-add missing transaction around status-from-OStatus creation (#4603) --- app/lib/ostatus/activity/creation.rb | 40 ++++++++++++++++++++---------------- 1 file changed, 22 insertions(+), 18 deletions(-) (limited to 'app/lib/ostatus/activity') diff --git a/app/lib/ostatus/activity/creation.rb b/app/lib/ostatus/activity/creation.rb index e22f746f2..6ec2cdd56 100644 --- a/app/lib/ostatus/activity/creation.rb +++ b/app/lib/ostatus/activity/creation.rb @@ -16,24 +16,28 @@ class OStatus::Activity::Creation < OStatus::Activity::Base return [status, false] unless status.nil? - status = Status.create!( - uri: id, - url: url, - account: @account, - reblog: reblog, - text: content, - spoiler_text: content_warning, - created_at: published, - reply: thread?, - language: content_language, - visibility: visibility_scope, - conversation: find_or_create_conversation, - thread: thread? ? find_status(thread.first) : nil - ) - - save_mentions(status) - save_hashtags(status) - save_media(status) + cached_reblog = reblog + + ApplicationRecord.transaction do + status = Status.create!( + uri: id, + url: url, + account: @account, + reblog: cached_reblog, + text: content, + spoiler_text: content_warning, + created_at: published, + reply: thread?, + language: content_language, + visibility: visibility_scope, + conversation: find_or_create_conversation, + thread: thread? ? find_status(thread.first) : nil + ) + + save_mentions(status) + save_hashtags(status) + save_media(status) + end if thread? && status.thread.nil? Rails.logger.debug "Trying to attach #{status.id} (#{id}) to #{thread.first}" -- cgit From 40c45f5dd958aa1319b4e8cb664e6b4cac029526 Mon Sep 17 00:00:00 2001 From: Eugen Rochko Date: Fri, 18 Aug 2017 01:03:18 +0200 Subject: Put ActivityPub alternate link into Atom, prefer it when processing Atom (#4623) --- app/lib/activitypub/tag_manager.rb | 2 +- app/lib/ostatus/activity/base.rb | 14 +++++++++++++- app/lib/ostatus/activity/creation.rb | 5 +++++ app/lib/ostatus/activity/remote.rb | 6 +++++- app/lib/ostatus/atom_serializer.rb | 2 ++ spec/lib/ostatus/atom_serializer_spec.rb | 11 ++++++----- 6 files changed, 32 insertions(+), 8 deletions(-) (limited to 'app/lib/ostatus/activity') diff --git a/app/lib/activitypub/tag_manager.rb b/app/lib/activitypub/tag_manager.rb index 96e610b6d..bd5dddcac 100644 --- a/app/lib/activitypub/tag_manager.rb +++ b/app/lib/activitypub/tag_manager.rb @@ -71,7 +71,7 @@ class ActivityPub::TagManager def local_uri?(uri) host = Addressable::URI.parse(uri).normalized_host - ::TagManager.instance.local_domain?(host) || ::TagManager.instance.web_domain?(host) + !host.nil? && (::TagManager.instance.local_domain?(host) || ::TagManager.instance.web_domain?(host)) end def uri_to_local_id(uri, param = :id) diff --git a/app/lib/ostatus/activity/base.rb b/app/lib/ostatus/activity/base.rb index e1477f0eb..da9a01759 100644 --- a/app/lib/ostatus/activity/base.rb +++ b/app/lib/ostatus/activity/base.rb @@ -29,16 +29,28 @@ class OStatus::Activity::Base end def url - link = @xml.at_xpath('./xmlns:link[@rel="alternate"]', xmlns: TagManager::XMLNS) + link = @xml.xpath('./xmlns:link[@rel="alternate"]', xmlns: TagManager::XMLNS).find { |link_candidate| link_candidate['type'] == 'text/html' } link.nil? ? nil : link['href'] end + def activitypub_uri + link = @xml.xpath('./xmlns:link[@rel="alternate"]', xmlns: TagManager::XMLNS).find { |link_candidate| ['application/activity+json', 'application/ld+json; profile="https://www.w3.org/ns/activitystreams"'].include?(link_candidate['type']) } + link.nil? ? nil : link['href'] + end + + def activitypub_uri? + activitypub_uri.present? + end + private def find_status(uri) if TagManager.instance.local_id?(uri) local_id = TagManager.instance.unique_tag_to_local_id(uri, 'Status') return Status.find_by(id: local_id) + elsif ActivityPub::TagManager.instance.local_uri?(uri) + local_id = ActivityPub::TagManager.instance.uri_to_local_id(uri) + return Status.find_by(id: local_id) end Status.find_by(uri: uri) diff --git a/app/lib/ostatus/activity/creation.rb b/app/lib/ostatus/activity/creation.rb index 6ec2cdd56..12488ab31 100644 --- a/app/lib/ostatus/activity/creation.rb +++ b/app/lib/ostatus/activity/creation.rb @@ -8,6 +8,7 @@ class OStatus::Activity::Creation < OStatus::Activity::Base end return [nil, false] if @account.suspended? + return perform_via_activitypub if activitypub_uri? Rails.logger.debug "Creating remote status #{id}" @@ -52,6 +53,10 @@ class OStatus::Activity::Creation < OStatus::Activity::Base [status, true] end + def perform_via_activitypub + [find_status(activitypub_uri) || ActivityPub::FetchRemoteStatusService.new.call(activitypub_uri), false] + end + def content @xml.at_xpath('./xmlns:content', xmlns: TagManager::XMLNS).content end diff --git a/app/lib/ostatus/activity/remote.rb b/app/lib/ostatus/activity/remote.rb index ecec6886c..5b204b6d8 100644 --- a/app/lib/ostatus/activity/remote.rb +++ b/app/lib/ostatus/activity/remote.rb @@ -2,6 +2,10 @@ class OStatus::Activity::Remote < OStatus::Activity::Base def perform - find_status(id) || FetchRemoteStatusService.new.call(url) + if activitypub_uri? + find_status(activitypub_uri) || FetchRemoteStatusService.new.call(url) + else + find_status(id) || FetchRemoteStatusService.new.call(url) + end end end diff --git a/app/lib/ostatus/atom_serializer.rb b/app/lib/ostatus/atom_serializer.rb index 0d62361be..92a16d228 100644 --- a/app/lib/ostatus/atom_serializer.rb +++ b/app/lib/ostatus/atom_serializer.rb @@ -343,6 +343,8 @@ class OStatus::AtomSerializer end def serialize_status_attributes(entry, status) + append_element(entry, 'link', nil, rel: :alternate, type: 'application/activity+json', href: ActivityPub::TagManager.instance.uri_for(status)) if status.account.local? + append_element(entry, 'summary', status.spoiler_text, 'xml:lang': status.language) if status.spoiler_text? append_element(entry, 'content', Formatter.instance.format(status).to_str, type: 'html', 'xml:lang': status.language) diff --git a/spec/lib/ostatus/atom_serializer_spec.rb b/spec/lib/ostatus/atom_serializer_spec.rb index b0cb8f019..301a0ce30 100644 --- a/spec/lib/ostatus/atom_serializer_spec.rb +++ b/spec/lib/ostatus/atom_serializer_spec.rb @@ -196,7 +196,7 @@ RSpec.describe OStatus::AtomSerializer do author = OStatus::AtomSerializer.new.author(account) - link = author.nodes.find { |node| node.name == 'link' && node[:rel] == 'alternate' } + link = author.nodes.find { |node| node.name == 'link' && node[:rel] == 'alternate' && node[:type] == 'text/html' } expect(link[:type]).to eq 'text/html' expect(link[:rel]).to eq 'alternate' expect(link[:href]).to eq 'https://cb6e6126.ngrok.io/@username' @@ -407,6 +407,7 @@ RSpec.describe OStatus::AtomSerializer do remote_status.stream_entry.update!(created_at: '2000-01-01T00:00:00Z') entry = OStatus::AtomSerializer.new.entry(remote_status.stream_entry, true) + entry.nodes.delete_if { |node| node[:type] == 'application/activity+json' } # Remove ActivityPub link to simplify test xml = OStatus::AtomSerializer.render(entry).gsub('cb6e6126.ngrok.io', 'remote') remote_status.destroy! @@ -415,7 +416,7 @@ RSpec.describe OStatus::AtomSerializer do account = Account.create!( domain: 'remote', username: 'username', - last_webfingered_at: Time.now.utc, + last_webfingered_at: Time.now.utc ) ProcessFeedService.new.call(xml, account) @@ -529,7 +530,7 @@ RSpec.describe OStatus::AtomSerializer do entry = OStatus::AtomSerializer.new.entry(status.stream_entry) - link = entry.nodes.find { |node| node.name == 'link' && node[:rel] == 'alternate' } + link = entry.nodes.find { |node| node.name == 'link' && node[:rel] == 'alternate' && node[:type] == 'text/html' } expect(link[:type]).to eq 'text/html' expect(link[:href]).to eq "https://cb6e6126.ngrok.io/users/username/updates/#{status.stream_entry.id}" end @@ -642,7 +643,7 @@ RSpec.describe OStatus::AtomSerializer do feed = OStatus::AtomSerializer.new.feed(account, []) - link = feed.nodes.find { |node| node.name == 'link' && node[:rel] == 'alternate' } + link = feed.nodes.find { |node| node.name == 'link' && node[:rel] == 'alternate' && node[:type] == 'text/html' } expect(link[:type]).to eq 'text/html' expect(link[:href]).to eq 'https://cb6e6126.ngrok.io/@username' end @@ -1509,7 +1510,7 @@ RSpec.describe OStatus::AtomSerializer do entry = OStatus::AtomSerializer.new.object(status) - link = entry.nodes.find { |node| node.name == 'link' && node[:rel] == 'alternate' } + link = entry.nodes.find { |node| node.name == 'link' && node[:rel] == 'alternate' && node[:type] == 'text/html' } expect(link[:type]).to eq 'text/html' expect(link[:href]).to eq "https://cb6e6126.ngrok.io/@username/#{status.id}" end -- cgit From 4c76402ba1d355061e7e208b7a2f8251388a38e1 Mon Sep 17 00:00:00 2001 From: Eugen Rochko Date: Tue, 29 Aug 2017 16:11:05 +0200 Subject: Serialize ActivityPub alternate link into OStatus deletes, handle it (#4730) Requires moving Atom rendering from DistributionWorker (where `stream_entry.status` is already nil) to inline (where `stream_entry.status.destroyed?` is true) and distributing that. Unfortunately, such XML renderings can no longer be easily chained together into one payload of n items. --- app/lib/ostatus/activity/deletion.rb | 4 +++- app/lib/ostatus/atom_serializer.rb | 3 +++ app/models/status.rb | 13 ++++++++++-- app/services/batched_remove_status_service.rb | 24 +++++++++++++--------- app/services/remove_status_service.rb | 4 +--- .../pubsubhubbub/raw_distribution_worker.rb | 22 ++++++++++++++++++++ .../services/batched_remove_status_service_spec.rb | 7 +++---- 7 files changed, 57 insertions(+), 20 deletions(-) create mode 100644 app/workers/pubsubhubbub/raw_distribution_worker.rb (limited to 'app/lib/ostatus/activity') diff --git a/app/lib/ostatus/activity/deletion.rb b/app/lib/ostatus/activity/deletion.rb index 860faf501..c98f5ee0a 100644 --- a/app/lib/ostatus/activity/deletion.rb +++ b/app/lib/ostatus/activity/deletion.rb @@ -3,7 +3,9 @@ class OStatus::Activity::Deletion < OStatus::Activity::Base def perform Rails.logger.debug "Deleting remote status #{id}" - status = Status.find_by(uri: id, account: @account) + + status = Status.find_by(uri: id, account: @account) + status ||= Status.find_by(uri: activitypub_uri, account: @account) if activitypub_uri? if status.nil? redis.setex("delete_upon_arrival:#{@account.id}:#{id}", 6 * 3_600, id) diff --git a/app/lib/ostatus/atom_serializer.rb b/app/lib/ostatus/atom_serializer.rb index 92a16d228..81fae4140 100644 --- a/app/lib/ostatus/atom_serializer.rb +++ b/app/lib/ostatus/atom_serializer.rb @@ -79,6 +79,9 @@ class OStatus::AtomSerializer if stream_entry.status.nil? append_element(entry, 'content', 'Deleted status') + elsif stream_entry.status.destroyed? + append_element(entry, 'content', 'Deleted status') + append_element(entry, 'link', nil, rel: :alternate, type: 'application/activity+json', href: ActivityPub::TagManager.instance.uri_for(stream_entry.status)) if stream_entry.account.local? else serialize_status_attributes(entry, stream_entry.status) end diff --git a/app/models/status.rb b/app/models/status.rb index 3dc83ad1f..abd902cd7 100644 --- a/app/models/status.rb +++ b/app/models/status.rb @@ -51,6 +51,7 @@ class Status < ApplicationRecord has_one :notification, as: :activity, dependent: :destroy has_one :preview_card, dependent: :destroy + has_one :stream_entry, as: :activity, inverse_of: :status validates :uri, uniqueness: true, unless: :local? validates :text, presence: true, unless: :reblog? @@ -90,7 +91,11 @@ class Status < ApplicationRecord end def verb - reblog? ? :share : :post + if destroyed? + :delete + else + reblog? ? :share : :post + end end def object_type @@ -110,7 +115,11 @@ class Status < ApplicationRecord end def title - reblog? ? "#{account.acct} shared a status by #{reblog.account.acct}" : "New status by #{account.acct}" + if destroyed? + "#{account.acct} deleted status" + else + reblog? ? "#{account.acct} shared a status by #{reblog.account.acct}" : "New status by #{account.acct}" + end end def hidden? diff --git a/app/services/batched_remove_status_service.rb b/app/services/batched_remove_status_service.rb index e9e22298d..86eaa5735 100644 --- a/app/services/batched_remove_status_service.rb +++ b/app/services/batched_remove_status_service.rb @@ -20,9 +20,10 @@ class BatchedRemoveStatusService < BaseService @activity_json_batches = [] @json_payloads = statuses.map { |s| [s.id, Oj.dump(event: :delete, payload: s.id)] }.to_h @activity_json = {} + @activity_xml = {} # Ensure that rendered XML reflects destroyed state - Status.where(id: statuses.map(&:id)).in_batches.destroy_all + statuses.each(&:destroy) # Batch by source account statuses.group_by(&:account_id).each do |_, account_statuses| @@ -31,7 +32,7 @@ class BatchedRemoveStatusService < BaseService unpush_from_home_timelines(account_statuses) if account.local? - batch_stream_entries(account_statuses) + batch_stream_entries(account, account_statuses) batch_activity_json(account, account_statuses) end end @@ -42,18 +43,16 @@ class BatchedRemoveStatusService < BaseService batch_salmon_slaps(status) if status.local? end - Pubsubhubbub::DistributionWorker.push_bulk(@stream_entry_batches) { |batch| batch } + Pubsubhubbub::RawDistributionWorker.push_bulk(@stream_entry_batches) { |batch| batch } NotificationWorker.push_bulk(@salmon_batches) { |batch| batch } ActivityPub::DeliveryWorker.push_bulk(@activity_json_batches) { |batch| batch } end private - def batch_stream_entries(statuses) - stream_entry_ids = statuses.map { |s| s.stream_entry.id } - - stream_entry_ids.each_slice(100) do |batch_of_stream_entry_ids| - @stream_entry_batches << [batch_of_stream_entry_ids] + def batch_stream_entries(account, statuses) + statuses.each do |status| + @stream_entry_batches << [build_xml(status.stream_entry), account.id] end end @@ -101,11 +100,10 @@ class BatchedRemoveStatusService < BaseService def batch_salmon_slaps(status) return if @mentions[status.id].empty? - payload = stream_entry_to_xml(status.stream_entry.reload) recipients = @mentions[status.id].map(&:account).reject(&:local?).select(&:ostatus?).uniq(&:domain).map(&:id) recipients.each do |recipient_id| - @salmon_batches << [payload, status.account_id, recipient_id] + @salmon_batches << [build_xml(status.stream_entry), status.account_id, recipient_id] end end @@ -145,6 +143,12 @@ class BatchedRemoveStatusService < BaseService ).as_json) end + def build_xml(stream_entry) + return @activity_xml[stream_entry.id] if @activity_xml.key?(stream_entry.id) + + @activity_xml[stream_entry.id] = stream_entry_to_xml(stream_entry) + end + def sign_json(status, json) Oj.dump(ActivityPub::LinkedDataSignature.new(json).sign!(status.account)) end diff --git a/app/services/remove_status_service.rb b/app/services/remove_status_service.rb index 7ddbd8906..83fc77043 100644 --- a/app/services/remove_status_service.rb +++ b/app/services/remove_status_service.rb @@ -22,8 +22,6 @@ class RemoveStatusService < BaseService return unless @account.local? - @stream_entry = @stream_entry.reload - remove_from_remote_followers remove_from_remote_affected end @@ -62,7 +60,7 @@ class RemoveStatusService < BaseService def remove_from_remote_followers # OStatus - Pubsubhubbub::DistributionWorker.perform_async(@stream_entry.id) + Pubsubhubbub::RawDistributionWorker.perform_async(salmon_xml, @account.id) # ActivityPub ActivityPub::DeliveryWorker.push_bulk(@account.followers.inboxes) do |inbox_url| diff --git a/app/workers/pubsubhubbub/raw_distribution_worker.rb b/app/workers/pubsubhubbub/raw_distribution_worker.rb new file mode 100644 index 000000000..16962a623 --- /dev/null +++ b/app/workers/pubsubhubbub/raw_distribution_worker.rb @@ -0,0 +1,22 @@ +# frozen_string_literal: true + +class Pubsubhubbub::RawDistributionWorker + include Sidekiq::Worker + + sidekiq_options queue: 'push' + + def perform(xml, source_account_id) + @account = Account.find(source_account_id) + @subscriptions = active_subscriptions.to_a + + Pubsubhubbub::DeliveryWorker.push_bulk(@subscriptions) do |subscription| + [subscription.id, xml] + end + end + + private + + def active_subscriptions + Subscription.where(account: @account).active.select('id, callback_url, domain') + end +end diff --git a/spec/services/batched_remove_status_service_spec.rb b/spec/services/batched_remove_status_service_spec.rb index 2484d4b58..b1e9ac567 100644 --- a/spec/services/batched_remove_status_service_spec.rb +++ b/spec/services/batched_remove_status_service_spec.rb @@ -48,11 +48,10 @@ RSpec.describe BatchedRemoveStatusService do expect(Redis.current).to have_received(:publish).with('timeline:public', any_args).at_least(:once) end - it 'sends PuSH update to PuSH subscribers with two payloads united' do + it 'sends PuSH update to PuSH subscribers' do expect(a_request(:post, 'http://example.com/push').with { |req| - matches = req.body.scan(TagManager::VERBS[:delete]) - matches.size == 2 - }).to have_been_made + matches = req.body.match(TagManager::VERBS[:delete]) + }).to have_been_made.at_least_once end it 'sends Salmon slap to previously mentioned users' do -- cgit From af2d22f88cf84313e11a5f0b1f3a1170f721967e Mon Sep 17 00:00:00 2001 From: unarist Date: Fri, 1 Sep 2017 19:33:02 +0900 Subject: Fallback from perform_via_activitypub on private posts (#4758) Currently, private / direct posts via OStatus from AP compatible instance will be dropped due to failing to fetch AP version. So this fallbacks to OStatus handling: * when failed to fetch ActivityPub version * when status is neither :public nor :unlisted --- app/lib/ostatus/activity/creation.rb | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) (limited to 'app/lib/ostatus/activity') diff --git a/app/lib/ostatus/activity/creation.rb b/app/lib/ostatus/activity/creation.rb index 12488ab31..b28239a05 100644 --- a/app/lib/ostatus/activity/creation.rb +++ b/app/lib/ostatus/activity/creation.rb @@ -8,7 +8,11 @@ class OStatus::Activity::Creation < OStatus::Activity::Base end return [nil, false] if @account.suspended? - return perform_via_activitypub if activitypub_uri? + + if activitypub_uri? && [:public, :unlisted].include?(visibility_scope) + result = perform_via_activitypub + return result if result.first.present? + end Rails.logger.debug "Creating remote status #{id}" -- cgit From 504737e860a13f9636fc47131ff27238236e8971 Mon Sep 17 00:00:00 2001 From: unarist Date: Fri, 1 Sep 2017 20:34:04 +0900 Subject: Convert OStatus tag to ActivityPub id on in_reply_to resolution (#4756) --- app/lib/ostatus/activity/base.rb | 10 ++++++++++ app/lib/ostatus/activity/creation.rb | 2 +- 2 files changed, 11 insertions(+), 1 deletion(-) (limited to 'app/lib/ostatus/activity') diff --git a/app/lib/ostatus/activity/base.rb b/app/lib/ostatus/activity/base.rb index da9a01759..1dc7abee3 100644 --- a/app/lib/ostatus/activity/base.rb +++ b/app/lib/ostatus/activity/base.rb @@ -56,6 +56,16 @@ class OStatus::Activity::Base Status.find_by(uri: uri) end + def find_activitypub_status(uri, href) + tag_matches = /tag:([^,:]+)[^:]*:objectId=([\d]+)/.match(uri) + href_matches = %r{/users/([^/]+)}.match(href) + + unless tag_matches.nil? || href_matches.nil? + uri = "https://#{tag_matches[1]}/users/#{href_matches[1]}/statuses/#{tag_matches[2]}" + Status.find_by(uri: uri) + end + end + def redis Redis.current end diff --git a/app/lib/ostatus/activity/creation.rb b/app/lib/ostatus/activity/creation.rb index b28239a05..1a23c9efa 100644 --- a/app/lib/ostatus/activity/creation.rb +++ b/app/lib/ostatus/activity/creation.rb @@ -36,7 +36,7 @@ class OStatus::Activity::Creation < OStatus::Activity::Base language: content_language, visibility: visibility_scope, conversation: find_or_create_conversation, - thread: thread? ? find_status(thread.first) : nil + thread: thread? ? find_status(thread.first) || find_activitypub_status(thread.first, thread.second) : nil ) save_mentions(status) -- cgit