about summary refs log tree commit diff
path: root/app
diff options
context:
space:
mode:
authorEugen Rochko <eugen@zeonfederated.com>2017-08-13 00:44:41 +0200
committerGitHub <noreply@github.com>2017-08-13 00:44:41 +0200
commitb7370ac8baa643d93ea727699b3b11f9d3a55bea (patch)
tree869a8c2d44f78d96255ae0bf20a84c150ca23702 /app
parentccdd5a9576819cdc95946d98fea0e3c8bbd1d626 (diff)
ActivityPub delivery (#4566)
* Deliver ActivityPub Like

* Deliver ActivityPub Undo-Like

* Deliver ActivityPub Create/Announce activities

* Deliver ActivityPub creates from mentions

* Deliver ActivityPub Block/Undo-Block

* Deliver ActivityPub Accept/Reject-Follow

* Deliver ActivityPub Undo-Follow

* Deliver ActivityPub Follow

* Deliver ActivityPub Delete activities

Incidentally fix #889

* Adjust BatchedRemoveStatusService for ActivityPub

* Add tests for ActivityPub workers

* Add tests for FollowService

* Add tests for FavouriteService, UnfollowService and PostStatusService

* Add tests for ReblogService, BlockService, UnblockService, ProcessMentionsService

* Add tests for AuthorizeFollowService, RejectFollowService, RemoveStatusService

* Add tests for BatchedRemoveStatusService

* Deliver updates to a local account to ActivityPub followers

* Minor adjustments
Diffstat (limited to 'app')
-rw-r--r--app/controllers/api/v1/accounts/credentials_controller.rb3
-rw-r--r--app/controllers/settings/profiles_controller.rb1
-rw-r--r--app/lib/activitypub/activity.rb2
-rw-r--r--app/models/account.rb4
-rw-r--r--app/services/authorize_follow_service.rb19
-rw-r--r--app/services/batched_remove_status_service.rb43
-rw-r--r--app/services/block_service.rb19
-rw-r--r--app/services/favourite_service.rb28
-rw-r--r--app/services/follow_service.rb14
-rw-r--r--app/services/post_status_service.rb1
-rw-r--r--app/services/process_mentions_service.rb28
-rw-r--r--app/services/reblog_service.rb28
-rw-r--r--app/services/reject_follow_service.rb19
-rw-r--r--app/services/remove_status_service.rb49
-rw-r--r--app/services/unblock_service.rb19
-rw-r--r--app/services/unfavourite_service.rb22
-rw-r--r--app/services/unfollow_service.rb19
-rw-r--r--app/workers/activitypub/delivery_worker.rb37
-rw-r--r--app/workers/activitypub/distribution_worker.rb38
-rw-r--r--app/workers/activitypub/processing_worker.rb2
-rw-r--r--app/workers/activitypub/update_distribution_worker.rb31
21 files changed, 382 insertions, 44 deletions
diff --git a/app/controllers/api/v1/accounts/credentials_controller.rb b/app/controllers/api/v1/accounts/credentials_controller.rb
index 073808532..90a580c33 100644
--- a/app/controllers/api/v1/accounts/credentials_controller.rb
+++ b/app/controllers/api/v1/accounts/credentials_controller.rb
@@ -10,8 +10,9 @@ class Api::V1::Accounts::CredentialsController < Api::BaseController
   end
 
   def update
-    current_account.update!(account_params)
     @account = current_account
+    @account.update!(account_params)
+    ActivityPub::UpdateDistributionWorker.perform_async(@account.id)
     render json: @account, serializer: REST::CredentialAccountSerializer
   end
 
diff --git a/app/controllers/settings/profiles_controller.rb b/app/controllers/settings/profiles_controller.rb
index 0367e3593..c751c64ae 100644
--- a/app/controllers/settings/profiles_controller.rb
+++ b/app/controllers/settings/profiles_controller.rb
@@ -15,6 +15,7 @@ class Settings::ProfilesController < ApplicationController
 
   def update
     if @account.update(account_params)
+      ActivityPub::UpdateDistributionWorker.perform_async(@account.id)
       redirect_to settings_profile_path, notice: I18n.t('generic.changes_saved_msg')
     else
       render :show
diff --git a/app/lib/activitypub/activity.rb b/app/lib/activitypub/activity.rb
index 5debe023a..f8de8060c 100644
--- a/app/lib/activitypub/activity.rb
+++ b/app/lib/activitypub/activity.rb
@@ -93,7 +93,7 @@ class ActivityPub::Activity
   end
 
   def distribute_to_followers(status)
-    DistributionWorker.perform_async(status.id)
+    ::DistributionWorker.perform_async(status.id)
   end
 
   def delete_arrived_first?(uri)
diff --git a/app/models/account.rb b/app/models/account.rb
index 163bd1c0e..a7264353e 100644
--- a/app/models/account.rb
+++ b/app/models/account.rb
@@ -171,6 +171,10 @@ class Account < ApplicationRecord
       reorder(nil).pluck('distinct accounts.domain')
     end
 
+    def inboxes
+      reorder(nil).where(protocol: :activitypub).pluck("distinct coalesce(nullif(accounts.shared_inbox_url, ''), accounts.inbox_url)")
+    end
+
     def triadic_closures(account, limit: 5, offset: 0)
       sql = <<-SQL.squish
         WITH first_degree AS (
diff --git a/app/services/authorize_follow_service.rb b/app/services/authorize_follow_service.rb
index 41815a393..db35b6030 100644
--- a/app/services/authorize_follow_service.rb
+++ b/app/services/authorize_follow_service.rb
@@ -4,11 +4,28 @@ class AuthorizeFollowService < BaseService
   def call(source_account, target_account)
     follow_request = FollowRequest.find_by!(account: source_account, target_account: target_account)
     follow_request.authorize!
-    NotificationWorker.perform_async(build_xml(follow_request), target_account.id, source_account.id) unless source_account.local?
+    create_notification(follow_request) unless source_account.local?
+    follow_request
   end
 
   private
 
+  def create_notification(follow_request)
+    if follow_request.account.ostatus?
+      NotificationWorker.perform_async(build_xml(follow_request), follow_request.target_account_id, follow_request.account_id)
+    elsif follow_request.account.activitypub?
+      ActivityPub::DeliveryWorker.perform_async(build_json(follow_request), follow_request.target_account_id, follow_request.account.inbox_url)
+    end
+  end
+
+  def build_json(follow_request)
+    ActiveModelSerializers::SerializableResource.new(
+      follow_request,
+      serializer: ActivityPub::AcceptFollowSerializer,
+      adapter: ActivityPub::Adapter
+    ).to_json
+  end
+
   def build_xml(follow_request)
     OStatus::AtomSerializer.render(OStatus::AtomSerializer.new.authorize_follow_request_salmon(follow_request))
   end
diff --git a/app/services/batched_remove_status_service.rb b/app/services/batched_remove_status_service.rb
index ab810c628..e6c8c9208 100644
--- a/app/services/batched_remove_status_service.rb
+++ b/app/services/batched_remove_status_service.rb
@@ -15,9 +15,11 @@ class BatchedRemoveStatusService < BaseService
     @mentions = statuses.map { |s| [s.id, s.mentions.includes(:account).to_a] }.to_h
     @tags     = statuses.map { |s| [s.id, s.tags.pluck(:name)] }.to_h
 
-    @stream_entry_batches = []
-    @salmon_batches       = []
-    @json_payloads        = statuses.map { |s| [s.id, Oj.dump(event: :delete, payload: s.id)] }.to_h
+    @stream_entry_batches  = []
+    @salmon_batches        = []
+    @activity_json_batches = []
+    @json_payloads         = statuses.map { |s| [s.id, Oj.dump(event: :delete, payload: s.id)] }.to_h
+    @activity_json         = {}
 
     # Ensure that rendered XML reflects destroyed state
     Status.where(id: statuses.map(&:id)).in_batches.destroy_all
@@ -27,7 +29,11 @@ class BatchedRemoveStatusService < BaseService
       account = account_statuses.first.account
 
       unpush_from_home_timelines(account_statuses)
-      batch_stream_entries(account_statuses) if account.local?
+
+      if account.local?
+        batch_stream_entries(account_statuses)
+        batch_activity_json(account, account_statuses)
+      end
     end
 
     # Cannot be batched
@@ -38,6 +44,7 @@ class BatchedRemoveStatusService < BaseService
 
     Pubsubhubbub::DistributionWorker.push_bulk(@stream_entry_batches) { |batch| batch }
     NotificationWorker.push_bulk(@salmon_batches) { |batch| batch }
+    ActivityPub::DeliveryWorker.push_bulk(@activity_json_batches) { |batch| batch }
   end
 
   private
@@ -50,6 +57,22 @@ class BatchedRemoveStatusService < BaseService
     end
   end
 
+  def batch_activity_json(account, statuses)
+    account.followers.inboxes.each do |inbox_url|
+      statuses.each do |status|
+        @activity_json_batches << [build_json(status), account.id, inbox_url]
+      end
+    end
+
+    statuses.each do |status|
+      other_recipients = (status.mentions + status.reblogs).map(&:account).reject(&:local?).select(&:activitypub?).uniq(&:id)
+
+      other_recipients.each do |target_account|
+        @activity_json_batches << [build_json(status), account.id, target_account.inbox_url]
+      end
+    end
+  end
+
   def unpush_from_home_timelines(statuses)
     account    = statuses.first.account
     recipients = account.followers.local.pluck(:id)
@@ -79,7 +102,7 @@ class BatchedRemoveStatusService < BaseService
     return if @mentions[status.id].empty?
 
     payload    = stream_entry_to_xml(status.stream_entry.reload)
-    recipients = @mentions[status.id].map(&:account).reject(&:local?).uniq(&:domain).map(&:id)
+    recipients = @mentions[status.id].map(&:account).reject(&:local?).select(&:ostatus?).uniq(&:domain).map(&:id)
 
     recipients.each do |recipient_id|
       @salmon_batches << [payload, status.account_id, recipient_id]
@@ -111,4 +134,14 @@ class BatchedRemoveStatusService < BaseService
   def redis
     Redis.current
   end
+
+  def build_json(status)
+    return @activity_json[status.id] if @activity_json.key?(status.id)
+
+    @activity_json[status.id] = ActiveModelSerializers::SerializableResource.new(
+      status,
+      serializer: ActivityPub::DeleteSerializer,
+      adapter: ActivityPub::Adapter
+    ).to_json
+  end
 end
diff --git a/app/services/block_service.rb b/app/services/block_service.rb
index 5d7bf6a3b..f2253226b 100644
--- a/app/services/block_service.rb
+++ b/app/services/block_service.rb
@@ -12,11 +12,28 @@ class BlockService < BaseService
     block = account.block!(target_account)
 
     BlockWorker.perform_async(account.id, target_account.id)
-    NotificationWorker.perform_async(build_xml(block), account.id, target_account.id) unless target_account.local?
+    create_notification(block) unless target_account.local?
+    block
   end
 
   private
 
+  def create_notification(block)
+    if block.target_account.ostatus?
+      NotificationWorker.perform_async(build_xml(block), block.account_id, block.target_account_id)
+    elsif block.target_account.activitypub?
+      ActivityPub::DeliveryWorker.perform_async(build_json(block), block.account_id, block.target_account.inbox_url)
+    end
+  end
+
+  def build_json(block)
+    ActiveModelSerializers::SerializableResource.new(
+      block,
+      serializer: ActivityPub::BlockSerializer,
+      adapter: ActivityPub::Adapter
+    ).to_json
+  end
+
   def build_xml(block)
     OStatus::AtomSerializer.render(OStatus::AtomSerializer.new.block_salmon(block))
   end
diff --git a/app/services/favourite_service.rb b/app/services/favourite_service.rb
index 291f9e56e..4aa935170 100644
--- a/app/services/favourite_service.rb
+++ b/app/services/favourite_service.rb
@@ -15,18 +15,32 @@ class FavouriteService < BaseService
     return favourite unless favourite.nil?
 
     favourite = Favourite.create!(account: account, status: status)
-
-    if status.local?
-      NotifyService.new.call(favourite.status.account, favourite)
-    else
-      NotificationWorker.perform_async(build_xml(favourite), account.id, status.account_id)
-    end
-
+    create_notification(favourite)
     favourite
   end
 
   private
 
+  def create_notification(favourite)
+    status = favourite.status
+
+    if status.account.local?
+      NotifyService.new.call(status.account, favourite)
+    elsif status.account.ostatus?
+      NotificationWorker.perform_async(build_xml(favourite), favourite.account_id, status.account_id)
+    elsif status.account.activitypub?
+      ActivityPub::DeliveryWorker.perform_async(build_json(favourite), favourite.account_id, status.account.inbox_url)
+    end
+  end
+
+  def build_json(favourite)
+    ActiveModelSerializers::SerializableResource.new(
+      favourite,
+      serializer: ActivityPub::LikeSerializer,
+      adapter: ActivityPub::Adapter
+    ).to_json
+  end
+
   def build_xml(favourite)
     OStatus::AtomSerializer.render(OStatus::AtomSerializer.new.favourite_salmon(favourite))
   end
diff --git a/app/services/follow_service.rb b/app/services/follow_service.rb
index 3155feaa4..2be625cd8 100644
--- a/app/services/follow_service.rb
+++ b/app/services/follow_service.rb
@@ -14,7 +14,7 @@ class FollowService < BaseService
 
     return if source_account.following?(target_account)
 
-    if target_account.locked?
+    if target_account.locked? || target_account.activitypub?
       request_follow(source_account, target_account)
     else
       direct_follow(source_account, target_account)
@@ -28,9 +28,11 @@ class FollowService < BaseService
 
     if target_account.local?
       NotifyService.new.call(target_account, follow_request)
-    else
+    elsif target_account.ostatus?
       NotificationWorker.perform_async(build_follow_request_xml(follow_request), source_account.id, target_account.id)
       AfterRemoteFollowRequestWorker.perform_async(follow_request.id)
+    elsif target_account.activitypub?
+      ActivityPub::DeliveryWorker.perform_async(build_json(follow_request), source_account.id, target_account.inbox_url)
     end
 
     follow_request
@@ -63,4 +65,12 @@ class FollowService < BaseService
   def build_follow_xml(follow)
     OStatus::AtomSerializer.render(OStatus::AtomSerializer.new.follow_salmon(follow))
   end
+
+  def build_json(follow_request)
+    ActiveModelSerializers::SerializableResource.new(
+      follow_request,
+      serializer: ActivityPub::FollowSerializer,
+      adapter: ActivityPub::Adapter
+    ).to_json
+  end
 end
diff --git a/app/services/post_status_service.rb b/app/services/post_status_service.rb
index 951a38e19..5ff93f21e 100644
--- a/app/services/post_status_service.rb
+++ b/app/services/post_status_service.rb
@@ -39,6 +39,7 @@ class PostStatusService < BaseService
     LinkCrawlWorker.perform_async(status.id) unless status.spoiler_text?
     DistributionWorker.perform_async(status.id)
     Pubsubhubbub::DistributionWorker.perform_async(status.stream_entry.id)
+    ActivityPub::DistributionWorker.perform_async(status.id)
 
     if options[:idempotency].present?
       redis.setex("idempotency:status:#{account.id}:#{options[:idempotency]}", 3_600, status.id)
diff --git a/app/services/process_mentions_service.rb b/app/services/process_mentions_service.rb
index 438033d22..407fa8c18 100644
--- a/app/services/process_mentions_service.rb
+++ b/app/services/process_mentions_service.rb
@@ -28,18 +28,32 @@ class ProcessMentionsService < BaseService
     end
 
     status.mentions.includes(:account).each do |mention|
-      mentioned_account = mention.account
-
-      if mentioned_account.local?
-        NotifyService.new.call(mentioned_account, mention)
-      else
-        NotificationWorker.perform_async(stream_entry_to_xml(status.stream_entry), status.account_id, mentioned_account.id)
-      end
+      create_notification(status, mention)
     end
   end
 
   private
 
+  def create_notification(status, mention)
+    mentioned_account = mention.account
+
+    if mentioned_account.local?
+      NotifyService.new.call(mentioned_account, mention)
+    elsif mentioned_account.ostatus?
+      NotificationWorker.perform_async(stream_entry_to_xml(status.stream_entry), status.account_id, mentioned_account.id)
+    elsif mentioned_account.activitypub?
+      ActivityPub::DeliveryWorker.perform_async(build_json(mention.status), mention.status.account_id, mentioned_account.inbox_url)
+    end
+  end
+
+  def build_json(status)
+    ActiveModelSerializers::SerializableResource.new(
+      status,
+      serializer: ActivityPub::ActivitySerializer,
+      adapter: ActivityPub::Adapter
+    ).to_json
+  end
+
   def follow_remote_account_service
     @follow_remote_account_service ||= ResolveRemoteAccountService.new
   end
diff --git a/app/services/reblog_service.rb b/app/services/reblog_service.rb
index ba24b1f9d..7f886af7c 100644
--- a/app/services/reblog_service.rb
+++ b/app/services/reblog_service.rb
@@ -21,13 +21,31 @@ class ReblogService < BaseService
 
     DistributionWorker.perform_async(reblog.id)
     Pubsubhubbub::DistributionWorker.perform_async(reblog.stream_entry.id)
+    ActivityPub::DistributionWorker.perform_async(reblog.id)
 
-    if reblogged_status.local?
-      NotifyService.new.call(reblog.reblog.account, reblog)
-    else
-      NotificationWorker.perform_async(stream_entry_to_xml(reblog.stream_entry), account.id, reblog.reblog.account_id)
+    create_notification(reblog)
+    reblog
+  end
+
+  private
+
+  def create_notification(reblog)
+    reblogged_status = reblog.reblog
+
+    if reblogged_status.account.local?
+      NotifyService.new.call(reblogged_status.account, reblog)
+    elsif reblogged_status.account.ostatus?
+      NotificationWorker.perform_async(stream_entry_to_xml(reblog.stream_entry), reblog.account_id, reblogged_status.account_id)
+    elsif reblogged_status.account.activitypub?
+      ActivityPub::DeliveryWorker.perform_async(build_json(reblog), reblog.account_id, reblogged_status.account.inbox_url)
     end
+  end
 
-    reblog
+  def build_json(reblog)
+    ActiveModelSerializers::SerializableResource.new(
+      reblog,
+      serializer: ActivityPub::ActivitySerializer,
+      adapter: ActivityPub::Adapter
+    ).to_json
   end
 end
diff --git a/app/services/reject_follow_service.rb b/app/services/reject_follow_service.rb
index fd7e66c23..a91266aa4 100644
--- a/app/services/reject_follow_service.rb
+++ b/app/services/reject_follow_service.rb
@@ -4,11 +4,28 @@ class RejectFollowService < BaseService
   def call(source_account, target_account)
     follow_request = FollowRequest.find_by!(account: source_account, target_account: target_account)
     follow_request.reject!
-    NotificationWorker.perform_async(build_xml(follow_request), target_account.id, source_account.id) unless source_account.local?
+    create_notification(follow_request) unless source_account.local?
+    follow_request
   end
 
   private
 
+  def create_notification(follow_request)
+    if follow_request.account.ostatus?
+      NotificationWorker.perform_async(build_xml(follow_request), follow_request.target_account_id, follow_request.account_id)
+    elsif follow_request.account.activitypub?
+      ActivityPub::DeliveryWorker.perform_async(build_json(follow_request), follow_request.target_account_id, follow_request.account.inbox_url)
+    end
+  end
+
+  def build_json(follow_request)
+    ActiveModelSerializers::SerializableResource.new(
+      follow_request,
+      serializer: ActivityPub::RejectFollowSerializer,
+      adapter: ActivityPub::Adapter
+    ).to_json
+  end
+
   def build_xml(follow_request)
     OStatus::AtomSerializer.render(OStatus::AtomSerializer.new.reject_follow_request_salmon(follow_request))
   end
diff --git a/app/services/remove_status_service.rb b/app/services/remove_status_service.rb
index a5281f586..fcccbaa24 100644
--- a/app/services/remove_status_service.rb
+++ b/app/services/remove_status_service.rb
@@ -22,8 +22,10 @@ class RemoveStatusService < BaseService
 
     return unless @account.local?
 
-    remove_from_mentioned(@stream_entry.reload)
-    Pubsubhubbub::DistributionWorker.perform_async(@stream_entry.id)
+    @stream_entry = @stream_entry.reload
+
+    remove_from_remote_followers
+    remove_from_remote_affected
   end
 
   private
@@ -38,15 +40,48 @@ class RemoveStatusService < BaseService
     end
   end
 
-  def remove_from_mentioned(stream_entry)
-    salmon_xml       = stream_entry_to_xml(stream_entry)
-    target_accounts  = @mentions.map(&:account).reject(&:local?).uniq(&:domain)
+  def remove_from_remote_affected
+    # People who got mentioned in the status, or who
+    # reblogged it from someone else might not follow
+    # the author and wouldn't normally receive the
+    # delete notification - so here, we explicitly
+    # send it to them
+
+    target_accounts = (@mentions.map(&:account).reject(&:local?) + @reblogs.map(&:account).reject(&:local?)).uniq(&:id)
+
+    # Ostatus
+    NotificationWorker.push_bulk(target_accounts.select(&:ostatus?).uniq(&:domain)) do |target_account|
+      [salmon_xml, @account.id, target_account.id]
+    end
+
+    # ActivityPub
+    ActivityPub::DeliveryWorker.push_bulk(target_accounts.select(&:activitypub?).uniq(&:inbox_url)) do |inbox_url|
+      [activity_json, @account.id, inbox_url]
+    end
+  end
+
+  def remove_from_remote_followers
+    # OStatus
+    Pubsubhubbub::DistributionWorker.perform_async(@stream_entry.id)
 
-    NotificationWorker.push_bulk(target_accounts) do |target_account|
-      [salmon_xml, stream_entry.account_id, target_account.id]
+    # ActivityPub
+    ActivityPub::DeliveryWorker.push_bulk(@account.followers.inboxes) do |inbox_url|
+      [activity_json, @account.id, inbox_url]
     end
   end
 
+  def salmon_xml
+    @salmon_xml ||= stream_entry_to_xml(@stream_entry)
+  end
+
+  def activity_json
+    @activity_json ||= ActiveModelSerializers::SerializableResource.new(
+      @status,
+      serializer: ActivityPub::DeleteSerializer,
+      adapter: ActivityPub::Adapter
+    ).to_json
+  end
+
   def remove_reblogs
     # We delete reblogs of the status before the original status,
     # because once original status is gone, reblogs will disappear
diff --git a/app/services/unblock_service.rb b/app/services/unblock_service.rb
index ff15c7275..72fc5ab15 100644
--- a/app/services/unblock_service.rb
+++ b/app/services/unblock_service.rb
@@ -5,11 +5,28 @@ class UnblockService < BaseService
     return unless account.blocking?(target_account)
 
     unblock = account.unblock!(target_account)
-    NotificationWorker.perform_async(build_xml(unblock), account.id, target_account.id) unless target_account.local?
+    create_notification(unblock) unless target_account.local?
+    unblock
   end
 
   private
 
+  def create_notification(unblock)
+    if unblock.target_account.ostatus?
+      NotificationWorker.perform_async(build_xml(unblock), unblock.account_id, unblock.target_account_id)
+    elsif unblock.target_account.activitypub?
+      ActivityPub::DeliveryWorker.perform_async(build_json(unblock), unblock.account_id, unblock.target_account.inbox_url)
+    end
+  end
+
+  def build_json(unblock)
+    ActiveModelSerializers::SerializableResource.new(
+      unblock,
+      serializer: ActivityPub::UndoBlockSerializer,
+      adapter: ActivityPub::Adapter
+    ).to_json
+  end
+
   def build_xml(block)
     OStatus::AtomSerializer.render(OStatus::AtomSerializer.new.unblock_salmon(block))
   end
diff --git a/app/services/unfavourite_service.rb b/app/services/unfavourite_service.rb
index 564aaee46..e53798e66 100644
--- a/app/services/unfavourite_service.rb
+++ b/app/services/unfavourite_service.rb
@@ -4,14 +4,30 @@ class UnfavouriteService < BaseService
   def call(account, status)
     favourite = Favourite.find_by!(account: account, status: status)
     favourite.destroy!
-
-    NotificationWorker.perform_async(build_xml(favourite), account.id, status.account_id) unless status.local?
-
+    create_notification(favourite) unless status.local?
     favourite
   end
 
   private
 
+  def create_notification(favourite)
+    status = favourite.status
+
+    if status.account.ostatus?
+      NotificationWorker.perform_async(build_xml(favourite), favourite.account_id, status.account_id)
+    elsif status.account.activitypub?
+      ActivityPub::DeliveryWorker.perform_async(build_json(favourite), favourite.account_id, status.account.inbox_url)
+    end
+  end
+
+  def build_json(favourite)
+    ActiveModelSerializers::SerializableResource.new(
+      favourite,
+      serializer: ActivityPub::UndoLikeSerializer,
+      adapter: ActivityPub::Adapter
+    ).to_json
+  end
+
   def build_xml(favourite)
     OStatus::AtomSerializer.render(OStatus::AtomSerializer.new.unfavourite_salmon(favourite))
   end
diff --git a/app/services/unfollow_service.rb b/app/services/unfollow_service.rb
index 388909586..10af75146 100644
--- a/app/services/unfollow_service.rb
+++ b/app/services/unfollow_service.rb
@@ -7,12 +7,29 @@ class UnfollowService < BaseService
   def call(source_account, target_account)
     follow = source_account.unfollow!(target_account)
     return unless follow
-    NotificationWorker.perform_async(build_xml(follow), source_account.id, target_account.id) unless target_account.local?
+    create_notification(follow) unless target_account.local?
     UnmergeWorker.perform_async(target_account.id, source_account.id)
+    follow
   end
 
   private
 
+  def create_notification(follow)
+    if follow.target_account.ostatus?
+      NotificationWorker.perform_async(build_xml(follow), follow.account_id, follow.target_account_id)
+    elsif follow.target_account.activitypub?
+      ActivityPub::DeliveryWorker.perform_async(build_json(follow), follow.account_id, follow.target_account.inbox_url)
+    end
+  end
+
+  def build_json(follow)
+    ActiveModelSerializers::SerializableResource.new(
+      follow,
+      serializer: ActivityPub::UndoFollowSerializer,
+      adapter: ActivityPub::Adapter
+    ).to_json
+  end
+
   def build_xml(follow)
     OStatus::AtomSerializer.render(OStatus::AtomSerializer.new.unfollow_salmon(follow))
   end
diff --git a/app/workers/activitypub/delivery_worker.rb b/app/workers/activitypub/delivery_worker.rb
new file mode 100644
index 000000000..cd67b6710
--- /dev/null
+++ b/app/workers/activitypub/delivery_worker.rb
@@ -0,0 +1,37 @@
+# frozen_string_literal: true
+
+class ActivityPub::DeliveryWorker
+  include Sidekiq::Worker
+
+  sidekiq_options queue: 'push', retry: 5, dead: false
+
+  HEADERS = { 'Content-Type' => 'application/activity+json' }.freeze
+
+  def perform(json, source_account_id, inbox_url)
+    @json           = json
+    @source_account = Account.find(source_account_id)
+    @inbox_url      = inbox_url
+
+    perform_request
+
+    raise Mastodon::UnexpectedResponseError, @response unless response_successful?
+  rescue => e
+    raise e.class, "Delivery failed for #{inbox_url}: #{e.message}"
+  end
+
+  private
+
+  def build_request
+    request = Request.new(:post, @inbox_url, body: @json)
+    request.on_behalf_of(@source_account, :uri)
+    request.add_headers(HEADERS)
+  end
+
+  def perform_request
+    @response = build_request.perform
+  end
+
+  def response_successful?
+    @response.code > 199 && @response.code < 300
+  end
+end
diff --git a/app/workers/activitypub/distribution_worker.rb b/app/workers/activitypub/distribution_worker.rb
new file mode 100644
index 000000000..004dd25d1
--- /dev/null
+++ b/app/workers/activitypub/distribution_worker.rb
@@ -0,0 +1,38 @@
+# frozen_string_literal: true
+
+class ActivityPub::DistributionWorker
+  include Sidekiq::Worker
+
+  sidekiq_options queue: 'push'
+
+  def perform(status_id)
+    @status  = Status.find(status_id)
+    @account = @status.account
+
+    return if skip_distribution?
+
+    ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url|
+      [payload, @account.id, inbox_url]
+    end
+  rescue ActiveRecord::RecordNotFound
+    true
+  end
+
+  private
+
+  def skip_distribution?
+    @status.direct_visibility?
+  end
+
+  def inboxes
+    @inboxes ||= @account.followers.inboxes
+  end
+
+  def payload
+    @payload ||= ActiveModelSerializers::SerializableResource.new(
+      @status,
+      serializer: ActivityPub::ActivitySerializer,
+      adapter: ActivityPub::Adapter
+    ).to_json
+  end
+end
diff --git a/app/workers/activitypub/processing_worker.rb b/app/workers/activitypub/processing_worker.rb
index 7656ab56a..bb9adf64b 100644
--- a/app/workers/activitypub/processing_worker.rb
+++ b/app/workers/activitypub/processing_worker.rb
@@ -6,6 +6,6 @@ class ActivityPub::ProcessingWorker
   sidekiq_options backtrace: true
 
   def perform(account_id, body)
-    ProcessCollectionService.new.call(body, Account.find(account_id))
+    ActivityPub::ProcessCollectionService.new.call(body, Account.find(account_id))
   end
 end
diff --git a/app/workers/activitypub/update_distribution_worker.rb b/app/workers/activitypub/update_distribution_worker.rb
new file mode 100644
index 000000000..f3377dcec
--- /dev/null
+++ b/app/workers/activitypub/update_distribution_worker.rb
@@ -0,0 +1,31 @@
+# frozen_string_literal: true
+
+class ActivityPub::UpdateDistributionWorker
+  include Sidekiq::Worker
+
+  sidekiq_options queue: 'push'
+
+  def perform(account_id)
+    @account = Account.find(account_id)
+
+    ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url|
+      [payload, @account.id, inbox_url]
+    end
+  rescue ActiveRecord::RecordNotFound
+    true
+  end
+
+  private
+
+  def inboxes
+    @inboxes ||= @account.followers.inboxes
+  end
+
+  def payload
+    @payload ||= ActiveModelSerializers::SerializableResource.new(
+      @account,
+      serializer: ActivityPub::UpdateSerializer,
+      adapter: ActivityPub::Adapter
+    ).to_json
+  end
+end