1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
|
From: "Alexander J. Maidak" <amaidak@equinix.com>
https://github.com/ged/ruby-pg/pull/619
---
lib/pg/connection.rb | 16 ++++++++++-
spec/helpers.rb | 13 +++++++++
spec/pg/connection_spec.rb | 57 +++++++++++++++++++++++++-------------
3 files changed, 65 insertions(+), 21 deletions(-)
diff --git a/lib/pg/connection.rb b/lib/pg/connection.rb
index 2c9ecd8..572a2bf 100644
--- a/lib/pg/connection.rb
+++ b/lib/pg/connection.rb
@@ -680,6 +680,7 @@ class PG::Connection
host_count = conninfo_hash[:host].to_s.count(",") + 1
stop_time = timeo * host_count + Process.clock_gettime(Process::CLOCK_MONOTONIC)
end
+ connection_errors = []
poll_status = PG::PGRES_POLLING_WRITING
until poll_status == PG::PGRES_POLLING_OK ||
@@ -720,7 +721,13 @@ class PG::Connection
else
connhost = "at \"#{host}\", port #{port}"
end
- raise PG::ConnectionBad.new("connection to server #{connhost} failed: timeout expired", connection: self)
+ connection_errors << "connection to server #{connhost} failed: timeout expired"
+ if connection_errors.count < host_count.to_i
+ new_conninfo_hash = rotate_hosts(conninfo_hash.compact)
+ send(:reset_start2, self.class.send(:parse_connect_args, new_conninfo_hash))
+ else
+ raise PG::ConnectionBad.new(connection_errors.join("\n"), connection: self)
+ end
end
# Check to see if it's finished or failed yet
@@ -733,6 +740,13 @@ class PG::Connection
raise PG::ConnectionBad.new(msg, connection: self)
end
end
+
+ private def rotate_hosts(conninfo_hash)
+ conninfo_hash[:host] = conninfo_hash[:host].split(",").rotate.join(",") if conninfo_hash[:host]
+ conninfo_hash[:port] = conninfo_hash[:port].split(",").rotate.join(",") if conninfo_hash[:port]
+ conninfo_hash[:hostaddr] = conninfo_hash[:hostaddr].split(",").rotate.join(",") if conninfo_hash[:hostaddr]
+ conninfo_hash
+ end
end
include Pollable
diff --git a/spec/helpers.rb b/spec/helpers.rb
index 7214ec1..bd546f5 100644
--- a/spec/helpers.rb
+++ b/spec/helpers.rb
@@ -475,6 +475,19 @@ EOT
end
end
+ class ListenSocket
+ attr_reader :port
+ def initialize(host = 'localhost', accept: true)
+ TCPServer.open( host, 0 ) do |serv|
+ if accept
+ Thread.new { begin loop do serv.accept end rescue nil end }
+ end
+ @port = serv.local_address.ip_port
+ yield self
+ end
+ end
+ end
+
def check_for_lingering_connections( conn )
conn.exec( "SELECT * FROM pg_stat_activity" ) do |res|
conns = res.find_all {|row| row['pid'].to_i != conn.backend_pid && ["client backend", nil].include?(row["backend_type"]) }
diff --git a/spec/pg/connection_spec.rb b/spec/pg/connection_spec.rb
index 63d3585..8a5645a 100644
--- a/spec/pg/connection_spec.rb
+++ b/spec/pg/connection_spec.rb
@@ -369,24 +369,38 @@ describe PG::Connection do
end
end
- it "times out after connect_timeout seconds" do
- TCPServer.open( 'localhost', 54320 ) do |serv|
+ it "times out after 2 * connect_timeout seconds on two connections" do
+ PG::TestingHelpers::ListenSocket.new do |sock|
start_time = Time.now
expect {
described_class.connect(
- host: 'localhost',
- port: 54320,
- connect_timeout: 1,
- dbname: "test")
+ host: 'localhost,localhost',
+ port: sock.port,
+ connect_timeout: 1,
+ dbname: "test")
}.to raise_error do |error|
expect( error ).to be_an( PG::ConnectionBad )
- expect( error.message ).to match( /timeout expired/ )
+ expect( error.message ).to match( /timeout expired.*timeout expired/m )
if PG.library_version >= 120000
- expect( error.message ).to match( /\"localhost\"/ )
- expect( error.message ).to match( /port 54320/ )
+ expect( error.message ).to match( /\"localhost\".*\"localhost\"/m )
+ expect( error.message ).to match( /port #{sock.port}/ )
end
end
+ expect( Time.now - start_time ).to be_between(1.9, 10).inclusive
+ end
+ end
+
+ it "succeeds with second host after connect_timeout" do
+ PG::TestingHelpers::ListenSocket.new do |sock|
+ start_time = Time.now
+ conn = described_class.connect(
+ host: 'localhost,localhost,localhost',
+ port: "#{sock.port},#{@port},#{sock.port}",
+ connect_timeout: 1,
+ dbname: "test")
+
+ expect( conn.port ).to eq( @port )
expect( Time.now - start_time ).to be_between(0.9, 10).inclusive
end
end
@@ -768,7 +782,8 @@ describe PG::Connection do
end
it "raises proper error when sending fails" do
- conn = described_class.connect_start( '127.0.0.1', 54320, "", "", "me", "xxxx", "somedb" )
+ sock = PG::TestingHelpers::ListenSocket.new('127.0.0.1', accept: false){ }
+ conn = described_class.connect_start( '127.0.0.1', sock.port, "", "", "me", "xxxx", "somedb" )
expect{ conn.exec 'SELECT 1' }.to raise_error(PG::UnableToSend, /no connection/){|err| expect(err).to have_attributes(connection: conn) }
end
@@ -1650,11 +1665,12 @@ describe PG::Connection do
it "handles server close while asynchronous connect" do
- serv = TCPServer.new( '127.0.0.1', 54320 )
- conn = described_class.connect_start( '127.0.0.1', 54320, "", "", "me", "xxxx", "somedb" )
- expect( [PG::PGRES_POLLING_WRITING, PG::CONNECTION_OK] ).to include conn.connect_poll
- select( nil, [conn.socket_io], nil, 0.2 )
- serv.close
+ conn = nil
+ PG::TestingHelpers::ListenSocket.new('127.0.0.1', accept: false)do |sock|
+ conn = described_class.connect_start( '127.0.0.1', sock.port, "", "", "me", "xxxx", "somedb" )
+ expect( [PG::PGRES_POLLING_WRITING, PG::CONNECTION_OK] ).to include conn.connect_poll
+ select( nil, [conn.socket_io], nil, 0.2 )
+ end
if conn.connect_poll == PG::PGRES_POLLING_READING
select( [conn.socket_io], nil, nil, 0.2 )
end
@@ -1778,12 +1794,13 @@ describe PG::Connection do
end
it "consume_input should raise ConnectionBad for a closed connection" do
- serv = TCPServer.new( '127.0.0.1', 54320 )
- conn = described_class.connect_start( '127.0.0.1', 54320, "", "", "me", "xxxx", "somedb" )
- while [PG::CONNECTION_STARTED, PG::CONNECTION_MADE].include?(conn.connect_poll)
- sleep 0.1
+ conn = nil
+ PG::TestingHelpers::ListenSocket.new '127.0.0.1', accept: false do |sock|
+ conn = described_class.connect_start( '127.0.0.1', sock.port, "", "", "me", "xxxx", "somedb" )
+ while [PG::CONNECTION_STARTED, PG::CONNECTION_MADE].include?(conn.connect_poll)
+ sleep 0.1
+ end
end
- serv.close
expect{ conn.consume_input }.to raise_error(PG::ConnectionBad, /server closed the connection unexpectedly/){|err| expect(err).to have_attributes(connection: conn) }
expect{ conn.consume_input }.to raise_error(PG::ConnectionBad, /can't get socket descriptor|connection not open/){|err| expect(err).to have_attributes(connection: conn) }
end
--
2.47.1
|