about summary refs log tree commit diff
path: root/app/lib/connection_pool/shared_timed_stack.rb
diff options
context:
space:
mode:
Diffstat (limited to 'app/lib/connection_pool/shared_timed_stack.rb')
-rw-r--r--app/lib/connection_pool/shared_timed_stack.rb95
1 files changed, 95 insertions, 0 deletions
diff --git a/app/lib/connection_pool/shared_timed_stack.rb b/app/lib/connection_pool/shared_timed_stack.rb
new file mode 100644
index 000000000..14a5285c4
--- /dev/null
+++ b/app/lib/connection_pool/shared_timed_stack.rb
@@ -0,0 +1,95 @@
+# frozen_string_literal: true
+
+class ConnectionPool::SharedTimedStack
+  def initialize(max = 0, &block)
+    @create_block = block
+    @max          = max
+    @created      = 0
+    @queue        = []
+    @tagged_queue = Hash.new { |hash, key| hash[key] = [] }
+    @mutex        = Mutex.new
+    @resource     = ConditionVariable.new
+  end
+
+  def push(connection)
+    @mutex.synchronize do
+      store_connection(connection)
+      @resource.broadcast
+    end
+  end
+
+  alias << push
+
+  def pop(preferred_tag, timeout = 5.0)
+    deadline = current_time + timeout
+
+    @mutex.synchronize do
+      loop do
+        return fetch_preferred_connection(preferred_tag) unless @tagged_queue[preferred_tag].empty?
+
+        connection = try_create(preferred_tag)
+        return connection if connection
+
+        to_wait = deadline - current_time
+        raise Timeout::Error, "Waited #{timeout} sec" if to_wait <= 0
+
+        @resource.wait(@mutex, to_wait)
+      end
+    end
+  end
+
+  def empty?
+    size.zero?
+  end
+
+  def size
+    @mutex.synchronize do
+      @queue.size
+    end
+  end
+
+  def flush
+    @mutex.synchronize do
+      @queue.delete_if do |connection|
+        delete = !connection.in_use && (connection.dead || connection.seconds_idle >= RequestPool::MAX_IDLE_TIME)
+
+        if delete
+          @tagged_queue[connection.site].delete(connection)
+          connection.close
+          @created -= 1
+        end
+
+        delete
+      end
+    end
+  end
+
+  private
+
+  def try_create(preferred_tag)
+    if @created == @max && !@queue.empty?
+      throw_away_connection = @queue.pop
+      @tagged_queue[throw_away_connection.site].delete(throw_away_connection)
+      @create_block.call(preferred_tag)
+    elsif @created != @max
+      connection = @create_block.call(preferred_tag)
+      @created += 1
+      connection
+    end
+  end
+
+  def fetch_preferred_connection(preferred_tag)
+    connection = @tagged_queue[preferred_tag].pop
+    @queue.delete(connection)
+    connection
+  end
+
+  def current_time
+    Process.clock_gettime(Process::CLOCK_MONOTONIC)
+  end
+
+  def store_connection(connection)
+    @tagged_queue[connection.site].push(connection)
+    @queue.push(connection)
+  end
+end