Skip to content

Commit aea982c

Browse files
author
Sylvain Lebresne
committed
Rename shutdown to closeAsync and add close/Closeable
JAVA-247 #fixes
1 parent f9c29eb commit aea982c

File tree

10 files changed

+155
-115
lines changed

10 files changed

+155
-115
lines changed

driver-core/CHANGELOG.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ CHANGELOG
55
------
66

77
- [improvement] Make most main objects interface to facilitate testing/mocking (JAVA-195)
8+
- [api] Renamed shutdown to closeAsync and ShutdownFuture to CloseFuture. Clustering
9+
and Session also now implement Closeable (JAVA-247).
810
- [bug] Fix potential thread leaks when shutting down Metrics (JAVA-232)
911
- [bug] Fix potential NPE in HostConnectionPool (JAVA-231)
1012
- [bug] Avoid NPE when node is in an unconfigured DC (JAVA-244)

driver-core/Upgrade_guide_to_2.0.rst

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,11 @@ Main API changes:
4848
in the same JVM). As a result, tools that were polling JMX informations will
4949
have to be updated accordingly.
5050

51-
5. The Cluster and Session shutdown API has changed. There is now only one
52-
shutdown method that is asynchronous but return a Future on the completion
53-
of shutdown. Also, shutdown now waits for ongoing queries to complete by
54-
default (but you can force the closing of all connections if you want to).
51+
5. The Cluster and Session shutdown API has changed. There is now a closeAsync
52+
that is asynchronous but return a Future on the completion of the shutdown
53+
process. And there is also a close shortcut that does the same but block.
54+
Also, close now waits for ongoing queries to complete by default (but you
55+
can force the closing of all connections if you want to).
5556

5657
6. The QueryBuilder#raw method does not automatically add quotes anymore, but
5758
rather ouptut its result without an change (as the raw name implies). This

