diff options
author | Eugen Rochko <eugen@zeonfederated.com> | 2017-08-13 00:44:41 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-08-13 00:44:41 +0200 |
commit | b7370ac8baa643d93ea727699b3b11f9d3a55bea (patch) | |
tree | 869a8c2d44f78d96255ae0bf20a84c150ca23702 /app/services | |
parent | ccdd5a9576819cdc95946d98fea0e3c8bbd1d626 (diff) |
ActivityPub delivery (#4566)
* Deliver ActivityPub Like * Deliver ActivityPub Undo-Like * Deliver ActivityPub Create/Announce activities * Deliver ActivityPub creates from mentions * Deliver ActivityPub Block/Undo-Block * Deliver ActivityPub Accept/Reject-Follow * Deliver ActivityPub Undo-Follow * Deliver ActivityPub Follow * Deliver ActivityPub Delete activities Incidentally fix #889 * Adjust BatchedRemoveStatusService for ActivityPub * Add tests for ActivityPub workers * Add tests for FollowService * Add tests for FavouriteService, UnfollowService and PostStatusService * Add tests for ReblogService, BlockService, UnblockService, ProcessMentionsService * Add tests for AuthorizeFollowService, RejectFollowService, RemoveStatusService * Add tests for BatchedRemoveStatusService * Deliver updates to a local account to ActivityPub followers * Minor adjustments
Diffstat (limited to 'app/services')
-rw-r--r-- | app/services/authorize_follow_service.rb | 19 | ||||
-rw-r--r-- | app/services/batched_remove_status_service.rb | 43 | ||||
-rw-r--r-- | app/services/block_service.rb | 19 | ||||
-rw-r--r-- | app/services/favourite_service.rb | 28 | ||||
-rw-r--r-- | app/services/follow_service.rb | 14 | ||||
-rw-r--r-- | app/services/post_status_service.rb | 1 | ||||
-rw-r--r-- | app/services/process_mentions_service.rb | 28 | ||||
-rw-r--r-- | app/services/reblog_service.rb | 28 | ||||
-rw-r--r-- | app/services/reject_follow_service.rb | 19 | ||||
-rw-r--r-- | app/services/remove_status_service.rb | 49 | ||||
-rw-r--r-- | app/services/unblock_service.rb | 19 | ||||
-rw-r--r-- | app/services/unfavourite_service.rb | 22 | ||||
-rw-r--r-- | app/services/unfollow_service.rb | 19 |
13 files changed, 267 insertions, 41 deletions
diff --git a/app/services/authorize_follow_service.rb b/app/services/authorize_follow_service.rb index 41815a393..db35b6030 100644 --- a/app/services/authorize_follow_service.rb +++ b/app/services/authorize_follow_service.rb @@ -4,11 +4,28 @@ class AuthorizeFollowService < BaseService def call(source_account, target_account) follow_request = FollowRequest.find_by!(account: source_account, target_account: target_account) follow_request.authorize! - NotificationWorker.perform_async(build_xml(follow_request), target_account.id, source_account.id) unless source_account.local? + create_notification(follow_request) unless source_account.local? + follow_request end private + def create_notification(follow_request) + if follow_request.account.ostatus? + NotificationWorker.perform_async(build_xml(follow_request), follow_request.target_account_id, follow_request.account_id) + elsif follow_request.account.activitypub? + ActivityPub::DeliveryWorker.perform_async(build_json(follow_request), follow_request.target_account_id, follow_request.account.inbox_url) + end + end + + def build_json(follow_request) + ActiveModelSerializers::SerializableResource.new( + follow_request, + serializer: ActivityPub::AcceptFollowSerializer, + adapter: ActivityPub::Adapter + ).to_json + end + def build_xml(follow_request) OStatus::AtomSerializer.render(OStatus::AtomSerializer.new.authorize_follow_request_salmon(follow_request)) end diff --git a/app/services/batched_remove_status_service.rb b/app/services/batched_remove_status_service.rb index ab810c628..e6c8c9208 100644 --- a/app/services/batched_remove_status_service.rb +++ b/app/services/batched_remove_status_service.rb @@ -15,9 +15,11 @@ class BatchedRemoveStatusService < BaseService @mentions = statuses.map { |s| [s.id, s.mentions.includes(:account).to_a] }.to_h @tags = statuses.map { |s| [s.id, s.tags.pluck(:name)] }.to_h - @stream_entry_batches = [] - @salmon_batches = [] - @json_payloads = statuses.map { |s| [s.id, Oj.dump(event: :delete, payload: s.id)] }.to_h + @stream_entry_batches = [] + @salmon_batches = [] + @activity_json_batches = [] + @json_payloads = statuses.map { |s| [s.id, Oj.dump(event: :delete, payload: s.id)] }.to_h + @activity_json = {} # Ensure that rendered XML reflects destroyed state Status.where(id: statuses.map(&:id)).in_batches.destroy_all @@ -27,7 +29,11 @@ class BatchedRemoveStatusService < BaseService account = account_statuses.first.account unpush_from_home_timelines(account_statuses) - batch_stream_entries(account_statuses) if account.local? + + if account.local? + batch_stream_entries(account_statuses) + batch_activity_json(account, account_statuses) + end end # Cannot be batched @@ -38,6 +44,7 @@ class BatchedRemoveStatusService < BaseService Pubsubhubbub::DistributionWorker.push_bulk(@stream_entry_batches) { |batch| batch } NotificationWorker.push_bulk(@salmon_batches) { |batch| batch } + ActivityPub::DeliveryWorker.push_bulk(@activity_json_batches) { |batch| batch } end private @@ -50,6 +57,22 @@ class BatchedRemoveStatusService < BaseService end end + def batch_activity_json(account, statuses) + account.followers.inboxes.each do |inbox_url| + statuses.each do |status| + @activity_json_batches << [build_json(status), account.id, inbox_url] + end + end + + statuses.each do |status| + other_recipients = (status.mentions + status.reblogs).map(&:account).reject(&:local?).select(&:activitypub?).uniq(&:id) + + other_recipients.each do |target_account| + @activity_json_batches << [build_json(status), account.id, target_account.inbox_url] + end + end + end + def unpush_from_home_timelines(statuses) account = statuses.first.account recipients = account.followers.local.pluck(:id) @@ -79,7 +102,7 @@ class BatchedRemoveStatusService < BaseService return if @mentions[status.id].empty? payload = stream_entry_to_xml(status.stream_entry.reload) - recipients = @mentions[status.id].map(&:account).reject(&:local?).uniq(&:domain).map(&:id) + recipients = @mentions[status.id].map(&:account).reject(&:local?).select(&:ostatus?).uniq(&:domain).map(&:id) recipients.each do |recipient_id| @salmon_batches << [payload, status.account_id, recipient_id] @@ -111,4 +134,14 @@ class BatchedRemoveStatusService < BaseService def redis Redis.current end + + def build_json(status) + return @activity_json[status.id] if @activity_json.key?(status.id) + + @activity_json[status.id] = ActiveModelSerializers::SerializableResource.new( + status, + serializer: ActivityPub::DeleteSerializer, + adapter: ActivityPub::Adapter + ).to_json + end end diff --git a/app/services/block_service.rb b/app/services/block_service.rb index 5d7bf6a3b..f2253226b 100644 --- a/app/services/block_service.rb +++ b/app/services/block_service.rb @@ -12,11 +12,28 @@ class BlockService < BaseService block = account.block!(target_account) BlockWorker.perform_async(account.id, target_account.id) - NotificationWorker.perform_async(build_xml(block), account.id, target_account.id) unless target_account.local? + create_notification(block) unless target_account.local? + block end private + def create_notification(block) + if block.target_account.ostatus? + NotificationWorker.perform_async(build_xml(block), block.account_id, block.target_account_id) + elsif block.target_account.activitypub? + ActivityPub::DeliveryWorker.perform_async(build_json(block), block.account_id, block.target_account.inbox_url) + end + end + + def build_json(block) + ActiveModelSerializers::SerializableResource.new( + block, + serializer: ActivityPub::BlockSerializer, + adapter: ActivityPub::Adapter + ).to_json + end + def build_xml(block) OStatus::AtomSerializer.render(OStatus::AtomSerializer.new.block_salmon(block)) end diff --git a/app/services/favourite_service.rb b/app/services/favourite_service.rb index 291f9e56e..4aa935170 100644 --- a/app/services/favourite_service.rb +++ b/app/services/favourite_service.rb @@ -15,18 +15,32 @@ class FavouriteService < BaseService return favourite unless favourite.nil? favourite = Favourite.create!(account: account, status: status) - - if status.local? - NotifyService.new.call(favourite.status.account, favourite) - else - NotificationWorker.perform_async(build_xml(favourite), account.id, status.account_id) - end - + create_notification(favourite) favourite end private + def create_notification(favourite) + status = favourite.status + + if status.account.local? + NotifyService.new.call(status.account, favourite) + elsif status.account.ostatus? + NotificationWorker.perform_async(build_xml(favourite), favourite.account_id, status.account_id) + elsif status.account.activitypub? + ActivityPub::DeliveryWorker.perform_async(build_json(favourite), favourite.account_id, status.account.inbox_url) + end + end + + def build_json(favourite) + ActiveModelSerializers::SerializableResource.new( + favourite, + serializer: ActivityPub::LikeSerializer, + adapter: ActivityPub::Adapter + ).to_json + end + def build_xml(favourite) OStatus::AtomSerializer.render(OStatus::AtomSerializer.new.favourite_salmon(favourite)) end diff --git a/app/services/follow_service.rb b/app/services/follow_service.rb index 3155feaa4..2be625cd8 100644 --- a/app/services/follow_service.rb +++ b/app/services/follow_service.rb @@ -14,7 +14,7 @@ class FollowService < BaseService return if source_account.following?(target_account) - if target_account.locked? + if target_account.locked? || target_account.activitypub? request_follow(source_account, target_account) else direct_follow(source_account, target_account) @@ -28,9 +28,11 @@ class FollowService < BaseService if target_account.local? NotifyService.new.call(target_account, follow_request) - else + elsif target_account.ostatus? NotificationWorker.perform_async(build_follow_request_xml(follow_request), source_account.id, target_account.id) AfterRemoteFollowRequestWorker.perform_async(follow_request.id) + elsif target_account.activitypub? + ActivityPub::DeliveryWorker.perform_async(build_json(follow_request), source_account.id, target_account.inbox_url) end follow_request @@ -63,4 +65,12 @@ class FollowService < BaseService def build_follow_xml(follow) OStatus::AtomSerializer.render(OStatus::AtomSerializer.new.follow_salmon(follow)) end + + def build_json(follow_request) + ActiveModelSerializers::SerializableResource.new( + follow_request, + serializer: ActivityPub::FollowSerializer, + adapter: ActivityPub::Adapter + ).to_json + end end diff --git a/app/services/post_status_service.rb b/app/services/post_status_service.rb index 951a38e19..5ff93f21e 100644 --- a/app/services/post_status_service.rb +++ b/app/services/post_status_service.rb @@ -39,6 +39,7 @@ class PostStatusService < BaseService LinkCrawlWorker.perform_async(status.id) unless status.spoiler_text? DistributionWorker.perform_async(status.id) Pubsubhubbub::DistributionWorker.perform_async(status.stream_entry.id) + ActivityPub::DistributionWorker.perform_async(status.id) if options[:idempotency].present? redis.setex("idempotency:status:#{account.id}:#{options[:idempotency]}", 3_600, status.id) diff --git a/app/services/process_mentions_service.rb b/app/services/process_mentions_service.rb index 438033d22..407fa8c18 100644 --- a/app/services/process_mentions_service.rb +++ b/app/services/process_mentions_service.rb @@ -28,18 +28,32 @@ class ProcessMentionsService < BaseService end status.mentions.includes(:account).each do |mention| - mentioned_account = mention.account - - if mentioned_account.local? - NotifyService.new.call(mentioned_account, mention) - else - NotificationWorker.perform_async(stream_entry_to_xml(status.stream_entry), status.account_id, mentioned_account.id) - end + create_notification(status, mention) end end private + def create_notification(status, mention) + mentioned_account = mention.account + + if mentioned_account.local? + NotifyService.new.call(mentioned_account, mention) + elsif mentioned_account.ostatus? + NotificationWorker.perform_async(stream_entry_to_xml(status.stream_entry), status.account_id, mentioned_account.id) + elsif mentioned_account.activitypub? + ActivityPub::DeliveryWorker.perform_async(build_json(mention.status), mention.status.account_id, mentioned_account.inbox_url) + end + end + + def build_json(status) + ActiveModelSerializers::SerializableResource.new( + status, + serializer: ActivityPub::ActivitySerializer, + adapter: ActivityPub::Adapter + ).to_json + end + def follow_remote_account_service @follow_remote_account_service ||= ResolveRemoteAccountService.new end diff --git a/app/services/reblog_service.rb b/app/services/reblog_service.rb index ba24b1f9d..7f886af7c 100644 --- a/app/services/reblog_service.rb +++ b/app/services/reblog_service.rb @@ -21,13 +21,31 @@ class ReblogService < BaseService DistributionWorker.perform_async(reblog.id) Pubsubhubbub::DistributionWorker.perform_async(reblog.stream_entry.id) + ActivityPub::DistributionWorker.perform_async(reblog.id) - if reblogged_status.local? - NotifyService.new.call(reblog.reblog.account, reblog) - else - NotificationWorker.perform_async(stream_entry_to_xml(reblog.stream_entry), account.id, reblog.reblog.account_id) + create_notification(reblog) + reblog + end + + private + + def create_notification(reblog) + reblogged_status = reblog.reblog + + if reblogged_status.account.local? + NotifyService.new.call(reblogged_status.account, reblog) + elsif reblogged_status.account.ostatus? + NotificationWorker.perform_async(stream_entry_to_xml(reblog.stream_entry), reblog.account_id, reblogged_status.account_id) + elsif reblogged_status.account.activitypub? + ActivityPub::DeliveryWorker.perform_async(build_json(reblog), reblog.account_id, reblogged_status.account.inbox_url) end + end - reblog + def build_json(reblog) + ActiveModelSerializers::SerializableResource.new( + reblog, + serializer: ActivityPub::ActivitySerializer, + adapter: ActivityPub::Adapter + ).to_json end end diff --git a/app/services/reject_follow_service.rb b/app/services/reject_follow_service.rb index fd7e66c23..a91266aa4 100644 --- a/app/services/reject_follow_service.rb +++ b/app/services/reject_follow_service.rb @@ -4,11 +4,28 @@ class RejectFollowService < BaseService def call(source_account, target_account) follow_request = FollowRequest.find_by!(account: source_account, target_account: target_account) follow_request.reject! - NotificationWorker.perform_async(build_xml(follow_request), target_account.id, source_account.id) unless source_account.local? + create_notification(follow_request) unless source_account.local? + follow_request end private + def create_notification(follow_request) + if follow_request.account.ostatus? + NotificationWorker.perform_async(build_xml(follow_request), follow_request.target_account_id, follow_request.account_id) + elsif follow_request.account.activitypub? + ActivityPub::DeliveryWorker.perform_async(build_json(follow_request), follow_request.target_account_id, follow_request.account.inbox_url) + end + end + + def build_json(follow_request) + ActiveModelSerializers::SerializableResource.new( + follow_request, + serializer: ActivityPub::RejectFollowSerializer, + adapter: ActivityPub::Adapter + ).to_json + end + def build_xml(follow_request) OStatus::AtomSerializer.render(OStatus::AtomSerializer.new.reject_follow_request_salmon(follow_request)) end diff --git a/app/services/remove_status_service.rb b/app/services/remove_status_service.rb index a5281f586..fcccbaa24 100644 --- a/app/services/remove_status_service.rb +++ b/app/services/remove_status_service.rb @@ -22,8 +22,10 @@ class RemoveStatusService < BaseService return unless @account.local? - remove_from_mentioned(@stream_entry.reload) - Pubsubhubbub::DistributionWorker.perform_async(@stream_entry.id) + @stream_entry = @stream_entry.reload + + remove_from_remote_followers + remove_from_remote_affected end private @@ -38,15 +40,48 @@ class RemoveStatusService < BaseService end end - def remove_from_mentioned(stream_entry) - salmon_xml = stream_entry_to_xml(stream_entry) - target_accounts = @mentions.map(&:account).reject(&:local?).uniq(&:domain) + def remove_from_remote_affected + # People who got mentioned in the status, or who + # reblogged it from someone else might not follow + # the author and wouldn't normally receive the + # delete notification - so here, we explicitly + # send it to them + + target_accounts = (@mentions.map(&:account).reject(&:local?) + @reblogs.map(&:account).reject(&:local?)).uniq(&:id) + + # Ostatus + NotificationWorker.push_bulk(target_accounts.select(&:ostatus?).uniq(&:domain)) do |target_account| + [salmon_xml, @account.id, target_account.id] + end + + # ActivityPub + ActivityPub::DeliveryWorker.push_bulk(target_accounts.select(&:activitypub?).uniq(&:inbox_url)) do |inbox_url| + [activity_json, @account.id, inbox_url] + end + end + + def remove_from_remote_followers + # OStatus + Pubsubhubbub::DistributionWorker.perform_async(@stream_entry.id) - NotificationWorker.push_bulk(target_accounts) do |target_account| - [salmon_xml, stream_entry.account_id, target_account.id] + # ActivityPub + ActivityPub::DeliveryWorker.push_bulk(@account.followers.inboxes) do |inbox_url| + [activity_json, @account.id, inbox_url] end end + def salmon_xml + @salmon_xml ||= stream_entry_to_xml(@stream_entry) + end + + def activity_json + @activity_json ||= ActiveModelSerializers::SerializableResource.new( + @status, + serializer: ActivityPub::DeleteSerializer, + adapter: ActivityPub::Adapter + ).to_json + end + def remove_reblogs # We delete reblogs of the status before the original status, # because once original status is gone, reblogs will disappear diff --git a/app/services/unblock_service.rb b/app/services/unblock_service.rb index ff15c7275..72fc5ab15 100644 --- a/app/services/unblock_service.rb +++ b/app/services/unblock_service.rb @@ -5,11 +5,28 @@ class UnblockService < BaseService return unless account.blocking?(target_account) unblock = account.unblock!(target_account) - NotificationWorker.perform_async(build_xml(unblock), account.id, target_account.id) unless target_account.local? + create_notification(unblock) unless target_account.local? + unblock end private + def create_notification(unblock) + if unblock.target_account.ostatus? + NotificationWorker.perform_async(build_xml(unblock), unblock.account_id, unblock.target_account_id) + elsif unblock.target_account.activitypub? + ActivityPub::DeliveryWorker.perform_async(build_json(unblock), unblock.account_id, unblock.target_account.inbox_url) + end + end + + def build_json(unblock) + ActiveModelSerializers::SerializableResource.new( + unblock, + serializer: ActivityPub::UndoBlockSerializer, + adapter: ActivityPub::Adapter + ).to_json + end + def build_xml(block) OStatus::AtomSerializer.render(OStatus::AtomSerializer.new.unblock_salmon(block)) end diff --git a/app/services/unfavourite_service.rb b/app/services/unfavourite_service.rb index 564aaee46..e53798e66 100644 --- a/app/services/unfavourite_service.rb +++ b/app/services/unfavourite_service.rb @@ -4,14 +4,30 @@ class UnfavouriteService < BaseService def call(account, status) favourite = Favourite.find_by!(account: account, status: status) favourite.destroy! - - NotificationWorker.perform_async(build_xml(favourite), account.id, status.account_id) unless status.local? - + create_notification(favourite) unless status.local? favourite end private + def create_notification(favourite) + status = favourite.status + + if status.account.ostatus? + NotificationWorker.perform_async(build_xml(favourite), favourite.account_id, status.account_id) + elsif status.account.activitypub? + ActivityPub::DeliveryWorker.perform_async(build_json(favourite), favourite.account_id, status.account.inbox_url) + end + end + + def build_json(favourite) + ActiveModelSerializers::SerializableResource.new( + favourite, + serializer: ActivityPub::UndoLikeSerializer, + adapter: ActivityPub::Adapter + ).to_json + end + def build_xml(favourite) OStatus::AtomSerializer.render(OStatus::AtomSerializer.new.unfavourite_salmon(favourite)) end diff --git a/app/services/unfollow_service.rb b/app/services/unfollow_service.rb index 388909586..10af75146 100644 --- a/app/services/unfollow_service.rb +++ b/app/services/unfollow_service.rb @@ -7,12 +7,29 @@ class UnfollowService < BaseService def call(source_account, target_account) follow = source_account.unfollow!(target_account) return unless follow - NotificationWorker.perform_async(build_xml(follow), source_account.id, target_account.id) unless target_account.local? + create_notification(follow) unless target_account.local? UnmergeWorker.perform_async(target_account.id, source_account.id) + follow end private + def create_notification(follow) + if follow.target_account.ostatus? + NotificationWorker.perform_async(build_xml(follow), follow.account_id, follow.target_account_id) + elsif follow.target_account.activitypub? + ActivityPub::DeliveryWorker.perform_async(build_json(follow), follow.account_id, follow.target_account.inbox_url) + end + end + + def build_json(follow) + ActiveModelSerializers::SerializableResource.new( + follow, + serializer: ActivityPub::UndoFollowSerializer, + adapter: ActivityPub::Adapter + ).to_json + end + def build_xml(follow) OStatus::AtomSerializer.render(OStatus::AtomSerializer.new.unfollow_salmon(follow)) end |