about summary refs log tree commit diff
path: root/app/lib/connection_pool/shared_timed_stack.rb
blob: 14a5285c45753280f0380ad9cbc79efb303e48b9 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# frozen_string_literal: true

class ConnectionPool::SharedTimedStack
  def initialize(max = 0, &block)
    @create_block = block
    @max          = max
    @created      = 0
    @queue        = []
    @tagged_queue = Hash.new { |hash, key| hash[key] = [] }
    @mutex        = Mutex.new
    @resource     = ConditionVariable.new
  end

  def push(connection)
    @mutex.synchronize do
      store_connection(connection)
      @resource.broadcast
    end
  end

  alias << push

  def pop(preferred_tag, timeout = 5.0)
    deadline = current_time + timeout

    @mutex.synchronize do
      loop do
        return fetch_preferred_connection(preferred_tag) unless @tagged_queue[preferred_tag].empty?

        connection = try_create(preferred_tag)
        return connection if connection

        to_wait = deadline - current_time
        raise Timeout::Error, "Waited #{timeout} sec" if to_wait <= 0

        @resource.wait(@mutex, to_wait)
      end
    end
  end

  def empty?
    size.zero?
  end

  def size
    @mutex.synchronize do
      @queue.size
    end
  end

  def flush
    @mutex.synchronize do
      @queue.delete_if do |connection|
        delete = !connection.in_use && (connection.dead || connection.seconds_idle >= RequestPool::MAX_IDLE_TIME)

        if delete
          @tagged_queue[connection.site].delete(connection)
          connection.close
          @created -= 1
        end

        delete
      end
    end
  end

  private

  def try_create(preferred_tag)
    if @created == @max && !@queue.empty?
      throw_away_connection = @queue.pop
      @tagged_queue[throw_away_connection.site].delete(throw_away_connection)
      @create_block.call(preferred_tag)
    elsif @created != @max
      connection = @create_block.call(preferred_tag)
      @created += 1
      connection
    end
  end

  def fetch_preferred_connection(preferred_tag)
    connection = @tagged_queue[preferred_tag].pop
    @queue.delete(connection)
    connection
  end

  def current_time
    Process.clock_gettime(Process::CLOCK_MONOTONIC)
  end

  def store_connection(connection)
    @tagged_queue[connection.site].push(connection)
    @queue.push(connection)
  end
end