driver-core/src/main/java/com/datastax/driver/core/ShutdownFuture.java renamed to driver-core/src/main/java/com/datastax/driver/core/CloseFuture.java

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,14 @@
3131
* Note that this class implements <a href="http://code.google.com/p/guava-libraries/">Guava</a>'s {@code
3232
* ListenableFuture} and can so be used with Guava's future utilities.
3333
*/
34-
public abstract class ShutdownFuture extends AbstractFuture<Void> {
34+
public abstract class CloseFuture extends AbstractFuture<Void> {
3535

36-
ShutdownFuture() {}
36+
CloseFuture() {}
3737

38-
static ShutdownFuture immediateFuture() {
39-
ShutdownFuture future = new ShutdownFuture() {
38+
static CloseFuture immediateFuture() {
39+
CloseFuture future = new CloseFuture() {
4040
@Override
41-
public ShutdownFuture force() {
41+
public CloseFuture force() {
4242
return this;
4343
}
4444
};
@@ -47,26 +47,27 @@ public ShutdownFuture force() {
4747
}
4848

4949
/**
50-
* Try to force the completion of the shutdown this a future of.
50+
* Try to force the completion of the shutdown this is a future of.
5151
* <p>
52-
* This method will do its best to expedite the shutdown process. In particular, all connection
53-
* will be closed right away, even if there is ongoing queries at the time of the call
54-
* to this method.
52+
* This method will do its best to expedite the shutdown process. In
53+
* particular, all connections will be closed right away, even if there is
54+
* ongoing queries at the time this method is called.
5555
* <p>
56-
* Note that this method does not block. The completion of this method does not imply that the
57-
* shutdown process and you still need to wait on this future to ensure that, though calling
58-
* this method does ensure said future will return in a timely way.
56+
* Note that this method does not block. The completion of this method does
57+
* not imply the shutdown process is done, you still need to wait on this
58+
* future to ensure that, but calling this method will ensure said
59+
* future will return in a timely way.
5960
*
60-
* @return this {@code ShutdownFuture}.
61+
* @return this {@code CloseFuture}.
6162
*/
62-
public abstract ShutdownFuture force();
63+
public abstract CloseFuture force();
6364

6465
// Internal utility for cases where we want to build a future that wait on other ones
65-
static class Forwarding extends ShutdownFuture {
66+
static class Forwarding extends CloseFuture {
6667

67-
private final List<ShutdownFuture> futures;
68+
private final List<CloseFuture> futures;
6869

69-
Forwarding(List<ShutdownFuture> futures) {
70+
Forwarding(List<CloseFuture> futures) {
7071
this.futures = futures;
7172

7273
Futures.addCallback(Futures.allAsList(futures), new FutureCallback<List<Void>>() {
@@ -81,8 +82,8 @@ public void onSuccess(List<Void> v) {
8182
}
8283

8384
@Override
84-
public ShutdownFuture force() {
85-
for (ShutdownFuture future : futures)
85+
public CloseFuture force() {
86+
for (CloseFuture future : futures)
8687
future.force();
8788
return this;
8889
}

driver-core/src/main/java/com/datastax/driver/core/Cluster.java

Lines changed: 46 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package com.datastax.driver.core;
1717

18+
import java.io.Closeable;
1819
import java.net.InetAddress;
1920
import java.net.UnknownHostException;
2021
import java.util.*;
@@ -55,7 +56,7 @@
5556
* the nodes currently in the cluster as well as new nodes joining the cluster
5657
* subsequently.
5758
*/
58-
public class Cluster {
59+
public class Cluster implements Closeable {
5960

6061
private static final Logger logger = LoggerFactory.getLogger(Cluster.class);
6162

@@ -322,24 +323,40 @@ public Cluster unregister(LatencyTracker tracker) {
322323

323324
/**
324325
* Initiates a shutdown of this cluster instance.
325-
*
326+
* <p>
326327
* This method is asynchronous and return a future on the completion
327328
* of the shutdown process. As soon a the cluster is shutdown, no
328329
* new request will be accepted, but already submitted queries are
329-
* allowed to complete. Shutdown closes all connections from all
330+
* allowed to complete. This method closes all connections from all
330331
* sessions and reclaims all ressources used by this Cluster
331332
* instance.
332333
* <p>
333334
* If for some reason you wish to expedite this process, the
334-
* {@link ShutdownFuture#force} can be called on the result future.
335+
* {@link CloseFuture#force} can be called on the result future.
335336
* <p>
336-
* This method has no particular effect if the cluster was already shut
337-
* down (in which case the returned future will return immediately).
337+
* This method has no particular effect if the cluster was already closed
338+
* (in which case the returned future will return immediately).
338339
*
339-
* @return a future on the completion of the shtudown process.
340+
* @return a future on the completion of the shutdown process.
341+
*/
342+
public CloseFuture closeAsync() {
343+
return manager.close();
344+
}
345+
346+
/**
347+
* Initiates a shutdown of this cluster instance and blocks until
348+
* that shutdown completes.
349+
* <p>
350+
* This method is a shortcut for {@code closeAsync().get()}.
340351
*/
341-
public ShutdownFuture shutdown() {
342-
return manager.shutdown();
352+
public void close() {
353+
try {
354+
closeAsync().get();
355+
} catch (ExecutionException e) {
356+
throw DefaultResultSetFuture.extractCauseFromExecutionException(e);
357+
} catch (InterruptedException e) {
358+
Thread.currentThread().interrupt();
359+
}
343360
}
344361

345362
/**
@@ -842,7 +859,7 @@ class Manager implements Host.StateListener, Connection.DefaultResponseHandler {
842859
// An executor for tasks that migth block some time, like creating new connection, but are generally not too critical.
843860
final ListeningExecutorService blockingTasksExecutor;
844861

845-
final AtomicReference<ShutdownFuture> shutdownFuture = new AtomicReference<ShutdownFuture>();
862+
final AtomicReference<CloseFuture> closeFuture = new AtomicReference<CloseFuture>();
846863

847864
// All the queries that have been prepared (we keep them so we can re-prepared them when a node fail or a
848865
// new one join the cluster).
@@ -899,7 +916,7 @@ private void init() {
899916
try {
900917
controlConnection.connect();
901918
} catch (NoHostAvailableException e) {
902-
shutdown();
919+
close();
903920
throw e;
904921
}
905922
}
@@ -930,13 +947,13 @@ void reportLatency(Host host, long latencyNanos) {
930947
}
931948
}
932949

933-
boolean isShutdown() {
934-
return shutdownFuture.get() != null;
950+
boolean isClosed() {
951+
return closeFuture.get() != null;
935952
}
936953

937-
private ShutdownFuture shutdown() {
954+
private CloseFuture close() {
938955

939-
ShutdownFuture future = shutdownFuture.get();
956+
CloseFuture future = closeFuture.get();
940957
if (future != null)
941958
return future;
942959

@@ -953,24 +970,24 @@ private ShutdownFuture shutdown() {
953970
metrics.shutdown();
954971

955972
// Then we shutdown all connections
956-
List<ShutdownFuture> futures = new ArrayList<ShutdownFuture>(sessions.size() + 1);
957-
futures.add(controlConnection.shutdown());
973+
List<CloseFuture> futures = new ArrayList<CloseFuture>(sessions.size() + 1);
974+
futures.add(controlConnection.closeAsync());
958975
for (Session session : sessions)
959-
futures.add(session.shutdown());
976+
futures.add(session.closeAsync());
960977

961-
future = new ClusterShutdownFuture(futures);
978+
future = new ClusterCloseFuture(futures);
962979

963980
// The rest will happen asynchonrously, when all connections are successfully closed
964-
return shutdownFuture.compareAndSet(null, future)
981+
return closeFuture.compareAndSet(null, future)
965982
? future
966-
: shutdownFuture.get(); // We raced, it's ok, return the future that was actually set
983+
: closeFuture.get(); // We raced, it's ok, return the future that was actually set
967984
}
968985

969986
@Override
970987
public void onUp(final Host host) {
971988
logger.trace("Host {} is UP", host);
972989

973-
if (isShutdown())
990+
if (isClosed())
974991
return;
975992

976993
if (host.isUp())
@@ -1043,7 +1060,7 @@ public void onDown(final Host host) {
10431060
public void onDown(final Host host, final boolean isHostAddition) {
10441061
logger.trace("Host {} is DOWN", host);
10451062

1046-
if (isShutdown())
1063+
if (isClosed())
10471064
return;
10481065

10491066
// Note: we don't want to skip that method if !host.isUp() because we set isUp
@@ -1104,7 +1121,7 @@ protected boolean onUnknownException(Exception e, long nextDelayMs) {
11041121
public void onAdd(final Host host) {
11051122
logger.trace("Adding new host {}", host);
11061123

1107-
if (isShutdown())
1124+
if (isClosed())
11081125
return;
11091126

11101127
try {
@@ -1154,7 +1171,7 @@ public void onFailure(Throwable t) {
11541171

11551172
@Override
11561173
public void onRemove(Host host) {
1157-
if (isShutdown())
1174+
if (isClosed())
11581175
return;
11591176

11601177
host.setDown();
@@ -1267,7 +1284,7 @@ private void prepareAllQueries(Host host) throws InterruptedException {
12671284
}
12681285
}
12691286
} finally {
1270-
connection.close();
1287+
connection.closeAsync();
12711288
}
12721289
} catch (ConnectionException e) {
12731290
// Ignore, not a big deal
@@ -1419,14 +1436,14 @@ private int delayForEvent(ProtocolEvent event) {
14191436
return 0;
14201437
}
14211438

1422-
private class ClusterShutdownFuture extends ShutdownFuture.Forwarding {
1439+
private class ClusterCloseFuture extends CloseFuture.Forwarding {
14231440

1424-
ClusterShutdownFuture(List<ShutdownFuture> futures) {
1441+
ClusterCloseFuture(List<CloseFuture> futures) {
14251442
super(futures);
14261443
}
14271444

14281445
@Override
1429-
public ShutdownFuture force() {
1446+
public CloseFuture force() {
14301447
reconnectionExecutor.shutdownNow();
14311448
scheduledTasksExecutor.shutdownNow();
14321449
executor.shutdownNow();

driver-core/src/main/java/com/datastax/driver/core/Connection.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ class Connection {
6969
private volatile boolean isDefunct;
7070
private volatile ConnectionException exception;
7171

72-
private final AtomicReference<ConnectionShutdownFuture> shutdownFuture = new AtomicReference<ConnectionShutdownFuture>();
72+
private final AtomicReference<ConnectionCloseFuture> closeFuture = new AtomicReference<ConnectionCloseFuture>();
7373

7474
/**
7575
* Create a new connection to a Cassandra node.
@@ -193,7 +193,7 @@ ConnectionException defunct(ConnectionException e) {
193193
exception = e;
194194
isDefunct = true;
195195
dispatcher.errorOutAllHandler(e);
196-
close();
196+
closeAsync();
197197
return e;
198198
}
199199

@@ -311,16 +311,16 @@ public void operationComplete(ChannelFuture writeFuture) {
311311
}
312312

313313
public boolean isClosed() {
314-
return shutdownFuture.get() != null;
314+
return closeFuture.get() != null;
315315
}
316316

317-
public ShutdownFuture close() {
317+
public CloseFuture closeAsync() {
318318

319-
ConnectionShutdownFuture future = new ConnectionShutdownFuture();
320-
if (!shutdownFuture.compareAndSet(null, future))
319+
ConnectionCloseFuture future = new ConnectionCloseFuture();
320+
if (!closeFuture.compareAndSet(null, future))
321321
{
322-
// Shutdown had already been called, return the existing future
323-
return shutdownFuture.get();
322+
// close had already been called, return the existing future
323+
return closeFuture.get();
324324
}
325325

326326
logger.trace("[{}] closing connection", name);
@@ -510,10 +510,10 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
510510
handler.cancelTimeout();
511511
handler.callback.onSet(Connection.this, response, System.nanoTime() - handler.startTime);
512512

513-
// If we happen to be shutdown and we're the last outstanding request, we need to signal the shutdown future
513+
// If we happen to be closed and we're the last outstanding request, we need to signal the close future
514514
// (note: this is racy as the signaling can be called more than once, but that's not a problem)
515515
if (isClosed() && pending.isEmpty())
516-
shutdownFuture.get().force();
516+
closeFuture.get().force();
517517
}
518518
}
519519

@@ -563,10 +563,10 @@ public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) {
563563
}
564564
}
565565

566-
private class ConnectionShutdownFuture extends ShutdownFuture {
566+
private class ConnectionCloseFuture extends CloseFuture {
567567

568568
@Override
569-
public ConnectionShutdownFuture force() {
569+
public ConnectionCloseFuture force() {
570570
// Note: we must not call releaseExternalResources on the bootstrap, because this shutdown the executors, which are shared
571571

572572
// This method can be thrown during Connection ctor, at which point channel is not yet set. This is ok.
@@ -584,9 +584,9 @@ public ConnectionShutdownFuture force() {
584584
future.addListener(new ChannelFutureListener() {
585585
public void operationComplete(ChannelFuture future) {
586586
if (future.getCause() != null)
587-
ConnectionShutdownFuture.this.setException(future.getCause());
587+
ConnectionCloseFuture.this.setException(future.getCause());
588588
else
589-
ConnectionShutdownFuture.this.set(null);
589+
ConnectionCloseFuture.this.set(null);
590590
}
591591
});
592592
return this;

driver-core/src/main/java/com/datastax/driver/core/ControlConnection.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,12 +78,12 @@ public void connect() {
7878
setNewConnection(reconnectInternal());
7979
}
8080

81-
public ShutdownFuture shutdown() {
81+
public CloseFuture closeAsync() {
8282
// We don't have to be fancy here. We just set a flag so that we stop trying to reconnect (and thus change the
8383
// connection used) and shutdown the current one.
8484
isShutdown = true;
8585
Connection connection = connectionRef.get();
86-
return connection == null ? ShutdownFuture.immediateFuture() : connection.close();
86+
return connection == null ? CloseFuture.immediateFuture() : connection.closeAsync();
8787
}
8888

8989
private void reconnect() {
@@ -146,7 +146,7 @@ private void setNewConnection(Connection newConnection) {
146146
logger.debug("[Control connection] Successfully connected to {}", newConnection.address);
147147
Connection old = connectionRef.getAndSet(newConnection);
148148
if (old != null && !old.isClosed())
149-
old.close();
149+
old.closeAsync();
150150
}
151151

152152
private Connection reconnectInternal() {

0 commit comments

Comments
 (0)