Skip to content

Commit 3380386

Browse files
committed
JAVA-1877: Use a separate reconnection schedule for the control connection
1 parent 63706fa commit 3380386

File tree

11 files changed

+94
-38
lines changed

11 files changed

+94
-38
lines changed

changelog/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
### 4.0.0-beta1 (in progress)
66

7+
- [improvement] JAVA-1877: Use a separate reconnection schedule for the control connection
78
- [improvement] JAVA-1763: Generate a binary tarball as part of the build process
89
- [improvement] JAVA-1884: Add additional methods from TypeToken to GenericType
910
- [improvement] JAVA-1883: Use custom queue implementation for LBP's query plan

core/src/main/java/com/datastax/oss/driver/api/core/connection/ReconnectionPolicy.java

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,23 +15,41 @@
1515
*/
1616
package com.datastax.oss.driver.api.core.connection;
1717

18+
import com.datastax.oss.driver.api.core.metadata.Node;
1819
import edu.umd.cs.findbugs.annotations.NonNull;
1920
import java.time.Duration;
2021

2122
/**
22-
* Decides how often the driver tries to re-establish lost connections to a node.
23+
* Decides how often the driver tries to re-establish lost connections.
2324
*
24-
* <p>Each time a connection to a node is lost, a {@link #newSchedule() new schedule} instance gets
25-
* created. Then {@link ReconnectionSchedule#nextDelay()} will be called each time the driver needs
26-
* to schedule the next connection attempt. When the node is back to its required number of
27-
* connections, the schedule will be reset (that is, the next failure will create a fresh schedule
28-
* instance).
25+
* <p>Each time a reconnection starts, a new {@link ReconnectionSchedule} instance gets created.
26+
* Then {@link ReconnectionSchedule#nextDelay()} will be called each time the driver needs to
27+
* program the next connection attempt. When the reconnection succeeds, the schedule is discarded,
28+
* and the next reconnection will start from a fresh instance.
29+
*
30+
* <p>There are two types of reconnection:
31+
*
32+
* <ul>
33+
* <li>{@linkplain #newNodeSchedule(Node) for regular node connections}: when the connection pool
34+
* for a node does not have its configured number of connections (see {@code
35+
* advanced.connection.pool.*.size} in the configuration), a reconnection starts for that
36+
* pool.
37+
* <li>{@linkplain #newControlConnectionSchedule() for the control connection}: when the control
38+
* node goes down, a reconnection starts to find another node to replace it.
39+
* </ul>
40+
*
41+
* This interface defines separate methods for those two cases, but implementations are free to
42+
* delegate to the same method internally if the same type of schedule can be used.
2943
*/
3044
public interface ReconnectionPolicy extends AutoCloseable {
3145

32-
/** Creates a new {@linkplain ReconnectionSchedule schedule}. */
46+
/** Creates a new schedule for the given node. */
47+
@NonNull
48+
ReconnectionSchedule newNodeSchedule(@NonNull Node node);
49+
50+
/** Creates a new schedule for the control connection. */
3351
@NonNull
34-
ReconnectionSchedule newSchedule();
52+
ReconnectionSchedule newControlConnectionSchedule();
3553

3654
/** Called when the cluster that this policy is associated with closes. */
3755
@Override

core/src/main/java/com/datastax/oss/driver/internal/core/connection/ConstantReconnectionPolicy.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,23 @@
1919
import com.datastax.oss.driver.api.core.config.DriverConfigProfile;
2020
import com.datastax.oss.driver.api.core.connection.ReconnectionPolicy;
2121
import com.datastax.oss.driver.api.core.context.DriverContext;
22+
import com.datastax.oss.driver.api.core.metadata.Node;
2223
import edu.umd.cs.findbugs.annotations.NonNull;
2324
import java.time.Duration;
25+
import org.slf4j.Logger;
26+
import org.slf4j.LoggerFactory;
2427

2528
/** A reconnection policy that waits a constant time between each reconnection attempt. */
2629
public class ConstantReconnectionPolicy implements ReconnectionPolicy {
2730

31+
private static final Logger LOG = LoggerFactory.getLogger(ConstantReconnectionPolicy.class);
32+
33+
private final String logPrefix;
2834
private final ReconnectionSchedule schedule;
2935

3036
/** Builds a new instance. */
3137
public ConstantReconnectionPolicy(DriverContext context) {
38+
this.logPrefix = context.sessionName();
3239
DriverConfigProfile config = context.config().getDefaultProfile();
3340
Duration delay = config.getDuration(DefaultDriverOption.RECONNECTION_BASE_DELAY);
3441
if (delay.isNegative()) {
@@ -44,7 +51,15 @@ public ConstantReconnectionPolicy(DriverContext context) {
4451

4552
@NonNull
4653
@Override
47-
public ReconnectionSchedule newSchedule() {
54+
public ReconnectionSchedule newNodeSchedule(@NonNull Node node) {
55+
LOG.debug("[{}] Creating new schedule for {}", logPrefix, node);
56+
return schedule;
57+
}
58+
59+
@NonNull
60+
@Override
61+
public ReconnectionSchedule newControlConnectionSchedule() {
62+
LOG.debug("[{}] Creating new schedule for the control connection", logPrefix);
4863
return schedule;
4964
}
5065

core/src/main/java/com/datastax/oss/driver/internal/core/connection/ExponentialReconnectionPolicy.java

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,24 +19,36 @@
1919
import com.datastax.oss.driver.api.core.config.DriverConfigProfile;
2020
import com.datastax.oss.driver.api.core.connection.ReconnectionPolicy;
2121
import com.datastax.oss.driver.api.core.context.DriverContext;
22+
import com.datastax.oss.driver.api.core.metadata.Node;
2223
import com.datastax.oss.driver.shaded.guava.common.base.Preconditions;
2324
import edu.umd.cs.findbugs.annotations.NonNull;
2425
import java.time.Duration;
2526
import net.jcip.annotations.ThreadSafe;
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
2629

2730
/**
2831
* A reconnection policy that waits exponentially longer between each reconnection attempt (but
2932
* keeps a constant delay once a maximum delay is reached).
33+
*
34+
* <p>It uses the same schedule implementation for individual nodes or the control connection:
35+
* reconnection attempt {@code i} will be tried {@code Math.min(2^(i-1) * getBaseDelayMs(),
36+
* getMaxDelayMs())} milliseconds after the previous one.
3037
*/
3138
@ThreadSafe
3239
public class ExponentialReconnectionPolicy implements ReconnectionPolicy {
3340

41+
private static final Logger LOG = LoggerFactory.getLogger(ExponentialReconnectionPolicy.class);
42+
43+
private final String logPrefix;
3444
private final long baseDelayMs;
3545
private final long maxDelayMs;
3646
private final long maxAttempts;
3747

3848
/** Builds a new instance. */
3949
public ExponentialReconnectionPolicy(DriverContext context) {
50+
this.logPrefix = context.sessionName();
51+
4052
DriverConfigProfile config = context.config().getDefaultProfile();
4153

4254
this.baseDelayMs = config.getDuration(DefaultDriverOption.RECONNECTION_BASE_DELAY).toMillis();
@@ -84,17 +96,17 @@ public long getMaxDelayMs() {
8496
return maxDelayMs;
8597
}
8698

87-
/**
88-
* A new schedule that used an exponentially growing delay between reconnection attempts.
89-
*
90-
* <p>For this schedule, reconnection attempt {@code i} will be tried {@code Math.min(2^(i-1) *
91-
* getBaseDelayMs(), getMaxDelayMs())} milliseconds after the previous one.
92-
*
93-
* @return the newly created schedule.
94-
*/
9599
@NonNull
96100
@Override
97-
public ReconnectionSchedule newSchedule() {
101+
public ReconnectionSchedule newNodeSchedule(@NonNull Node node) {
102+
LOG.debug("[{}] Creating new schedule for {}", logPrefix, node);
103+
return new ExponentialSchedule();
104+
}
105+
106+
@NonNull
107+
@Override
108+
public ReconnectionSchedule newControlConnectionSchedule() {
109+
LOG.debug("[{}] Creating new schedule for the control connection", logPrefix);
98110
return new ExponentialSchedule();
99111
}
100112

core/src/main/java/com/datastax/oss/driver/internal/core/control/ControlConnection.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import com.datastax.oss.driver.api.core.AllNodesFailedException;
1919
import com.datastax.oss.driver.api.core.AsyncAutoCloseable;
20+
import com.datastax.oss.driver.api.core.connection.ReconnectionPolicy;
2021
import com.datastax.oss.driver.api.core.loadbalancing.NodeDistance;
2122
import com.datastax.oss.driver.api.core.metadata.Node;
2223
import com.datastax.oss.driver.api.core.metadata.NodeState;
@@ -224,8 +225,13 @@ private class SingleThreaded {
224225

225226
private SingleThreaded(InternalDriverContext context) {
226227
this.context = context;
228+
ReconnectionPolicy reconnectionPolicy = context.reconnectionPolicy();
227229
this.reconnection =
228-
new Reconnection(logPrefix, adminExecutor, context.reconnectionPolicy(), this::reconnect);
230+
new Reconnection(
231+
logPrefix,
232+
adminExecutor,
233+
reconnectionPolicy::newControlConnectionSchedule,
234+
this::reconnect);
229235
// In "reconnect-on-init" mode, handle cancellation of the initFuture by user code
230236
CompletableFutures.whenCancelled(
231237
this.initFuture,

core/src/main/java/com/datastax/oss/driver/internal/core/pool/ChannelPool.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.datastax.oss.driver.api.core.auth.AuthenticationException;
2323
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
2424
import com.datastax.oss.driver.api.core.config.DriverConfig;
25+
import com.datastax.oss.driver.api.core.connection.ReconnectionPolicy;
2526
import com.datastax.oss.driver.api.core.loadbalancing.NodeDistance;
2627
import com.datastax.oss.driver.api.core.metadata.Node;
2728
import com.datastax.oss.driver.api.core.metrics.DefaultNodeMetric;
@@ -234,11 +235,12 @@ private SingleThreaded(
234235
this.wantedCount = getConfiguredSize(distance);
235236
this.channelFactory = context.channelFactory();
236237
this.eventBus = context.eventBus();
238+
ReconnectionPolicy reconnectionPolicy = context.reconnectionPolicy();
237239
this.reconnection =
238240
new Reconnection(
239241
logPrefix,
240242
adminExecutor,
241-
context.reconnectionPolicy(),
243+
() -> reconnectionPolicy.newNodeSchedule(node),
242244
this::addMissingChannels,
243245
() -> eventBus.fire(ChannelEvent.reconnectionStarted(node)),
244246
() -> eventBus.fire(ChannelEvent.reconnectionStopped(node)));

core/src/main/java/com/datastax/oss/driver/internal/core/util/concurrent/Reconnection.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
*/
1616
package com.datastax.oss.driver.internal.core.util.concurrent;
1717

18-
import com.datastax.oss.driver.api.core.connection.ReconnectionPolicy;
18+
import com.datastax.oss.driver.api.core.connection.ReconnectionPolicy.ReconnectionSchedule;
1919
import com.datastax.oss.driver.internal.core.util.Loggers;
2020
import io.netty.util.concurrent.EventExecutor;
2121
import io.netty.util.concurrent.Future;
@@ -25,6 +25,7 @@
2525
import java.util.concurrent.CancellationException;
2626
import java.util.concurrent.CompletionStage;
2727
import java.util.concurrent.TimeUnit;
28+
import java.util.function.Supplier;
2829
import net.jcip.annotations.NotThreadSafe;
2930
import org.slf4j.Logger;
3031
import org.slf4j.LoggerFactory;
@@ -50,13 +51,13 @@ private enum State {
5051

5152
private final String logPrefix;
5253
private final EventExecutor executor;
53-
private final ReconnectionPolicy reconnectionPolicy;
54+
private final Supplier<ReconnectionSchedule> scheduleSupplier;
5455
private final Callable<CompletionStage<Boolean>> reconnectionTask;
5556
private final Runnable onStart;
5657
private final Runnable onStop;
5758

5859
private State state = State.STOPPED;
59-
private ReconnectionPolicy.ReconnectionSchedule reconnectionSchedule;
60+
private ReconnectionSchedule reconnectionSchedule;
6061
private ScheduledFuture<CompletionStage<Boolean>> nextAttempt;
6162

6263
/**
@@ -66,13 +67,13 @@ private enum State {
6667
public Reconnection(
6768
String logPrefix,
6869
EventExecutor executor,
69-
ReconnectionPolicy reconnectionPolicy,
70+
Supplier<ReconnectionSchedule> scheduleSupplier,
7071
Callable<CompletionStage<Boolean>> reconnectionTask,
7172
Runnable onStart,
7273
Runnable onStop) {
7374
this.logPrefix = logPrefix;
7475
this.executor = executor;
75-
this.reconnectionPolicy = reconnectionPolicy;
76+
this.scheduleSupplier = scheduleSupplier;
7677
this.reconnectionTask = reconnectionTask;
7778
this.onStart = onStart;
7879
this.onStop = onStop;
@@ -81,9 +82,9 @@ public Reconnection(
8182
public Reconnection(
8283
String logPrefix,
8384
EventExecutor executor,
84-
ReconnectionPolicy reconnectionPolicy,
85+
Supplier<ReconnectionSchedule> scheduleSupplier,
8586
Callable<CompletionStage<Boolean>> reconnectionTask) {
86-
this(logPrefix, executor, reconnectionPolicy, reconnectionTask, () -> {}, () -> {});
87+
this(logPrefix, executor, scheduleSupplier, reconnectionTask, () -> {}, () -> {});
8788
}
8889

8990
/**
@@ -108,7 +109,7 @@ public void start() {
108109
state = State.ATTEMPT_IN_PROGRESS;
109110
break;
110111
case STOPPED:
111-
reconnectionSchedule = reconnectionPolicy.newSchedule();
112+
reconnectionSchedule = scheduleSupplier.get();
112113
onStart.run();
113114
scheduleNextAttempt();
114115
break;
@@ -180,7 +181,7 @@ private void scheduleNextAttempt() {
180181
assert executor.inEventLoop();
181182
state = State.SCHEDULED;
182183
if (reconnectionSchedule == null) { // happens if reconnectNow() while we were stopped
183-
reconnectionSchedule = reconnectionPolicy.newSchedule();
184+
reconnectionSchedule = scheduleSupplier.get();
184185
}
185186
Duration nextInterval = reconnectionSchedule.nextDelay();
186187
LOG.debug("[{}] Scheduling next reconnection in {}", logPrefix, nextInterval);

core/src/test/java/com/datastax/oss/driver/internal/core/control/ControlConnectionTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,11 @@
1616
package com.datastax.oss.driver.internal.core.control;
1717

1818
import static com.datastax.oss.driver.Assertions.assertThat;
19+
import static org.mockito.ArgumentMatchers.any;
1920
import static org.mockito.Mockito.never;
2021

2122
import com.datastax.oss.driver.api.core.loadbalancing.NodeDistance;
23+
import com.datastax.oss.driver.api.core.metadata.Node;
2224
import com.datastax.oss.driver.api.core.metadata.NodeState;
2325
import com.datastax.oss.driver.internal.core.channel.ChannelEvent;
2426
import com.datastax.oss.driver.internal.core.channel.DriverChannel;
@@ -105,7 +107,7 @@ public void should_init_with_second_contact_point_if_first_one_fails() {
105107
Mockito.verify(eventBus).fire(ChannelEvent.controlConnectionFailed(node1));
106108
Mockito.verify(eventBus).fire(ChannelEvent.channelOpened(node2));
107109
// each attempt tries all nodes, so there is no reconnection
108-
Mockito.verify(reconnectionPolicy, never()).newSchedule();
110+
Mockito.verify(reconnectionPolicy, never()).newNodeSchedule(any(Node.class));
109111

110112
factoryHelper.verifyNoMoreCalls();
111113
}
@@ -130,7 +132,7 @@ public void should_fail_to_init_if_all_contact_points_fail() {
130132
Mockito.verify(eventBus).fire(ChannelEvent.controlConnectionFailed(node1));
131133
Mockito.verify(eventBus).fire(ChannelEvent.controlConnectionFailed(node2));
132134
// no reconnections at init
133-
Mockito.verify(reconnectionPolicy, never()).newSchedule();
135+
Mockito.verify(reconnectionPolicy, never()).newNodeSchedule(any(Node.class));
134136

135137
factoryHelper.verifyNoMoreCalls();
136138
}

core/src/test/java/com/datastax/oss/driver/internal/core/control/ControlConnectionTestBase.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,8 @@ public void setup() {
9595
});
9696

9797
Mockito.when(context.reconnectionPolicy()).thenReturn(reconnectionPolicy);
98-
Mockito.when(reconnectionPolicy.newSchedule()).thenReturn(reconnectionSchedule);
98+
Mockito.when(reconnectionPolicy.newControlConnectionSchedule())
99+
.thenReturn(reconnectionSchedule);
99100
// By default, set a large reconnection delay. Tests that care about reconnection will override
100101
// it.
101102
Mockito.when(reconnectionSchedule.nextDelay()).thenReturn(Duration.ofDays(1));

core/src/test/java/com/datastax/oss/driver/internal/core/pool/ChannelPoolTestBase.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.datastax.oss.driver.api.core.config.DriverConfig;
2323
import com.datastax.oss.driver.api.core.config.DriverConfigProfile;
2424
import com.datastax.oss.driver.api.core.connection.ReconnectionPolicy;
25+
import com.datastax.oss.driver.api.core.metadata.Node;
2526
import com.datastax.oss.driver.internal.core.channel.ChannelFactory;
2627
import com.datastax.oss.driver.internal.core.channel.DriverChannel;
2728
import com.datastax.oss.driver.internal.core.context.EventBus;
@@ -76,7 +77,8 @@ public void setup() {
7677
Mockito.when(context.channelFactory()).thenReturn(channelFactory);
7778

7879
Mockito.when(context.reconnectionPolicy()).thenReturn(reconnectionPolicy);
79-
Mockito.when(reconnectionPolicy.newSchedule()).thenReturn(reconnectionSchedule);
80+
Mockito.when(reconnectionPolicy.newNodeSchedule(any(Node.class)))
81+
.thenReturn(reconnectionSchedule);
8082
// By default, set a large reconnection delay. Tests that care about reconnection will override
8183
// it.
8284
Mockito.when(reconnectionSchedule.nextDelay()).thenReturn(Duration.ofDays(1));

0 commit comments

Comments
 (0)