From b7c1b12367b307d07303ce99f2c27bf255ecd56a Mon Sep 17 00:00:00 2001 From: Eugen Rochko Date: Mon, 3 Apr 2017 18:55:06 +0200 Subject: Make default admin UI page reports. Add admin UI for creating a domain block --- app/workers/domain_block_worker.rb | 11 +++++++++++ 1 file changed, 11 insertions(+) create mode 100644 app/workers/domain_block_worker.rb (limited to 'app/workers') diff --git a/app/workers/domain_block_worker.rb b/app/workers/domain_block_worker.rb new file mode 100644 index 000000000..884477829 --- /dev/null +++ b/app/workers/domain_block_worker.rb @@ -0,0 +1,11 @@ +# frozen_string_literal: true + +class DomainBlockWorker + include Sidekiq::Worker + + def perform(domain_block_id) + BlockDomainService.new.call(DomainBlock.find(domain_block_id)) + rescue ActiveRecord::RecordNotFound + true + end +end -- cgit From f722bd2387df9163760014e9555928ec487ae95f Mon Sep 17 00:00:00 2001 From: Eugen Rochko Date: Tue, 4 Apr 2017 00:53:20 +0200 Subject: Separate background jobs into different queues. ATTENTION: new queue "pull" must be added to the Sidekiq invokation in your systemd file The pull queue will handle link crawling, thread resolving, and OStatus processing. Such tasks are more likely to hang for a longer time (due to network requests) so it is more sensible to not make the "in-house" tasks wait for them. --- app/workers/after_remote_follow_request_worker.rb | 2 +- app/workers/after_remote_follow_worker.rb | 2 +- app/workers/import_worker.rb | 2 +- app/workers/link_crawl_worker.rb | 2 +- app/workers/merge_worker.rb | 2 ++ app/workers/notification_worker.rb | 2 +- app/workers/processing_worker.rb | 2 +- app/workers/regeneration_worker.rb | 2 ++ app/workers/salmon_worker.rb | 2 +- app/workers/thread_resolve_worker.rb | 2 +- app/workers/unmerge_worker.rb | 2 ++ docker-compose.yml | 2 +- docs/Running-Mastodon/Production-guide.md | 2 +- 13 files changed, 16 insertions(+), 10 deletions(-) (limited to 'app/workers') diff --git a/app/workers/after_remote_follow_request_worker.rb b/app/workers/after_remote_follow_request_worker.rb index f1d6869cc..1f2db3061 100644 --- a/app/workers/after_remote_follow_request_worker.rb +++ b/app/workers/after_remote_follow_request_worker.rb @@ -3,7 +3,7 @@ class AfterRemoteFollowRequestWorker include Sidekiq::Worker - sidekiq_options retry: 5 + sidekiq_options queue: 'pull', retry: 5 def perform(follow_request_id) follow_request = FollowRequest.find(follow_request_id) diff --git a/app/workers/after_remote_follow_worker.rb b/app/workers/after_remote_follow_worker.rb index 0d04456a9..bdd2c2a91 100644 --- a/app/workers/after_remote_follow_worker.rb +++ b/app/workers/after_remote_follow_worker.rb @@ -3,7 +3,7 @@ class AfterRemoteFollowWorker include Sidekiq::Worker - sidekiq_options retry: 5 + sidekiq_options queue: 'pull', retry: 5 def perform(follow_id) follow = Follow.find(follow_id) diff --git a/app/workers/import_worker.rb b/app/workers/import_worker.rb index a3ae2a85a..7cf29fb53 100644 --- a/app/workers/import_worker.rb +++ b/app/workers/import_worker.rb @@ -5,7 +5,7 @@ require 'csv' class ImportWorker include Sidekiq::Worker - sidekiq_options retry: false + sidekiq_options queue: 'pull', retry: false def perform(import_id) import = Import.find(import_id) diff --git a/app/workers/link_crawl_worker.rb b/app/workers/link_crawl_worker.rb index af3394b8b..834b0088b 100644 --- a/app/workers/link_crawl_worker.rb +++ b/app/workers/link_crawl_worker.rb @@ -3,7 +3,7 @@ class LinkCrawlWorker include Sidekiq::Worker - sidekiq_options retry: false + sidekiq_options queue: 'pull', retry: false def perform(status_id) FetchLinkCardService.new.call(Status.find(status_id)) diff --git a/app/workers/merge_worker.rb b/app/workers/merge_worker.rb index 0f288f43f..d745cb99c 100644 --- a/app/workers/merge_worker.rb +++ b/app/workers/merge_worker.rb @@ -3,6 +3,8 @@ class MergeWorker include Sidekiq::Worker + sidekiq_options queue: 'pull' + def perform(from_account_id, into_account_id) FeedManager.instance.merge_into_timeline(Account.find(from_account_id), Account.find(into_account_id)) end diff --git a/app/workers/notification_worker.rb b/app/workers/notification_worker.rb index 1a2faefd8..da1d6ab45 100644 --- a/app/workers/notification_worker.rb +++ b/app/workers/notification_worker.rb @@ -3,7 +3,7 @@ class NotificationWorker include Sidekiq::Worker - sidekiq_options retry: 5 + sidekiq_options queue: 'push', retry: 5 def perform(xml, source_account_id, target_account_id) SendInteractionService.new.call(xml, Account.find(source_account_id), Account.find(target_account_id)) diff --git a/app/workers/processing_worker.rb b/app/workers/processing_worker.rb index 5df404bcc..4a467d924 100644 --- a/app/workers/processing_worker.rb +++ b/app/workers/processing_worker.rb @@ -3,7 +3,7 @@ class ProcessingWorker include Sidekiq::Worker - sidekiq_options backtrace: true + sidekiq_options queue: 'pull', backtrace: true def perform(account_id, body) ProcessFeedService.new.call(body, Account.find(account_id)) diff --git a/app/workers/regeneration_worker.rb b/app/workers/regeneration_worker.rb index 3aece0ba2..289b63d84 100644 --- a/app/workers/regeneration_worker.rb +++ b/app/workers/regeneration_worker.rb @@ -3,6 +3,8 @@ class RegenerationWorker include Sidekiq::Worker + sidekiq_options queue: 'pull', backtrace: true + def perform(account_id, timeline_type) PrecomputeFeedService.new.call(timeline_type, Account.find(account_id)) end diff --git a/app/workers/salmon_worker.rb b/app/workers/salmon_worker.rb index fc95ce47f..2888b574b 100644 --- a/app/workers/salmon_worker.rb +++ b/app/workers/salmon_worker.rb @@ -3,7 +3,7 @@ class SalmonWorker include Sidekiq::Worker - sidekiq_options backtrace: true + sidekiq_options queue: 'pull', backtrace: true def perform(account_id, body) ProcessInteractionService.new.call(body, Account.find(account_id)) diff --git a/app/workers/thread_resolve_worker.rb b/app/workers/thread_resolve_worker.rb index 593edd032..38287e8e6 100644 --- a/app/workers/thread_resolve_worker.rb +++ b/app/workers/thread_resolve_worker.rb @@ -3,7 +3,7 @@ class ThreadResolveWorker include Sidekiq::Worker - sidekiq_options retry: false + sidekiq_options queue: 'pull', retry: false def perform(child_status_id, parent_url) child_status = Status.find(child_status_id) diff --git a/app/workers/unmerge_worker.rb b/app/workers/unmerge_worker.rb index dbf7243de..ea6aacebf 100644 --- a/app/workers/unmerge_worker.rb +++ b/app/workers/unmerge_worker.rb @@ -3,6 +3,8 @@ class UnmergeWorker include Sidekiq::Worker + sidekiq_options queue: 'pull' + def perform(from_account_id, into_account_id) FeedManager.instance.unmerge_from_timeline(Account.find(from_account_id), Account.find(into_account_id)) end diff --git a/docker-compose.yml b/docker-compose.yml index 68c8ef960..d6ba66dde 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -33,7 +33,7 @@ services: restart: always build: . env_file: .env.production - command: bundle exec sidekiq -q default -q mailers -q push + command: bundle exec sidekiq -q default -q mailers -q pull -q push depends_on: - db - redis diff --git a/docs/Running-Mastodon/Production-guide.md b/docs/Running-Mastodon/Production-guide.md index f0dd7bd2b..469fefa94 100644 --- a/docs/Running-Mastodon/Production-guide.md +++ b/docs/Running-Mastodon/Production-guide.md @@ -180,7 +180,7 @@ User=mastodon WorkingDirectory=/home/mastodon/live Environment="RAILS_ENV=production" Environment="DB_POOL=5" -ExecStart=/home/mastodon/.rbenv/shims/bundle exec sidekiq -c 5 -q default -q mailers -q push +ExecStart=/home/mastodon/.rbenv/shims/bundle exec sidekiq -c 5 -q default -q mailers -q pull -q push TimeoutSec=15 Restart=always -- cgit From b510a56c0c3d8c1a48bb295a85b688af94466723 Mon Sep 17 00:00:00 2001 From: Eugen Rochko Date: Tue, 4 Apr 2017 02:00:10 +0200 Subject: Only call regeneration worker after first login after a 14 day break --- app/controllers/application_controller.rb | 9 ++++++++- app/controllers/oauth/authorizations_controller.rb | 7 +++++++ app/models/feed.rb | 12 ++---------- app/workers/regeneration_worker.rb | 4 ++-- 4 files changed, 19 insertions(+), 13 deletions(-) (limited to 'app/workers') diff --git a/app/controllers/application_controller.rb b/app/controllers/application_controller.rb index ef9364897..c06142fd4 100644 --- a/app/controllers/application_controller.rb +++ b/app/controllers/application_controller.rb @@ -39,7 +39,14 @@ class ApplicationController < ActionController::Base end def set_user_activity - current_user.touch(:current_sign_in_at) if !current_user.nil? && (current_user.current_sign_in_at.nil? || current_user.current_sign_in_at < 24.hours.ago) + return unless !current_user.nil? && (current_user.current_sign_in_at.nil? || current_user.current_sign_in_at < 24.hours.ago) + + # Mark user as signed-in today + current_user.update_tracked_fields(request) + + # If the sign in is after a two week break, we need to regenerate their feed + RegenerationWorker.perform_async(current_user.account_id) if current_user.last_sign_in_at < 14.days.ago + return end def check_suspension diff --git a/app/controllers/oauth/authorizations_controller.rb b/app/controllers/oauth/authorizations_controller.rb index feaad04f6..7c25266d8 100644 --- a/app/controllers/oauth/authorizations_controller.rb +++ b/app/controllers/oauth/authorizations_controller.rb @@ -3,6 +3,7 @@ class Oauth::AuthorizationsController < Doorkeeper::AuthorizationsController skip_before_action :authenticate_resource_owner! + before_action :set_locale before_action :store_current_location before_action :authenticate_resource_owner! @@ -11,4 +12,10 @@ class Oauth::AuthorizationsController < Doorkeeper::AuthorizationsController def store_current_location store_location_for(:user, request.url) end + + def set_locale + I18n.locale = current_user.try(:locale) || I18n.default_locale + rescue I18n::InvalidLocale + I18n.locale = I18n.default_locale + end end diff --git a/app/models/feed.rb b/app/models/feed.rb index 5e1905e15..3cbc160a0 100644 --- a/app/models/feed.rb +++ b/app/models/feed.rb @@ -10,17 +10,9 @@ class Feed max_id = '+inf' if max_id.blank? since_id = '-inf' if since_id.blank? unhydrated = redis.zrevrangebyscore(key, "(#{max_id}", "(#{since_id}", limit: [0, limit], with_scores: true).map(&:last).map(&:to_i) + status_map = Status.where(id: unhydrated).cache_ids.map { |s| [s.id, s] }.to_h - # If we're after most recent items and none are there, we need to precompute the feed - if unhydrated.empty? && max_id == '+inf' && since_id == '-inf' - RegenerationWorker.perform_async(@account.id, @type) - @statuses = Status.send("as_#{@type}_timeline", @account).cache_ids.paginate_by_max_id(limit, nil, nil) - else - status_map = Status.where(id: unhydrated).cache_ids.map { |s| [s.id, s] }.to_h - @statuses = unhydrated.map { |id| status_map[id] }.compact - end - - @statuses + unhydrated.map { |id| status_map[id] }.compact end private diff --git a/app/workers/regeneration_worker.rb b/app/workers/regeneration_worker.rb index 289b63d84..82665b581 100644 --- a/app/workers/regeneration_worker.rb +++ b/app/workers/regeneration_worker.rb @@ -5,7 +5,7 @@ class RegenerationWorker sidekiq_options queue: 'pull', backtrace: true - def perform(account_id, timeline_type) - PrecomputeFeedService.new.call(timeline_type, Account.find(account_id)) + def perform(account_id, _ = :home) + PrecomputeFeedService.new.call(:home, Account.find(account_id)) end end -- cgit From b21f7c28f6832817d5de616ab0c4c2d3c28d90b0 Mon Sep 17 00:00:00 2001 From: Eugen Rochko Date: Tue, 4 Apr 2017 13:02:49 +0200 Subject: Move OStatus processing back into default queue --- app/workers/processing_worker.rb | 2 +- app/workers/salmon_worker.rb | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) (limited to 'app/workers') diff --git a/app/workers/processing_worker.rb b/app/workers/processing_worker.rb index 4a467d924..5df404bcc 100644 --- a/app/workers/processing_worker.rb +++ b/app/workers/processing_worker.rb @@ -3,7 +3,7 @@ class ProcessingWorker include Sidekiq::Worker - sidekiq_options queue: 'pull', backtrace: true + sidekiq_options backtrace: true def perform(account_id, body) ProcessFeedService.new.call(body, Account.find(account_id)) diff --git a/app/workers/salmon_worker.rb b/app/workers/salmon_worker.rb index 2888b574b..fc95ce47f 100644 --- a/app/workers/salmon_worker.rb +++ b/app/workers/salmon_worker.rb @@ -3,7 +3,7 @@ class SalmonWorker include Sidekiq::Worker - sidekiq_options queue: 'pull', backtrace: true + sidekiq_options backtrace: true def perform(account_id, body) ProcessInteractionService.new.call(body, Account.find(account_id)) -- cgit From 82aaedec467815c2947a11651d5216bb88ce4038 Mon Sep 17 00:00:00 2001 From: Eugen Rochko Date: Tue, 4 Apr 2017 13:58:34 +0200 Subject: Reduce number of items in feeds, optimize regeneration worker slightly, make regeneration worker unique, (only schedule/execute once at a time) --- Gemfile | 2 ++ Gemfile.lock | 9 +++++++++ app/lib/feed_manager.rb | 6 +++--- app/services/precompute_feed_service.rb | 8 +++++--- app/workers/regeneration_worker.rb | 2 +- 5 files changed, 20 insertions(+), 7 deletions(-) (limited to 'app/workers') diff --git a/Gemfile b/Gemfile index cb9824131..41c636904 100644 --- a/Gemfile +++ b/Gemfile @@ -46,6 +46,8 @@ gem 'will_paginate' gem 'rack-attack' gem 'rack-cors', require: 'rack/cors' gem 'sidekiq' +gem 'sidekiq-unique-jobs' +gem 'sidekiq-merger' gem 'rails-settings-cached' gem 'simple-navigation' gem 'statsd-instrument' diff --git a/Gemfile.lock b/Gemfile.lock index 6e3115249..27de1bee0 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -387,6 +387,13 @@ GEM connection_pool (~> 2.2, >= 2.2.0) rack-protection (>= 1.5.0) redis (~> 3.2, >= 3.2.1) + sidekiq-merger (0.0.11) + activesupport (>= 3.2, < 6) + concurrent-ruby (~> 1.0) + sidekiq (>= 3.4, < 5) + sidekiq-unique-jobs (4.0.18) + sidekiq (>= 2.6) + thor simple-navigation (4.0.3) activesupport (>= 2.3.2) simple_form (3.2.1) @@ -510,6 +517,8 @@ DEPENDENCIES sass-rails (~> 5.0) sdoc (~> 0.4.0) sidekiq + sidekiq-merger + sidekiq-unique-jobs simple-navigation simple_form simplecov diff --git a/app/lib/feed_manager.rb b/app/lib/feed_manager.rb index 919bc3df9..a2efcce10 100644 --- a/app/lib/feed_manager.rb +++ b/app/lib/feed_manager.rb @@ -5,7 +5,7 @@ require 'singleton' class FeedManager include Singleton - MAX_ITEMS = 800 + MAX_ITEMS = 400 def key(type, id) "feed:#{type}:#{id}" @@ -50,9 +50,9 @@ class FeedManager def merge_into_timeline(from_account, into_account) timeline_key = key(:home, into_account.id) - query = from_account.statuses.limit(MAX_ITEMS) + query = from_account.statuses.limit(FeedManager::MAX_ITEMS / 4) - if redis.zcard(timeline_key) >= FeedManager::MAX_ITEMS + if redis.zcard(timeline_key) >= FeedManager::MAX_ITEMS / 4 oldest_home_score = redis.zrange(timeline_key, 0, 0, with_scores: true)&.first&.last&.to_i || 0 query = query.where('id > ?', oldest_home_score) end diff --git a/app/services/precompute_feed_service.rb b/app/services/precompute_feed_service.rb index e1ec56e8d..a57c401d0 100644 --- a/app/services/precompute_feed_service.rb +++ b/app/services/precompute_feed_service.rb @@ -5,9 +5,11 @@ class PrecomputeFeedService < BaseService # @param [Symbol] type :home or :mentions # @param [Account] account def call(_, account) - Status.as_home_timeline(account).limit(FeedManager::MAX_ITEMS).each do |status| - next if status.direct_visibility? || FeedManager.instance.filter?(:home, status, account) - redis.zadd(FeedManager.instance.key(:home, account.id), status.id, status.reblog? ? status.reblog_of_id : status.id) + redis.pipelined do + Status.as_home_timeline(account).limit(FeedManager::MAX_ITEMS / 4).each do |status| + next if status.direct_visibility? || FeedManager.instance.filter?(:home, status, account) + redis.zadd(FeedManager.instance.key(:home, account.id), status.id, status.reblog? ? status.reblog_of_id : status.id) + end end end diff --git a/app/workers/regeneration_worker.rb b/app/workers/regeneration_worker.rb index 82665b581..da8b845f6 100644 --- a/app/workers/regeneration_worker.rb +++ b/app/workers/regeneration_worker.rb @@ -3,7 +3,7 @@ class RegenerationWorker include Sidekiq::Worker - sidekiq_options queue: 'pull', backtrace: true + sidekiq_options queue: 'pull', backtrace: true, unique: :until_executed def perform(account_id, _ = :home) PrecomputeFeedService.new.call(:home, Account.find(account_id)) -- cgit From 6fd865c0004efbf11ee87c06fea9f48af567fabe Mon Sep 17 00:00:00 2001 From: Eugen Rochko Date: Tue, 4 Apr 2017 19:21:37 +0200 Subject: Spawn FeedInsertWorker to deliver status into personal feed --- app/lib/feed_manager.rb | 32 ++++++++++++++++---------------- app/services/fan_out_on_write_service.rb | 13 ++++++------- app/services/notify_service.rb | 2 +- app/services/precompute_feed_service.rb | 2 +- app/workers/feed_insert_worker.rb | 15 +++++++++++++++ 5 files changed, 39 insertions(+), 25 deletions(-) create mode 100644 app/workers/feed_insert_worker.rb (limited to 'app/workers') diff --git a/app/lib/feed_manager.rb b/app/lib/feed_manager.rb index a2efcce10..28e712704 100644 --- a/app/lib/feed_manager.rb +++ b/app/lib/feed_manager.rb @@ -11,11 +11,11 @@ class FeedManager "feed:#{type}:#{id}" end - def filter?(timeline_type, status, receiver) + def filter?(timeline_type, status, receiver_id) if timeline_type == :home - filter_from_home?(status, receiver) + filter_from_home?(status, receiver_id) elsif timeline_type == :mentions - filter_from_mentions?(status, receiver) + filter_from_mentions?(status, receiver_id) else false end @@ -91,39 +91,39 @@ class FeedManager Redis.current end - def filter_from_home?(status, receiver) + def filter_from_home?(status, receiver_id) return true if status.reply? && status.in_reply_to_id.nil? check_for_mutes = [status.account_id] check_for_mutes.concat([status.reblog.account_id]) if status.reblog? - return true if receiver.muting?(check_for_mutes) + return true if Mute.where(account_id: receiver_id, target_account_id: check_for_mutes).any? check_for_blocks = status.mentions.map(&:account_id) check_for_blocks.concat([status.reblog.account_id]) if status.reblog? - return true if receiver.blocking?(check_for_blocks) + return true if Block.where(account_id: receiver_id, target_account_id: check_for_blocks).any? - if status.reply? && !status.in_reply_to_account_id.nil? # Filter out if it's a reply - should_filter = !receiver.following?(status.in_reply_to_account) # and I'm not following the person it's a reply to - should_filter &&= !(receiver.id == status.in_reply_to_account_id) # and it's not a reply to me - should_filter &&= !(status.account_id == status.in_reply_to_account_id) # and it's not a self-reply + if status.reply? && !status.in_reply_to_account_id.nil? # Filter out if it's a reply + should_filter = !Follow.where(account_id: receiver_id, target_account_id: status.in_reply_to_account_id).exists? # and I'm not following the person it's a reply to + should_filter &&= !(receiver_id == status.in_reply_to_account_id) # and it's not a reply to me + should_filter &&= !(status.account_id == status.in_reply_to_account_id) # and it's not a self-reply return should_filter - elsif status.reblog? # Filter out a reblog - return status.reblog.account.blocking?(receiver) # or if the author of the reblogged status is blocking me + elsif status.reblog? # Filter out a reblog + return Block.where(account_id: status.reblog.account_id, target_account_id: receiver_id).exists? # or if the author of the reblogged status is blocking me end false end - def filter_from_mentions?(status, receiver) + def filter_from_mentions?(status, receiver_id) check_for_blocks = [status.account_id] check_for_blocks.concat(status.mentions.select('account_id').map(&:account_id)) check_for_blocks.concat([status.in_reply_to_account]) if status.reply? && !status.in_reply_to_account_id.nil? - should_filter = receiver.id == status.account_id # Filter if I'm mentioning myself - should_filter ||= receiver.blocking?(check_for_blocks) # or it's from someone I blocked, in reply to someone I blocked, or mentioning someone I blocked - should_filter ||= (status.account.silenced? && !receiver.following?(status.account)) # of if the account is silenced and I'm not following them + should_filter = receiver_id == status.account_id # Filter if I'm mentioning myself + should_filter ||= Block.where(account_id: receiver_id, target_account_id: check_for_blocks).any? # or it's from someone I blocked, in reply to someone I blocked, or mentioning someone I blocked + should_filter ||= (status.account.silenced? && !Follow.where(account_id: receiver_id, target_account_id: status.account_id).exists?) # of if the account is silenced and I'm not following them should_filter end diff --git a/app/services/fan_out_on_write_service.rb b/app/services/fan_out_on_write_service.rb index df404cbef..42222c25b 100644 --- a/app/services/fan_out_on_write_service.rb +++ b/app/services/fan_out_on_write_service.rb @@ -33,9 +33,8 @@ class FanOutOnWriteService < BaseService def deliver_to_followers(status) Rails.logger.debug "Delivering status #{status.id} to followers" - status.account.followers.where(domain: nil).joins(:user).where('users.current_sign_in_at > ?', 14.days.ago).find_each do |follower| - next if FeedManager.instance.filter?(:home, status, follower) - FeedManager.instance.push(:home, follower, status) + status.account.followers.where(domain: nil).joins(:user).where('users.current_sign_in_at > ?', 14.days.ago).select(:id).find_each do |follower| + FeedInsertWorker.perform_async(status.id, follower.id) end end @@ -44,7 +43,7 @@ class FanOutOnWriteService < BaseService status.mentions.includes(:account).each do |mention| mentioned_account = mention.account - next if !mentioned_account.local? || !mentioned_account.following?(status.account) || FeedManager.instance.filter?(:home, status, mentioned_account) + next if !mentioned_account.local? || !mentioned_account.following?(status.account) || FeedManager.instance.filter?(:home, status, mention.account_id) FeedManager.instance.push(:home, mentioned_account, status) end end @@ -54,9 +53,9 @@ class FanOutOnWriteService < BaseService payload = FeedManager.instance.inline_render(nil, 'api/v1/statuses/show', status) - status.tags.find_each do |tag| - FeedManager.instance.broadcast("hashtag:#{tag.name}", event: 'update', payload: payload) - FeedManager.instance.broadcast("hashtag:#{tag.name}:local", event: 'update', payload: payload) if status.account.local? + status.tags.pluck(:name).each do |hashtag| + FeedManager.instance.broadcast("hashtag:#{hashtag}", event: 'update', payload: payload) + FeedManager.instance.broadcast("hashtag:#{hashtag}:local", event: 'update', payload: payload) if status.account.local? end end diff --git a/app/services/notify_service.rb b/app/services/notify_service.rb index 942cd9d21..24486f220 100644 --- a/app/services/notify_service.rb +++ b/app/services/notify_service.rb @@ -17,7 +17,7 @@ class NotifyService < BaseService private def blocked_mention? - FeedManager.instance.filter?(:mentions, @notification.mention.status, @recipient) + FeedManager.instance.filter?(:mentions, @notification.mention.status, @recipient.id) end def blocked_favourite? diff --git a/app/services/precompute_feed_service.rb b/app/services/precompute_feed_service.rb index a57c401d0..07dcb81da 100644 --- a/app/services/precompute_feed_service.rb +++ b/app/services/precompute_feed_service.rb @@ -7,7 +7,7 @@ class PrecomputeFeedService < BaseService def call(_, account) redis.pipelined do Status.as_home_timeline(account).limit(FeedManager::MAX_ITEMS / 4).each do |status| - next if status.direct_visibility? || FeedManager.instance.filter?(:home, status, account) + next if status.direct_visibility? || FeedManager.instance.filter?(:home, status, account.id) redis.zadd(FeedManager.instance.key(:home, account.id), status.id, status.reblog? ? status.reblog_of_id : status.id) end end diff --git a/app/workers/feed_insert_worker.rb b/app/workers/feed_insert_worker.rb new file mode 100644 index 000000000..a58dfaa74 --- /dev/null +++ b/app/workers/feed_insert_worker.rb @@ -0,0 +1,15 @@ +# frozen_string_literal: true + +class FeedInsertWorker + include Sidekiq::Worker + + def perform(status_id, follower_id) + status = Status.find(status_id) + follower = Account.find(follower_id) + + return if FeedManager.instance.filter?(:home, status, follower.id) + FeedManager.instance.push(:home, follower, status) + rescue ActiveRecord::RecordNotFound + true + end +end -- cgit From bda37489ac5c14d18b1bb4290f2a2931dc8728c9 Mon Sep 17 00:00:00 2001 From: Eugen Rochko Date: Wed, 5 Apr 2017 02:32:18 +0200 Subject: Remove PuSH subscriptions when delivery is answered with a 4xx error --- app/workers/pubsubhubbub/delivery_worker.rb | 1 + 1 file changed, 1 insertion(+) (limited to 'app/workers') diff --git a/app/workers/pubsubhubbub/delivery_worker.rb b/app/workers/pubsubhubbub/delivery_worker.rb index 15005bc80..466def3a8 100644 --- a/app/workers/pubsubhubbub/delivery_worker.rb +++ b/app/workers/pubsubhubbub/delivery_worker.rb @@ -22,6 +22,7 @@ class Pubsubhubbub::DeliveryWorker .headers(headers) .post(subscription.callback_url, body: payload) + return subscription.destroy! if response.code > 299 && response.code < 500 && response.code != 429 # HTTP 4xx means error is not temporary, except for 429 (throttling) raise "Delivery failed for #{subscription.callback_url}: HTTP #{response.code}" unless response.code > 199 && response.code < 300 subscription.touch(:last_successful_delivery_at) -- cgit From 1e96ce378e2aa35ed7287a4a88e5165c2ee20101 Mon Sep 17 00:00:00 2001 From: Kurtis Rainbolt-Greene Date: Tue, 4 Apr 2017 20:16:53 -0700 Subject: By pushing this into a worker we can reduce the amount of time the feed manager using workers eat up a connection --- app/lib/feed_manager.rb | 2 +- app/workers/push_update_worker.rb | 13 +++++++++++++ 2 files changed, 14 insertions(+), 1 deletion(-) create mode 100644 app/workers/push_update_worker.rb (limited to 'app/workers') diff --git a/app/lib/feed_manager.rb b/app/lib/feed_manager.rb index 2cca1cefe..075f86c2d 100644 --- a/app/lib/feed_manager.rb +++ b/app/lib/feed_manager.rb @@ -34,7 +34,7 @@ class FeedManager trim(timeline_type, account.id) end - broadcast(account.id, event: 'update', payload: inline_render(account, 'api/v1/statuses/show', status)) + PushUpdateWorker.perform_async(timeline_type, account.id, status.id) end def broadcast(timeline_id, options = {}) diff --git a/app/workers/push_update_worker.rb b/app/workers/push_update_worker.rb new file mode 100644 index 000000000..3d398b5ac --- /dev/null +++ b/app/workers/push_update_worker.rb @@ -0,0 +1,13 @@ +# frozen_string_literal: true + +class PushUpdateWorker + include Sidekiq::Worker + + def perform(timeline, account_id, status_id) + account = Account.find(account_id) + status = Status.find(status_id) + message = inline_render(account, 'api/v1/statuses/show', status) + + broadcast(account_id, type: 'update', timeline: timeline, message: message) + end +end -- cgit From 96ef9338208e09cbc52a49a3d7171d877eab3c43 Mon Sep 17 00:00:00 2001 From: Kurtis Rainbolt-Greene Date: Tue, 4 Apr 2017 20:36:03 -0700 Subject: Replacing the broadcast method with the one defined in the feed manager --- app/workers/push_update_worker.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'app/workers') diff --git a/app/workers/push_update_worker.rb b/app/workers/push_update_worker.rb index 3d398b5ac..5b5e9f68a 100644 --- a/app/workers/push_update_worker.rb +++ b/app/workers/push_update_worker.rb @@ -8,6 +8,6 @@ class PushUpdateWorker status = Status.find(status_id) message = inline_render(account, 'api/v1/statuses/show', status) - broadcast(account_id, type: 'update', timeline: timeline, message: message) + ActionCable.server.broadcast("timeline:#{account_id}", type: 'update', timeline: timeline, message: message) end end -- cgit From 0069c01285d7fc6b97220fd678f6e5b82f301b1a Mon Sep 17 00:00:00 2001 From: Kurtis Rainbolt-Greene Date: Tue, 4 Apr 2017 20:39:14 -0700 Subject: Moving the queue_at into the worker --- app/workers/push_update_worker.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'app/workers') diff --git a/app/workers/push_update_worker.rb b/app/workers/push_update_worker.rb index 5b5e9f68a..6512e13ad 100644 --- a/app/workers/push_update_worker.rb +++ b/app/workers/push_update_worker.rb @@ -7,7 +7,7 @@ class PushUpdateWorker account = Account.find(account_id) status = Status.find(status_id) message = inline_render(account, 'api/v1/statuses/show', status) - + queue_at = (Time.now.to_f * 1000.0).to_i ActionCable.server.broadcast("timeline:#{account_id}", type: 'update', timeline: timeline, message: message) end end -- cgit From 220051b8b2d09a741f5edadd34e21115c5938bf0 Mon Sep 17 00:00:00 2001 From: Kurtis Rainbolt-Greene Date: Tue, 4 Apr 2017 20:48:22 -0700 Subject: I don't actually think we need that. --- app/workers/push_update_worker.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'app/workers') diff --git a/app/workers/push_update_worker.rb b/app/workers/push_update_worker.rb index 6512e13ad..5b5e9f68a 100644 --- a/app/workers/push_update_worker.rb +++ b/app/workers/push_update_worker.rb @@ -7,7 +7,7 @@ class PushUpdateWorker account = Account.find(account_id) status = Status.find(status_id) message = inline_render(account, 'api/v1/statuses/show', status) - queue_at = (Time.now.to_f * 1000.0).to_i + ActionCable.server.broadcast("timeline:#{account_id}", type: 'update', timeline: timeline, message: message) end end -- cgit From 9638894233d31368733574217e4d173e4cd5d13c Mon Sep 17 00:00:00 2001 From: Kurtis Rainbolt-Greene Date: Tue, 4 Apr 2017 20:51:18 -0700 Subject: Moving in the inline render --- app/workers/push_update_worker.rb | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) (limited to 'app/workers') diff --git a/app/workers/push_update_worker.rb b/app/workers/push_update_worker.rb index 5b5e9f68a..fef75d909 100644 --- a/app/workers/push_update_worker.rb +++ b/app/workers/push_update_worker.rb @@ -6,8 +6,14 @@ class PushUpdateWorker def perform(timeline, account_id, status_id) account = Account.find(account_id) status = Status.find(status_id) - message = inline_render(account, 'api/v1/statuses/show', status) + message = Rabl::Renderer.new( + 'api/v1/statuses/show', + status, + view_path: 'app/views', + format: :json, + scope: InlineRablScope.new(account) + ) - ActionCable.server.broadcast("timeline:#{account_id}", type: 'update', timeline: timeline, message: message) + ActionCable.server.broadcast("timeline:#{account_id}", type: 'update', timeline: timeline, message: message.render) end end -- cgit From 22dcadedb495d2e1279b834a624710d34daee6ad Mon Sep 17 00:00:00 2001 From: Kurtis Rainbolt-Greene Date: Tue, 4 Apr 2017 21:14:37 -0700 Subject: We're going to want these nice helper methods, lets share them with a parent class that matches Rails 5 practices (application level abstraction) --- app/workers/application_worker.rb | 5 +++++ app/workers/distribution_worker.rb | 4 ++-- 2 files changed, 7 insertions(+), 2 deletions(-) create mode 100644 app/workers/application_worker.rb (limited to 'app/workers') diff --git a/app/workers/application_worker.rb b/app/workers/application_worker.rb new file mode 100644 index 000000000..f2d7c1062 --- /dev/null +++ b/app/workers/application_worker.rb @@ -0,0 +1,5 @@ +class ApplicationWorker + def info(message) + Rails.logger.info("#{self.class.name} - #{message}") + end +end diff --git a/app/workers/distribution_worker.rb b/app/workers/distribution_worker.rb index f4e738d80..9a2867ea6 100644 --- a/app/workers/distribution_worker.rb +++ b/app/workers/distribution_worker.rb @@ -1,6 +1,6 @@ # frozen_string_literal: true -class DistributionWorker +class DistributionWorker < ApplicationWorker include Sidekiq::Worker def perform(status_id) @@ -9,6 +9,6 @@ class DistributionWorker FanOutOnWriteService.new.call(status) WarmCacheService.new.call(status) rescue ActiveRecord::RecordNotFound - true + info("Couldn't find the status") end end -- cgit From 220bc48e8e9c4b8cebd98537233998f34d768347 Mon Sep 17 00:00:00 2001 From: Eugen Rochko Date: Wed, 5 Apr 2017 14:26:17 +0200 Subject: Only render public payload once in FanOutOnWrite --- app/services/fan_out_on_write_service.rb | 17 +++++++++-------- app/workers/after_remote_follow_request_worker.rb | 2 ++ app/workers/after_remote_follow_worker.rb | 2 ++ 3 files changed, 13 insertions(+), 8 deletions(-) (limited to 'app/workers') diff --git a/app/services/fan_out_on_write_service.rb b/app/services/fan_out_on_write_service.rb index 42222c25b..106d257ba 100644 --- a/app/services/fan_out_on_write_service.rb +++ b/app/services/fan_out_on_write_service.rb @@ -16,6 +16,7 @@ class FanOutOnWriteService < BaseService return if status.account.silenced? || !status.public_visibility? || status.reblog? + render_anonymous_payload(status) deliver_to_hashtags(status) return if status.reply? && status.in_reply_to_account_id != status.account_id @@ -48,23 +49,23 @@ class FanOutOnWriteService < BaseService end end + def render_anonymous_payload(status) + @payload = FeedManager.instance.inline_render(nil, 'api/v1/statuses/show', status) + end + def deliver_to_hashtags(status) Rails.logger.debug "Delivering status #{status.id} to hashtags" - payload = FeedManager.instance.inline_render(nil, 'api/v1/statuses/show', status) - status.tags.pluck(:name).each do |hashtag| - FeedManager.instance.broadcast("hashtag:#{hashtag}", event: 'update', payload: payload) - FeedManager.instance.broadcast("hashtag:#{hashtag}:local", event: 'update', payload: payload) if status.account.local? + FeedManager.instance.broadcast("hashtag:#{hashtag}", event: 'update', payload: @payload) + FeedManager.instance.broadcast("hashtag:#{hashtag}:local", event: 'update', payload: @payload) if status.account.local? end end def deliver_to_public(status) Rails.logger.debug "Delivering status #{status.id} to public timeline" - payload = FeedManager.instance.inline_render(nil, 'api/v1/statuses/show', status) - - FeedManager.instance.broadcast(:public, event: 'update', payload: payload) - FeedManager.instance.broadcast('public:local', event: 'update', payload: payload) if status.account.local? + FeedManager.instance.broadcast(:public, event: 'update', payload: @payload) + FeedManager.instance.broadcast('public:local', event: 'update', payload: @payload) if status.account.local? end end diff --git a/app/workers/after_remote_follow_request_worker.rb b/app/workers/after_remote_follow_request_worker.rb index 1f2db3061..928069211 100644 --- a/app/workers/after_remote_follow_request_worker.rb +++ b/app/workers/after_remote_follow_request_worker.rb @@ -13,5 +13,7 @@ class AfterRemoteFollowRequestWorker follow_request.destroy FollowService.new.call(follow_request.account, updated_account.acct) + rescue ActiveRecord::RecordNotFound + true end end diff --git a/app/workers/after_remote_follow_worker.rb b/app/workers/after_remote_follow_worker.rb index bdd2c2a91..d12fa3454 100644 --- a/app/workers/after_remote_follow_worker.rb +++ b/app/workers/after_remote_follow_worker.rb @@ -13,5 +13,7 @@ class AfterRemoteFollowWorker follow.destroy FollowService.new.call(follow.account, updated_account.acct) + rescue ActiveRecord::RecordNotFound + true end end -- cgit From 1b8c244dff84ae981d89a1672a9db06f08cf405e Mon Sep 17 00:00:00 2001 From: Eugen Date: Wed, 5 Apr 2017 18:48:41 +0200 Subject: Add proper message to PushUpdateWorker, use redis directly --- app/workers/push_update_worker.rb | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) (limited to 'app/workers') diff --git a/app/workers/push_update_worker.rb b/app/workers/push_update_worker.rb index fef75d909..9d16c20bf 100644 --- a/app/workers/push_update_worker.rb +++ b/app/workers/push_update_worker.rb @@ -5,7 +5,8 @@ class PushUpdateWorker def perform(timeline, account_id, status_id) account = Account.find(account_id) - status = Status.find(status_id) + status = Status.find(status_id) + message = Rabl::Renderer.new( 'api/v1/statuses/show', status, @@ -14,6 +15,8 @@ class PushUpdateWorker scope: InlineRablScope.new(account) ) - ActionCable.server.broadcast("timeline:#{account_id}", type: 'update', timeline: timeline, message: message.render) + Redis.current.publish("timeline:#{timeline_id}", Oj.dump({ event: :update, payload: message, queued_at: (Time.now.to_f * 1000.0).to_i })) + rescue ActiveRecord::RecordNotFound + true end end -- cgit From c9ebd5d19fccaabd1192f5e61537251c2c2d782e Mon Sep 17 00:00:00 2001 From: Eugen Date: Wed, 5 Apr 2017 18:58:32 +0200 Subject: Fix wrong variable used in publish channel --- app/workers/push_update_worker.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'app/workers') diff --git a/app/workers/push_update_worker.rb b/app/workers/push_update_worker.rb index 9d16c20bf..166a9b449 100644 --- a/app/workers/push_update_worker.rb +++ b/app/workers/push_update_worker.rb @@ -15,7 +15,7 @@ class PushUpdateWorker scope: InlineRablScope.new(account) ) - Redis.current.publish("timeline:#{timeline_id}", Oj.dump({ event: :update, payload: message, queued_at: (Time.now.to_f * 1000.0).to_i })) + Redis.current.publish("timeline:#{account.id}", Oj.dump({ event: :update, payload: message, queued_at: (Time.now.to_f * 1000.0).to_i })) rescue ActiveRecord::RecordNotFound true end -- cgit From 5b95be1c42ba69c9a3a79cfa990c80a5f2debfc6 Mon Sep 17 00:00:00 2001 From: Eugen Rochko Date: Wed, 5 Apr 2017 19:45:18 +0200 Subject: Replace calls to FeedManager#inline_render and #broadcast --- app/lib/feed_manager.rb | 2 +- app/lib/inline_renderer.rb | 13 +++++++++++++ app/services/fan_out_on_write_service.rb | 10 +++++----- app/services/notify_service.rb | 2 +- app/services/remove_status_service.rb | 6 +++--- app/workers/push_update_worker.rb | 13 +++---------- 6 files changed, 26 insertions(+), 20 deletions(-) create mode 100644 app/lib/inline_renderer.rb (limited to 'app/workers') diff --git a/app/lib/feed_manager.rb b/app/lib/feed_manager.rb index 87865bfdc..58d9fb1fc 100644 --- a/app/lib/feed_manager.rb +++ b/app/lib/feed_manager.rb @@ -34,7 +34,7 @@ class FeedManager trim(timeline_type, account.id) end - PushUpdateWorker.perform_async(timeline_type, account.id, status.id) + PushUpdateWorker.perform_async(account.id, status.id) end def trim(type, account_id) diff --git a/app/lib/inline_renderer.rb b/app/lib/inline_renderer.rb new file mode 100644 index 000000000..8e04ad1d5 --- /dev/null +++ b/app/lib/inline_renderer.rb @@ -0,0 +1,13 @@ +# frozen_string_literal: true + +class InlineRenderer + def self.render(status, current_account, template) + Rabl::Renderer.new( + template, + status, + view_path: 'app/views', + format: :json, + scope: InlineRablScope.new(current_account) + ).render + end +end diff --git a/app/services/fan_out_on_write_service.rb b/app/services/fan_out_on_write_service.rb index 106d257ba..c63fcc1fe 100644 --- a/app/services/fan_out_on_write_service.rb +++ b/app/services/fan_out_on_write_service.rb @@ -50,22 +50,22 @@ class FanOutOnWriteService < BaseService end def render_anonymous_payload(status) - @payload = FeedManager.instance.inline_render(nil, 'api/v1/statuses/show', status) + @payload = InlineRenderer.render(status, nil, 'api/v1/statuses/show') end def deliver_to_hashtags(status) Rails.logger.debug "Delivering status #{status.id} to hashtags" status.tags.pluck(:name).each do |hashtag| - FeedManager.instance.broadcast("hashtag:#{hashtag}", event: 'update', payload: @payload) - FeedManager.instance.broadcast("hashtag:#{hashtag}:local", event: 'update', payload: @payload) if status.account.local? + Redis.current.publish("hashtag:#{hashtag}", Oj.dump(event: :update, payload: @payload)) + Redis.current.publish("hashtag:#{hashtag}:local", Oj.dump(event: :update, payload: @payload)) if status.account.local? end end def deliver_to_public(status) Rails.logger.debug "Delivering status #{status.id} to public timeline" - FeedManager.instance.broadcast(:public, event: 'update', payload: @payload) - FeedManager.instance.broadcast('public:local', event: 'update', payload: @payload) if status.account.local? + Redis.current.publish('public', Oj.dump(event: 'update', payload: @payload)) + Redis.current.publish('public:local', Oj.dump(event: 'update', payload: @payload)) if status.account.local? end end diff --git a/app/services/notify_service.rb b/app/services/notify_service.rb index 24486f220..62508a049 100644 --- a/app/services/notify_service.rb +++ b/app/services/notify_service.rb @@ -50,7 +50,7 @@ class NotifyService < BaseService def create_notification @notification.save! return unless @notification.browserable? - FeedManager.instance.broadcast(@recipient.id, event: 'notification', payload: FeedManager.instance.inline_render(@recipient, 'api/v1/notifications/show', @notification)) + Redis.current.publish(@recipient.id, Oj.dump(event: :notification, payload: InlineRenderer.render(@notification, @recipient, 'api/v1/notifications/show'))) end def send_email diff --git a/app/services/remove_status_service.rb b/app/services/remove_status_service.rb index cf1f432e4..e19fdd030 100644 --- a/app/services/remove_status_service.rb +++ b/app/services/remove_status_service.rb @@ -65,17 +65,17 @@ class RemoveStatusService < BaseService redis.zremrangebyscore(FeedManager.instance.key(type, receiver.id), status.id, status.id) end - FeedManager.instance.broadcast(receiver.id, event: 'delete', payload: status.id) + Redis.current.publish(receiver.id, Oj.dump(event: :delete, payload: status.id)) end def remove_from_hashtags(status) status.tags.each do |tag| - FeedManager.instance.broadcast("hashtag:#{tag.name}", event: 'delete', payload: status.id) + Redis.current.publish("hashtag:#{tag.name}", Oj.dump(event: :delete, payload: status.id)) end end def remove_from_public(status) - FeedManager.instance.broadcast(:public, event: 'delete', payload: status.id) + Redis.current.publish('public', Oj.dump(event: :delete, payload: status.id)) end def redis diff --git a/app/workers/push_update_worker.rb b/app/workers/push_update_worker.rb index 166a9b449..fbcdcf634 100644 --- a/app/workers/push_update_worker.rb +++ b/app/workers/push_update_worker.rb @@ -3,19 +3,12 @@ class PushUpdateWorker include Sidekiq::Worker - def perform(timeline, account_id, status_id) + def perform(account_id, status_id) account = Account.find(account_id) status = Status.find(status_id) - - message = Rabl::Renderer.new( - 'api/v1/statuses/show', - status, - view_path: 'app/views', - format: :json, - scope: InlineRablScope.new(account) - ) + message = InlineRenderer.render(status, account, 'api/v1/statuses/show') - Redis.current.publish("timeline:#{account.id}", Oj.dump({ event: :update, payload: message, queued_at: (Time.now.to_f * 1000.0).to_i })) + Redis.current.publish("timeline:#{account.id}", Oj.dump(event: :update, payload: message, queued_at: (Time.now.to_f * 1000.0).to_i)) rescue ActiveRecord::RecordNotFound true end -- cgit