diff options
author | Eugen Rochko <eugen@zeonfederated.com> | 2022-04-28 17:47:34 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-04-28 17:47:34 +0200 |
commit | 3917353645b91dae04f7d9b81162fead6f73072a (patch) | |
tree | 202cbf93dfa378c26a291900ea65ec088ee5b5cc | |
parent | 9bf04db23acf10e05ffdf7def06c246081f8f065 (diff) |
Fix single Redis connection being used across all threads (#18135)
* Fix single Redis connection being used across all Sidekiq threads * Fix tests
44 files changed, 243 insertions, 124 deletions
diff --git a/app/controllers/admin/dashboard_controller.rb b/app/controllers/admin/dashboard_controller.rb index e376baab2..da9c6dd16 100644 --- a/app/controllers/admin/dashboard_controller.rb +++ b/app/controllers/admin/dashboard_controller.rb @@ -2,6 +2,8 @@ module Admin class DashboardController < BaseController + include Redisable + def index @system_checks = Admin::SystemCheck.perform @time_period = (29.days.ago.to_date...Time.now.utc.to_date) @@ -15,10 +17,10 @@ module Admin def redis_info @redis_info ||= begin - if Redis.current.is_a?(Redis::Namespace) - Redis.current.redis.info + if redis.is_a?(Redis::Namespace) + redis.redis.info else - Redis.current.info + redis.info end end end diff --git a/app/controllers/media_proxy_controller.rb b/app/controllers/media_proxy_controller.rb index 5596e92d1..d2a4cb207 100644 --- a/app/controllers/media_proxy_controller.rb +++ b/app/controllers/media_proxy_controller.rb @@ -3,6 +3,7 @@ class MediaProxyController < ApplicationController include RoutingHelper include Authorization + include Redisable skip_before_action :store_current_location skip_before_action :require_functional! @@ -45,7 +46,7 @@ class MediaProxyController < ApplicationController end def lock_options - { redis: Redis.current, key: "media_download:#{params[:id]}", autorelease: 15.minutes.seconds } + { redis: redis, key: "media_download:#{params[:id]}", autorelease: 15.minutes.seconds } end def reject_media? diff --git a/app/controllers/settings/exports_controller.rb b/app/controllers/settings/exports_controller.rb index 30138d29e..1638d3412 100644 --- a/app/controllers/settings/exports_controller.rb +++ b/app/controllers/settings/exports_controller.rb @@ -2,6 +2,7 @@ class Settings::ExportsController < Settings::BaseController include Authorization + include Redisable skip_before_action :require_functional! @@ -28,6 +29,6 @@ class Settings::ExportsController < Settings::BaseController end def lock_options - { redis: Redis.current, key: "backup:#{current_user.id}" } + { redis: redis, key: "backup:#{current_user.id}" } end end diff --git a/app/lib/access_token_extension.rb b/app/lib/access_token_extension.rb index 2cafaaa20..f51bde492 100644 --- a/app/lib/access_token_extension.rb +++ b/app/lib/access_token_extension.rb @@ -4,6 +4,8 @@ module AccessTokenExtension extend ActiveSupport::Concern included do + include Redisable + after_commit :push_to_streaming_api end @@ -16,6 +18,6 @@ module AccessTokenExtension end def push_to_streaming_api - Redis.current.publish("timeline:access_token:#{id}", Oj.dump(event: :kill)) if revoked? || destroyed? + redis.publish("timeline:access_token:#{id}", Oj.dump(event: :kill)) if revoked? || destroyed? end end diff --git a/app/lib/activitypub/activity.rb b/app/lib/activitypub/activity.rb index f599e1b58..3c51a7a51 100644 --- a/app/lib/activitypub/activity.rb +++ b/app/lib/activitypub/activity.rb @@ -164,7 +164,7 @@ class ActivityPub::Activity end def lock_or_fail(key, expire_after = 15.minutes.seconds) - RedisLock.acquire({ redis: Redis.current, key: key, autorelease: expire_after }) do |lock| + RedisLock.acquire({ redis: redis, key: key, autorelease: expire_after }) do |lock| if lock.acquired? yield else diff --git a/app/lib/delivery_failure_tracker.rb b/app/lib/delivery_failure_tracker.rb index 7b800fc0b..7c4e28eb7 100644 --- a/app/lib/delivery_failure_tracker.rb +++ b/app/lib/delivery_failure_tracker.rb @@ -1,6 +1,8 @@ # frozen_string_literal: true class DeliveryFailureTracker + include Redisable + FAILURE_DAYS_THRESHOLD = 7 def initialize(url_or_host) @@ -8,21 +10,21 @@ class DeliveryFailureTracker end def track_failure! - Redis.current.sadd(exhausted_deliveries_key, today) + redis.sadd(exhausted_deliveries_key, today) UnavailableDomain.create(domain: @host) if reached_failure_threshold? end def track_success! - Redis.current.del(exhausted_deliveries_key) + redis.del(exhausted_deliveries_key) UnavailableDomain.find_by(domain: @host)&.destroy end def clear_failures! - Redis.current.del(exhausted_deliveries_key) + redis.del(exhausted_deliveries_key) end def days - Redis.current.scard(exhausted_deliveries_key) || 0 + redis.scard(exhausted_deliveries_key) || 0 end def available? @@ -30,12 +32,14 @@ class DeliveryFailureTracker end def exhausted_deliveries_days - @exhausted_deliveries_days ||= Redis.current.smembers(exhausted_deliveries_key).sort.map { |date| Date.new(date.slice(0, 4).to_i, date.slice(4, 2).to_i, date.slice(6, 2).to_i) } + @exhausted_deliveries_days ||= redis.smembers(exhausted_deliveries_key).sort.map { |date| Date.new(date.slice(0, 4).to_i, date.slice(4, 2).to_i, date.slice(6, 2).to_i) } end alias reset! track_success! class << self + include Redisable + def without_unavailable(urls) unavailable_domains_map = Rails.cache.fetch('unavailable_domains') { UnavailableDomain.pluck(:domain).index_with(true) } @@ -54,7 +58,7 @@ class DeliveryFailureTracker end def warning_domains - domains = Redis.current.keys(exhausted_deliveries_key_by('*')).map do |key| + domains = redis.keys(exhausted_deliveries_key_by('*')).map do |key| key.delete_prefix(exhausted_deliveries_key_by('')) end @@ -62,7 +66,7 @@ class DeliveryFailureTracker end def warning_domains_map - warning_domains.index_with { |domain| Redis.current.scard(exhausted_deliveries_key_by(domain)) } + warning_domains.index_with { |domain| redis.scard(exhausted_deliveries_key_by(domain)) } end private diff --git a/app/lib/redis_configuration.rb b/app/lib/redis_configuration.rb new file mode 100644 index 000000000..fc8cf2f80 --- /dev/null +++ b/app/lib/redis_configuration.rb @@ -0,0 +1,47 @@ +# frozen_string_literal: true + +class RedisConfiguration + class << self + def with + pool.with { |redis| yield redis } + end + + def pool + @pool ||= ConnectionPool.new(size: pool_size) { new.connection } + end + + def pool_size + if Sidekiq.server? + Sidekiq.options[:concurrency] + else + ENV['MAX_THREADS'] || 5 + end + end + end + + def connection + if namespace? + Redis::Namespace.new(namespace, redis: raw_connection) + else + raw_connection + end + end + + def namespace? + namespace.present? + end + + def namespace + ENV.fetch('REDIS_NAMESPACE', nil) + end + + def url + ENV['REDIS_URL'] + end + + private + + def raw_connection + Redis.new(url: url, driver: :hiredis) + end +end diff --git a/app/models/account_conversation.rb b/app/models/account_conversation.rb index 56fd13543..45e74bbeb 100644 --- a/app/models/account_conversation.rb +++ b/app/models/account_conversation.rb @@ -14,6 +14,8 @@ # class AccountConversation < ApplicationRecord + include Redisable + after_commit :push_to_streaming_api belongs_to :account @@ -109,7 +111,7 @@ class AccountConversation < ApplicationRecord end def subscribed_to_timeline? - Redis.current.exists?("subscribed:#{streaming_channel}") + redis.exists?("subscribed:#{streaming_channel}") end def streaming_channel diff --git a/app/models/account_suggestions/global_source.rb b/app/models/account_suggestions/global_source.rb index 7bca530d4..651041d67 100644 --- a/app/models/account_suggestions/global_source.rb +++ b/app/models/account_suggestions/global_source.rb @@ -1,6 +1,8 @@ # frozen_string_literal: true class AccountSuggestions::GlobalSource < AccountSuggestions::Source + include Redisable + def key :global end @@ -28,7 +30,7 @@ class AccountSuggestions::GlobalSource < AccountSuggestions::Source end def account_ids_for_locale(locale) - Redis.current.zrevrange("follow_recommendations:#{locale}", 0, -1).map(&:to_i) + redis.zrevrange("follow_recommendations:#{locale}", 0, -1).map(&:to_i) end def to_ordered_list_key(account) diff --git a/app/models/concerns/redisable.rb b/app/models/concerns/redisable.rb index c6cf97359..cce8efb86 100644 --- a/app/models/concerns/redisable.rb +++ b/app/models/concerns/redisable.rb @@ -6,6 +6,6 @@ module Redisable private def redis - Redis.current + Thread.current[:redis] ||= RedisConfiguration.new.connection end end diff --git a/app/models/custom_filter.rb b/app/models/custom_filter.rb index 9d0f3729b..8e3476794 100644 --- a/app/models/custom_filter.rb +++ b/app/models/custom_filter.rb @@ -24,6 +24,7 @@ class CustomFilter < ApplicationRecord ).freeze include Expireable + include Redisable belongs_to :account @@ -51,7 +52,7 @@ class CustomFilter < ApplicationRecord def remove_cache Rails.cache.delete("filters:#{account_id}") - Redis.current.publish("timeline:#{account_id}", Oj.dump(event: :filters_changed)) + redis.publish("timeline:#{account_id}", Oj.dump(event: :filters_changed)) end def context_must_be_valid diff --git a/app/models/encrypted_message.rb b/app/models/encrypted_message.rb index aa4182b4e..7b4e32283 100644 --- a/app/models/encrypted_message.rb +++ b/app/models/encrypted_message.rb @@ -19,6 +19,7 @@ class EncryptedMessage < ApplicationRecord self.inheritance_column = nil include Paginable + include Redisable scope :up_to, ->(id) { where(arel_table[:id].lteq(id)) } @@ -38,7 +39,7 @@ class EncryptedMessage < ApplicationRecord end def subscribed_to_timeline? - Redis.current.exists?("subscribed:#{streaming_channel}") + redis.exists?("subscribed:#{streaming_channel}") end def streaming_channel diff --git a/app/models/follow_recommendation_filter.rb b/app/models/follow_recommendation_filter.rb index acf03cd84..531332614 100644 --- a/app/models/follow_recommendation_filter.rb +++ b/app/models/follow_recommendation_filter.rb @@ -1,6 +1,8 @@ # frozen_string_literal: true class FollowRecommendationFilter + include Redisable + KEYS = %i( language status @@ -17,7 +19,7 @@ class FollowRecommendationFilter if params['status'] == 'suppressed' Account.joins(:follow_recommendation_suppression).order(FollowRecommendationSuppression.arel_table[:id].desc).to_a else - account_ids = Redis.current.zrevrange("follow_recommendations:#{@language}", 0, -1).map(&:to_i) + account_ids = redis.zrevrange("follow_recommendations:#{@language}", 0, -1).map(&:to_i) accounts = Account.where(id: account_ids).index_by(&:id) account_ids.map { |id| accounts[id] }.compact diff --git a/app/models/user.rb b/app/models/user.rb index 9da7b2639..ab832bcd0 100644 --- a/app/models/user.rb +++ b/app/models/user.rb @@ -52,6 +52,7 @@ class User < ApplicationRecord include Settings::Extend include UserRoles + include Redisable # The home and list feeds will be stored in Redis for this amount # of time, and status fan-out to followers will include only people @@ -456,7 +457,7 @@ class User < ApplicationRecord end def regenerate_feed! - RegenerationWorker.perform_async(account_id) if Redis.current.set("account:#{account_id}:regeneration", true, nx: true, ex: 1.day.seconds) + RegenerationWorker.perform_async(account_id) if redis.set("account:#{account_id}:regeneration", true, nx: true, ex: 1.day.seconds) end def needs_feed_update? diff --git a/app/services/activitypub/process_account_service.rb b/app/services/activitypub/process_account_service.rb index ec5140720..5649153ee 100644 --- a/app/services/activitypub/process_account_service.rb +++ b/app/services/activitypub/process_account_service.rb @@ -3,6 +3,7 @@ class ActivityPub::ProcessAccountService < BaseService include JsonLdHelper include DomainControlHelper + include Redisable # Should be called with confirmed valid JSON # and WebFinger-resolved username and domain @@ -289,7 +290,7 @@ class ActivityPub::ProcessAccountService < BaseService end def lock_options - { redis: Redis.current, key: "process_account:#{@uri}", autorelease: 15.minutes.seconds } + { redis: redis, key: "process_account:#{@uri}", autorelease: 15.minutes.seconds } end def process_tags diff --git a/app/services/activitypub/process_status_update_service.rb b/app/services/activitypub/process_status_update_service.rb index 3d9d9cb84..fb6e44c6d 100644 --- a/app/services/activitypub/process_status_update_service.rb +++ b/app/services/activitypub/process_status_update_service.rb @@ -2,6 +2,7 @@ class ActivityPub::ProcessStatusUpdateService < BaseService include JsonLdHelper + include Redisable def call(status, json) raise ArgumentError, 'Status has unsaved changes' if status.changed? @@ -241,7 +242,7 @@ class ActivityPub::ProcessStatusUpdateService < BaseService end def lock_options - { redis: Redis.current, key: "create:#{@uri}", autorelease: 15.minutes.seconds } + { redis: redis, key: "create:#{@uri}", autorelease: 15.minutes.seconds } end def record_previous_edit! diff --git a/app/services/fan_out_on_write_service.rb b/app/services/fan_out_on_write_service.rb index 76404c6b8..de5c5ebe4 100644 --- a/app/services/fan_out_on_write_service.rb +++ b/app/services/fan_out_on_write_service.rb @@ -1,6 +1,8 @@ # frozen_string_literal: true class FanOutOnWriteService < BaseService + include Redisable + # Push a status into home and mentions feeds # @param [Status] status # @param [Hash] options @@ -99,20 +101,20 @@ class FanOutOnWriteService < BaseService def broadcast_to_hashtag_streams! @status.tags.pluck(:name).each do |hashtag| - Redis.current.publish("timeline:hashtag:#{hashtag.mb_chars.downcase}", anonymous_payload) - Redis.current.publish("timeline:hashtag:#{hashtag.mb_chars.downcase}:local", anonymous_payload) if @status.local? + redis.publish("timeline:hashtag:#{hashtag.mb_chars.downcase}", anonymous_payload) + redis.publish("timeline:hashtag:#{hashtag.mb_chars.downcase}:local", anonymous_payload) if @status.local? end end def broadcast_to_public_streams! return if @status.reply? && @status.in_reply_to_account_id != @account.id - Redis.current.publish('timeline:public', anonymous_payload) - Redis.current.publish(@status.local? ? 'timeline:public:local' : 'timeline:public:remote', anonymous_payload) + redis.publish('timeline:public', anonymous_payload) + redis.publish(@status.local? ? 'timeline:public:local' : 'timeline:public:remote', anonymous_payload) if @status.with_media? - Redis.current.publish('timeline:public:media', anonymous_payload) - Redis.current.publish(@status.local? ? 'timeline:public:local:media' : 'timeline:public:remote:media', anonymous_payload) + redis.publish('timeline:public:media', anonymous_payload) + redis.publish(@status.local? ? 'timeline:public:local:media' : 'timeline:public:remote:media', anonymous_payload) end end diff --git a/app/services/fetch_link_card_service.rb b/app/services/fetch_link_card_service.rb index 9c8b5ea20..868796a6b 100644 --- a/app/services/fetch_link_card_service.rb +++ b/app/services/fetch_link_card_service.rb @@ -1,6 +1,8 @@ # frozen_string_literal: true class FetchLinkCardService < BaseService + include Redisable + URL_PATTERN = %r{ (#{Twitter::TwitterText::Regex[:valid_url_preceding_chars]}) # $1 preceding chars ( # $2 URL @@ -155,6 +157,6 @@ class FetchLinkCardService < BaseService end def lock_options - { redis: Redis.current, key: "fetch:#{@original_url}", autorelease: 15.minutes.seconds } + { redis: redis, key: "fetch:#{@original_url}", autorelease: 15.minutes.seconds } end end diff --git a/app/services/precompute_feed_service.rb b/app/services/precompute_feed_service.rb index 61f573534..f813f06b2 100644 --- a/app/services/precompute_feed_service.rb +++ b/app/services/precompute_feed_service.rb @@ -1,9 +1,11 @@ # frozen_string_literal: true class PrecomputeFeedService < BaseService + include Redisable + def call(account) FeedManager.instance.populate_home(account) ensure - Redis.current.del("account:#{account.id}:regeneration") + redis.del("account:#{account.id}:regeneration") end end diff --git a/app/services/remove_status_service.rb b/app/services/remove_status_service.rb index 41730154d..dbd1f6430 100644 --- a/app/services/remove_status_service.rb +++ b/app/services/remove_status_service.rb @@ -146,6 +146,6 @@ class RemoveStatusService < BaseService end def lock_options - { redis: Redis.current, key: "distribute:#{@status.id}", autorelease: 5.minutes.seconds } + { redis: redis, key: "distribute:#{@status.id}", autorelease: 5.minutes.seconds } end end diff --git a/app/services/resolve_account_service.rb b/app/services/resolve_account_service.rb index 3a372ef2a..21332a03e 100644 --- a/app/services/resolve_account_service.rb +++ b/app/services/resolve_account_service.rb @@ -4,6 +4,7 @@ class ResolveAccountService < BaseService include JsonLdHelper include DomainControlHelper include WebfingerHelper + include Redisable # Find or create an account record for a remote user. When creating, # look up the user's webfinger and fetch ActivityPub data @@ -147,6 +148,6 @@ class ResolveAccountService < BaseService end def lock_options - { redis: Redis.current, key: "resolve:#{@username}@#{@domain}", autorelease: 15.minutes.seconds } + { redis: redis, key: "resolve:#{@username}@#{@domain}", autorelease: 15.minutes.seconds } end end diff --git a/app/services/vote_service.rb b/app/services/vote_service.rb index 19e453332..b77812970 100644 --- a/app/services/vote_service.rb +++ b/app/services/vote_service.rb @@ -3,6 +3,7 @@ class VoteService < BaseService include Authorization include Payloadable + include Redisable def call(account, poll, choices) authorize_with account, poll, :vote? @@ -77,6 +78,6 @@ class VoteService < BaseService end def lock_options - { redis: Redis.current, key: "vote:#{@poll.id}:#{@account.id}" } + { redis: redis, key: "vote:#{@poll.id}:#{@account.id}" } end end diff --git a/app/workers/distribution_worker.rb b/app/workers/distribution_worker.rb index 770325ccf..474b4daaf 100644 --- a/app/workers/distribution_worker.rb +++ b/app/workers/distribution_worker.rb @@ -2,9 +2,10 @@ class DistributionWorker include Sidekiq::Worker + include Redisable def perform(status_id, options = {}) - RedisLock.acquire(redis: Redis.current, key: "distribute:#{status_id}", autorelease: 5.minutes.seconds) do |lock| + RedisLock.acquire(redis: redis, key: "distribute:#{status_id}", autorelease: 5.minutes.seconds) do |lock| if lock.acquired? FanOutOnWriteService.new.call(Status.find(status_id), **options.symbolize_keys) else diff --git a/app/workers/merge_worker.rb b/app/workers/merge_worker.rb index 6ebb9a400..e526d2887 100644 --- a/app/workers/merge_worker.rb +++ b/app/workers/merge_worker.rb @@ -2,12 +2,13 @@ class MergeWorker include Sidekiq::Worker + include Redisable def perform(from_account_id, into_account_id) FeedManager.instance.merge_into_home(Account.find(from_account_id), Account.find(into_account_id)) rescue ActiveRecord::RecordNotFound true ensure - Redis.current.del("account:#{into_account_id}:regeneration") + redis.del("account:#{into_account_id}:regeneration") end end diff --git a/app/workers/scheduler/accounts_statuses_cleanup_scheduler.rb b/app/workers/scheduler/accounts_statuses_cleanup_scheduler.rb index 7195f0ff9..bd92fe32c 100644 --- a/app/workers/scheduler/accounts_statuses_cleanup_scheduler.rb +++ b/app/workers/scheduler/accounts_statuses_cleanup_scheduler.rb @@ -2,6 +2,7 @@ class Scheduler::AccountsStatusesCleanupScheduler include Sidekiq::Worker + include Redisable # This limit is mostly to be nice to the fediverse at large and not # generate too much traffic. @@ -83,14 +84,14 @@ class Scheduler::AccountsStatusesCleanupScheduler end def last_processed_id - Redis.current.get('account_statuses_cleanup_scheduler:last_account_id') + redis.get('account_statuses_cleanup_scheduler:last_account_id') end def save_last_processed_id(id) if id.nil? - Redis.current.del('account_statuses_cleanup_scheduler:last_account_id') + redis.del('account_statuses_cleanup_scheduler:last_account_id') else - Redis.current.set('account_statuses_cleanup_scheduler:last_account_id', id, ex: 1.hour.seconds) + redis.set('account_statuses_cleanup_scheduler:last_account_id', id, ex: 1.hour.seconds) end end end diff --git a/config/application.rb b/config/application.rb index a1ba71f61..64987cfe7 100644 --- a/config/application.rb +++ b/config/application.rb @@ -35,6 +35,7 @@ require_relative '../lib/paperclip/response_with_limit_adapter' require_relative '../lib/terrapin/multi_pipe_extensions' require_relative '../lib/mastodon/snowflake' require_relative '../lib/mastodon/version' +require_relative '../lib/mastodon/rack_middleware' require_relative '../lib/devise/two_factor_ldap_authenticatable' require_relative '../lib/devise/two_factor_pam_authenticatable' require_relative '../lib/chewy/strategy/custom_sidekiq' @@ -164,6 +165,7 @@ module Mastodon config.middleware.use Rack::Attack config.middleware.use Rack::Deflater + config.middleware.use Mastodon::RackMiddleware config.to_prepare do Doorkeeper::AuthorizationsController.layout 'modal' diff --git a/config/initializers/redis.rb b/config/initializers/redis.rb deleted file mode 100644 index 7573fc9f7..000000000 --- a/config/initializers/redis.rb +++ /dev/null @@ -1,14 +0,0 @@ -# frozen_string_literal: true - -redis_connection = Redis.new( - url: ENV['REDIS_URL'], - driver: :hiredis -) - -namespace = ENV.fetch('REDIS_NAMESPACE') { nil } - -if namespace - Redis.current = Redis::Namespace.new(namespace, redis: redis_connection) -else - Redis.current = redis_connection -end diff --git a/config/initializers/sidekiq.rb b/config/initializers/sidekiq.rb index 19a705ce8..c1327053d 100644 --- a/config/initializers/sidekiq.rb +++ b/config/initializers/sidekiq.rb @@ -1,12 +1,12 @@ # frozen_string_literal: true -require_relative '../../lib/sidekiq_error_handler' +require_relative '../../lib/mastodon/sidekiq_middleware' Sidekiq.configure_server do |config| config.redis = REDIS_SIDEKIQ_PARAMS config.server_middleware do |chain| - chain.add SidekiqErrorHandler + chain.add Mastodon::SidekiqMiddleware end config.server_middleware do |chain| diff --git a/lib/mastodon/feeds_cli.rb b/lib/mastodon/feeds_cli.rb index 578ea15c5..428d63a44 100644 --- a/lib/mastodon/feeds_cli.rb +++ b/lib/mastodon/feeds_cli.rb @@ -7,6 +7,7 @@ require_relative 'cli_helper' module Mastodon class FeedsCLI < Thor include CLIHelper + include Redisable def self.exit_on_failure? true @@ -51,10 +52,10 @@ module Mastodon desc 'clear', 'Remove all home and list feeds from Redis' def clear - keys = Redis.current.keys('feed:*') + keys = redis.keys('feed:*') - Redis.current.pipelined do - keys.each { |key| Redis.current.del(key) } + redis.pipelined do + keys.each { |key| redis.del(key) } end say('OK', :green) diff --git a/lib/mastodon/rack_middleware.rb b/lib/mastodon/rack_middleware.rb new file mode 100644 index 000000000..619a2c36d --- /dev/null +++ b/lib/mastodon/rack_middleware.rb @@ -0,0 +1,30 @@ +# frozen_string_literal: true + +class Mastodon::RackMiddleware + def initialize(app) + @app = app + end + + def call(env) + @app.call(env) + ensure + clean_up_sockets! + end + + private + + def clean_up_sockets! + clean_up_redis_socket! + clean_up_statsd_socket! + end + + def clean_up_redis_socket! + Thread.current[:redis]&.close + Thread.current[:redis] = nil + end + + def clean_up_statsd_socket! + Thread.current[:statsd_socket]&.close + Thread.current[:statsd_socket] = nil + end +end diff --git a/lib/mastodon/redis_config.rb b/lib/mastodon/redis_config.rb index 5bfd26e34..98dc4788d 100644 --- a/lib/mastodon/redis_config.rb +++ b/lib/mastodon/redis_config.rb @@ -11,13 +11,15 @@ def setup_redis_env_url(prefix = nil, defaults = true) port = ENV.fetch(prefix + 'REDIS_PORT') { 6379 if defaults } db = ENV.fetch(prefix + 'REDIS_DB') { 0 if defaults } - ENV[prefix + 'REDIS_URL'] = if [password, host, port, db].all?(&:nil?) - ENV['REDIS_URL'] - else - Addressable::URI.parse("redis://#{host}:#{port}/#{db}").tap do |uri| - uri.password = password if password.present? - end.normalize.to_str - end + ENV[prefix + 'REDIS_URL'] = begin + if [password, host, port, db].all?(&:nil?) + ENV['REDIS_URL'] + else + Addressable::URI.parse("redis://#{host}:#{port}/#{db}").tap do |uri| + uri.password = password if password.present? + end.normalize.to_str + end + end end setup_redis_env_url @@ -33,6 +35,8 @@ REDIS_CACHE_PARAMS = { url: ENV['CACHE_REDIS_URL'], expires_in: 10.minutes, namespace: cache_namespace, + pool_size: Sidekiq.server? ? Sidekiq.options[:concurrency] : Integer(ENV['MAX_THREADS'] || 5), + pool_timeout: 5, }.freeze REDIS_SIDEKIQ_PARAMS = { @@ -40,3 +44,7 @@ REDIS_SIDEKIQ_PARAMS = { url: ENV['SIDEKIQ_REDIS_URL'], namespace: sidekiq_namespace, }.freeze + +if Rails.env.test? + ENV['REDIS_NAMESPACE'] = "mastodon_test#{ENV['TEST_ENV_NUMBER']}" +end diff --git a/lib/sidekiq_error_handler.rb b/lib/mastodon/sidekiq_middleware.rb index 358afd540..7ec4097df 100644 --- a/lib/sidekiq_error_handler.rb +++ b/lib/mastodon/sidekiq_middleware.rb @@ -1,6 +1,6 @@ # frozen_string_literal: true -class SidekiqErrorHandler +class Mastodon::SidekiqMiddleware BACKTRACE_LIMIT = 3 def call(*) @@ -10,9 +10,7 @@ class SidekiqErrorHandler rescue => e limit_backtrace_and_raise(e) ensure - socket = Thread.current[:statsd_socket] - socket&.close - Thread.current[:statsd_socket] = nil + clean_up_sockets! end private @@ -21,4 +19,19 @@ class SidekiqErrorHandler exception.set_backtrace(exception.backtrace.first(BACKTRACE_LIMIT)) raise exception end + + def clean_up_sockets! + clean_up_redis_socket! + clean_up_statsd_socket! + end + + def clean_up_redis_socket! + Thread.current[:redis]&.close + Thread.current[:redis] = nil + end + + def clean_up_statsd_socket! + Thread.current[:statsd_socket]&.close + Thread.current[:statsd_socket] = nil + end end diff --git a/spec/controllers/concerns/user_tracking_concern_spec.rb b/spec/controllers/concerns/user_tracking_concern_spec.rb index 1e5620221..b2548d5c0 100644 --- a/spec/controllers/concerns/user_tracking_concern_spec.rb +++ b/spec/controllers/concerns/user_tracking_concern_spec.rb @@ -65,22 +65,22 @@ describe ApplicationController, type: :controller do get :show expect_updated_sign_in_at(user) - expect(Redis.current.get("account:#{user.account_id}:regeneration")).to eq 'true' + expect(redis.get("account:#{user.account_id}:regeneration")).to eq 'true' expect(RegenerationWorker).to have_received(:perform_async) end it 'sets the regeneration marker to expire' do allow(RegenerationWorker).to receive(:perform_async) get :show - expect(Redis.current.ttl("account:#{user.account_id}:regeneration")).to be >= 0 + expect(redis.ttl("account:#{user.account_id}:regeneration")).to be >= 0 end it 'regenerates feed when sign in is older than two weeks' do get :show expect_updated_sign_in_at(user) - expect(Redis.current.zcard(FeedManager.instance.key(:home, user.account_id))).to eq 3 - expect(Redis.current.get("account:#{user.account_id}:regeneration")).to be_nil + expect(redis.zcard(FeedManager.instance.key(:home, user.account_id))).to eq 3 + expect(redis.get("account:#{user.account_id}:regeneration")).to be_nil end end diff --git a/spec/lib/activitypub/activity/move_spec.rb b/spec/lib/activitypub/activity/move_spec.rb index 2d1d276c5..c468fdeff 100644 --- a/spec/lib/activitypub/activity/move_spec.rb +++ b/spec/lib/activitypub/activity/move_spec.rb @@ -84,9 +84,9 @@ RSpec.describe ActivityPub::Activity::Move do context 'when a Move has been recently processed' do around do |example| - Redis.current.set("move_in_progress:#{old_account.id}", true, nx: true, ex: 7.days.seconds) + redis.set("move_in_progress:#{old_account.id}", true, nx: true, ex: 7.days.seconds) example.run - Redis.current.del("move_in_progress:#{old_account.id}") + redis.del("move_in_progress:#{old_account.id}") end it 'does not set moved account on old account' do diff --git a/spec/lib/delivery_failure_tracker_spec.rb b/spec/lib/delivery_failure_tracker_spec.rb index 52a1a92f0..c8179ebd9 100644 --- a/spec/lib/delivery_failure_tracker_spec.rb +++ b/spec/lib/delivery_failure_tracker_spec.rb @@ -22,7 +22,7 @@ describe DeliveryFailureTracker do describe '#track_failure!' do it 'marks URL as unavailable after 7 days of being called' do - 6.times { |i| Redis.current.sadd('exhausted_deliveries:example.com', i) } + 6.times { |i| redis.sadd('exhausted_deliveries:example.com', i) } subject.track_failure! expect(subject.days).to eq 7 diff --git a/spec/lib/feed_manager_spec.rb b/spec/lib/feed_manager_spec.rb index 0df85e5bc..3ba8aaa9f 100644 --- a/spec/lib/feed_manager_spec.rb +++ b/spec/lib/feed_manager_spec.rb @@ -195,11 +195,11 @@ RSpec.describe FeedManager do account = Fabricate(:account) status = Fabricate(:status) members = FeedManager::MAX_ITEMS.times.map { |count| [count, count] } - Redis.current.zadd("feed:home:#{account.id}", members) + redis.zadd("feed:home:#{account.id}", members) FeedManager.instance.push_to_home(account, status) - expect(Redis.current.zcard("feed:home:#{account.id}")).to eq FeedManager::MAX_ITEMS + expect(redis.zcard("feed:home:#{account.id}")).to eq FeedManager::MAX_ITEMS end context 'reblogs' do @@ -424,7 +424,7 @@ RSpec.describe FeedManager do FeedManager.instance.merge_into_home(account, reblog.account) - expect(Redis.current.zscore("feed:home:0", reblog.id)).to eq nil + expect(redis.zscore("feed:home:0", reblog.id)).to eq nil end end @@ -440,13 +440,13 @@ RSpec.describe FeedManager do FeedManager.instance.push_to_home(receiver, status) # The reblogging status should show up under normal conditions. - expect(Redis.current.zrange("feed:home:#{receiver.id}", 0, -1)).to include(status.id.to_s) + expect(redis.zrange("feed:home:#{receiver.id}", 0, -1)).to include(status.id.to_s) FeedManager.instance.unpush_from_home(receiver, status) # Restore original status - expect(Redis.current.zrange("feed:home:#{receiver.id}", 0, -1)).to_not include(status.id.to_s) - expect(Redis.current.zrange("feed:home:#{receiver.id}", 0, -1)).to include(reblogged.id.to_s) + expect(redis.zrange("feed:home:#{receiver.id}", 0, -1)).to_not include(status.id.to_s) + expect(redis.zrange("feed:home:#{receiver.id}", 0, -1)).to include(reblogged.id.to_s) end it 'removes a reblogged status if it was only reblogged once' do @@ -456,11 +456,11 @@ RSpec.describe FeedManager do FeedManager.instance.push_to_home(receiver, status) # The reblogging status should show up under normal conditions. - expect(Redis.current.zrange("feed:home:#{receiver.id}", 0, -1)).to eq [status.id.to_s] + expect(redis.zrange("feed:home:#{receiver.id}", 0, -1)).to eq [status.id.to_s] FeedManager.instance.unpush_from_home(receiver, status) - expect(Redis.current.zrange("feed:home:#{receiver.id}", 0, -1)).to be_empty + expect(redis.zrange("feed:home:#{receiver.id}", 0, -1)).to be_empty end it 'leaves a multiply-reblogged status if another reblog was in feed' do @@ -472,13 +472,13 @@ RSpec.describe FeedManager do end # The reblogging status should show up under normal conditions. - expect(Redis.current.zrange("feed:home:#{receiver.id}", 0, -1)).to eq [reblogs.first.id.to_s] + expect(redis.zrange("feed:home:#{receiver.id}", 0, -1)).to eq [reblogs.first.id.to_s] reblogs[0...-1].each do |reblog| FeedManager.instance.unpush_from_home(receiver, reblog) end - expect(Redis.current.zrange("feed:home:#{receiver.id}", 0, -1)).to eq [reblogs.last.id.to_s] + expect(redis.zrange("feed:home:#{receiver.id}", 0, -1)).to eq [reblogs.last.id.to_s] end it 'sends push updates' do @@ -486,11 +486,11 @@ RSpec.describe FeedManager do FeedManager.instance.push_to_home(receiver, status) - allow(Redis.current).to receive_messages(publish: nil) + allow(redis).to receive_messages(publish: nil) FeedManager.instance.unpush_from_home(receiver, status) deletion = Oj.dump(event: :delete, payload: status.id.to_s) - expect(Redis.current).to have_received(:publish).with("timeline:#{receiver.id}", deletion) + expect(redis).to have_received(:publish).with("timeline:#{receiver.id}", deletion) end end @@ -508,14 +508,14 @@ RSpec.describe FeedManager do before do [status_1, status_3, status_5, status_6, status_7].each do |status| - Redis.current.zadd("feed:home:#{account.id}", status.id, status.id) + redis.zadd("feed:home:#{account.id}", status.id, status.id) end end it 'correctly cleans the home timeline' do FeedManager.instance.clear_from_home(account, target_account) - expect(Redis.current.zrange("feed:home:#{account.id}", 0, -1)).to eq [status_1.id.to_s, status_7.id.to_s] + expect(redis.zrange("feed:home:#{account.id}", 0, -1)).to eq [status_1.id.to_s, status_7.id.to_s] end end end diff --git a/spec/models/home_feed_spec.rb b/spec/models/home_feed_spec.rb index ee7a83960..80f6edbff 100644 --- a/spec/models/home_feed_spec.rb +++ b/spec/models/home_feed_spec.rb @@ -15,7 +15,7 @@ RSpec.describe HomeFeed, type: :model do context 'when feed is generated' do before do - Redis.current.zadd( + redis.zadd( FeedManager.instance.key(:home, account.id), [[4, 4], [3, 3], [2, 2], [1, 1]] ) @@ -31,7 +31,7 @@ RSpec.describe HomeFeed, type: :model do context 'when feed is being generated' do before do - Redis.current.set("account:#{account.id}:regeneration", true) + redis.set("account:#{account.id}:regeneration", true) end it 'returns nothing' do diff --git a/spec/rails_helper.rb b/spec/rails_helper.rb index 217a8f2f8..02827a388 100644 --- a/spec/rails_helper.rb +++ b/spec/rails_helper.rb @@ -13,7 +13,6 @@ Dir[Rails.root.join('spec/support/**/*.rb')].each { |f| require f } ActiveRecord::Migration.maintain_test_schema! WebMock.disable_net_connect!(allow: Chewy.settings[:host]) -Redis.current = Redis::Namespace.new("mastodon_test#{ENV['TEST_ENV_NUMBER']}", redis: Redis.current) Sidekiq::Testing.inline! Sidekiq.logger = nil @@ -44,6 +43,7 @@ RSpec.configure do |config| config.include Devise::Test::ControllerHelpers, type: :view config.include Paperclip::Shoulda::Matchers config.include ActiveSupport::Testing::TimeHelpers + config.include Redisable config.before :each, type: :feature do https = ENV['LOCAL_HTTPS'] == 'true' @@ -60,7 +60,7 @@ RSpec.configure do |config| config.after :each do Rails.cache.clear - Redis.current.del(Redis.current.keys) + redis.del(redis.keys) end end diff --git a/spec/services/after_block_service_spec.rb b/spec/services/after_block_service_spec.rb index c09425d7c..337766d06 100644 --- a/spec/services/after_block_service_spec.rb +++ b/spec/services/after_block_service_spec.rb @@ -14,7 +14,7 @@ RSpec.describe AfterBlockService, type: :service do let(:home_timeline_key) { FeedManager.instance.key(:home, account.id) } before do - Redis.current.del(home_timeline_key) + redis.del(home_timeline_key) end it "clears account's statuses" do @@ -23,7 +23,7 @@ RSpec.describe AfterBlockService, type: :service do FeedManager.instance.push_to_home(account, other_account_reblog) expect { subject }.to change { - Redis.current.zrange(home_timeline_key, 0, -1) + redis.zrange(home_timeline_key, 0, -1) }.from([status.id.to_s, other_account_status.id.to_s, other_account_reblog.id.to_s]).to([other_account_status.id.to_s]) end end @@ -33,7 +33,7 @@ RSpec.describe AfterBlockService, type: :service do let(:list_timeline_key) { FeedManager.instance.key(:list, list.id) } before do - Redis.current.del(list_timeline_key) + redis.del(list_timeline_key) end it "clears account's statuses" do @@ -42,7 +42,7 @@ RSpec.describe AfterBlockService, type: :service do FeedManager.instance.push_to_list(list, other_account_reblog) expect { subject }.to change { - Redis.current.zrange(list_timeline_key, 0, -1) + redis.zrange(list_timeline_key, 0, -1) }.from([status.id.to_s, other_account_status.id.to_s, other_account_reblog.id.to_s]).to([other_account_status.id.to_s]) end end diff --git a/spec/services/batched_remove_status_service_spec.rb b/spec/services/batched_remove_status_service_spec.rb index 8f38908cd..920edeb13 100644 --- a/spec/services/batched_remove_status_service_spec.rb +++ b/spec/services/batched_remove_status_service_spec.rb @@ -12,7 +12,7 @@ RSpec.describe BatchedRemoveStatusService, type: :service do let(:status2) { PostStatusService.new.call(alice, text: 'Another status') } before do - allow(Redis.current).to receive_messages(publish: nil) + allow(redis).to receive_messages(publish: nil) stub_request(:post, 'http://example.com/inbox').to_return(status: 200) @@ -40,11 +40,11 @@ RSpec.describe BatchedRemoveStatusService, type: :service do end it 'notifies streaming API of followers' do - expect(Redis.current).to have_received(:publish).with("timeline:#{jeff.id}", any_args).at_least(:once) + expect(redis).to have_received(:publish).with("timeline:#{jeff.id}", any_args).at_least(:once) end it 'notifies streaming API of public timeline' do - expect(Redis.current).to have_received(:publish).with('timeline:public', any_args).at_least(:once) + expect(redis).to have_received(:publish).with('timeline:public', any_args).at_least(:once) end it 'sends delete activity to followers' do diff --git a/spec/services/fan_out_on_write_service_spec.rb b/spec/services/fan_out_on_write_service_spec.rb index aaf179ce5..59e15d230 100644 --- a/spec/services/fan_out_on_write_service_spec.rb +++ b/spec/services/fan_out_on_write_service_spec.rb @@ -18,7 +18,7 @@ RSpec.describe FanOutOnWriteService, type: :service do ProcessMentionsService.new.call(status) ProcessHashtagsService.new.call(status) - allow(Redis.current).to receive(:publish) + allow(redis).to receive(:publish) subject.call(status) end @@ -40,13 +40,13 @@ RSpec.describe FanOutOnWriteService, type: :service do end it 'is broadcast to the hashtag stream' do - expect(Redis.current).to have_received(:publish).with('timeline:hashtag:hoge', anything) - expect(Redis.current).to have_received(:publish).with('timeline:hashtag:hoge:local', anything) + expect(redis).to have_received(:publish).with('timeline:hashtag:hoge', anything) + expect(redis).to have_received(:publish).with('timeline:hashtag:hoge:local', anything) end it 'is broadcast to the public stream' do - expect(Redis.current).to have_received(:publish).with('timeline:public', anything) - expect(Redis.current).to have_received(:publish).with('timeline:public:local', anything) + expect(redis).to have_received(:publish).with('timeline:public', anything) + expect(redis).to have_received(:publish).with('timeline:public:local', anything) end end @@ -66,8 +66,8 @@ RSpec.describe FanOutOnWriteService, type: :service do end it 'is not broadcast publicly' do - expect(Redis.current).to_not have_received(:publish).with('timeline:hashtag:hoge', anything) - expect(Redis.current).to_not have_received(:publish).with('timeline:public', anything) + expect(redis).to_not have_received(:publish).with('timeline:hashtag:hoge', anything) + expect(redis).to_not have_received(:publish).with('timeline:public', anything) end end @@ -84,8 +84,8 @@ RSpec.describe FanOutOnWriteService, type: :service do end it 'is not broadcast publicly' do - expect(Redis.current).to_not have_received(:publish).with('timeline:hashtag:hoge', anything) - expect(Redis.current).to_not have_received(:publish).with('timeline:public', anything) + expect(redis).to_not have_received(:publish).with('timeline:hashtag:hoge', anything) + expect(redis).to_not have_received(:publish).with('timeline:public', anything) end end @@ -105,8 +105,8 @@ RSpec.describe FanOutOnWriteService, type: :service do end it 'is not broadcast publicly' do - expect(Redis.current).to_not have_received(:publish).with('timeline:hashtag:hoge', anything) - expect(Redis.current).to_not have_received(:publish).with('timeline:public', anything) + expect(redis).to_not have_received(:publish).with('timeline:hashtag:hoge', anything) + expect(redis).to_not have_received(:publish).with('timeline:public', anything) end end end diff --git a/spec/services/mute_service_spec.rb b/spec/services/mute_service_spec.rb index bdec1c67b..57d8c41de 100644 --- a/spec/services/mute_service_spec.rb +++ b/spec/services/mute_service_spec.rb @@ -12,7 +12,7 @@ RSpec.describe MuteService, type: :service do let(:home_timeline_key) { FeedManager.instance.key(:home, account.id) } before do - Redis.current.del(home_timeline_key) + redis.del(home_timeline_key) end it "clears account's statuses" do @@ -20,7 +20,7 @@ RSpec.describe MuteService, type: :service do FeedManager.instance.push_to_home(account, other_account_status) expect { subject }.to change { - Redis.current.zrange(home_timeline_key, 0, -1) + redis.zrange(home_timeline_key, 0, -1) }.from([status.id.to_s, other_account_status.id.to_s]).to([other_account_status.id.to_s]) end end diff --git a/spec/services/precompute_feed_service_spec.rb b/spec/services/precompute_feed_service_spec.rb index 1f6b6ed88..86b93b5d2 100644 --- a/spec/services/precompute_feed_service_spec.rb +++ b/spec/services/precompute_feed_service_spec.rb @@ -13,7 +13,7 @@ RSpec.describe PrecomputeFeedService, type: :service do subject.call(account) - expect(Redis.current.zscore(FeedManager.instance.key(:home, account.id), status.id)).to be_within(0.1).of(status.id.to_f) + expect(redis.zscore(FeedManager.instance.key(:home, account.id), status.id)).to be_within(0.1).of(status.id.to_f) end it 'does not raise an error even if it could not find any status' do @@ -30,7 +30,7 @@ RSpec.describe PrecomputeFeedService, type: :service do subject.call(account) - expect(Redis.current.zscore(FeedManager.instance.key(:home, account.id), reblog.id)).to eq nil + expect(redis.zscore(FeedManager.instance.key(:home, account.id), reblog.id)).to eq nil end end end diff --git a/spec/workers/scheduler/feed_cleanup_scheduler_spec.rb b/spec/workers/scheduler/feed_cleanup_scheduler_spec.rb index 914eed829..82d794594 100644 --- a/spec/workers/scheduler/feed_cleanup_scheduler_spec.rb +++ b/spec/workers/scheduler/feed_cleanup_scheduler_spec.rb @@ -7,17 +7,17 @@ describe Scheduler::FeedCleanupScheduler do let!(:inactive_user) { Fabricate(:user, current_sign_in_at: 22.days.ago) } it 'clears feeds of inactives' do - Redis.current.zadd(feed_key_for(inactive_user), 1, 1) - Redis.current.zadd(feed_key_for(active_user), 1, 1) - Redis.current.zadd(feed_key_for(inactive_user, 'reblogs'), 2, 2) - Redis.current.sadd(feed_key_for(inactive_user, 'reblogs:2'), 3) + redis.zadd(feed_key_for(inactive_user), 1, 1) + redis.zadd(feed_key_for(active_user), 1, 1) + redis.zadd(feed_key_for(inactive_user, 'reblogs'), 2, 2) + redis.sadd(feed_key_for(inactive_user, 'reblogs:2'), 3) subject.perform - expect(Redis.current.zcard(feed_key_for(inactive_user))).to eq 0 - expect(Redis.current.zcard(feed_key_for(active_user))).to eq 1 - expect(Redis.current.exists?(feed_key_for(inactive_user, 'reblogs'))).to be false - expect(Redis.current.exists?(feed_key_for(inactive_user, 'reblogs:2'))).to be false + expect(redis.zcard(feed_key_for(inactive_user))).to eq 0 + expect(redis.zcard(feed_key_for(active_user))).to eq 1 + expect(redis.exists?(feed_key_for(inactive_user, 'reblogs'))).to be false + expect(redis.exists?(feed_key_for(inactive_user, 'reblogs:2'))).to be false end def feed_key_for(user, subtype = nil) |