From 5b95be1c42ba69c9a3a79cfa990c80a5f2debfc6 Mon Sep 17 00:00:00 2001 From: Eugen Rochko Date: Wed, 5 Apr 2017 19:45:18 +0200 Subject: Replace calls to FeedManager#inline_render and #broadcast --- app/services/fan_out_on_write_service.rb | 10 +++++----- app/services/notify_service.rb | 2 +- app/services/remove_status_service.rb | 6 +++--- 3 files changed, 9 insertions(+), 9 deletions(-) (limited to 'app/services') diff --git a/app/services/fan_out_on_write_service.rb b/app/services/fan_out_on_write_service.rb index 106d257ba..c63fcc1fe 100644 --- a/app/services/fan_out_on_write_service.rb +++ b/app/services/fan_out_on_write_service.rb @@ -50,22 +50,22 @@ class FanOutOnWriteService < BaseService end def render_anonymous_payload(status) - @payload = FeedManager.instance.inline_render(nil, 'api/v1/statuses/show', status) + @payload = InlineRenderer.render(status, nil, 'api/v1/statuses/show') end def deliver_to_hashtags(status) Rails.logger.debug "Delivering status #{status.id} to hashtags" status.tags.pluck(:name).each do |hashtag| - FeedManager.instance.broadcast("hashtag:#{hashtag}", event: 'update', payload: @payload) - FeedManager.instance.broadcast("hashtag:#{hashtag}:local", event: 'update', payload: @payload) if status.account.local? + Redis.current.publish("hashtag:#{hashtag}", Oj.dump(event: :update, payload: @payload)) + Redis.current.publish("hashtag:#{hashtag}:local", Oj.dump(event: :update, payload: @payload)) if status.account.local? end end def deliver_to_public(status) Rails.logger.debug "Delivering status #{status.id} to public timeline" - FeedManager.instance.broadcast(:public, event: 'update', payload: @payload) - FeedManager.instance.broadcast('public:local', event: 'update', payload: @payload) if status.account.local? + Redis.current.publish('public', Oj.dump(event: 'update', payload: @payload)) + Redis.current.publish('public:local', Oj.dump(event: 'update', payload: @payload)) if status.account.local? end end diff --git a/app/services/notify_service.rb b/app/services/notify_service.rb index 24486f220..62508a049 100644 --- a/app/services/notify_service.rb +++ b/app/services/notify_service.rb @@ -50,7 +50,7 @@ class NotifyService < BaseService def create_notification @notification.save! return unless @notification.browserable? - FeedManager.instance.broadcast(@recipient.id, event: 'notification', payload: FeedManager.instance.inline_render(@recipient, 'api/v1/notifications/show', @notification)) + Redis.current.publish(@recipient.id, Oj.dump(event: :notification, payload: InlineRenderer.render(@notification, @recipient, 'api/v1/notifications/show'))) end def send_email diff --git a/app/services/remove_status_service.rb b/app/services/remove_status_service.rb index cf1f432e4..e19fdd030 100644 --- a/app/services/remove_status_service.rb +++ b/app/services/remove_status_service.rb @@ -65,17 +65,17 @@ class RemoveStatusService < BaseService redis.zremrangebyscore(FeedManager.instance.key(type, receiver.id), status.id, status.id) end - FeedManager.instance.broadcast(receiver.id, event: 'delete', payload: status.id) + Redis.current.publish(receiver.id, Oj.dump(event: :delete, payload: status.id)) end def remove_from_hashtags(status) status.tags.each do |tag| - FeedManager.instance.broadcast("hashtag:#{tag.name}", event: 'delete', payload: status.id) + Redis.current.publish("hashtag:#{tag.name}", Oj.dump(event: :delete, payload: status.id)) end end def remove_from_public(status) - FeedManager.instance.broadcast(:public, event: 'delete', payload: status.id) + Redis.current.publish('public', Oj.dump(event: :delete, payload: status.id)) end def redis -- cgit From 5442083b3c44c731679fc489568bf7f70a807a39 Mon Sep 17 00:00:00 2001 From: Eugen Rochko Date: Wed, 5 Apr 2017 21:41:50 +0200 Subject: Split SalmonWorker into smaller parts, move profile updating into another job --- app/services/follow_remote_account_service.rb | 16 ++++++---------- app/services/process_feed_service.rb | 6 +++--- app/services/process_interaction_service.rb | 10 +++------- app/workers/admin/suspension_worker.rb | 2 ++ app/workers/application_worker.rb | 2 ++ app/workers/distribution_worker.rb | 5 +---- app/workers/remote_profile_update_worker.rb | 20 ++++++++++++++++++++ app/workers/salmon_worker.rb | 2 +- spec/services/process_feed_service_spec.rb | 1 + 9 files changed, 39 insertions(+), 25 deletions(-) create mode 100644 app/workers/remote_profile_update_worker.rb (limited to 'app/services') diff --git a/app/services/follow_remote_account_service.rb b/app/services/follow_remote_account_service.rb index b39eafc70..936953429 100644 --- a/app/services/follow_remote_account_service.rb +++ b/app/services/follow_remote_account_service.rb @@ -45,13 +45,13 @@ class FollowRemoteAccountService < BaseService account.suspended = true if domain_block && domain_block.suspend? account.silenced = true if domain_block && domain_block.silence? - xml = get_feed(account.remote_url) - hubs = get_hubs(xml) + body, xml = get_feed(account.remote_url) + hubs = get_hubs(xml) account.uri = get_account_uri(xml) account.hub_url = hubs.first.attribute('href').value - get_profile(xml, account) + get_profile(body, account) account.save! account @@ -61,7 +61,7 @@ class FollowRemoteAccountService < BaseService def get_feed(url) response = http_client.get(Addressable::URI.parse(url)) - Nokogiri::XML(response) + [response.to_s, Nokogiri::XML(response)] end def get_hubs(xml) @@ -82,12 +82,8 @@ class FollowRemoteAccountService < BaseService author_uri.content end - def get_profile(xml, account) - update_remote_profile_service.call(xml.at_xpath('/xmlns:feed'), account) - end - - def update_remote_profile_service - @update_remote_profile_service ||= UpdateRemoteProfileService.new + def get_profile(body, account) + RemoteProfileUpdateWorker.perform_async(account.id, body.force_encoding('UTF-8'), false) end def http_client diff --git a/app/services/process_feed_service.rb b/app/services/process_feed_service.rb index 69911abc5..cf2f7a826 100644 --- a/app/services/process_feed_service.rb +++ b/app/services/process_feed_service.rb @@ -5,15 +5,15 @@ class ProcessFeedService < BaseService xml = Nokogiri::XML(body) xml.encoding = 'utf-8' - update_author(xml, account) + update_author(body, xml, account) process_entries(xml, account) end private - def update_author(xml, account) + def update_author(body, xml, account) return if xml.at_xpath('/xmlns:feed', xmlns: TagManager::XMLNS).nil? - UpdateRemoteProfileService.new.call(xml.at_xpath('/xmlns:feed', xmlns: TagManager::XMLNS), account, true) + RemoteProfileUpdateWorker.perform_async(account.id, body.force_encoding('UTF-8'), true) end def process_entries(xml, account) diff --git a/app/services/process_interaction_service.rb b/app/services/process_interaction_service.rb index d5f7b4b3c..805ca5a27 100644 --- a/app/services/process_interaction_service.rb +++ b/app/services/process_interaction_service.rb @@ -24,7 +24,7 @@ class ProcessInteractionService < BaseService return if account.suspended? if salmon.verify(envelope, account.keypair) - update_remote_profile_service.call(xml.at_xpath('/xmlns:entry', xmlns: TagManager::XMLNS), account, true) + RemoteProfileUpdateWorker.perform_async(account.id, body.force_encoding('UTF-8'), true) case verb(xml) when :follow @@ -114,7 +114,7 @@ class ProcessInteractionService < BaseService return if status.nil? - remove_status_service.call(status) if account.id == status.account_id + RemovalWorker.perform_async(status.id) if account.id == status.account_id end def favourite!(xml, from_account) @@ -130,7 +130,7 @@ class ProcessInteractionService < BaseService end def add_post!(body, account) - process_feed_service.call(body, account) + ProcessingWorker.perform_async(account.id, body.force_encoding('UTF-8')) end def status(xml) @@ -153,10 +153,6 @@ class ProcessInteractionService < BaseService @process_feed_service ||= ProcessFeedService.new end - def update_remote_profile_service - @update_remote_profile_service ||= UpdateRemoteProfileService.new - end - def remove_status_service @remove_status_service ||= RemoveStatusService.new end diff --git a/app/workers/admin/suspension_worker.rb b/app/workers/admin/suspension_worker.rb index 38761f3b9..7ef2b35ec 100644 --- a/app/workers/admin/suspension_worker.rb +++ b/app/workers/admin/suspension_worker.rb @@ -3,6 +3,8 @@ class Admin::SuspensionWorker include Sidekiq::Worker + sidekiq_options queue: 'pull' + def perform(account_id) SuspendAccountService.new.call(Account.find(account_id)) end diff --git a/app/workers/application_worker.rb b/app/workers/application_worker.rb index f2d7c1062..436f24763 100644 --- a/app/workers/application_worker.rb +++ b/app/workers/application_worker.rb @@ -1,3 +1,5 @@ +# frozen_string_literal: true + class ApplicationWorker def info(message) Rails.logger.info("#{self.class.name} - #{message}") diff --git a/app/workers/distribution_worker.rb b/app/workers/distribution_worker.rb index 9a2867ea6..f7953689b 100644 --- a/app/workers/distribution_worker.rb +++ b/app/workers/distribution_worker.rb @@ -4,10 +4,7 @@ class DistributionWorker < ApplicationWorker include Sidekiq::Worker def perform(status_id) - status = Status.find(status_id) - - FanOutOnWriteService.new.call(status) - WarmCacheService.new.call(status) + FanOutOnWriteService.new.call(Status.find(status_id)) rescue ActiveRecord::RecordNotFound info("Couldn't find the status") end diff --git a/app/workers/remote_profile_update_worker.rb b/app/workers/remote_profile_update_worker.rb new file mode 100644 index 000000000..b91dc3466 --- /dev/null +++ b/app/workers/remote_profile_update_worker.rb @@ -0,0 +1,20 @@ +# frozen_string_literal: true + +class RemoteProfileUpdateWorker + include Sidekiq::Worker + + sidekiq_options queue: 'pull' + + def perform(account_id, body, resubscribe) + account = Account.find(account_id) + + xml = Nokogiri::XML(body) + xml.encoding = 'utf-8' + + author_container = xml.at_xpath('/xmlns:feed', xmlns: TagManager::XMLNS) || xml.at_xpath('/xmlns:entry', xmlns: TagManager::XMLNS) + + UpdateRemoteProfileService.new.call(author_container, account, resubscribe) + rescue ActiveRecord::RecordNotFound + true + end +end diff --git a/app/workers/salmon_worker.rb b/app/workers/salmon_worker.rb index fc95ce47f..d37d40432 100644 --- a/app/workers/salmon_worker.rb +++ b/app/workers/salmon_worker.rb @@ -7,7 +7,7 @@ class SalmonWorker def perform(account_id, body) ProcessInteractionService.new.call(body, Account.find(account_id)) - rescue ActiveRecord::RecordNotFound + rescue Nokogiri::XML::XPath::SyntaxError, ActiveRecord::RecordNotFound true end end diff --git a/spec/services/process_feed_service_spec.rb b/spec/services/process_feed_service_spec.rb index 5e57d823b..b15284fee 100644 --- a/spec/services/process_feed_service_spec.rb +++ b/spec/services/process_feed_service_spec.rb @@ -16,6 +16,7 @@ RSpec.describe ProcessFeedService do end it 'updates remote user\'s account information' do + account.reload expect(account.display_name).to eq '::1' expect(account).to have_attached_file(:avatar) end -- cgit From dbd529109ea95df43aefa514c312d7397e7fc713 Mon Sep 17 00:00:00 2001 From: Eugen Rochko Date: Thu, 6 Apr 2017 02:26:59 +0200 Subject: Fix notifications delivered to wrong pubsub channel, optimized RemoveStatusService, slightly optimized FanOutOnWriteService again --- app/services/fan_out_on_write_service.rb | 9 +++++---- app/services/notify_service.rb | 2 +- app/services/remove_status_service.rb | 28 +++++++++++++++------------- 3 files changed, 21 insertions(+), 18 deletions(-) (limited to 'app/services') diff --git a/app/services/fan_out_on_write_service.rb b/app/services/fan_out_on_write_service.rb index c63fcc1fe..a25c20c93 100644 --- a/app/services/fan_out_on_write_service.rb +++ b/app/services/fan_out_on_write_service.rb @@ -51,21 +51,22 @@ class FanOutOnWriteService < BaseService def render_anonymous_payload(status) @payload = InlineRenderer.render(status, nil, 'api/v1/statuses/show') + @payload = Oj.dump(event: :update, payload: @payload) end def deliver_to_hashtags(status) Rails.logger.debug "Delivering status #{status.id} to hashtags" status.tags.pluck(:name).each do |hashtag| - Redis.current.publish("hashtag:#{hashtag}", Oj.dump(event: :update, payload: @payload)) - Redis.current.publish("hashtag:#{hashtag}:local", Oj.dump(event: :update, payload: @payload)) if status.account.local? + Redis.current.publish("hashtag:#{hashtag}", @payload) + Redis.current.publish("hashtag:#{hashtag}:local", @payload) if status.local? end end def deliver_to_public(status) Rails.logger.debug "Delivering status #{status.id} to public timeline" - Redis.current.publish('public', Oj.dump(event: 'update', payload: @payload)) - Redis.current.publish('public:local', Oj.dump(event: 'update', payload: @payload)) if status.account.local? + Redis.current.publish('public', @payload) + Redis.current.publish('public:local', @payload) if status.local? end end diff --git a/app/services/notify_service.rb b/app/services/notify_service.rb index 62508a049..ffeee5fcf 100644 --- a/app/services/notify_service.rb +++ b/app/services/notify_service.rb @@ -50,7 +50,7 @@ class NotifyService < BaseService def create_notification @notification.save! return unless @notification.browserable? - Redis.current.publish(@recipient.id, Oj.dump(event: :notification, payload: InlineRenderer.render(@notification, @recipient, 'api/v1/notifications/show'))) + Redis.current.publish("timeline:#{@recipient.id}", Oj.dump(event: :notification, payload: InlineRenderer.render(@notification, @recipient, 'api/v1/notifications/show'))) end def send_email diff --git a/app/services/remove_status_service.rb b/app/services/remove_status_service.rb index e19fdd030..60ce9987c 100644 --- a/app/services/remove_status_service.rb +++ b/app/services/remove_status_service.rb @@ -4,6 +4,8 @@ class RemoveStatusService < BaseService include StreamEntryRenderer def call(status) + @payload = Oj.dump(event: :delete, payload: status.id) + remove_from_self(status) if status.account.local? remove_from_followers(status) remove_from_mentioned(status) @@ -25,25 +27,23 @@ class RemoveStatusService < BaseService end def remove_from_followers(status) - status.account.followers.each do |follower| - next unless follower.local? + status.account.followers.where(domain: nil).each do |follower| unpush(:home, follower, status) end end def remove_from_mentioned(status) + return unless status.local? notified_domains = [] status.mentions.each do |mention| mentioned_account = mention.account - if mentioned_account.local? - unpush(:mentions, mentioned_account, status) - else - next if notified_domains.include?(mentioned_account.domain) - notified_domains << mentioned_account.domain - send_delete_salmon(mentioned_account, status) - end + next if mentioned_account.local? + next if notified_domains.include?(mentioned_account.domain) + + notified_domains << mentioned_account.domain + send_delete_salmon(mentioned_account, status) end end @@ -65,17 +65,19 @@ class RemoveStatusService < BaseService redis.zremrangebyscore(FeedManager.instance.key(type, receiver.id), status.id, status.id) end - Redis.current.publish(receiver.id, Oj.dump(event: :delete, payload: status.id)) + Redis.current.publish("timeline:#{receiver.id}", @payload) end def remove_from_hashtags(status) - status.tags.each do |tag| - Redis.current.publish("hashtag:#{tag.name}", Oj.dump(event: :delete, payload: status.id)) + status.tags.pluck(:name) do |hashtag| + Redis.current.publish("hashtag:#{hashtag}", @payload) + Redis.current.publish("hashtag:#{hashtag}:local", @payload) if status.local? end end def remove_from_public(status) - Redis.current.publish('public', Oj.dump(event: :delete, payload: status.id)) + Redis.current.publish('public', @payload) + Redis.current.publish('public:local', @payload) if status.local? end def redis -- cgit From 51d7caaf1994ee3cb531b64841e12d129dded298 Mon Sep 17 00:00:00 2001 From: Eugen Rochko Date: Thu, 6 Apr 2017 04:03:23 +0200 Subject: Fix wrong pubsub channel on public timelines --- app/services/fan_out_on_write_service.rb | 8 ++++---- app/services/remove_status_service.rb | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) (limited to 'app/services') diff --git a/app/services/fan_out_on_write_service.rb b/app/services/fan_out_on_write_service.rb index a25c20c93..19eedc0a7 100644 --- a/app/services/fan_out_on_write_service.rb +++ b/app/services/fan_out_on_write_service.rb @@ -58,15 +58,15 @@ class FanOutOnWriteService < BaseService Rails.logger.debug "Delivering status #{status.id} to hashtags" status.tags.pluck(:name).each do |hashtag| - Redis.current.publish("hashtag:#{hashtag}", @payload) - Redis.current.publish("hashtag:#{hashtag}:local", @payload) if status.local? + Redis.current.publish("timeline:hashtag:#{hashtag}", @payload) + Redis.current.publish("timeline:hashtag:#{hashtag}:local", @payload) if status.local? end end def deliver_to_public(status) Rails.logger.debug "Delivering status #{status.id} to public timeline" - Redis.current.publish('public', @payload) - Redis.current.publish('public:local', @payload) if status.local? + Redis.current.publish('timeline:public', @payload) + Redis.current.publish('timeline:public:local', @payload) if status.local? end end diff --git a/app/services/remove_status_service.rb b/app/services/remove_status_service.rb index 60ce9987c..50bb7fc97 100644 --- a/app/services/remove_status_service.rb +++ b/app/services/remove_status_service.rb @@ -70,14 +70,14 @@ class RemoveStatusService < BaseService def remove_from_hashtags(status) status.tags.pluck(:name) do |hashtag| - Redis.current.publish("hashtag:#{hashtag}", @payload) - Redis.current.publish("hashtag:#{hashtag}:local", @payload) if status.local? + Redis.current.publish("timeline:hashtag:#{hashtag}", @payload) + Redis.current.publish("timeline:hashtag:#{hashtag}:local", @payload) if status.local? end end def remove_from_public(status) - Redis.current.publish('public', @payload) - Redis.current.publish('public:local', @payload) if status.local? + Redis.current.publish('timeline:public', @payload) + Redis.current.publish('timeline:public:local', @payload) if status.local? end def redis -- cgit