diff options
Diffstat (limited to 'app')
-rw-r--r-- | app/controllers/activitypub/inboxes_controller.rb | 8 | ||||
-rw-r--r-- | app/lib/connection_pool/shared_connection_pool.rb | 63 | ||||
-rw-r--r-- | app/lib/connection_pool/shared_timed_stack.rb | 95 | ||||
-rw-r--r-- | app/lib/request.rb | 51 | ||||
-rw-r--r-- | app/lib/request_pool.rb | 114 | ||||
-rw-r--r-- | app/services/resolve_url_service.rb | 2 | ||||
-rw-r--r-- | app/workers/activitypub/delivery_worker.rb | 4 |
7 files changed, 319 insertions, 18 deletions
diff --git a/app/controllers/activitypub/inboxes_controller.rb b/app/controllers/activitypub/inboxes_controller.rb index 7cfd9a25e..469c61de9 100644 --- a/app/controllers/activitypub/inboxes_controller.rb +++ b/app/controllers/activitypub/inboxes_controller.rb @@ -43,10 +43,10 @@ class ActivityPub::InboxesController < Api::BaseController end def upgrade_account - if signed_request_account.ostatus? - signed_request_account.update(last_webfingered_at: nil) - ResolveAccountWorker.perform_async(signed_request_account.acct) - end +# if signed_request_account.ostatus? +# signed_request_account.update(last_webfingered_at: nil) +# ResolveAccountWorker.perform_async(signed_request_account.acct) +# end DeliveryFailureTracker.track_inverse_success!(signed_request_account) end diff --git a/app/lib/connection_pool/shared_connection_pool.rb b/app/lib/connection_pool/shared_connection_pool.rb new file mode 100644 index 000000000..2865a4108 --- /dev/null +++ b/app/lib/connection_pool/shared_connection_pool.rb @@ -0,0 +1,63 @@ +# frozen_string_literal: true + +require 'connection_pool' +require_relative './shared_timed_stack' + +class ConnectionPool::SharedConnectionPool < ConnectionPool + def initialize(options = {}, &block) + super(options, &block) + + @available = ConnectionPool::SharedTimedStack.new(@size, &block) + end + + delegate :size, :flush, to: :@available + + def with(preferred_tag, options = {}) + Thread.handle_interrupt(Exception => :never) do + conn = checkout(preferred_tag, options) + + begin + Thread.handle_interrupt(Exception => :immediate) do + yield conn + end + ensure + checkin(preferred_tag) + end + end + end + + def checkout(preferred_tag, options = {}) + if ::Thread.current[key(preferred_tag)] + ::Thread.current[key_count(preferred_tag)] += 1 + ::Thread.current[key(preferred_tag)] + else + ::Thread.current[key_count(preferred_tag)] = 1 + ::Thread.current[key(preferred_tag)] = @available.pop(preferred_tag, options[:timeout] || @timeout) + end + end + + def checkin(preferred_tag) + if ::Thread.current[key(preferred_tag)] + if ::Thread.current[key_count(preferred_tag)] == 1 + @available.push(::Thread.current[key(preferred_tag)]) + ::Thread.current[key(preferred_tag)] = nil + else + ::Thread.current[key_count(preferred_tag)] -= 1 + end + else + raise ConnectionPool::Error, 'no connections are checked out' + end + + nil + end + + private + + def key(tag) + :"#{@key}-#{tag}" + end + + def key_count(tag) + :"#{@key_count}-#{tag}" + end +end diff --git a/app/lib/connection_pool/shared_timed_stack.rb b/app/lib/connection_pool/shared_timed_stack.rb new file mode 100644 index 000000000..14a5285c4 --- /dev/null +++ b/app/lib/connection_pool/shared_timed_stack.rb @@ -0,0 +1,95 @@ +# frozen_string_literal: true + +class ConnectionPool::SharedTimedStack + def initialize(max = 0, &block) + @create_block = block + @max = max + @created = 0 + @queue = [] + @tagged_queue = Hash.new { |hash, key| hash[key] = [] } + @mutex = Mutex.new + @resource = ConditionVariable.new + end + + def push(connection) + @mutex.synchronize do + store_connection(connection) + @resource.broadcast + end + end + + alias << push + + def pop(preferred_tag, timeout = 5.0) + deadline = current_time + timeout + + @mutex.synchronize do + loop do + return fetch_preferred_connection(preferred_tag) unless @tagged_queue[preferred_tag].empty? + + connection = try_create(preferred_tag) + return connection if connection + + to_wait = deadline - current_time + raise Timeout::Error, "Waited #{timeout} sec" if to_wait <= 0 + + @resource.wait(@mutex, to_wait) + end + end + end + + def empty? + size.zero? + end + + def size + @mutex.synchronize do + @queue.size + end + end + + def flush + @mutex.synchronize do + @queue.delete_if do |connection| + delete = !connection.in_use && (connection.dead || connection.seconds_idle >= RequestPool::MAX_IDLE_TIME) + + if delete + @tagged_queue[connection.site].delete(connection) + connection.close + @created -= 1 + end + + delete + end + end + end + + private + + def try_create(preferred_tag) + if @created == @max && !@queue.empty? + throw_away_connection = @queue.pop + @tagged_queue[throw_away_connection.site].delete(throw_away_connection) + @create_block.call(preferred_tag) + elsif @created != @max + connection = @create_block.call(preferred_tag) + @created += 1 + connection + end + end + + def fetch_preferred_connection(preferred_tag) + connection = @tagged_queue[preferred_tag].pop + @queue.delete(connection) + connection + end + + def current_time + Process.clock_gettime(Process::CLOCK_MONOTONIC) + end + + def store_connection(connection) + @tagged_queue[connection.site].push(connection) + @queue.push(connection) + end +end diff --git a/app/lib/request.rb b/app/lib/request.rb index 7a8141c7e..5186bb5a3 100644 --- a/app/lib/request.rb +++ b/app/lib/request.rb @@ -16,17 +16,17 @@ end class Request REQUEST_TARGET = '(request-target)' - + TIMEOUT = { connect: 5, read: 10, write: 10 }.freeze include RoutingHelper def initialize(verb, url, **options) raise ArgumentError if url.blank? - @verb = verb - @url = Addressable::URI.parse(url).normalize - @options = options.merge(use_proxy? ? Rails.configuration.x.http_client_proxy : { socket_class: Socket }) - @headers = {} - + @verb = verb + @url = Addressable::URI.parse(url).normalize + @http_client = options.delete(:http_client) + @options = options.merge(use_proxy? ? Rails.configuration.x.http_client_proxy : { socket_class: Socket }) + @headers = {} raise Mastodon::HostValidationError, 'Instance does not support hidden service connections' if block_hidden_service? set_common_headers! @@ -50,15 +50,17 @@ class Request def perform begin - response = http_client.headers(headers).public_send(@verb, @url.to_s, @options) + response = http_client.public_send(@verb, @url.to_s, @options.merge(headers: headers)) rescue => e raise e.class, "#{e.message} on #{@url}", e.backtrace end begin - yield response.extend(ClientLimit) if block_given? + response = response.extend(ClientLimit) + response.body_with_limit if http_client.persistent? + yield response if block_given? ensure - http_client.close + http_client.close unless http_client.persistent? end end @@ -76,6 +78,9 @@ class Request %w(http https).include?(parsed_url.scheme) && parsed_url.host.present? end + def http_client + HTTP.use(:auto_inflate).timeout(:per_operation, TIMEOUT.dup).follow(max_hops: 2) + end end private @@ -125,7 +130,7 @@ class Request end def http_client - @http_client ||= HTTP.use(:auto_inflate).timeout(:per_operation, timeout).follow(max_hops: 3) + @http_client ||= Request.http_client end def use_proxy? @@ -169,6 +174,7 @@ class Request return super(host, *args) if thru_hidden_service?(host) outer_e = nil + port = args.first Resolv::DNS.open do |dns| dns.timeouts = 5 @@ -179,10 +185,29 @@ class Request addresses.each do |address| begin raise Mastodon::HostValidationError if PrivateAddressCheck.private_address?(IPAddr.new(address.to_s)) - - ::Timeout.timeout(time_slot, HTTP::TimeoutError) do - return super(address.to_s, *args) + sock = ::Socket.new(::Socket::AF_INET, ::Socket::SOCK_STREAM, 0) + sockaddr = ::Socket.pack_sockaddr_in(port, address.to_s) + + sock.setsockopt(::Socket::IPPROTO_TCP, ::Socket::TCP_NODELAY, 1) + + begin + sock.connect_nonblock(sockaddr) + rescue IO::WaitWritable + if IO.select(nil, [sock], nil, Request::TIMEOUT[:connect]) + begin + sock.connect_nonblock(sockaddr) + rescue Errno::EISCONN + # Yippee! + rescue + sock.close + raise + end + else + sock.close + raise HTTP::TimeoutError, "Connect timed out after #{Request::TIMEOUT[:connect]} seconds" + end end + return sock rescue => e outer_e = e end diff --git a/app/lib/request_pool.rb b/app/lib/request_pool.rb new file mode 100644 index 000000000..e5899a79a --- /dev/null +++ b/app/lib/request_pool.rb @@ -0,0 +1,114 @@ +# frozen_string_literal: true + +require_relative './connection_pool/shared_connection_pool' + +class RequestPool + def self.current + @current ||= RequestPool.new + end + + class Reaper + attr_reader :pool, :frequency + + def initialize(pool, frequency) + @pool = pool + @frequency = frequency + end + + def run + return unless frequency&.positive? + + Thread.new(frequency, pool) do |t, p| + loop do + sleep t + p.flush + end + end + end + end + + MAX_IDLE_TIME = 30 + WAIT_TIMEOUT = 5 + MAX_POOL_SIZE = ENV.fetch('MAX_REQUEST_POOL_SIZE', 512).to_i + + class Connection + attr_reader :site, :last_used_at, :created_at, :in_use, :dead, :fresh + + def initialize(site) + @site = site + @http_client = http_client + @last_used_at = nil + @created_at = current_time + @dead = false + @fresh = true + end + + def use + @last_used_at = current_time + @in_use = true + + retries = 0 + + begin + yield @http_client + rescue HTTP::ConnectionError + # It's possible the connection was closed, so let's + # try re-opening it once + + close + + if @fresh || retries.positive? + raise + else + @http_client = http_client + retries += 1 + retry + end + rescue StandardError + # If this connection raises errors of any kind, it's + # better if it gets reaped as soon as possible + + close + @dead = true + raise + end + ensure + @fresh = false + @in_use = false + end + + def seconds_idle + current_time - (@last_used_at || @created_at) + end + + def close + @http_client.close + end + + private + + def http_client + Request.http_client.persistent(@site, timeout: MAX_IDLE_TIME) + end + + def current_time + Process.clock_gettime(Process::CLOCK_MONOTONIC) + end + end + + def initialize + @pool = ConnectionPool::SharedConnectionPool.new(size: MAX_POOL_SIZE, timeout: WAIT_TIMEOUT) { |site| Connection.new(site) } + @reaper = Reaper.new(self, 30) + @reaper.run + end + + def with(site, &block) + @pool.with(site) do |connection| + ActiveSupport::Notifications.instrument('with.request_pool', miss: connection.fresh, host: connection.site) do + connection.use(&block) + end + end + end + + delegate :size, :flush, to: :@pool +end diff --git a/app/services/resolve_url_service.rb b/app/services/resolve_url_service.rb index bab420945..65b6ffd61 100644 --- a/app/services/resolve_url_service.rb +++ b/app/services/resolve_url_service.rb @@ -21,7 +21,7 @@ class ResolveURLService < BaseService if equals_or_includes_any?(type, ActivityPub::FetchRemoteAccountService::SUPPORTED_TYPES) FetchRemoteAccountService.new.call(resource_url, body) elsif equals_or_includes_any?(type, ActivityPub::Activity::Create::SUPPORTED_TYPES + ActivityPub::Activity::Create::CONVERTED_TYPES) - status = FetchRemoteStatusService.new.call(atom_url, body, requested: true) + status = FetchRemoteStatusService.new.call(resource_url, body, requested: true) authorize_with @on_behalf_of, status, :show? unless status.nil? status end diff --git a/app/workers/activitypub/delivery_worker.rb b/app/workers/activitypub/delivery_worker.rb index 03d2c1187..8524039f9 100644 --- a/app/workers/activitypub/delivery_worker.rb +++ b/app/workers/activitypub/delivery_worker.rb @@ -66,4 +66,8 @@ class ActivityPub::DeliveryWorker def failure_tracker @failure_tracker ||= DeliveryFailureTracker.new(@inbox_url) end + + def request_pool + RequestPool.current + end end |