diff options
Diffstat (limited to 'app/lib/connection_pool/shared_timed_stack.rb')
-rw-r--r-- | app/lib/connection_pool/shared_timed_stack.rb | 95 |
1 files changed, 95 insertions, 0 deletions
diff --git a/app/lib/connection_pool/shared_timed_stack.rb b/app/lib/connection_pool/shared_timed_stack.rb new file mode 100644 index 000000000..14a5285c4 --- /dev/null +++ b/app/lib/connection_pool/shared_timed_stack.rb @@ -0,0 +1,95 @@ +# frozen_string_literal: true + +class ConnectionPool::SharedTimedStack + def initialize(max = 0, &block) + @create_block = block + @max = max + @created = 0 + @queue = [] + @tagged_queue = Hash.new { |hash, key| hash[key] = [] } + @mutex = Mutex.new + @resource = ConditionVariable.new + end + + def push(connection) + @mutex.synchronize do + store_connection(connection) + @resource.broadcast + end + end + + alias << push + + def pop(preferred_tag, timeout = 5.0) + deadline = current_time + timeout + + @mutex.synchronize do + loop do + return fetch_preferred_connection(preferred_tag) unless @tagged_queue[preferred_tag].empty? + + connection = try_create(preferred_tag) + return connection if connection + + to_wait = deadline - current_time + raise Timeout::Error, "Waited #{timeout} sec" if to_wait <= 0 + + @resource.wait(@mutex, to_wait) + end + end + end + + def empty? + size.zero? + end + + def size + @mutex.synchronize do + @queue.size + end + end + + def flush + @mutex.synchronize do + @queue.delete_if do |connection| + delete = !connection.in_use && (connection.dead || connection.seconds_idle >= RequestPool::MAX_IDLE_TIME) + + if delete + @tagged_queue[connection.site].delete(connection) + connection.close + @created -= 1 + end + + delete + end + end + end + + private + + def try_create(preferred_tag) + if @created == @max && !@queue.empty? + throw_away_connection = @queue.pop + @tagged_queue[throw_away_connection.site].delete(throw_away_connection) + @create_block.call(preferred_tag) + elsif @created != @max + connection = @create_block.call(preferred_tag) + @created += 1 + connection + end + end + + def fetch_preferred_connection(preferred_tag) + connection = @tagged_queue[preferred_tag].pop + @queue.delete(connection) + connection + end + + def current_time + Process.clock_gettime(Process::CLOCK_MONOTONIC) + end + + def store_connection(connection) + @tagged_queue[connection.site].push(connection) + @queue.push(connection) + end +end |