about summary refs log tree commit diff
path: root/app/lib/connection_pool
diff options
context:
space:
mode:
authorStarfall <root@starfall.blue>2019-12-09 19:07:33 -0600
committerStarfall <root@starfall.blue>2019-12-09 19:09:31 -0600
commit6b34fcfef7566105e8d80ab5fee0a539c06cddbf (patch)
tree8fad2d47bf8be255d3c671c40cbfd04c2f55ed03 /app/lib/connection_pool
parent9fbb4af7611aa7836e65ef9f544d341423c15685 (diff)
parent246addd5b33a172600342af3fb6fb5e4c80ad95e (diff)
Merge branch 'glitch'`
Diffstat (limited to 'app/lib/connection_pool')
-rw-r--r--app/lib/connection_pool/shared_connection_pool.rb63
-rw-r--r--app/lib/connection_pool/shared_timed_stack.rb95
2 files changed, 158 insertions, 0 deletions
diff --git a/app/lib/connection_pool/shared_connection_pool.rb b/app/lib/connection_pool/shared_connection_pool.rb
new file mode 100644
index 000000000..2865a4108
--- /dev/null
+++ b/app/lib/connection_pool/shared_connection_pool.rb
@@ -0,0 +1,63 @@
+# frozen_string_literal: true
+
+require 'connection_pool'
+require_relative './shared_timed_stack'
+
+class ConnectionPool::SharedConnectionPool < ConnectionPool
+  def initialize(options = {}, &block)
+    super(options, &block)
+
+    @available = ConnectionPool::SharedTimedStack.new(@size, &block)
+  end
+
+  delegate :size, :flush, to: :@available
+
+  def with(preferred_tag, options = {})
+    Thread.handle_interrupt(Exception => :never) do
+      conn = checkout(preferred_tag, options)
+
+      begin
+        Thread.handle_interrupt(Exception => :immediate) do
+          yield conn
+        end
+      ensure
+        checkin(preferred_tag)
+      end
+    end
+  end
+
+  def checkout(preferred_tag, options = {})
+    if ::Thread.current[key(preferred_tag)]
+      ::Thread.current[key_count(preferred_tag)] += 1
+      ::Thread.current[key(preferred_tag)]
+    else
+      ::Thread.current[key_count(preferred_tag)] = 1
+      ::Thread.current[key(preferred_tag)] = @available.pop(preferred_tag, options[:timeout] || @timeout)
+    end
+  end
+
+  def checkin(preferred_tag)
+    if ::Thread.current[key(preferred_tag)]
+      if ::Thread.current[key_count(preferred_tag)] == 1
+        @available.push(::Thread.current[key(preferred_tag)])
+        ::Thread.current[key(preferred_tag)] = nil
+      else
+        ::Thread.current[key_count(preferred_tag)] -= 1
+      end
+    else
+      raise ConnectionPool::Error, 'no connections are checked out'
+    end
+
+    nil
+  end
+
+  private
+
+  def key(tag)
+    :"#{@key}-#{tag}"
+  end
+
+  def key_count(tag)
+    :"#{@key_count}-#{tag}"
+  end
+end
diff --git a/app/lib/connection_pool/shared_timed_stack.rb b/app/lib/connection_pool/shared_timed_stack.rb
new file mode 100644
index 000000000..14a5285c4
--- /dev/null
+++ b/app/lib/connection_pool/shared_timed_stack.rb
@@ -0,0 +1,95 @@
+# frozen_string_literal: true
+
+class ConnectionPool::SharedTimedStack
+  def initialize(max = 0, &block)
+    @create_block = block
+    @max          = max
+    @created      = 0
+    @queue        = []
+    @tagged_queue = Hash.new { |hash, key| hash[key] = [] }
+    @mutex        = Mutex.new
+    @resource     = ConditionVariable.new
+  end
+
+  def push(connection)
+    @mutex.synchronize do
+      store_connection(connection)
+      @resource.broadcast
+    end
+  end
+
+  alias << push
+
+  def pop(preferred_tag, timeout = 5.0)
+    deadline = current_time + timeout
+
+    @mutex.synchronize do
+      loop do
+        return fetch_preferred_connection(preferred_tag) unless @tagged_queue[preferred_tag].empty?
+
+        connection = try_create(preferred_tag)
+        return connection if connection
+
+        to_wait = deadline - current_time
+        raise Timeout::Error, "Waited #{timeout} sec" if to_wait <= 0
+
+        @resource.wait(@mutex, to_wait)
+      end
+    end
+  end
+
+  def empty?
+    size.zero?
+  end
+
+  def size
+    @mutex.synchronize do
+      @queue.size
+    end
+  end
+
+  def flush
+    @mutex.synchronize do
+      @queue.delete_if do |connection|
+        delete = !connection.in_use && (connection.dead || connection.seconds_idle >= RequestPool::MAX_IDLE_TIME)
+
+        if delete
+          @tagged_queue[connection.site].delete(connection)
+          connection.close
+          @created -= 1
+        end
+
+        delete
+      end
+    end
+  end
+
+  private
+
+  def try_create(preferred_tag)
+    if @created == @max && !@queue.empty?
+      throw_away_connection = @queue.pop
+      @tagged_queue[throw_away_connection.site].delete(throw_away_connection)
+      @create_block.call(preferred_tag)
+    elsif @created != @max
+      connection = @create_block.call(preferred_tag)
+      @created += 1
+      connection
+    end
+  end
+
+  def fetch_preferred_connection(preferred_tag)
+    connection = @tagged_queue[preferred_tag].pop
+    @queue.delete(connection)
+    connection
+  end
+
+  def current_time
+    Process.clock_gettime(Process::CLOCK_MONOTONIC)
+  end
+
+  def store_connection(connection)
+    @tagged_queue[connection.site].push(connection)
+    @queue.push(connection)
+  end
+end