From 1e96ce378e2aa35ed7287a4a88e5165c2ee20101 Mon Sep 17 00:00:00 2001 From: Kurtis Rainbolt-Greene Date: Tue, 4 Apr 2017 20:16:53 -0700 Subject: By pushing this into a worker we can reduce the amount of time the feed manager using workers eat up a connection --- app/workers/push_update_worker.rb | 13 +++++++++++++ 1 file changed, 13 insertions(+) create mode 100644 app/workers/push_update_worker.rb (limited to 'app/workers') diff --git a/app/workers/push_update_worker.rb b/app/workers/push_update_worker.rb new file mode 100644 index 000000000..3d398b5ac --- /dev/null +++ b/app/workers/push_update_worker.rb @@ -0,0 +1,13 @@ +# frozen_string_literal: true + +class PushUpdateWorker + include Sidekiq::Worker + + def perform(timeline, account_id, status_id) + account = Account.find(account_id) + status = Status.find(status_id) + message = inline_render(account, 'api/v1/statuses/show', status) + + broadcast(account_id, type: 'update', timeline: timeline, message: message) + end +end -- cgit From 96ef9338208e09cbc52a49a3d7171d877eab3c43 Mon Sep 17 00:00:00 2001 From: Kurtis Rainbolt-Greene Date: Tue, 4 Apr 2017 20:36:03 -0700 Subject: Replacing the broadcast method with the one defined in the feed manager --- app/workers/push_update_worker.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'app/workers') diff --git a/app/workers/push_update_worker.rb b/app/workers/push_update_worker.rb index 3d398b5ac..5b5e9f68a 100644 --- a/app/workers/push_update_worker.rb +++ b/app/workers/push_update_worker.rb @@ -8,6 +8,6 @@ class PushUpdateWorker status = Status.find(status_id) message = inline_render(account, 'api/v1/statuses/show', status) - broadcast(account_id, type: 'update', timeline: timeline, message: message) + ActionCable.server.broadcast("timeline:#{account_id}", type: 'update', timeline: timeline, message: message) end end -- cgit From 0069c01285d7fc6b97220fd678f6e5b82f301b1a Mon Sep 17 00:00:00 2001 From: Kurtis Rainbolt-Greene Date: Tue, 4 Apr 2017 20:39:14 -0700 Subject: Moving the queue_at into the worker --- app/workers/push_update_worker.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'app/workers') diff --git a/app/workers/push_update_worker.rb b/app/workers/push_update_worker.rb index 5b5e9f68a..6512e13ad 100644 --- a/app/workers/push_update_worker.rb +++ b/app/workers/push_update_worker.rb @@ -7,7 +7,7 @@ class PushUpdateWorker account = Account.find(account_id) status = Status.find(status_id) message = inline_render(account, 'api/v1/statuses/show', status) - + queue_at = (Time.now.to_f * 1000.0).to_i ActionCable.server.broadcast("timeline:#{account_id}", type: 'update', timeline: timeline, message: message) end end -- cgit From 220051b8b2d09a741f5edadd34e21115c5938bf0 Mon Sep 17 00:00:00 2001 From: Kurtis Rainbolt-Greene Date: Tue, 4 Apr 2017 20:48:22 -0700 Subject: I don't actually think we need that. --- app/workers/push_update_worker.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'app/workers') diff --git a/app/workers/push_update_worker.rb b/app/workers/push_update_worker.rb index 6512e13ad..5b5e9f68a 100644 --- a/app/workers/push_update_worker.rb +++ b/app/workers/push_update_worker.rb @@ -7,7 +7,7 @@ class PushUpdateWorker account = Account.find(account_id) status = Status.find(status_id) message = inline_render(account, 'api/v1/statuses/show', status) - queue_at = (Time.now.to_f * 1000.0).to_i + ActionCable.server.broadcast("timeline:#{account_id}", type: 'update', timeline: timeline, message: message) end end -- cgit From 9638894233d31368733574217e4d173e4cd5d13c Mon Sep 17 00:00:00 2001 From: Kurtis Rainbolt-Greene Date: Tue, 4 Apr 2017 20:51:18 -0700 Subject: Moving in the inline render --- app/workers/push_update_worker.rb | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) (limited to 'app/workers') diff --git a/app/workers/push_update_worker.rb b/app/workers/push_update_worker.rb index 5b5e9f68a..fef75d909 100644 --- a/app/workers/push_update_worker.rb +++ b/app/workers/push_update_worker.rb @@ -6,8 +6,14 @@ class PushUpdateWorker def perform(timeline, account_id, status_id) account = Account.find(account_id) status = Status.find(status_id) - message = inline_render(account, 'api/v1/statuses/show', status) + message = Rabl::Renderer.new( + 'api/v1/statuses/show', + status, + view_path: 'app/views', + format: :json, + scope: InlineRablScope.new(account) + ) - ActionCable.server.broadcast("timeline:#{account_id}", type: 'update', timeline: timeline, message: message) + ActionCable.server.broadcast("timeline:#{account_id}", type: 'update', timeline: timeline, message: message.render) end end -- cgit From 1b8c244dff84ae981d89a1672a9db06f08cf405e Mon Sep 17 00:00:00 2001 From: Eugen Date: Wed, 5 Apr 2017 18:48:41 +0200 Subject: Add proper message to PushUpdateWorker, use redis directly --- app/workers/push_update_worker.rb | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) (limited to 'app/workers') diff --git a/app/workers/push_update_worker.rb b/app/workers/push_update_worker.rb index fef75d909..9d16c20bf 100644 --- a/app/workers/push_update_worker.rb +++ b/app/workers/push_update_worker.rb @@ -5,7 +5,8 @@ class PushUpdateWorker def perform(timeline, account_id, status_id) account = Account.find(account_id) - status = Status.find(status_id) + status = Status.find(status_id) + message = Rabl::Renderer.new( 'api/v1/statuses/show', status, @@ -14,6 +15,8 @@ class PushUpdateWorker scope: InlineRablScope.new(account) ) - ActionCable.server.broadcast("timeline:#{account_id}", type: 'update', timeline: timeline, message: message.render) + Redis.current.publish("timeline:#{timeline_id}", Oj.dump({ event: :update, payload: message, queued_at: (Time.now.to_f * 1000.0).to_i })) + rescue ActiveRecord::RecordNotFound + true end end -- cgit From c9ebd5d19fccaabd1192f5e61537251c2c2d782e Mon Sep 17 00:00:00 2001 From: Eugen Date: Wed, 5 Apr 2017 18:58:32 +0200 Subject: Fix wrong variable used in publish channel --- app/workers/push_update_worker.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'app/workers') diff --git a/app/workers/push_update_worker.rb b/app/workers/push_update_worker.rb index 9d16c20bf..166a9b449 100644 --- a/app/workers/push_update_worker.rb +++ b/app/workers/push_update_worker.rb @@ -15,7 +15,7 @@ class PushUpdateWorker scope: InlineRablScope.new(account) ) - Redis.current.publish("timeline:#{timeline_id}", Oj.dump({ event: :update, payload: message, queued_at: (Time.now.to_f * 1000.0).to_i })) + Redis.current.publish("timeline:#{account.id}", Oj.dump({ event: :update, payload: message, queued_at: (Time.now.to_f * 1000.0).to_i })) rescue ActiveRecord::RecordNotFound true end -- cgit From 5b95be1c42ba69c9a3a79cfa990c80a5f2debfc6 Mon Sep 17 00:00:00 2001 From: Eugen Rochko Date: Wed, 5 Apr 2017 19:45:18 +0200 Subject: Replace calls to FeedManager#inline_render and #broadcast --- app/lib/feed_manager.rb | 2 +- app/lib/inline_renderer.rb | 13 +++++++++++++ app/services/fan_out_on_write_service.rb | 10 +++++----- app/services/notify_service.rb | 2 +- app/services/remove_status_service.rb | 6 +++--- app/workers/push_update_worker.rb | 13 +++---------- 6 files changed, 26 insertions(+), 20 deletions(-) create mode 100644 app/lib/inline_renderer.rb (limited to 'app/workers') diff --git a/app/lib/feed_manager.rb b/app/lib/feed_manager.rb index 87865bfdc..58d9fb1fc 100644 --- a/app/lib/feed_manager.rb +++ b/app/lib/feed_manager.rb @@ -34,7 +34,7 @@ class FeedManager trim(timeline_type, account.id) end - PushUpdateWorker.perform_async(timeline_type, account.id, status.id) + PushUpdateWorker.perform_async(account.id, status.id) end def trim(type, account_id) diff --git a/app/lib/inline_renderer.rb b/app/lib/inline_renderer.rb new file mode 100644 index 000000000..8e04ad1d5 --- /dev/null +++ b/app/lib/inline_renderer.rb @@ -0,0 +1,13 @@ +# frozen_string_literal: true + +class InlineRenderer + def self.render(status, current_account, template) + Rabl::Renderer.new( + template, + status, + view_path: 'app/views', + format: :json, + scope: InlineRablScope.new(current_account) + ).render + end +end diff --git a/app/services/fan_out_on_write_service.rb b/app/services/fan_out_on_write_service.rb index 106d257ba..c63fcc1fe 100644 --- a/app/services/fan_out_on_write_service.rb +++ b/app/services/fan_out_on_write_service.rb @@ -50,22 +50,22 @@ class FanOutOnWriteService < BaseService end def render_anonymous_payload(status) - @payload = FeedManager.instance.inline_render(nil, 'api/v1/statuses/show', status) + @payload = InlineRenderer.render(status, nil, 'api/v1/statuses/show') end def deliver_to_hashtags(status) Rails.logger.debug "Delivering status #{status.id} to hashtags" status.tags.pluck(:name).each do |hashtag| - FeedManager.instance.broadcast("hashtag:#{hashtag}", event: 'update', payload: @payload) - FeedManager.instance.broadcast("hashtag:#{hashtag}:local", event: 'update', payload: @payload) if status.account.local? + Redis.current.publish("hashtag:#{hashtag}", Oj.dump(event: :update, payload: @payload)) + Redis.current.publish("hashtag:#{hashtag}:local", Oj.dump(event: :update, payload: @payload)) if status.account.local? end end def deliver_to_public(status) Rails.logger.debug "Delivering status #{status.id} to public timeline" - FeedManager.instance.broadcast(:public, event: 'update', payload: @payload) - FeedManager.instance.broadcast('public:local', event: 'update', payload: @payload) if status.account.local? + Redis.current.publish('public', Oj.dump(event: 'update', payload: @payload)) + Redis.current.publish('public:local', Oj.dump(event: 'update', payload: @payload)) if status.account.local? end end diff --git a/app/services/notify_service.rb b/app/services/notify_service.rb index 24486f220..62508a049 100644 --- a/app/services/notify_service.rb +++ b/app/services/notify_service.rb @@ -50,7 +50,7 @@ class NotifyService < BaseService def create_notification @notification.save! return unless @notification.browserable? - FeedManager.instance.broadcast(@recipient.id, event: 'notification', payload: FeedManager.instance.inline_render(@recipient, 'api/v1/notifications/show', @notification)) + Redis.current.publish(@recipient.id, Oj.dump(event: :notification, payload: InlineRenderer.render(@notification, @recipient, 'api/v1/notifications/show'))) end def send_email diff --git a/app/services/remove_status_service.rb b/app/services/remove_status_service.rb index cf1f432e4..e19fdd030 100644 --- a/app/services/remove_status_service.rb +++ b/app/services/remove_status_service.rb @@ -65,17 +65,17 @@ class RemoveStatusService < BaseService redis.zremrangebyscore(FeedManager.instance.key(type, receiver.id), status.id, status.id) end - FeedManager.instance.broadcast(receiver.id, event: 'delete', payload: status.id) + Redis.current.publish(receiver.id, Oj.dump(event: :delete, payload: status.id)) end def remove_from_hashtags(status) status.tags.each do |tag| - FeedManager.instance.broadcast("hashtag:#{tag.name}", event: 'delete', payload: status.id) + Redis.current.publish("hashtag:#{tag.name}", Oj.dump(event: :delete, payload: status.id)) end end def remove_from_public(status) - FeedManager.instance.broadcast(:public, event: 'delete', payload: status.id) + Redis.current.publish('public', Oj.dump(event: :delete, payload: status.id)) end def redis diff --git a/app/workers/push_update_worker.rb b/app/workers/push_update_worker.rb index 166a9b449..fbcdcf634 100644 --- a/app/workers/push_update_worker.rb +++ b/app/workers/push_update_worker.rb @@ -3,19 +3,12 @@ class PushUpdateWorker include Sidekiq::Worker - def perform(timeline, account_id, status_id) + def perform(account_id, status_id) account = Account.find(account_id) status = Status.find(status_id) - - message = Rabl::Renderer.new( - 'api/v1/statuses/show', - status, - view_path: 'app/views', - format: :json, - scope: InlineRablScope.new(account) - ) + message = InlineRenderer.render(status, account, 'api/v1/statuses/show') - Redis.current.publish("timeline:#{account.id}", Oj.dump({ event: :update, payload: message, queued_at: (Time.now.to_f * 1000.0).to_i })) + Redis.current.publish("timeline:#{account.id}", Oj.dump(event: :update, payload: message, queued_at: (Time.now.to_f * 1000.0).to_i)) rescue ActiveRecord::RecordNotFound true end -- cgit From 5442083b3c44c731679fc489568bf7f70a807a39 Mon Sep 17 00:00:00 2001 From: Eugen Rochko Date: Wed, 5 Apr 2017 21:41:50 +0200 Subject: Split SalmonWorker into smaller parts, move profile updating into another job --- app/services/follow_remote_account_service.rb | 16 ++++++---------- app/services/process_feed_service.rb | 6 +++--- app/services/process_interaction_service.rb | 10 +++------- app/workers/admin/suspension_worker.rb | 2 ++ app/workers/application_worker.rb | 2 ++ app/workers/distribution_worker.rb | 5 +---- app/workers/remote_profile_update_worker.rb | 20 ++++++++++++++++++++ app/workers/salmon_worker.rb | 2 +- spec/services/process_feed_service_spec.rb | 1 + 9 files changed, 39 insertions(+), 25 deletions(-) create mode 100644 app/workers/remote_profile_update_worker.rb (limited to 'app/workers') diff --git a/app/services/follow_remote_account_service.rb b/app/services/follow_remote_account_service.rb index b39eafc70..936953429 100644 --- a/app/services/follow_remote_account_service.rb +++ b/app/services/follow_remote_account_service.rb @@ -45,13 +45,13 @@ class FollowRemoteAccountService < BaseService account.suspended = true if domain_block && domain_block.suspend? account.silenced = true if domain_block && domain_block.silence? - xml = get_feed(account.remote_url) - hubs = get_hubs(xml) + body, xml = get_feed(account.remote_url) + hubs = get_hubs(xml) account.uri = get_account_uri(xml) account.hub_url = hubs.first.attribute('href').value - get_profile(xml, account) + get_profile(body, account) account.save! account @@ -61,7 +61,7 @@ class FollowRemoteAccountService < BaseService def get_feed(url) response = http_client.get(Addressable::URI.parse(url)) - Nokogiri::XML(response) + [response.to_s, Nokogiri::XML(response)] end def get_hubs(xml) @@ -82,12 +82,8 @@ class FollowRemoteAccountService < BaseService author_uri.content end - def get_profile(xml, account) - update_remote_profile_service.call(xml.at_xpath('/xmlns:feed'), account) - end - - def update_remote_profile_service - @update_remote_profile_service ||= UpdateRemoteProfileService.new + def get_profile(body, account) + RemoteProfileUpdateWorker.perform_async(account.id, body.force_encoding('UTF-8'), false) end def http_client diff --git a/app/services/process_feed_service.rb b/app/services/process_feed_service.rb index 69911abc5..cf2f7a826 100644 --- a/app/services/process_feed_service.rb +++ b/app/services/process_feed_service.rb @@ -5,15 +5,15 @@ class ProcessFeedService < BaseService xml = Nokogiri::XML(body) xml.encoding = 'utf-8' - update_author(xml, account) + update_author(body, xml, account) process_entries(xml, account) end private - def update_author(xml, account) + def update_author(body, xml, account) return if xml.at_xpath('/xmlns:feed', xmlns: TagManager::XMLNS).nil? - UpdateRemoteProfileService.new.call(xml.at_xpath('/xmlns:feed', xmlns: TagManager::XMLNS), account, true) + RemoteProfileUpdateWorker.perform_async(account.id, body.force_encoding('UTF-8'), true) end def process_entries(xml, account) diff --git a/app/services/process_interaction_service.rb b/app/services/process_interaction_service.rb index d5f7b4b3c..805ca5a27 100644 --- a/app/services/process_interaction_service.rb +++ b/app/services/process_interaction_service.rb @@ -24,7 +24,7 @@ class ProcessInteractionService < BaseService return if account.suspended? if salmon.verify(envelope, account.keypair) - update_remote_profile_service.call(xml.at_xpath('/xmlns:entry', xmlns: TagManager::XMLNS), account, true) + RemoteProfileUpdateWorker.perform_async(account.id, body.force_encoding('UTF-8'), true) case verb(xml) when :follow @@ -114,7 +114,7 @@ class ProcessInteractionService < BaseService return if status.nil? - remove_status_service.call(status) if account.id == status.account_id + RemovalWorker.perform_async(status.id) if account.id == status.account_id end def favourite!(xml, from_account) @@ -130,7 +130,7 @@ class ProcessInteractionService < BaseService end def add_post!(body, account) - process_feed_service.call(body, account) + ProcessingWorker.perform_async(account.id, body.force_encoding('UTF-8')) end def status(xml) @@ -153,10 +153,6 @@ class ProcessInteractionService < BaseService @process_feed_service ||= ProcessFeedService.new end - def update_remote_profile_service - @update_remote_profile_service ||= UpdateRemoteProfileService.new - end - def remove_status_service @remove_status_service ||= RemoveStatusService.new end diff --git a/app/workers/admin/suspension_worker.rb b/app/workers/admin/suspension_worker.rb index 38761f3b9..7ef2b35ec 100644 --- a/app/workers/admin/suspension_worker.rb +++ b/app/workers/admin/suspension_worker.rb @@ -3,6 +3,8 @@ class Admin::SuspensionWorker include Sidekiq::Worker + sidekiq_options queue: 'pull' + def perform(account_id) SuspendAccountService.new.call(Account.find(account_id)) end diff --git a/app/workers/application_worker.rb b/app/workers/application_worker.rb index f2d7c1062..436f24763 100644 --- a/app/workers/application_worker.rb +++ b/app/workers/application_worker.rb @@ -1,3 +1,5 @@ +# frozen_string_literal: true + class ApplicationWorker def info(message) Rails.logger.info("#{self.class.name} - #{message}") diff --git a/app/workers/distribution_worker.rb b/app/workers/distribution_worker.rb index 9a2867ea6..f7953689b 100644 --- a/app/workers/distribution_worker.rb +++ b/app/workers/distribution_worker.rb @@ -4,10 +4,7 @@ class DistributionWorker < ApplicationWorker include Sidekiq::Worker def perform(status_id) - status = Status.find(status_id) - - FanOutOnWriteService.new.call(status) - WarmCacheService.new.call(status) + FanOutOnWriteService.new.call(Status.find(status_id)) rescue ActiveRecord::RecordNotFound info("Couldn't find the status") end diff --git a/app/workers/remote_profile_update_worker.rb b/app/workers/remote_profile_update_worker.rb new file mode 100644 index 000000000..b91dc3466 --- /dev/null +++ b/app/workers/remote_profile_update_worker.rb @@ -0,0 +1,20 @@ +# frozen_string_literal: true + +class RemoteProfileUpdateWorker + include Sidekiq::Worker + + sidekiq_options queue: 'pull' + + def perform(account_id, body, resubscribe) + account = Account.find(account_id) + + xml = Nokogiri::XML(body) + xml.encoding = 'utf-8' + + author_container = xml.at_xpath('/xmlns:feed', xmlns: TagManager::XMLNS) || xml.at_xpath('/xmlns:entry', xmlns: TagManager::XMLNS) + + UpdateRemoteProfileService.new.call(author_container, account, resubscribe) + rescue ActiveRecord::RecordNotFound + true + end +end diff --git a/app/workers/salmon_worker.rb b/app/workers/salmon_worker.rb index fc95ce47f..d37d40432 100644 --- a/app/workers/salmon_worker.rb +++ b/app/workers/salmon_worker.rb @@ -7,7 +7,7 @@ class SalmonWorker def perform(account_id, body) ProcessInteractionService.new.call(body, Account.find(account_id)) - rescue ActiveRecord::RecordNotFound + rescue Nokogiri::XML::XPath::SyntaxError, ActiveRecord::RecordNotFound true end end diff --git a/spec/services/process_feed_service_spec.rb b/spec/services/process_feed_service_spec.rb index 5e57d823b..b15284fee 100644 --- a/spec/services/process_feed_service_spec.rb +++ b/spec/services/process_feed_service_spec.rb @@ -16,6 +16,7 @@ RSpec.describe ProcessFeedService do end it 'updates remote user\'s account information' do + account.reload expect(account.display_name).to eq '::1' expect(account).to have_attached_file(:avatar) end -- cgit From 540d6efe88717b0da4a23b5069a3caaff5b3241a Mon Sep 17 00:00:00 2001 From: blackle Date: Wed, 5 Apr 2017 20:04:13 -0400 Subject: Catch more errors in process_follows so it doesn't fail --- app/workers/import_worker.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'app/workers') diff --git a/app/workers/import_worker.rb b/app/workers/import_worker.rb index 7cf29fb53..d5a33cada 100644 --- a/app/workers/import_worker.rb +++ b/app/workers/import_worker.rb @@ -46,7 +46,7 @@ class ImportWorker begin FollowService.new.call(from_account, row[0]) - rescue Goldfinger::Error, HTTP::Error, OpenSSL::SSL::SSLError + rescue Mastodon::NotPermittedError, ActiveRecord::RecordNotFound, Goldfinger::Error, HTTP::Error, OpenSSL::SSL::SSLError next end end -- cgit