Skip to content

Commit 598c5cb

Browse files
authored
JAVA-1388: Add dynamic port discovery for system.peers_v2 (apache#1065)
1 parent ce29eea commit 598c5cb

File tree

12 files changed

+755
-166
lines changed

12 files changed

+755
-166
lines changed

changelog/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
- [bug] JAVA-1928: Fix GuavaCompatibility for Guava 26.
1818
- [bug] JAVA-1935: Add null check in QueryConsistencyException.getHost.
1919
- [improvement] JAVA-1771: Send driver name and version in STARTUP message.
20+
- [improvement] JAVA-1388: Add dynamic port discovery for system.peers\_v2.
2021

2122
Merged from 3.5.x:
2223

driver-core/src/main/java/com/datastax/driver/core/Cluster.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1686,11 +1686,15 @@ ReconnectionPolicy reconnectionPolicy() {
16861686
return configuration.getPolicies().getReconnectionPolicy();
16871687
}
16881688

1689+
InetSocketAddress translateAddress(InetSocketAddress address) {
1690+
InetSocketAddress translated =
1691+
configuration.getPolicies().getAddressTranslator().translate(address);
1692+
return translated == null ? address : translated;
1693+
}
1694+
16891695
InetSocketAddress translateAddress(InetAddress address) {
16901696
InetSocketAddress sa = new InetSocketAddress(address, connectionFactory.getPort());
1691-
InetSocketAddress translated =
1692-
configuration.getPolicies().getAddressTranslator().translate(sa);
1693-
return translated == null ? sa : translated;
1697+
return translateAddress(sa);
16941698
}
16951699

16961700
private Session newSession() {

driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java

Lines changed: 187 additions & 83 deletions
Large diffs are not rendered by default.

driver-core/src/main/java/com/datastax/driver/core/Host.java

Lines changed: 52 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,12 @@ public class Host {
4444
// We use that internally because
4545
// that's the 'peer' in the 'System.peers' table and avoids querying the full peers table in
4646
// ControlConnection.refreshNodeInfo.
47-
private volatile InetAddress broadcastAddress;
47+
private volatile InetSocketAddress broadcastSocketAddress;
4848

4949
// The listen_address as known by Cassandra.
5050
// This is usually the same as broadcast_address unless
5151
// specified otherwise in cassandra.yaml file.
52-
private volatile InetAddress listenAddress;
52+
private volatile InetSocketAddress listenSocketAddress;
5353

5454
private volatile UUID hostId;
5555

@@ -118,12 +118,12 @@ void setVersion(String cassandraVersion) {
118118
this.cassandraVersion = versionNumber;
119119
}
120120

121-
void setBroadcastAddress(InetAddress broadcastAddress) {
122-
this.broadcastAddress = broadcastAddress;
121+
void setBroadcastSocketAddress(InetSocketAddress broadcastAddress) {
122+
this.broadcastSocketAddress = broadcastAddress;
123123
}
124124

125-
void setListenAddress(InetAddress listenAddress) {
126-
this.listenAddress = listenAddress;
125+
void setListenSocketAddress(InetSocketAddress listenAddress) {
126+
this.listenSocketAddress = listenAddress;
127127
}
128128

129129
void setDseVersion(String dseVersion) {
@@ -194,12 +194,30 @@ public InetSocketAddress getSocketAddress() {
194194
}
195195

196196
/**
197-
* Returns the node broadcast address (that is, the IP by which it should be contacted by other
198-
* peers in the cluster), if known.
197+
* Returns the node broadcast address, if known. Otherwise {@code null}.
198+
*
199+
* <p>This is a shortcut for {@code getBroadcastSocketAddress().getAddress()}.
200+
*
201+
* @return the node broadcast address, if known. Otherwise {@code null}.
202+
* @see #getBroadcastSocketAddress()
203+
* @see <a
204+
* href="https://docs.datastax.com/en/cassandra/2.1/cassandra/configuration/configCassandra_yaml_r.html">The
205+
* cassandra.yaml configuration file</a>
206+
*/
207+
public InetAddress getBroadcastAddress() {
208+
return broadcastSocketAddress != null ? broadcastSocketAddress.getAddress() : null;
209+
}
210+
211+
/**
212+
* Returns the node broadcast address (that is, the address by which it should be contacted by
213+
* other peers in the cluster), if known. Otherwise {@code null}.
214+
*
215+
* <p>Note that the port of the returned address will be 0 for versions of Cassandra older than
216+
* 4.0.
199217
*
200218
* <p>This corresponds to the {@code broadcast_address} cassandra.yaml file setting and is by
201-
* default the same as {@link #getListenAddress()}, unless specified otherwise in cassandra.yaml.
202-
* <em>This is NOT the address clients should use to contact this node</em>.
219+
* default the same as {@link #getListenSocketAddress()}, unless specified otherwise in
220+
* cassandra.yaml. <em>This is NOT the address clients should use to contact this node</em>.
203221
*
204222
* <p>This information is always available for peer hosts. For the control host, it's only
205223
* available if CASSANDRA-9436 is fixed on the server side (Cassandra versions >= 2.0.16, 2.1.6,
@@ -212,13 +230,31 @@ public InetSocketAddress getSocketAddress() {
212230
* href="https://docs.datastax.com/en/cassandra/2.1/cassandra/configuration/configCassandra_yaml_r.html">The
213231
* cassandra.yaml configuration file</a>
214232
*/
215-
public InetAddress getBroadcastAddress() {
216-
return broadcastAddress;
233+
public InetSocketAddress getBroadcastSocketAddress() {
234+
return broadcastSocketAddress;
217235
}
218236

219237
/**
220-
* Returns the node listen address (that is, the IP the node uses to contact other peers in the
221-
* cluster), if known.
238+
* Returns the node listen address, if known. Otherwise {@code null}.
239+
*
240+
* <p>This is a shortcut for {@code getListenSocketAddress().getAddress()}.
241+
*
242+
* @return the node listen address, if known. Otherwise {@code null}.
243+
* @see #getListenSocketAddress()
244+
* @see <a
245+
* href="https://docs.datastax.com/en/cassandra/2.1/cassandra/configuration/configCassandra_yaml_r.html">The
246+
* cassandra.yaml configuration file</a>
247+
*/
248+
public InetAddress getListenAddress() {
249+
return listenSocketAddress != null ? listenSocketAddress.getAddress() : null;
250+
}
251+
252+
/**
253+
* Returns the node listen address (that is, the address the node uses to contact other peers in
254+
* the cluster), if known. Otherwise {@code null}.
255+
*
256+
* <p>Note that the port of the returned address will be 0 for versions of Cassandra older than
257+
* 4.0.
222258
*
223259
* <p>This corresponds to the {@code listen_address} cassandra.yaml file setting. <em>This is NOT
224260
* the address clients should use to contact this node</em>.
@@ -234,8 +270,8 @@ public InetAddress getBroadcastAddress() {
234270
* href="https://docs.datastax.com/en/cassandra/2.1/cassandra/configuration/configCassandra_yaml_r.html">The
235271
* cassandra.yaml configuration file</a>
236272
*/
237-
public InetAddress getListenAddress() {
238-
return listenAddress;
273+
public InetSocketAddress getListenSocketAddress() {
274+
return listenSocketAddress;
239275
}
240276

241277
/**

driver-core/src/main/java/com/datastax/driver/core/utils/MoreFutures.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,11 @@
1515
*/
1616
package com.datastax.driver.core.utils;
1717

18+
import com.datastax.driver.core.GuavaCompatibility;
1819
import com.google.common.util.concurrent.FutureCallback;
1920
import com.google.common.util.concurrent.Futures;
2021
import com.google.common.util.concurrent.ListenableFuture;
22+
import com.google.common.util.concurrent.SettableFuture;
2123

2224
/** Helpers to work with Guava's {@link ListenableFuture}. */
2325
public class MoreFutures {
@@ -39,4 +41,28 @@ public void onSuccess(V result) {
3941
/* nothing */
4042
}
4143
}
44+
45+
/**
46+
* Configures a {@link SettableFuture} to propagate the result of a future.
47+
*
48+
* @param settable future to be propagated to
49+
* @param future future to propagate
50+
* @param <T>
51+
*/
52+
public static <T> void propagateFuture(
53+
final SettableFuture<T> settable, ListenableFuture<T> future) {
54+
GuavaCompatibility.INSTANCE.addCallback(
55+
future,
56+
new FutureCallback<T>() {
57+
@Override
58+
public void onSuccess(T result) {
59+
settable.set(result);
60+
}
61+
62+
@Override
63+
public void onFailure(Throwable t) {
64+
settable.setException(t);
65+
}
66+
});
67+
}
4268
}

driver-core/src/test/java/com/datastax/driver/core/ClusterInitTest.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.scassandra.Scassandra;
4545
import org.scassandra.http.client.PrimingClient;
4646
import org.scassandra.http.client.PrimingRequest;
47+
import org.scassandra.http.client.Result;
4748
import org.slf4j.Logger;
4849
import org.slf4j.LoggerFactory;
4950
import org.testng.annotations.Test;
@@ -310,5 +311,12 @@ private void primePeerRows(Scassandra scassandra, List<FakeHost> otherHosts)
310311
.withQuery("SELECT * FROM system.peers")
311312
.withThen(then().withRows(rows).withColumnTypes(ScassandraCluster.SELECT_PEERS))
312313
.build());
314+
315+
// prime invalid for peers_v2 so peers table is used.
316+
primingClient.prime(
317+
PrimingRequest.queryBuilder()
318+
.withQuery("SELECT * FROM system.peers_v2")
319+
.withThen(then().withResult(Result.invalid))
320+
.build());
313321
}
314322
}

0 commit comments

Comments
 (0)