diff options
author | Eugen Rochko <eugen@zeonfederated.com> | 2022-04-28 17:47:34 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-04-28 17:47:34 +0200 |
commit | 3917353645b91dae04f7d9b81162fead6f73072a (patch) | |
tree | 202cbf93dfa378c26a291900ea65ec088ee5b5cc /app | |
parent | 9bf04db23acf10e05ffdf7def06c246081f8f065 (diff) |
Fix single Redis connection being used across all threads (#18135)
* Fix single Redis connection being used across all Sidekiq threads * Fix tests
Diffstat (limited to 'app')
25 files changed, 118 insertions, 39 deletions
diff --git a/app/controllers/admin/dashboard_controller.rb b/app/controllers/admin/dashboard_controller.rb index e376baab2..da9c6dd16 100644 --- a/app/controllers/admin/dashboard_controller.rb +++ b/app/controllers/admin/dashboard_controller.rb @@ -2,6 +2,8 @@ module Admin class DashboardController < BaseController + include Redisable + def index @system_checks = Admin::SystemCheck.perform @time_period = (29.days.ago.to_date...Time.now.utc.to_date) @@ -15,10 +17,10 @@ module Admin def redis_info @redis_info ||= begin - if Redis.current.is_a?(Redis::Namespace) - Redis.current.redis.info + if redis.is_a?(Redis::Namespace) + redis.redis.info else - Redis.current.info + redis.info end end end diff --git a/app/controllers/media_proxy_controller.rb b/app/controllers/media_proxy_controller.rb index 5596e92d1..d2a4cb207 100644 --- a/app/controllers/media_proxy_controller.rb +++ b/app/controllers/media_proxy_controller.rb @@ -3,6 +3,7 @@ class MediaProxyController < ApplicationController include RoutingHelper include Authorization + include Redisable skip_before_action :store_current_location skip_before_action :require_functional! @@ -45,7 +46,7 @@ class MediaProxyController < ApplicationController end def lock_options - { redis: Redis.current, key: "media_download:#{params[:id]}", autorelease: 15.minutes.seconds } + { redis: redis, key: "media_download:#{params[:id]}", autorelease: 15.minutes.seconds } end def reject_media? diff --git a/app/controllers/settings/exports_controller.rb b/app/controllers/settings/exports_controller.rb index 30138d29e..1638d3412 100644 --- a/app/controllers/settings/exports_controller.rb +++ b/app/controllers/settings/exports_controller.rb @@ -2,6 +2,7 @@ class Settings::ExportsController < Settings::BaseController include Authorization + include Redisable skip_before_action :require_functional! @@ -28,6 +29,6 @@ class Settings::ExportsController < Settings::BaseController end def lock_options - { redis: Redis.current, key: "backup:#{current_user.id}" } + { redis: redis, key: "backup:#{current_user.id}" } end end diff --git a/app/lib/access_token_extension.rb b/app/lib/access_token_extension.rb index 2cafaaa20..f51bde492 100644 --- a/app/lib/access_token_extension.rb +++ b/app/lib/access_token_extension.rb @@ -4,6 +4,8 @@ module AccessTokenExtension extend ActiveSupport::Concern included do + include Redisable + after_commit :push_to_streaming_api end @@ -16,6 +18,6 @@ module AccessTokenExtension end def push_to_streaming_api - Redis.current.publish("timeline:access_token:#{id}", Oj.dump(event: :kill)) if revoked? || destroyed? + redis.publish("timeline:access_token:#{id}", Oj.dump(event: :kill)) if revoked? || destroyed? end end diff --git a/app/lib/activitypub/activity.rb b/app/lib/activitypub/activity.rb index f599e1b58..3c51a7a51 100644 --- a/app/lib/activitypub/activity.rb +++ b/app/lib/activitypub/activity.rb @@ -164,7 +164,7 @@ class ActivityPub::Activity end def lock_or_fail(key, expire_after = 15.minutes.seconds) - RedisLock.acquire({ redis: Redis.current, key: key, autorelease: expire_after }) do |lock| + RedisLock.acquire({ redis: redis, key: key, autorelease: expire_after }) do |lock| if lock.acquired? yield else diff --git a/app/lib/delivery_failure_tracker.rb b/app/lib/delivery_failure_tracker.rb index 7b800fc0b..7c4e28eb7 100644 --- a/app/lib/delivery_failure_tracker.rb +++ b/app/lib/delivery_failure_tracker.rb @@ -1,6 +1,8 @@ # frozen_string_literal: true class DeliveryFailureTracker + include Redisable + FAILURE_DAYS_THRESHOLD = 7 def initialize(url_or_host) @@ -8,21 +10,21 @@ class DeliveryFailureTracker end def track_failure! - Redis.current.sadd(exhausted_deliveries_key, today) + redis.sadd(exhausted_deliveries_key, today) UnavailableDomain.create(domain: @host) if reached_failure_threshold? end def track_success! - Redis.current.del(exhausted_deliveries_key) + redis.del(exhausted_deliveries_key) UnavailableDomain.find_by(domain: @host)&.destroy end def clear_failures! - Redis.current.del(exhausted_deliveries_key) + redis.del(exhausted_deliveries_key) end def days - Redis.current.scard(exhausted_deliveries_key) || 0 + redis.scard(exhausted_deliveries_key) || 0 end def available? @@ -30,12 +32,14 @@ class DeliveryFailureTracker end def exhausted_deliveries_days - @exhausted_deliveries_days ||= Redis.current.smembers(exhausted_deliveries_key).sort.map { |date| Date.new(date.slice(0, 4).to_i, date.slice(4, 2).to_i, date.slice(6, 2).to_i) } + @exhausted_deliveries_days ||= redis.smembers(exhausted_deliveries_key).sort.map { |date| Date.new(date.slice(0, 4).to_i, date.slice(4, 2).to_i, date.slice(6, 2).to_i) } end alias reset! track_success! class << self + include Redisable + def without_unavailable(urls) unavailable_domains_map = Rails.cache.fetch('unavailable_domains') { UnavailableDomain.pluck(:domain).index_with(true) } @@ -54,7 +58,7 @@ class DeliveryFailureTracker end def warning_domains - domains = Redis.current.keys(exhausted_deliveries_key_by('*')).map do |key| + domains = redis.keys(exhausted_deliveries_key_by('*')).map do |key| key.delete_prefix(exhausted_deliveries_key_by('')) end @@ -62,7 +66,7 @@ class DeliveryFailureTracker end def warning_domains_map - warning_domains.index_with { |domain| Redis.current.scard(exhausted_deliveries_key_by(domain)) } + warning_domains.index_with { |domain| redis.scard(exhausted_deliveries_key_by(domain)) } end private diff --git a/app/lib/redis_configuration.rb b/app/lib/redis_configuration.rb new file mode 100644 index 000000000..fc8cf2f80 --- /dev/null +++ b/app/lib/redis_configuration.rb @@ -0,0 +1,47 @@ +# frozen_string_literal: true + +class RedisConfiguration + class << self + def with + pool.with { |redis| yield redis } + end + + def pool + @pool ||= ConnectionPool.new(size: pool_size) { new.connection } + end + + def pool_size + if Sidekiq.server? + Sidekiq.options[:concurrency] + else + ENV['MAX_THREADS'] || 5 + end + end + end + + def connection + if namespace? + Redis::Namespace.new(namespace, redis: raw_connection) + else + raw_connection + end + end + + def namespace? + namespace.present? + end + + def namespace + ENV.fetch('REDIS_NAMESPACE', nil) + end + + def url + ENV['REDIS_URL'] + end + + private + + def raw_connection + Redis.new(url: url, driver: :hiredis) + end +end diff --git a/app/models/account_conversation.rb b/app/models/account_conversation.rb index 56fd13543..45e74bbeb 100644 --- a/app/models/account_conversation.rb +++ b/app/models/account_conversation.rb @@ -14,6 +14,8 @@ # class AccountConversation < ApplicationRecord + include Redisable + after_commit :push_to_streaming_api belongs_to :account @@ -109,7 +111,7 @@ class AccountConversation < ApplicationRecord end def subscribed_to_timeline? - Redis.current.exists?("subscribed:#{streaming_channel}") + redis.exists?("subscribed:#{streaming_channel}") end def streaming_channel diff --git a/app/models/account_suggestions/global_source.rb b/app/models/account_suggestions/global_source.rb index 7bca530d4..651041d67 100644 --- a/app/models/account_suggestions/global_source.rb +++ b/app/models/account_suggestions/global_source.rb @@ -1,6 +1,8 @@ # frozen_string_literal: true class AccountSuggestions::GlobalSource < AccountSuggestions::Source + include Redisable + def key :global end @@ -28,7 +30,7 @@ class AccountSuggestions::GlobalSource < AccountSuggestions::Source end def account_ids_for_locale(locale) - Redis.current.zrevrange("follow_recommendations:#{locale}", 0, -1).map(&:to_i) + redis.zrevrange("follow_recommendations:#{locale}", 0, -1).map(&:to_i) end def to_ordered_list_key(account) diff --git a/app/models/concerns/redisable.rb b/app/models/concerns/redisable.rb index c6cf97359..cce8efb86 100644 --- a/app/models/concerns/redisable.rb +++ b/app/models/concerns/redisable.rb @@ -6,6 +6,6 @@ module Redisable private def redis - Redis.current + Thread.current[:redis] ||= RedisConfiguration.new.connection end end diff --git a/app/models/custom_filter.rb b/app/models/custom_filter.rb index 9d0f3729b..8e3476794 100644 --- a/app/models/custom_filter.rb +++ b/app/models/custom_filter.rb @@ -24,6 +24,7 @@ class CustomFilter < ApplicationRecord ).freeze include Expireable + include Redisable belongs_to :account @@ -51,7 +52,7 @@ class CustomFilter < ApplicationRecord def remove_cache Rails.cache.delete("filters:#{account_id}") - Redis.current.publish("timeline:#{account_id}", Oj.dump(event: :filters_changed)) + redis.publish("timeline:#{account_id}", Oj.dump(event: :filters_changed)) end def context_must_be_valid diff --git a/app/models/encrypted_message.rb b/app/models/encrypted_message.rb index aa4182b4e..7b4e32283 100644 --- a/app/models/encrypted_message.rb +++ b/app/models/encrypted_message.rb @@ -19,6 +19,7 @@ class EncryptedMessage < ApplicationRecord self.inheritance_column = nil include Paginable + include Redisable scope :up_to, ->(id) { where(arel_table[:id].lteq(id)) } @@ -38,7 +39,7 @@ class EncryptedMessage < ApplicationRecord end def subscribed_to_timeline? - Redis.current.exists?("subscribed:#{streaming_channel}") + redis.exists?("subscribed:#{streaming_channel}") end def streaming_channel diff --git a/app/models/follow_recommendation_filter.rb b/app/models/follow_recommendation_filter.rb index acf03cd84..531332614 100644 --- a/app/models/follow_recommendation_filter.rb +++ b/app/models/follow_recommendation_filter.rb @@ -1,6 +1,8 @@ # frozen_string_literal: true class FollowRecommendationFilter + include Redisable + KEYS = %i( language status @@ -17,7 +19,7 @@ class FollowRecommendationFilter if params['status'] == 'suppressed' Account.joins(:follow_recommendation_suppression).order(FollowRecommendationSuppression.arel_table[:id].desc).to_a else - account_ids = Redis.current.zrevrange("follow_recommendations:#{@language}", 0, -1).map(&:to_i) + account_ids = redis.zrevrange("follow_recommendations:#{@language}", 0, -1).map(&:to_i) accounts = Account.where(id: account_ids).index_by(&:id) account_ids.map { |id| accounts[id] }.compact diff --git a/app/models/user.rb b/app/models/user.rb index 9da7b2639..ab832bcd0 100644 --- a/app/models/user.rb +++ b/app/models/user.rb @@ -52,6 +52,7 @@ class User < ApplicationRecord include Settings::Extend include UserRoles + include Redisable # The home and list feeds will be stored in Redis for this amount # of time, and status fan-out to followers will include only people @@ -456,7 +457,7 @@ class User < ApplicationRecord end def regenerate_feed! - RegenerationWorker.perform_async(account_id) if Redis.current.set("account:#{account_id}:regeneration", true, nx: true, ex: 1.day.seconds) + RegenerationWorker.perform_async(account_id) if redis.set("account:#{account_id}:regeneration", true, nx: true, ex: 1.day.seconds) end def needs_feed_update? diff --git a/app/services/activitypub/process_account_service.rb b/app/services/activitypub/process_account_service.rb index ec5140720..5649153ee 100644 --- a/app/services/activitypub/process_account_service.rb +++ b/app/services/activitypub/process_account_service.rb @@ -3,6 +3,7 @@ class ActivityPub::ProcessAccountService < BaseService include JsonLdHelper include DomainControlHelper + include Redisable # Should be called with confirmed valid JSON # and WebFinger-resolved username and domain @@ -289,7 +290,7 @@ class ActivityPub::ProcessAccountService < BaseService end def lock_options - { redis: Redis.current, key: "process_account:#{@uri}", autorelease: 15.minutes.seconds } + { redis: redis, key: "process_account:#{@uri}", autorelease: 15.minutes.seconds } end def process_tags diff --git a/app/services/activitypub/process_status_update_service.rb b/app/services/activitypub/process_status_update_service.rb index 3d9d9cb84..fb6e44c6d 100644 --- a/app/services/activitypub/process_status_update_service.rb +++ b/app/services/activitypub/process_status_update_service.rb @@ -2,6 +2,7 @@ class ActivityPub::ProcessStatusUpdateService < BaseService include JsonLdHelper + include Redisable def call(status, json) raise ArgumentError, 'Status has unsaved changes' if status.changed? @@ -241,7 +242,7 @@ class ActivityPub::ProcessStatusUpdateService < BaseService end def lock_options - { redis: Redis.current, key: "create:#{@uri}", autorelease: 15.minutes.seconds } + { redis: redis, key: "create:#{@uri}", autorelease: 15.minutes.seconds } end def record_previous_edit! diff --git a/app/services/fan_out_on_write_service.rb b/app/services/fan_out_on_write_service.rb index 76404c6b8..de5c5ebe4 100644 --- a/app/services/fan_out_on_write_service.rb +++ b/app/services/fan_out_on_write_service.rb @@ -1,6 +1,8 @@ # frozen_string_literal: true class FanOutOnWriteService < BaseService + include Redisable + # Push a status into home and mentions feeds # @param [Status] status # @param [Hash] options @@ -99,20 +101,20 @@ class FanOutOnWriteService < BaseService def broadcast_to_hashtag_streams! @status.tags.pluck(:name).each do |hashtag| - Redis.current.publish("timeline:hashtag:#{hashtag.mb_chars.downcase}", anonymous_payload) - Redis.current.publish("timeline:hashtag:#{hashtag.mb_chars.downcase}:local", anonymous_payload) if @status.local? + redis.publish("timeline:hashtag:#{hashtag.mb_chars.downcase}", anonymous_payload) + redis.publish("timeline:hashtag:#{hashtag.mb_chars.downcase}:local", anonymous_payload) if @status.local? end end def broadcast_to_public_streams! return if @status.reply? && @status.in_reply_to_account_id != @account.id - Redis.current.publish('timeline:public', anonymous_payload) - Redis.current.publish(@status.local? ? 'timeline:public:local' : 'timeline:public:remote', anonymous_payload) + redis.publish('timeline:public', anonymous_payload) + redis.publish(@status.local? ? 'timeline:public:local' : 'timeline:public:remote', anonymous_payload) if @status.with_media? - Redis.current.publish('timeline:public:media', anonymous_payload) - Redis.current.publish(@status.local? ? 'timeline:public:local:media' : 'timeline:public:remote:media', anonymous_payload) + redis.publish('timeline:public:media', anonymous_payload) + redis.publish(@status.local? ? 'timeline:public:local:media' : 'timeline:public:remote:media', anonymous_payload) end end diff --git a/app/services/fetch_link_card_service.rb b/app/services/fetch_link_card_service.rb index 9c8b5ea20..868796a6b 100644 --- a/app/services/fetch_link_card_service.rb +++ b/app/services/fetch_link_card_service.rb @@ -1,6 +1,8 @@ # frozen_string_literal: true class FetchLinkCardService < BaseService + include Redisable + URL_PATTERN = %r{ (#{Twitter::TwitterText::Regex[:valid_url_preceding_chars]}) # $1 preceding chars ( # $2 URL @@ -155,6 +157,6 @@ class FetchLinkCardService < BaseService end def lock_options - { redis: Redis.current, key: "fetch:#{@original_url}", autorelease: 15.minutes.seconds } + { redis: redis, key: "fetch:#{@original_url}", autorelease: 15.minutes.seconds } end end diff --git a/app/services/precompute_feed_service.rb b/app/services/precompute_feed_service.rb index 61f573534..f813f06b2 100644 --- a/app/services/precompute_feed_service.rb +++ b/app/services/precompute_feed_service.rb @@ -1,9 +1,11 @@ # frozen_string_literal: true class PrecomputeFeedService < BaseService + include Redisable + def call(account) FeedManager.instance.populate_home(account) ensure - Redis.current.del("account:#{account.id}:regeneration") + redis.del("account:#{account.id}:regeneration") end end diff --git a/app/services/remove_status_service.rb b/app/services/remove_status_service.rb index 41730154d..dbd1f6430 100644 --- a/app/services/remove_status_service.rb +++ b/app/services/remove_status_service.rb @@ -146,6 +146,6 @@ class RemoveStatusService < BaseService end def lock_options - { redis: Redis.current, key: "distribute:#{@status.id}", autorelease: 5.minutes.seconds } + { redis: redis, key: "distribute:#{@status.id}", autorelease: 5.minutes.seconds } end end diff --git a/app/services/resolve_account_service.rb b/app/services/resolve_account_service.rb index 3a372ef2a..21332a03e 100644 --- a/app/services/resolve_account_service.rb +++ b/app/services/resolve_account_service.rb @@ -4,6 +4,7 @@ class ResolveAccountService < BaseService include JsonLdHelper include DomainControlHelper include WebfingerHelper + include Redisable # Find or create an account record for a remote user. When creating, # look up the user's webfinger and fetch ActivityPub data @@ -147,6 +148,6 @@ class ResolveAccountService < BaseService end def lock_options - { redis: Redis.current, key: "resolve:#{@username}@#{@domain}", autorelease: 15.minutes.seconds } + { redis: redis, key: "resolve:#{@username}@#{@domain}", autorelease: 15.minutes.seconds } end end diff --git a/app/services/vote_service.rb b/app/services/vote_service.rb index 19e453332..b77812970 100644 --- a/app/services/vote_service.rb +++ b/app/services/vote_service.rb @@ -3,6 +3,7 @@ class VoteService < BaseService include Authorization include Payloadable + include Redisable def call(account, poll, choices) authorize_with account, poll, :vote? @@ -77,6 +78,6 @@ class VoteService < BaseService end def lock_options - { redis: Redis.current, key: "vote:#{@poll.id}:#{@account.id}" } + { redis: redis, key: "vote:#{@poll.id}:#{@account.id}" } end end diff --git a/app/workers/distribution_worker.rb b/app/workers/distribution_worker.rb index 770325ccf..474b4daaf 100644 --- a/app/workers/distribution_worker.rb +++ b/app/workers/distribution_worker.rb @@ -2,9 +2,10 @@ class DistributionWorker include Sidekiq::Worker + include Redisable def perform(status_id, options = {}) - RedisLock.acquire(redis: Redis.current, key: "distribute:#{status_id}", autorelease: 5.minutes.seconds) do |lock| + RedisLock.acquire(redis: redis, key: "distribute:#{status_id}", autorelease: 5.minutes.seconds) do |lock| if lock.acquired? FanOutOnWriteService.new.call(Status.find(status_id), **options.symbolize_keys) else diff --git a/app/workers/merge_worker.rb b/app/workers/merge_worker.rb index 6ebb9a400..e526d2887 100644 --- a/app/workers/merge_worker.rb +++ b/app/workers/merge_worker.rb @@ -2,12 +2,13 @@ class MergeWorker include Sidekiq::Worker + include Redisable def perform(from_account_id, into_account_id) FeedManager.instance.merge_into_home(Account.find(from_account_id), Account.find(into_account_id)) rescue ActiveRecord::RecordNotFound true ensure - Redis.current.del("account:#{into_account_id}:regeneration") + redis.del("account:#{into_account_id}:regeneration") end end diff --git a/app/workers/scheduler/accounts_statuses_cleanup_scheduler.rb b/app/workers/scheduler/accounts_statuses_cleanup_scheduler.rb index 7195f0ff9..bd92fe32c 100644 --- a/app/workers/scheduler/accounts_statuses_cleanup_scheduler.rb +++ b/app/workers/scheduler/accounts_statuses_cleanup_scheduler.rb @@ -2,6 +2,7 @@ class Scheduler::AccountsStatusesCleanupScheduler include Sidekiq::Worker + include Redisable # This limit is mostly to be nice to the fediverse at large and not # generate too much traffic. @@ -83,14 +84,14 @@ class Scheduler::AccountsStatusesCleanupScheduler end def last_processed_id - Redis.current.get('account_statuses_cleanup_scheduler:last_account_id') + redis.get('account_statuses_cleanup_scheduler:last_account_id') end def save_last_processed_id(id) if id.nil? - Redis.current.del('account_statuses_cleanup_scheduler:last_account_id') + redis.del('account_statuses_cleanup_scheduler:last_account_id') else - Redis.current.set('account_statuses_cleanup_scheduler:last_account_id', id, ex: 1.hour.seconds) + redis.set('account_statuses_cleanup_scheduler:last_account_id', id, ex: 1.hour.seconds) end end end |