From 1e96ce378e2aa35ed7287a4a88e5165c2ee20101 Mon Sep 17 00:00:00 2001 From: Kurtis Rainbolt-Greene Date: Tue, 4 Apr 2017 20:16:53 -0700 Subject: By pushing this into a worker we can reduce the amount of time the feed manager using workers eat up a connection --- app/lib/feed_manager.rb | 2 +- app/workers/push_update_worker.rb | 13 +++++++++++++ 2 files changed, 14 insertions(+), 1 deletion(-) create mode 100644 app/workers/push_update_worker.rb (limited to 'app') diff --git a/app/lib/feed_manager.rb b/app/lib/feed_manager.rb index 2cca1cefe..075f86c2d 100644 --- a/app/lib/feed_manager.rb +++ b/app/lib/feed_manager.rb @@ -34,7 +34,7 @@ class FeedManager trim(timeline_type, account.id) end - broadcast(account.id, event: 'update', payload: inline_render(account, 'api/v1/statuses/show', status)) + PushUpdateWorker.perform_async(timeline_type, account.id, status.id) end def broadcast(timeline_id, options = {}) diff --git a/app/workers/push_update_worker.rb b/app/workers/push_update_worker.rb new file mode 100644 index 000000000..3d398b5ac --- /dev/null +++ b/app/workers/push_update_worker.rb @@ -0,0 +1,13 @@ +# frozen_string_literal: true + +class PushUpdateWorker + include Sidekiq::Worker + + def perform(timeline, account_id, status_id) + account = Account.find(account_id) + status = Status.find(status_id) + message = inline_render(account, 'api/v1/statuses/show', status) + + broadcast(account_id, type: 'update', timeline: timeline, message: message) + end +end -- cgit From 96ef9338208e09cbc52a49a3d7171d877eab3c43 Mon Sep 17 00:00:00 2001 From: Kurtis Rainbolt-Greene Date: Tue, 4 Apr 2017 20:36:03 -0700 Subject: Replacing the broadcast method with the one defined in the feed manager --- app/workers/push_update_worker.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'app') diff --git a/app/workers/push_update_worker.rb b/app/workers/push_update_worker.rb index 3d398b5ac..5b5e9f68a 100644 --- a/app/workers/push_update_worker.rb +++ b/app/workers/push_update_worker.rb @@ -8,6 +8,6 @@ class PushUpdateWorker status = Status.find(status_id) message = inline_render(account, 'api/v1/statuses/show', status) - broadcast(account_id, type: 'update', timeline: timeline, message: message) + ActionCable.server.broadcast("timeline:#{account_id}", type: 'update', timeline: timeline, message: message) end end -- cgit From dc5704b0b0c8f5a58ff95d3f3c4055929c6ecfba Mon Sep 17 00:00:00 2001 From: Kurtis Rainbolt-Greene Date: Tue, 4 Apr 2017 20:38:07 -0700 Subject: This method isn't used anymore --- app/lib/feed_manager.rb | 5 ----- 1 file changed, 5 deletions(-) (limited to 'app') diff --git a/app/lib/feed_manager.rb b/app/lib/feed_manager.rb index 075f86c2d..6698c78a5 100644 --- a/app/lib/feed_manager.rb +++ b/app/lib/feed_manager.rb @@ -37,11 +37,6 @@ class FeedManager PushUpdateWorker.perform_async(timeline_type, account.id, status.id) end - def broadcast(timeline_id, options = {}) - options[:queued_at] = (Time.now.to_f * 1000.0).to_i - ActionCable.server.broadcast("timeline:#{timeline_id}", options) - end - def trim(type, account_id) return unless redis.zcard(key(type, account_id)) > FeedManager::MAX_ITEMS last = redis.zrevrange(key(type, account_id), FeedManager::MAX_ITEMS - 1, FeedManager::MAX_ITEMS - 1) -- cgit From 0069c01285d7fc6b97220fd678f6e5b82f301b1a Mon Sep 17 00:00:00 2001 From: Kurtis Rainbolt-Greene Date: Tue, 4 Apr 2017 20:39:14 -0700 Subject: Moving the queue_at into the worker --- app/workers/push_update_worker.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'app') diff --git a/app/workers/push_update_worker.rb b/app/workers/push_update_worker.rb index 5b5e9f68a..6512e13ad 100644 --- a/app/workers/push_update_worker.rb +++ b/app/workers/push_update_worker.rb @@ -7,7 +7,7 @@ class PushUpdateWorker account = Account.find(account_id) status = Status.find(status_id) message = inline_render(account, 'api/v1/statuses/show', status) - + queue_at = (Time.now.to_f * 1000.0).to_i ActionCable.server.broadcast("timeline:#{account_id}", type: 'update', timeline: timeline, message: message) end end -- cgit From 220051b8b2d09a741f5edadd34e21115c5938bf0 Mon Sep 17 00:00:00 2001 From: Kurtis Rainbolt-Greene Date: Tue, 4 Apr 2017 20:48:22 -0700 Subject: I don't actually think we need that. --- app/workers/push_update_worker.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'app') diff --git a/app/workers/push_update_worker.rb b/app/workers/push_update_worker.rb index 6512e13ad..5b5e9f68a 100644 --- a/app/workers/push_update_worker.rb +++ b/app/workers/push_update_worker.rb @@ -7,7 +7,7 @@ class PushUpdateWorker account = Account.find(account_id) status = Status.find(status_id) message = inline_render(account, 'api/v1/statuses/show', status) - queue_at = (Time.now.to_f * 1000.0).to_i + ActionCable.server.broadcast("timeline:#{account_id}", type: 'update', timeline: timeline, message: message) end end -- cgit From 9638894233d31368733574217e4d173e4cd5d13c Mon Sep 17 00:00:00 2001 From: Kurtis Rainbolt-Greene Date: Tue, 4 Apr 2017 20:51:18 -0700 Subject: Moving in the inline render --- app/workers/push_update_worker.rb | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) (limited to 'app') diff --git a/app/workers/push_update_worker.rb b/app/workers/push_update_worker.rb index 5b5e9f68a..fef75d909 100644 --- a/app/workers/push_update_worker.rb +++ b/app/workers/push_update_worker.rb @@ -6,8 +6,14 @@ class PushUpdateWorker def perform(timeline, account_id, status_id) account = Account.find(account_id) status = Status.find(status_id) - message = inline_render(account, 'api/v1/statuses/show', status) + message = Rabl::Renderer.new( + 'api/v1/statuses/show', + status, + view_path: 'app/views', + format: :json, + scope: InlineRablScope.new(account) + ) - ActionCable.server.broadcast("timeline:#{account_id}", type: 'update', timeline: timeline, message: message) + ActionCable.server.broadcast("timeline:#{account_id}", type: 'update', timeline: timeline, message: message.render) end end -- cgit From 7bed4e51db18c864c36c6b48eb22c65f11c16b1c Mon Sep 17 00:00:00 2001 From: Kurtis Rainbolt-Greene Date: Tue, 4 Apr 2017 20:51:44 -0700 Subject: Moved to the worker --- app/lib/feed_manager.rb | 4 ---- 1 file changed, 4 deletions(-) (limited to 'app') diff --git a/app/lib/feed_manager.rb b/app/lib/feed_manager.rb index 6698c78a5..87865bfdc 100644 --- a/app/lib/feed_manager.rb +++ b/app/lib/feed_manager.rb @@ -76,10 +76,6 @@ class FeedManager end end - def inline_render(target_account, template, object) - Rabl::Renderer.new(template, object, view_path: 'app/views', format: :json, scope: InlineRablScope.new(target_account)).render - end - private def redis -- cgit From 1b8c244dff84ae981d89a1672a9db06f08cf405e Mon Sep 17 00:00:00 2001 From: Eugen Date: Wed, 5 Apr 2017 18:48:41 +0200 Subject: Add proper message to PushUpdateWorker, use redis directly --- app/workers/push_update_worker.rb | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) (limited to 'app') diff --git a/app/workers/push_update_worker.rb b/app/workers/push_update_worker.rb index fef75d909..9d16c20bf 100644 --- a/app/workers/push_update_worker.rb +++ b/app/workers/push_update_worker.rb @@ -5,7 +5,8 @@ class PushUpdateWorker def perform(timeline, account_id, status_id) account = Account.find(account_id) - status = Status.find(status_id) + status = Status.find(status_id) + message = Rabl::Renderer.new( 'api/v1/statuses/show', status, @@ -14,6 +15,8 @@ class PushUpdateWorker scope: InlineRablScope.new(account) ) - ActionCable.server.broadcast("timeline:#{account_id}", type: 'update', timeline: timeline, message: message.render) + Redis.current.publish("timeline:#{timeline_id}", Oj.dump({ event: :update, payload: message, queued_at: (Time.now.to_f * 1000.0).to_i })) + rescue ActiveRecord::RecordNotFound + true end end -- cgit From c9ebd5d19fccaabd1192f5e61537251c2c2d782e Mon Sep 17 00:00:00 2001 From: Eugen Date: Wed, 5 Apr 2017 18:58:32 +0200 Subject: Fix wrong variable used in publish channel --- app/workers/push_update_worker.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'app') diff --git a/app/workers/push_update_worker.rb b/app/workers/push_update_worker.rb index 9d16c20bf..166a9b449 100644 --- a/app/workers/push_update_worker.rb +++ b/app/workers/push_update_worker.rb @@ -15,7 +15,7 @@ class PushUpdateWorker scope: InlineRablScope.new(account) ) - Redis.current.publish("timeline:#{timeline_id}", Oj.dump({ event: :update, payload: message, queued_at: (Time.now.to_f * 1000.0).to_i })) + Redis.current.publish("timeline:#{account.id}", Oj.dump({ event: :update, payload: message, queued_at: (Time.now.to_f * 1000.0).to_i })) rescue ActiveRecord::RecordNotFound true end -- cgit From 5b95be1c42ba69c9a3a79cfa990c80a5f2debfc6 Mon Sep 17 00:00:00 2001 From: Eugen Rochko Date: Wed, 5 Apr 2017 19:45:18 +0200 Subject: Replace calls to FeedManager#inline_render and #broadcast --- app/lib/feed_manager.rb | 2 +- app/lib/inline_renderer.rb | 13 +++++++++++++ app/services/fan_out_on_write_service.rb | 10 +++++----- app/services/notify_service.rb | 2 +- app/services/remove_status_service.rb | 6 +++--- app/workers/push_update_worker.rb | 13 +++---------- 6 files changed, 26 insertions(+), 20 deletions(-) create mode 100644 app/lib/inline_renderer.rb (limited to 'app') diff --git a/app/lib/feed_manager.rb b/app/lib/feed_manager.rb index 87865bfdc..58d9fb1fc 100644 --- a/app/lib/feed_manager.rb +++ b/app/lib/feed_manager.rb @@ -34,7 +34,7 @@ class FeedManager trim(timeline_type, account.id) end - PushUpdateWorker.perform_async(timeline_type, account.id, status.id) + PushUpdateWorker.perform_async(account.id, status.id) end def trim(type, account_id) diff --git a/app/lib/inline_renderer.rb b/app/lib/inline_renderer.rb new file mode 100644 index 000000000..8e04ad1d5 --- /dev/null +++ b/app/lib/inline_renderer.rb @@ -0,0 +1,13 @@ +# frozen_string_literal: true + +class InlineRenderer + def self.render(status, current_account, template) + Rabl::Renderer.new( + template, + status, + view_path: 'app/views', + format: :json, + scope: InlineRablScope.new(current_account) + ).render + end +end diff --git a/app/services/fan_out_on_write_service.rb b/app/services/fan_out_on_write_service.rb index 106d257ba..c63fcc1fe 100644 --- a/app/services/fan_out_on_write_service.rb +++ b/app/services/fan_out_on_write_service.rb @@ -50,22 +50,22 @@ class FanOutOnWriteService < BaseService end def render_anonymous_payload(status) - @payload = FeedManager.instance.inline_render(nil, 'api/v1/statuses/show', status) + @payload = InlineRenderer.render(status, nil, 'api/v1/statuses/show') end def deliver_to_hashtags(status) Rails.logger.debug "Delivering status #{status.id} to hashtags" status.tags.pluck(:name).each do |hashtag| - FeedManager.instance.broadcast("hashtag:#{hashtag}", event: 'update', payload: @payload) - FeedManager.instance.broadcast("hashtag:#{hashtag}:local", event: 'update', payload: @payload) if status.account.local? + Redis.current.publish("hashtag:#{hashtag}", Oj.dump(event: :update, payload: @payload)) + Redis.current.publish("hashtag:#{hashtag}:local", Oj.dump(event: :update, payload: @payload)) if status.account.local? end end def deliver_to_public(status) Rails.logger.debug "Delivering status #{status.id} to public timeline" - FeedManager.instance.broadcast(:public, event: 'update', payload: @payload) - FeedManager.instance.broadcast('public:local', event: 'update', payload: @payload) if status.account.local? + Redis.current.publish('public', Oj.dump(event: 'update', payload: @payload)) + Redis.current.publish('public:local', Oj.dump(event: 'update', payload: @payload)) if status.account.local? end end diff --git a/app/services/notify_service.rb b/app/services/notify_service.rb index 24486f220..62508a049 100644 --- a/app/services/notify_service.rb +++ b/app/services/notify_service.rb @@ -50,7 +50,7 @@ class NotifyService < BaseService def create_notification @notification.save! return unless @notification.browserable? - FeedManager.instance.broadcast(@recipient.id, event: 'notification', payload: FeedManager.instance.inline_render(@recipient, 'api/v1/notifications/show', @notification)) + Redis.current.publish(@recipient.id, Oj.dump(event: :notification, payload: InlineRenderer.render(@notification, @recipient, 'api/v1/notifications/show'))) end def send_email diff --git a/app/services/remove_status_service.rb b/app/services/remove_status_service.rb index cf1f432e4..e19fdd030 100644 --- a/app/services/remove_status_service.rb +++ b/app/services/remove_status_service.rb @@ -65,17 +65,17 @@ class RemoveStatusService < BaseService redis.zremrangebyscore(FeedManager.instance.key(type, receiver.id), status.id, status.id) end - FeedManager.instance.broadcast(receiver.id, event: 'delete', payload: status.id) + Redis.current.publish(receiver.id, Oj.dump(event: :delete, payload: status.id)) end def remove_from_hashtags(status) status.tags.each do |tag| - FeedManager.instance.broadcast("hashtag:#{tag.name}", event: 'delete', payload: status.id) + Redis.current.publish("hashtag:#{tag.name}", Oj.dump(event: :delete, payload: status.id)) end end def remove_from_public(status) - FeedManager.instance.broadcast(:public, event: 'delete', payload: status.id) + Redis.current.publish('public', Oj.dump(event: :delete, payload: status.id)) end def redis diff --git a/app/workers/push_update_worker.rb b/app/workers/push_update_worker.rb index 166a9b449..fbcdcf634 100644 --- a/app/workers/push_update_worker.rb +++ b/app/workers/push_update_worker.rb @@ -3,19 +3,12 @@ class PushUpdateWorker include Sidekiq::Worker - def perform(timeline, account_id, status_id) + def perform(account_id, status_id) account = Account.find(account_id) status = Status.find(status_id) - - message = Rabl::Renderer.new( - 'api/v1/statuses/show', - status, - view_path: 'app/views', - format: :json, - scope: InlineRablScope.new(account) - ) + message = InlineRenderer.render(status, account, 'api/v1/statuses/show') - Redis.current.publish("timeline:#{account.id}", Oj.dump({ event: :update, payload: message, queued_at: (Time.now.to_f * 1000.0).to_i })) + Redis.current.publish("timeline:#{account.id}", Oj.dump(event: :update, payload: message, queued_at: (Time.now.to_f * 1000.0).to_i)) rescue ActiveRecord::RecordNotFound true end -- cgit From 5442083b3c44c731679fc489568bf7f70a807a39 Mon Sep 17 00:00:00 2001 From: Eugen Rochko Date: Wed, 5 Apr 2017 21:41:50 +0200 Subject: Split SalmonWorker into smaller parts, move profile updating into another job --- app/services/follow_remote_account_service.rb | 16 ++++++---------- app/services/process_feed_service.rb | 6 +++--- app/services/process_interaction_service.rb | 10 +++------- app/workers/admin/suspension_worker.rb | 2 ++ app/workers/application_worker.rb | 2 ++ app/workers/distribution_worker.rb | 5 +---- app/workers/remote_profile_update_worker.rb | 20 ++++++++++++++++++++ app/workers/salmon_worker.rb | 2 +- spec/services/process_feed_service_spec.rb | 1 + 9 files changed, 39 insertions(+), 25 deletions(-) create mode 100644 app/workers/remote_profile_update_worker.rb (limited to 'app') diff --git a/app/services/follow_remote_account_service.rb b/app/services/follow_remote_account_service.rb index b39eafc70..936953429 100644 --- a/app/services/follow_remote_account_service.rb +++ b/app/services/follow_remote_account_service.rb @@ -45,13 +45,13 @@ class FollowRemoteAccountService < BaseService account.suspended = true if domain_block && domain_block.suspend? account.silenced = true if domain_block && domain_block.silence? - xml = get_feed(account.remote_url) - hubs = get_hubs(xml) + body, xml = get_feed(account.remote_url) + hubs = get_hubs(xml) account.uri = get_account_uri(xml) account.hub_url = hubs.first.attribute('href').value - get_profile(xml, account) + get_profile(body, account) account.save! account @@ -61,7 +61,7 @@ class FollowRemoteAccountService < BaseService def get_feed(url) response = http_client.get(Addressable::URI.parse(url)) - Nokogiri::XML(response) + [response.to_s, Nokogiri::XML(response)] end def get_hubs(xml) @@ -82,12 +82,8 @@ class FollowRemoteAccountService < BaseService author_uri.content end - def get_profile(xml, account) - update_remote_profile_service.call(xml.at_xpath('/xmlns:feed'), account) - end - - def update_remote_profile_service - @update_remote_profile_service ||= UpdateRemoteProfileService.new + def get_profile(body, account) + RemoteProfileUpdateWorker.perform_async(account.id, body.force_encoding('UTF-8'), false) end def http_client diff --git a/app/services/process_feed_service.rb b/app/services/process_feed_service.rb index 69911abc5..cf2f7a826 100644 --- a/app/services/process_feed_service.rb +++ b/app/services/process_feed_service.rb @@ -5,15 +5,15 @@ class ProcessFeedService < BaseService xml = Nokogiri::XML(body) xml.encoding = 'utf-8' - update_author(xml, account) + update_author(body, xml, account) process_entries(xml, account) end private - def update_author(xml, account) + def update_author(body, xml, account) return if xml.at_xpath('/xmlns:feed', xmlns: TagManager::XMLNS).nil? - UpdateRemoteProfileService.new.call(xml.at_xpath('/xmlns:feed', xmlns: TagManager::XMLNS), account, true) + RemoteProfileUpdateWorker.perform_async(account.id, body.force_encoding('UTF-8'), true) end def process_entries(xml, account) diff --git a/app/services/process_interaction_service.rb b/app/services/process_interaction_service.rb index d5f7b4b3c..805ca5a27 100644 --- a/app/services/process_interaction_service.rb +++ b/app/services/process_interaction_service.rb @@ -24,7 +24,7 @@ class ProcessInteractionService < BaseService return if account.suspended? if salmon.verify(envelope, account.keypair) - update_remote_profile_service.call(xml.at_xpath('/xmlns:entry', xmlns: TagManager::XMLNS), account, true) + RemoteProfileUpdateWorker.perform_async(account.id, body.force_encoding('UTF-8'), true) case verb(xml) when :follow @@ -114,7 +114,7 @@ class ProcessInteractionService < BaseService return if status.nil? - remove_status_service.call(status) if account.id == status.account_id + RemovalWorker.perform_async(status.id) if account.id == status.account_id end def favourite!(xml, from_account) @@ -130,7 +130,7 @@ class ProcessInteractionService < BaseService end def add_post!(body, account) - process_feed_service.call(body, account) + ProcessingWorker.perform_async(account.id, body.force_encoding('UTF-8')) end def status(xml) @@ -153,10 +153,6 @@ class ProcessInteractionService < BaseService @process_feed_service ||= ProcessFeedService.new end - def update_remote_profile_service - @update_remote_profile_service ||= UpdateRemoteProfileService.new - end - def remove_status_service @remove_status_service ||= RemoveStatusService.new end diff --git a/app/workers/admin/suspension_worker.rb b/app/workers/admin/suspension_worker.rb index 38761f3b9..7ef2b35ec 100644 --- a/app/workers/admin/suspension_worker.rb +++ b/app/workers/admin/suspension_worker.rb @@ -3,6 +3,8 @@ class Admin::SuspensionWorker include Sidekiq::Worker + sidekiq_options queue: 'pull' + def perform(account_id) SuspendAccountService.new.call(Account.find(account_id)) end diff --git a/app/workers/application_worker.rb b/app/workers/application_worker.rb index f2d7c1062..436f24763 100644 --- a/app/workers/application_worker.rb +++ b/app/workers/application_worker.rb @@ -1,3 +1,5 @@ +# frozen_string_literal: true + class ApplicationWorker def info(message) Rails.logger.info("#{self.class.name} - #{message}") diff --git a/app/workers/distribution_worker.rb b/app/workers/distribution_worker.rb index 9a2867ea6..f7953689b 100644 --- a/app/workers/distribution_worker.rb +++ b/app/workers/distribution_worker.rb @@ -4,10 +4,7 @@ class DistributionWorker < ApplicationWorker include Sidekiq::Worker def perform(status_id) - status = Status.find(status_id) - - FanOutOnWriteService.new.call(status) - WarmCacheService.new.call(status) + FanOutOnWriteService.new.call(Status.find(status_id)) rescue ActiveRecord::RecordNotFound info("Couldn't find the status") end diff --git a/app/workers/remote_profile_update_worker.rb b/app/workers/remote_profile_update_worker.rb new file mode 100644 index 000000000..b91dc3466 --- /dev/null +++ b/app/workers/remote_profile_update_worker.rb @@ -0,0 +1,20 @@ +# frozen_string_literal: true + +class RemoteProfileUpdateWorker + include Sidekiq::Worker + + sidekiq_options queue: 'pull' + + def perform(account_id, body, resubscribe) + account = Account.find(account_id) + + xml = Nokogiri::XML(body) + xml.encoding = 'utf-8' + + author_container = xml.at_xpath('/xmlns:feed', xmlns: TagManager::XMLNS) || xml.at_xpath('/xmlns:entry', xmlns: TagManager::XMLNS) + + UpdateRemoteProfileService.new.call(author_container, account, resubscribe) + rescue ActiveRecord::RecordNotFound + true + end +end diff --git a/app/workers/salmon_worker.rb b/app/workers/salmon_worker.rb index fc95ce47f..d37d40432 100644 --- a/app/workers/salmon_worker.rb +++ b/app/workers/salmon_worker.rb @@ -7,7 +7,7 @@ class SalmonWorker def perform(account_id, body) ProcessInteractionService.new.call(body, Account.find(account_id)) - rescue ActiveRecord::RecordNotFound + rescue Nokogiri::XML::XPath::SyntaxError, ActiveRecord::RecordNotFound true end end diff --git a/spec/services/process_feed_service_spec.rb b/spec/services/process_feed_service_spec.rb index 5e57d823b..b15284fee 100644 --- a/spec/services/process_feed_service_spec.rb +++ b/spec/services/process_feed_service_spec.rb @@ -16,6 +16,7 @@ RSpec.describe ProcessFeedService do end it 'updates remote user\'s account information' do + account.reload expect(account.display_name).to eq '::1' expect(account).to have_attached_file(:avatar) end -- cgit From 540d6efe88717b0da4a23b5069a3caaff5b3241a Mon Sep 17 00:00:00 2001 From: blackle Date: Wed, 5 Apr 2017 20:04:13 -0400 Subject: Catch more errors in process_follows so it doesn't fail --- app/workers/import_worker.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'app') diff --git a/app/workers/import_worker.rb b/app/workers/import_worker.rb index 7cf29fb53..d5a33cada 100644 --- a/app/workers/import_worker.rb +++ b/app/workers/import_worker.rb @@ -46,7 +46,7 @@ class ImportWorker begin FollowService.new.call(from_account, row[0]) - rescue Goldfinger::Error, HTTP::Error, OpenSSL::SSL::SSLError + rescue Mastodon::NotPermittedError, ActiveRecord::RecordNotFound, Goldfinger::Error, HTTP::Error, OpenSSL::SSL::SSLError next end end -- cgit From dbd529109ea95df43aefa514c312d7397e7fc713 Mon Sep 17 00:00:00 2001 From: Eugen Rochko Date: Thu, 6 Apr 2017 02:26:59 +0200 Subject: Fix notifications delivered to wrong pubsub channel, optimized RemoveStatusService, slightly optimized FanOutOnWriteService again --- app/services/fan_out_on_write_service.rb | 9 +++++---- app/services/notify_service.rb | 2 +- app/services/remove_status_service.rb | 28 +++++++++++++++------------- 3 files changed, 21 insertions(+), 18 deletions(-) (limited to 'app') diff --git a/app/services/fan_out_on_write_service.rb b/app/services/fan_out_on_write_service.rb index c63fcc1fe..a25c20c93 100644 --- a/app/services/fan_out_on_write_service.rb +++ b/app/services/fan_out_on_write_service.rb @@ -51,21 +51,22 @@ class FanOutOnWriteService < BaseService def render_anonymous_payload(status) @payload = InlineRenderer.render(status, nil, 'api/v1/statuses/show') + @payload = Oj.dump(event: :update, payload: @payload) end def deliver_to_hashtags(status) Rails.logger.debug "Delivering status #{status.id} to hashtags" status.tags.pluck(:name).each do |hashtag| - Redis.current.publish("hashtag:#{hashtag}", Oj.dump(event: :update, payload: @payload)) - Redis.current.publish("hashtag:#{hashtag}:local", Oj.dump(event: :update, payload: @payload)) if status.account.local? + Redis.current.publish("hashtag:#{hashtag}", @payload) + Redis.current.publish("hashtag:#{hashtag}:local", @payload) if status.local? end end def deliver_to_public(status) Rails.logger.debug "Delivering status #{status.id} to public timeline" - Redis.current.publish('public', Oj.dump(event: 'update', payload: @payload)) - Redis.current.publish('public:local', Oj.dump(event: 'update', payload: @payload)) if status.account.local? + Redis.current.publish('public', @payload) + Redis.current.publish('public:local', @payload) if status.local? end end diff --git a/app/services/notify_service.rb b/app/services/notify_service.rb index 62508a049..ffeee5fcf 100644 --- a/app/services/notify_service.rb +++ b/app/services/notify_service.rb @@ -50,7 +50,7 @@ class NotifyService < BaseService def create_notification @notification.save! return unless @notification.browserable? - Redis.current.publish(@recipient.id, Oj.dump(event: :notification, payload: InlineRenderer.render(@notification, @recipient, 'api/v1/notifications/show'))) + Redis.current.publish("timeline:#{@recipient.id}", Oj.dump(event: :notification, payload: InlineRenderer.render(@notification, @recipient, 'api/v1/notifications/show'))) end def send_email diff --git a/app/services/remove_status_service.rb b/app/services/remove_status_service.rb index e19fdd030..60ce9987c 100644 --- a/app/services/remove_status_service.rb +++ b/app/services/remove_status_service.rb @@ -4,6 +4,8 @@ class RemoveStatusService < BaseService include StreamEntryRenderer def call(status) + @payload = Oj.dump(event: :delete, payload: status.id) + remove_from_self(status) if status.account.local? remove_from_followers(status) remove_from_mentioned(status) @@ -25,25 +27,23 @@ class RemoveStatusService < BaseService end def remove_from_followers(status) - status.account.followers.each do |follower| - next unless follower.local? + status.account.followers.where(domain: nil).each do |follower| unpush(:home, follower, status) end end def remove_from_mentioned(status) + return unless status.local? notified_domains = [] status.mentions.each do |mention| mentioned_account = mention.account - if mentioned_account.local? - unpush(:mentions, mentioned_account, status) - else - next if notified_domains.include?(mentioned_account.domain) - notified_domains << mentioned_account.domain - send_delete_salmon(mentioned_account, status) - end + next if mentioned_account.local? + next if notified_domains.include?(mentioned_account.domain) + + notified_domains << mentioned_account.domain + send_delete_salmon(mentioned_account, status) end end @@ -65,17 +65,19 @@ class RemoveStatusService < BaseService redis.zremrangebyscore(FeedManager.instance.key(type, receiver.id), status.id, status.id) end - Redis.current.publish(receiver.id, Oj.dump(event: :delete, payload: status.id)) + Redis.current.publish("timeline:#{receiver.id}", @payload) end def remove_from_hashtags(status) - status.tags.each do |tag| - Redis.current.publish("hashtag:#{tag.name}", Oj.dump(event: :delete, payload: status.id)) + status.tags.pluck(:name) do |hashtag| + Redis.current.publish("hashtag:#{hashtag}", @payload) + Redis.current.publish("hashtag:#{hashtag}:local", @payload) if status.local? end end def remove_from_public(status) - Redis.current.publish('public', Oj.dump(event: :delete, payload: status.id)) + Redis.current.publish('public', @payload) + Redis.current.publish('public:local', @payload) if status.local? end def redis -- cgit From 51d7caaf1994ee3cb531b64841e12d129dded298 Mon Sep 17 00:00:00 2001 From: Eugen Rochko Date: Thu, 6 Apr 2017 04:03:23 +0200 Subject: Fix wrong pubsub channel on public timelines --- app/services/fan_out_on_write_service.rb | 8 ++++---- app/services/remove_status_service.rb | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) (limited to 'app') diff --git a/app/services/fan_out_on_write_service.rb b/app/services/fan_out_on_write_service.rb index a25c20c93..19eedc0a7 100644 --- a/app/services/fan_out_on_write_service.rb +++ b/app/services/fan_out_on_write_service.rb @@ -58,15 +58,15 @@ class FanOutOnWriteService < BaseService Rails.logger.debug "Delivering status #{status.id} to hashtags" status.tags.pluck(:name).each do |hashtag| - Redis.current.publish("hashtag:#{hashtag}", @payload) - Redis.current.publish("hashtag:#{hashtag}:local", @payload) if status.local? + Redis.current.publish("timeline:hashtag:#{hashtag}", @payload) + Redis.current.publish("timeline:hashtag:#{hashtag}:local", @payload) if status.local? end end def deliver_to_public(status) Rails.logger.debug "Delivering status #{status.id} to public timeline" - Redis.current.publish('public', @payload) - Redis.current.publish('public:local', @payload) if status.local? + Redis.current.publish('timeline:public', @payload) + Redis.current.publish('timeline:public:local', @payload) if status.local? end end diff --git a/app/services/remove_status_service.rb b/app/services/remove_status_service.rb index 60ce9987c..50bb7fc97 100644 --- a/app/services/remove_status_service.rb +++ b/app/services/remove_status_service.rb @@ -70,14 +70,14 @@ class RemoveStatusService < BaseService def remove_from_hashtags(status) status.tags.pluck(:name) do |hashtag| - Redis.current.publish("hashtag:#{hashtag}", @payload) - Redis.current.publish("hashtag:#{hashtag}:local", @payload) if status.local? + Redis.current.publish("timeline:hashtag:#{hashtag}", @payload) + Redis.current.publish("timeline:hashtag:#{hashtag}:local", @payload) if status.local? end end def remove_from_public(status) - Redis.current.publish('public', @payload) - Redis.current.publish('public:local', @payload) if status.local? + Redis.current.publish('timeline:public', @payload) + Redis.current.publish('timeline:public:local', @payload) if status.local? end def redis -- cgit