From ea62997fc970f084aac6382149cac4f80d355e36 Mon Sep 17 00:00:00 2001 From: Alex Ott Date: Wed, 17 Jan 2018 12:32:20 +0100 Subject: [PATCH 001/317] Small fixes & improvements in documentation (#934) * Update example to match declared interface * swap calls to match commentary * explicitly mention Object Mapper in top-level document * uniform style for "user-defined" objects --- manual/README.md | 5 +++++ manual/object_mapper/creating/README.md | 8 ++++---- manual/object_mapper/using/README.md | 2 +- manual/statements/prepared/README.md | 6 +++--- 4 files changed, 13 insertions(+), 8 deletions(-) diff --git a/manual/README.md b/manual/README.md index 0e8e3c32632..4565da7bd96 100644 --- a/manual/README.md +++ b/manual/README.md @@ -277,6 +277,11 @@ for (ColumnDefinitions.Definition definition : row.getColumnDefinitions()) { } ``` +### Object mapping + +Besides explicit work with queries and rows, you can also use +[Object Mapper](object_mapper/) to simplify retrieval & store of your data. + ### More information diff --git a/manual/object_mapper/creating/README.md b/manual/object_mapper/creating/README.md index b5ba962556c..8292e1224a0 100644 --- a/manual/object_mapper/creating/README.md +++ b/manual/object_mapper/creating/README.md @@ -221,7 +221,7 @@ declaration. [@Computed][computed] can be used on properties that are the result of a computation on the Cassandra side, typically a function call. Native -functions in Cassandra like `writetime()` or [User Defined Functions] are +functions in Cassandra like `writetime()` or [User-Defined Functions] are supported. ```java @@ -249,7 +249,7 @@ Support in [accessors](../using/#accessors) is planned for a future version (see [JAVA-832](https://datastax-oss.atlassian.net/browse/JAVA-832)). -[User Defined Functions]:http://www.planetcassandra.org/blog/user-defined-functions-in-cassandra-3-0/ +[User-Defined Functions]:http://www.planetcassandra.org/blog/user-defined-functions-in-cassandra-3-0/ [computed]:http://docs.datastax.com/en/drivers/java/3.3/com/datastax/driver/mapping/annotations/Computed.html #### Transient properties @@ -263,7 +263,7 @@ it should be placed on either the field declaration or the property getter metho ### Mapping User Types -[User Defined Types] can also be mapped by using [@UDT][udt]: +[User-Defined Types] can also be mapped by using [@UDT][udt]: ``` CREATE TYPE address (street text, zip_code int); @@ -321,7 +321,7 @@ public class Company { This also works with UDTs inside collections or other UDTs, with any arbitrary nesting level. -[User Defined Types]: ../../udts/ +[User-Defined Types]: ../../udts/ [udt]:http://docs.datastax.com/en/drivers/java/3.3/com/datastax/driver/mapping/annotations/UDT.html [field]:http://docs.datastax.com/en/drivers/java/3.3/com/datastax/driver/mapping/annotations/Field.html diff --git a/manual/object_mapper/using/README.md b/manual/object_mapper/using/README.md index 137cfe5c7ad..6be548a9db8 100644 --- a/manual/object_mapper/using/README.md +++ b/manual/object_mapper/using/README.md @@ -202,7 +202,7 @@ implementation for it: ```java UserAccessor userAccessor = manager.createAccessor(UserAccessor.class); -User user = userAccessor.getOne(uuid); +Result users = userAccessor.getAll(); ``` Like mappers, accessors are cached at the manager level and thus, are diff --git a/manual/statements/prepared/README.md b/manual/statements/prepared/README.md index 4fe8c9558bc..b544d3a97a8 100644 --- a/manual/statements/prepared/README.md +++ b/manual/statements/prepared/README.md @@ -122,10 +122,10 @@ BoundStatement bound = ps1.bind() // Using the unset method to unset previously set value. // Positional setter: -bound.unset("description"); +bound.unset(1); // Named setter: -bound.unset(1); +bound.unset("description"); ``` A bound statement also has getters to retrieve the values. Note that @@ -264,4 +264,4 @@ This will be addressed in a future release of both Cassandra and the driver. Fo [execute]: http://docs.datastax.com/en/drivers/java/3.3/com/datastax/driver/core/Session.html#execute-com.datastax.driver.core.Statement- [executeAsync]: http://docs.datastax.com/en/drivers/java/3.3/com/datastax/driver/core/Session.html#executeAsync-com.datastax.driver.core.Statement- [CASSANDRA-10786]: https://issues.apache.org/jira/browse/CASSANDRA-10786 -[JAVA-1196]: https://datastax-oss.atlassian.net/browse/JAVA-1196 \ No newline at end of file +[JAVA-1196]: https://datastax-oss.atlassian.net/browse/JAVA-1196 From 9a8a23fdc5e60f12a719ac490e6ec2fc001b1b4a Mon Sep 17 00:00:00 2001 From: Alexandre Dutra Date: Wed, 28 Feb 2018 14:42:55 +0100 Subject: [PATCH 002/317] JAVA-1448: TokenAwarePolicy should respect child policy ordering (#956) --- changelog/README.md | 5 + .../core/policies/TokenAwarePolicy.java | 221 ++++++++++---- .../core/policies/TokenAwarePolicyTest.java | 277 +++++++++++------- upgrade_guide/README.md | 14 + 4 files changed, 349 insertions(+), 168 deletions(-) diff --git a/changelog/README.md b/changelog/README.md index 8b0c2e215a5..76e180e70f7 100644 --- a/changelog/README.md +++ b/changelog/README.md @@ -1,5 +1,10 @@ ## Changelog +### 3.5.0 (in progress) + +- [improvement] JAVA-1448: TokenAwarePolicy should respect child policy ordering. + + ### 3.4.0 - [improvement] JAVA-1671: Remove unnecessary test on prepared statement metadata. diff --git a/driver-core/src/main/java/com/datastax/driver/core/policies/TokenAwarePolicy.java b/driver-core/src/main/java/com/datastax/driver/core/policies/TokenAwarePolicy.java index ce3ce756f9c..5e97afb27e3 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/policies/TokenAwarePolicy.java +++ b/driver-core/src/main/java/com/datastax/driver/core/policies/TokenAwarePolicy.java @@ -23,31 +23,70 @@ import java.util.*; /** - * A wrapper load balancing policy that add token awareness to a child policy. + * A wrapper load balancing policy that adds token awareness to a child policy. *

* This policy encapsulates another policy. The resulting policy works in * the following way: *

    *
  • the {@code distance} method is inherited from the child policy.
  • - *
  • the iterator return by the {@code newQueryPlan} method will first - * return the {@code LOCAL} replicas for the query (based on {@link Statement#getRoutingKey}) - * if possible (i.e. if the query {@code getRoutingKey} method - * doesn't return {@code null} and if {@link Metadata#getReplicas} - * returns a non empty set of replicas for that partition key). If no - * local replica can be either found or successfully contacted, the rest - * of the query plan will fallback to one of the child policy.
  • + *
  • the iterator returned by the {@code newQueryPlan} method will first + * return the {@link HostDistance#LOCAL LOCAL} replicas for the query + * if possible (i.e. if the query's + * {@linkplain Statement#getRoutingKey(ProtocolVersion, CodecRegistry) routing key} + * is not {@code null} and if the + * {@linkplain Metadata#getReplicas(String, ByteBuffer) set of replicas} + * for that partition key is not empty). If no local replica can be either found + * or successfully contacted, the rest of the query plan will fallback + * to the child policy's one.
  • *
+ * The exact order in which local replicas are returned is dictated by the + * {@linkplain ReplicaOrdering strategy} provided at instantiation. *

- * Do note that only replica for which the child policy {@code distance} - * method returns {@code HostDistance.LOCAL} will be considered having + * Do note that only replicas for which the child policy's + * {@linkplain LoadBalancingPolicy#distance(Host) distance} + * method returns {@link HostDistance#LOCAL LOCAL} will be considered having * priority. For example, if you wrap {@link DCAwareRoundRobinPolicy} with this * token aware policy, replicas from remote data centers may only be - * returned after all the host of the local data center. + * returned after all the hosts of the local data center. */ public class TokenAwarePolicy implements ChainableLoadBalancingPolicy { + /** + * Strategies for replica ordering. + */ + public enum ReplicaOrdering { + + /** + * Order replicas by token ring topology, i.e. always return the "primary" replica first, + * then the second, etc., according to the placement of replicas around the token ring. + *

+ * This strategy is the only one guaranteed to order replicas in a deterministic and + * constant way. This increases the effectiveness of server-side row caching (especially + * at consistency level ONE), but is more heavily impacted by hotspots, since the primary + * replica is always tried first. + */ + TOPOLOGICAL, + + /** + * Return replicas in a different, random order for each query plan. This is the default strategy. + *

+ * This strategy fans out writes and thus can alleviate hotspots caused by "fat" partitions, + * but its randomness makes server-side caching less efficient. + */ + RANDOM, + + /** + * Return the replicas in the exact same order in which they appear in the child + * policy's query plan. + *

+ * This is the only strategy that fully respects the child policy's replica ordering. + * Use it when it is important to keep that order intact (e.g. when using the {@link LatencyAwarePolicy}). + */ + NEUTRAL + } + private final LoadBalancingPolicy childPolicy; - private final boolean shuffleReplicas; + private final ReplicaOrdering replicaOrdering; private volatile Metadata clusterMetadata; private volatile ProtocolVersion protocolVersion; private volatile CodecRegistry codecRegistry; @@ -56,28 +95,36 @@ public class TokenAwarePolicy implements ChainableLoadBalancingPolicy { * Creates a new {@code TokenAware} policy. * * @param childPolicy the load balancing policy to wrap with token awareness. - * @param shuffleReplicas whether to shuffle the replicas returned by {@code getRoutingKey}. - * Note that setting this parameter to {@code true} might decrease the - * effectiveness of caching (especially at consistency level ONE), since - * the same row will be retrieved from any replica (instead of only the - * "primary" replica without shuffling). - * On the other hand, shuffling will better distribute writes, and can - * alleviate hotspots caused by "fat" partitions. + * @param replicaOrdering the strategy to use to order replicas. */ - public TokenAwarePolicy(LoadBalancingPolicy childPolicy, boolean shuffleReplicas) { + public TokenAwarePolicy(LoadBalancingPolicy childPolicy, ReplicaOrdering replicaOrdering) { this.childPolicy = childPolicy; - this.shuffleReplicas = shuffleReplicas; + this.replicaOrdering = replicaOrdering; } /** - * Creates a new {@code TokenAware} policy with shuffling of replicas. + * Creates a new {@code TokenAware} policy. * - * @param childPolicy the load balancing policy to wrap with token - * awareness. - * @see #TokenAwarePolicy(LoadBalancingPolicy, boolean) + * @param childPolicy the load balancing policy to wrap with token awareness. + * @param shuffleReplicas whether or not to shuffle the replicas. + * If {@code true}, then the {@link ReplicaOrdering#RANDOM RANDOM} strategy will be used, + * otherwise the {@link ReplicaOrdering#TOPOLOGICAL TOPOLOGICAL} one will be used. + * @deprecated Use {@link #TokenAwarePolicy(LoadBalancingPolicy, ReplicaOrdering)} instead. + * This constructor will be removed in the next major release. + */ + @SuppressWarnings("DeprecatedIsStillUsed") + @Deprecated + public TokenAwarePolicy(LoadBalancingPolicy childPolicy, boolean shuffleReplicas) { + this(childPolicy, shuffleReplicas ? ReplicaOrdering.RANDOM : ReplicaOrdering.TOPOLOGICAL); + } + + /** + * Creates a new {@code TokenAware} policy with {@link ReplicaOrdering#RANDOM RANDOM} replica ordering. + * + * @param childPolicy the load balancing policy to wrap with token awareness. */ public TokenAwarePolicy(LoadBalancingPolicy childPolicy) { - this(childPolicy, true); + this(childPolicy, ReplicaOrdering.RANDOM); } @Override @@ -94,10 +141,9 @@ public void init(Cluster cluster, Collection hosts) { } /** - * Return the HostDistance for the provided host. - * - * @param host the host of which to return the distance of. - * @return the HostDistance to {@code host} as returned by the wrapped policy. + * {@inheritDoc} + *

+ * This implementation always returns distances as reported by the wrapped policy. */ @Override public HostDistance distance(Host host) { @@ -105,15 +151,15 @@ public HostDistance distance(Host host) { } /** - * Returns the hosts to use for a new query. + * {@inheritDoc} *

- * The returned plan will first return replicas (whose {@code HostDistance} - * for the child policy is {@code LOCAL}) for the query if it can determine - * them (i.e. mainly if {@code statement.getRoutingKey()} is not {@code null}). - * Following what it will return the plan of the child policy. - * - * @param statement the query for which to build the plan. - * @return the new query plan. + * The returned plan will first return local replicas for the query (i.e. + * replicas whose {@linkplain HostDistance distance} according to the child policy is {@code LOCAL}), + * if it can determine them (i.e. mainly if the statement's + * {@linkplain Statement#getRoutingKey(ProtocolVersion, CodecRegistry) routing key} + * is not {@code null}), and ordered according to the {@linkplain ReplicaOrdering ordering strategy} + * specified at instantiation; following what it will return the rest of the child policy's + * original query plan. */ @Override public Iterator newQueryPlan(final String loggedKeyspace, final Statement statement) { @@ -130,39 +176,86 @@ public Iterator newQueryPlan(final String loggedKeyspace, final Statement if (replicas.isEmpty()) return childPolicy.newQueryPlan(loggedKeyspace, statement); - final Iterator iter; - if (shuffleReplicas) { - List l = Lists.newArrayList(replicas); - Collections.shuffle(l); - iter = l.iterator(); - } else { - iter = replicas.iterator(); - } + if (replicaOrdering == ReplicaOrdering.NEUTRAL) { - return new AbstractIterator() { + final Iterator childIterator = childPolicy.newQueryPlan(keyspace, statement); - private Iterator childIterator; + return new AbstractIterator() { - @Override - protected Host computeNext() { - while (iter.hasNext()) { - Host host = iter.next(); - if (host.isUp() && childPolicy.distance(host) == HostDistance.LOCAL) - return host; - } + private List nonReplicas; + private Iterator nonReplicasIterator; + + @Override + protected Host computeNext() { + + while (childIterator.hasNext()) { + + Host host = childIterator.next(); + + if (host.isUp() && replicas.contains(host) && childPolicy.distance(host) == HostDistance.LOCAL) { + // UP replicas should be prioritized, retaining order from childPolicy + return host; + } else { + // save for later + if (nonReplicas == null) + nonReplicas = new ArrayList(); + nonReplicas.add(host); + } - if (childIterator == null) - childIterator = childPolicy.newQueryPlan(loggedKeyspace, statement); + } - while (childIterator.hasNext()) { - Host host = childIterator.next(); - // Skip it if it was already a local replica - if (!replicas.contains(host) || childPolicy.distance(host) != HostDistance.LOCAL) - return host; + // This should only engage if all local replicas are DOWN + if (nonReplicas != null) { + + if (nonReplicasIterator == null) + nonReplicasIterator = nonReplicas.iterator(); + + if (nonReplicasIterator.hasNext()) + return nonReplicasIterator.next(); + } + + return endOfData(); } - return endOfData(); + }; + + } else { + + final Iterator replicasIterator; + + if (replicaOrdering == ReplicaOrdering.RANDOM) { + List replicasList = Lists.newArrayList(replicas); + Collections.shuffle(replicasList); + replicasIterator = replicasList.iterator(); + } else { + replicasIterator = replicas.iterator(); } - }; + + return new AbstractIterator() { + + private Iterator childIterator; + + @Override + protected Host computeNext() { + while (replicasIterator.hasNext()) { + Host host = replicasIterator.next(); + if (host.isUp() && childPolicy.distance(host) == HostDistance.LOCAL) + return host; + } + + if (childIterator == null) + childIterator = childPolicy.newQueryPlan(loggedKeyspace, statement); + + while (childIterator.hasNext()) { + Host host = childIterator.next(); + // Skip it if it was already a local replica + if (!replicas.contains(host) || childPolicy.distance(host) != HostDistance.LOCAL) + return host; + } + return endOfData(); + } + }; + } + } @Override diff --git a/driver-core/src/test/java/com/datastax/driver/core/policies/TokenAwarePolicyTest.java b/driver-core/src/test/java/com/datastax/driver/core/policies/TokenAwarePolicyTest.java index 1464187ba41..d3ca3912425 100644 --- a/driver-core/src/test/java/com/datastax/driver/core/policies/TokenAwarePolicyTest.java +++ b/driver-core/src/test/java/com/datastax/driver/core/policies/TokenAwarePolicyTest.java @@ -18,38 +18,105 @@ import com.datastax.driver.core.*; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; +import java.nio.ByteBuffer; +import java.util.Iterator; +import java.util.List; +import org.assertj.core.util.Sets; import org.testng.annotations.BeforeMethod; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; -import java.nio.ByteBuffer; -import java.util.List; - import static com.datastax.driver.core.Assertions.assertThat; -import static com.datastax.driver.core.CreateCCM.TestMode.PER_METHOD; import static com.datastax.driver.core.TestUtils.CREATE_KEYSPACE_SIMPLE_FORMAT; import static com.datastax.driver.core.TestUtils.nonQuietClusterCloseOptions; - -@CreateCCM(PER_METHOD) -@CCMConfig(createCcm = false) -public class TokenAwarePolicyTest extends CCMTestsSupport { - - QueryTracker queryTracker; - - @BeforeMethod(groups = "short") - public void setUp() { - queryTracker = new QueryTracker(); +import static com.datastax.driver.core.policies.TokenAwarePolicy.ReplicaOrdering.NEUTRAL; +import static com.datastax.driver.core.policies.TokenAwarePolicy.ReplicaOrdering.RANDOM; +import static com.datastax.driver.core.policies.TokenAwarePolicy.ReplicaOrdering.TOPOLOGICAL; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TokenAwarePolicyTest { + + private ByteBuffer routingKey = ByteBuffer.wrap(new byte[]{1, 2, 3, 4}); + + private RegularStatement statement = new SimpleStatement("irrelevant") + .setRoutingKey(routingKey); + + private Host host1 = mock(Host.class); + private Host host2 = mock(Host.class); + private Host host3 = mock(Host.class); + private Host host4 = mock(Host.class); + + private LoadBalancingPolicy childPolicy; + private Cluster cluster; + + @BeforeMethod(groups = "unit") + public void initMocks() { + CodecRegistry codecRegistry = new CodecRegistry(); + cluster = mock(Cluster.class); + Configuration configuration = mock(Configuration.class); + ProtocolOptions protocolOptions = mock(ProtocolOptions.class); + Metadata metadata = mock(Metadata.class); + childPolicy = mock(LoadBalancingPolicy.class); + when(cluster.getConfiguration()).thenReturn(configuration); + when(configuration.getCodecRegistry()).thenReturn(codecRegistry); + when(configuration.getProtocolOptions()).thenReturn(protocolOptions); + when(protocolOptions.getProtocolVersion()).thenReturn(ProtocolVersion.NEWEST_SUPPORTED); + when(cluster.getMetadata()).thenReturn(metadata); + when(metadata.getReplicas(Metadata.quote("keyspace"), routingKey)) + .thenReturn(Sets.newLinkedHashSet(host1, host2)); + when(childPolicy.newQueryPlan("keyspace", statement)).thenReturn( + Sets.newLinkedHashSet(host4, host3, host2, host1).iterator()); + when(childPolicy.distance(any(Host.class))).thenReturn(HostDistance.LOCAL); + when(host1.isUp()).thenReturn(true); + when(host2.isUp()).thenReturn(true); + when(host3.isUp()).thenReturn(true); + when(host4.isUp()).thenReturn(true); } @DataProvider(name = "shuffleProvider") public Object[][] shuffleProvider() { return new Object[][]{ - {true}, - {false}, - {null} + {TokenAwarePolicy.ReplicaOrdering.TOPOLOGICAL}, + {TokenAwarePolicy.ReplicaOrdering.RANDOM}, + {TokenAwarePolicy.ReplicaOrdering.NEUTRAL} }; } + @Test(groups = "unit") + public void should_respect_topological_order() { + // given + TokenAwarePolicy policy = new TokenAwarePolicy(childPolicy, TOPOLOGICAL); + policy.init(cluster, null); + // when + Iterator queryPlan = policy.newQueryPlan("keyspace", statement); + // then + assertThat(queryPlan).containsExactly(host1, host2, host4, host3); + } + + @Test(groups = "unit") + public void should_respect_child_policy_order() { + // given + TokenAwarePolicy policy = new TokenAwarePolicy(childPolicy, NEUTRAL); + policy.init(cluster, null); + // when + Iterator queryPlan = policy.newQueryPlan("keyspace", statement); + // then + assertThat(queryPlan).containsExactly(host2, host1, host4, host3); + } + + @Test(groups = "unit") + public void should_create_random_order() { + // given + TokenAwarePolicy policy = new TokenAwarePolicy(childPolicy, RANDOM); + policy.init(cluster, null); + // when + Iterator queryPlan = policy.newQueryPlan("keyspace", statement); + // then + assertThat(queryPlan).containsOnlyOnce(host1, host2, host3, host4).endsWith(host4, host3); + } + /** * Ensures that {@link TokenAwarePolicy} will shuffle discovered replicas depending on the value of shuffleReplicas * used when constructing with {@link TokenAwarePolicy#TokenAwarePolicy(LoadBalancingPolicy, boolean)} and that if not @@ -58,20 +125,14 @@ public Object[][] shuffleProvider() { * @test_category load_balancing:token_aware */ @Test(groups = "short", dataProvider = "shuffleProvider") - public void should_shuffle_replicas_based_on_configuration(Boolean shuffleReplicas) { + public void should_order_replicas_based_on_configuration(TokenAwarePolicy.ReplicaOrdering ordering) { // given: an 8 node cluster using TokenAwarePolicy and some shuffle replica configuration with a keyspace with replication factor of 3. ScassandraCluster sCluster = ScassandraCluster.builder() .withNodes(8) .withSimpleKeyspace("keyspace", 3) .build(); - LoadBalancingPolicy loadBalancingPolicy; - if (shuffleReplicas == null) { - loadBalancingPolicy = new TokenAwarePolicy(new RoundRobinPolicy()); - shuffleReplicas = true; - } else { - loadBalancingPolicy = new TokenAwarePolicy(new RoundRobinPolicy(), shuffleReplicas); - } + LoadBalancingPolicy loadBalancingPolicy = new TokenAwarePolicy(new SortingLoadBalancingPolicy(), ordering); Cluster cluster = Cluster.builder() .addContactPoints(sCluster.address(1).getAddress()) @@ -83,11 +144,9 @@ public void should_shuffle_replicas_based_on_configuration(Boolean shuffleReplic try { sCluster.init(); - Session session = cluster.connect(); - // given: A routing key that falls in the token range of node 6. - // Encodes into murmur hash '4874351301193663061' which should belong be owned by node 6 with replicas 7 and 8. + // Encodes into murmur hash '4874351301193663061' which should be owned by node 6 with replicas 7 and 8. ByteBuffer routingKey = TypeCodec.varchar().serialize("This is some sample text", ProtocolVersion.NEWEST_SUPPORTED); // then: The replicas resolved from the cluster metadata must match node 6 and its replicas. @@ -103,26 +162,17 @@ public void should_shuffle_replicas_based_on_configuration(Boolean shuffleReplic statement.setRoutingKey(routingKey); statement.setKeyspace("keyspace"); - boolean shuffledAtLeastOnce = false; - for (int i = 0; i < 1024; i++) { - List queryPlan = Lists.newArrayList(loadBalancingPolicy.newQueryPlan(null, statement)); - assertThat(queryPlan).containsOnlyElementsOf(cluster.getMetadata().getAllHosts()); - - List firstThree = queryPlan.subList(0, 3); - // then: if shuffle replicas was used or using default, the first three hosts returned should be 6,7,8 in any order. - // if shuffle replicas was not used, the first three hosts returned should be 6,7,8 in that order. - if (shuffleReplicas) { - assertThat(firstThree).containsOnlyElementsOf(replicas); - if (!firstThree.equals(replicas)) { - shuffledAtLeastOnce = true; - } - } else { - assertThat(firstThree).containsExactlyElementsOf(replicas); - } - } + List queryPlan = Lists.newArrayList(loadBalancingPolicy.newQueryPlan(null, statement)); + assertThat(queryPlan).containsOnlyElementsOf(cluster.getMetadata().getAllHosts()); - // then: given 1024 query plans, the replicas should be shuffled at least once. - assertThat(shuffledAtLeastOnce).isEqualTo(shuffleReplicas); + List firstThree = queryPlan.subList(0, 3); + // then: if ordering is RANDOM, the first three hosts returned should be 6,7,8 in any order. + // if ordering is TOPOLOGICAL or NEUTRAL, the first three hosts returned should be 6,7,8 in that order. + if (ordering == RANDOM) { + assertThat(firstThree).containsOnlyElementsOf(replicas); + } else { + assertThat(firstThree).containsExactlyElementsOf(replicas); + } } finally { cluster.close(); sCluster.stop(); @@ -156,12 +206,13 @@ public void should_choose_proper_host_based_on_routing_key() { Session session = cluster.connect(); - // Encodes into murmur hash '4557949199137838892' which should belong be owned by node 3. + // Encodes into murmur hash '4557949199137838892' which should be owned by node 3. ByteBuffer routingKey = TypeCodec.varchar().serialize("should_choose_proper_host_based_on_routing_key", ProtocolVersion.NEWEST_SUPPORTED); SimpleStatement statement = new SimpleStatement("select * from table where k=5") .setRoutingKey(routingKey) .setKeyspace("keyspace"); + QueryTracker queryTracker = new QueryTracker(); queryTracker.query(session, 10, statement); // then: The host having that token should be queried. @@ -210,6 +261,7 @@ public void should_choose_host_in_local_dc_when_using_network_topology_strategy_ .setRoutingKey(routingKey) .setKeyspace("keyspace"); + QueryTracker queryTracker = new QueryTracker(); queryTracker.query(session, 10, statement); // then: The local replica (2:1) should be queried and never the remote one. @@ -240,7 +292,7 @@ public void should_use_other_nodes_when_replicas_having_token_are_down() { .withPort(sCluster.getBinaryPort()) .withNettyOptions(nonQuietClusterCloseOptions) // Don't shuffle replicas just to keep test deterministic. - .withLoadBalancingPolicy(new TokenAwarePolicy(new RoundRobinPolicy(), false)) + .withLoadBalancingPolicy(new TokenAwarePolicy(new SortingLoadBalancingPolicy(), NEUTRAL)) .build(); try { @@ -255,58 +307,60 @@ public void should_use_other_nodes_when_replicas_having_token_are_down() { .setRoutingKey(routingKey) .setKeyspace("keyspace"); + QueryTracker queryTracker = new QueryTracker(); queryTracker.query(session, 10, statement); - // then: The node that is the primary for that key's hash is chosen. - queryTracker.assertQueried(sCluster, 1, 1, 0); + // then: primary replica is 4, secondary is 1; since the child policy returns [1,2,3,4], the + // TAP reorders the plan to [1,4,2,3]. Only 1 should be queried + queryTracker.assertQueried(sCluster, 1, 1, 10); queryTracker.assertQueried(sCluster, 1, 2, 0); queryTracker.assertQueried(sCluster, 1, 3, 0); - queryTracker.assertQueried(sCluster, 1, 4, 10); + queryTracker.assertQueried(sCluster, 1, 4, 0); - // when: The primary node owning that key goes down and a query is made. + // when: The secondary node owning that key (1) goes down and a query is made. queryTracker.reset(); - sCluster.stop(cluster, 4); + sCluster.stop(cluster, 1); queryTracker.query(session, 10, statement); - // then: The next replica having that data should be chosen (node 1). - queryTracker.assertQueried(sCluster, 1, 1, 10); + // then: The next replica having that data should be chosen (node 4 - primary replica). + queryTracker.assertQueried(sCluster, 1, 1, 0); queryTracker.assertQueried(sCluster, 1, 2, 0); queryTracker.assertQueried(sCluster, 1, 3, 0); - queryTracker.assertQueried(sCluster, 1, 4, 0); + queryTracker.assertQueried(sCluster, 1, 4, 10); // when: All nodes having that token are down and a query is made. queryTracker.reset(); - sCluster.stop(cluster, 1); + sCluster.stop(cluster, 4); queryTracker.query(session, 10, statement); // then: The remaining nodes which are non-replicas of that token should be used - // delegating to the child policy (RoundRobin). + // delegating to the child policy. queryTracker.assertQueried(sCluster, 1, 1, 0); - queryTracker.assertQueried(sCluster, 1, 2, 5); - queryTracker.assertQueried(sCluster, 1, 3, 5); + queryTracker.assertQueried(sCluster, 1, 2, 10); + queryTracker.assertQueried(sCluster, 1, 3, 0); queryTracker.assertQueried(sCluster, 1, 4, 0); - // when: A replica having that key becomes up and a query is made. + // when: A replica having that key (4) becomes up and a query is made. queryTracker.reset(); - sCluster.start(cluster, 1); + sCluster.start(cluster, 4); queryTracker.query(session, 10, statement); // then: The newly up replica should be queried. - queryTracker.assertQueried(sCluster, 1, 1, 10); + queryTracker.assertQueried(sCluster, 1, 1, 0); queryTracker.assertQueried(sCluster, 1, 2, 0); queryTracker.assertQueried(sCluster, 1, 3, 0); - queryTracker.assertQueried(sCluster, 1, 4, 0); + queryTracker.assertQueried(sCluster, 1, 4, 10); - // when: The primary replicas becomes up and a query is made. + // when: The other replica becomes up and a query is made. queryTracker.reset(); - sCluster.start(cluster, 4); + sCluster.start(cluster, 1); queryTracker.query(session, 10, statement); - // then: The primary replica which is now up should be queried. - queryTracker.assertQueried(sCluster, 1, 1, 0); + // then: The secondary replica (1) which is now up should be queried. + queryTracker.assertQueried(sCluster, 1, 1, 10); queryTracker.assertQueried(sCluster, 1, 2, 0); queryTracker.assertQueried(sCluster, 1, 3, 0); - queryTracker.assertQueried(sCluster, 1, 4, 10); + queryTracker.assertQueried(sCluster, 1, 4, 0); } finally { cluster.close(); sCluster.stop(); @@ -333,7 +387,7 @@ public void should_use_provided_routing_key_boundstatement() { .withPort(sCluster.getBinaryPort()) .withNettyOptions(nonQuietClusterCloseOptions) // Don't shuffle replicas just to keep test deterministic. - .withLoadBalancingPolicy(new TokenAwarePolicy(new RoundRobinPolicy(), false)) + .withLoadBalancingPolicy(new TokenAwarePolicy(new SortingLoadBalancingPolicy(), NEUTRAL)) .build(); try { @@ -351,6 +405,7 @@ public void should_use_provided_routing_key_boundstatement() { ByteBuffer routingKey = TypeCodec.bigint().serialize(33L, ProtocolVersion.NEWEST_SUPPORTED); bs.setRoutingKey(routingKey); + QueryTracker queryTracker = new QueryTracker(); queryTracker.query(session, 10, bs); // Expect only node 3 to have been queried, give it has ownership of that partition @@ -391,43 +446,57 @@ public void should_use_provided_routing_key_boundstatement() { * @test_category load_balancing:token_aware * @jira_ticket JAVA-123 (to ensure routing key buffers are not destroyed). */ - @CCMConfig(createCcm = true, numberOfNodes = 3, createCluster = false) @Test(groups = "long") public void should_properly_generate_and_use_routing_key_for_composite_partition_key() { // given: a 3 node cluster with a keyspace with RF 1. - Cluster cluster = register(Cluster.builder() + CCMBridge ccm = CCMBridge.builder() + .withNodes(3) + .build(); + + ccm.start(); + + Cluster cluster = Cluster.builder() + .addContactPoints(ccm.addressOfNode(1).getAddress()) + .withPort(ccm.getBinaryPort()) + .withNettyOptions(nonQuietClusterCloseOptions) .withLoadBalancingPolicy(new TokenAwarePolicy(new RoundRobinPolicy())) - .addContactPoints(getContactPoints().get(0)) - .withPort(ccm().getBinaryPort()) - .build()); - Session session = cluster.connect(); - - String table = "composite"; - String ks = TestUtils.generateIdentifier("ks_"); - session.execute(String.format(CREATE_KEYSPACE_SIMPLE_FORMAT, ks, 1)); - session.execute("USE " + ks); - session.execute(String.format("CREATE TABLE %s (k1 int, k2 int, i int, PRIMARY KEY ((k1, k2)))", table)); - - // (1,2) resolves to token '4881097376275569167' which belongs to node 1 so all queries should go to that node. - PreparedStatement insertPs = session.prepare("INSERT INTO " + table + "(k1, k2, i) VALUES (?, ?, ?)"); - BoundStatement insertBs = insertPs.bind(1, 2, 3); - - PreparedStatement selectPs = session.prepare("SELECT * FROM " + table + " WHERE k1=? and k2=?"); - BoundStatement selectBs = selectPs.bind(1, 2); - - // when: executing a prepared statement with a composite partition key. - // then: should query the correct node (1) in for both insert and select queries. - for (int i = 0; i < 10; i++) { - ResultSet rs = session.execute(insertBs); - assertThat(rs.getExecutionInfo().getQueriedHost()).isEqualTo(TestUtils.findHost(cluster, 1)); - - rs = session.execute(selectBs); - assertThat(rs.getExecutionInfo().getQueriedHost()).isEqualTo(TestUtils.findHost(cluster, 1)); - assertThat(rs.isExhausted()).isFalse(); - Row r = rs.one(); - assertThat(rs.isExhausted()).isTrue(); - - assertThat(r.getInt("i")).isEqualTo(3); + .build(); + + try { + + Session session = cluster.connect(); + + String ks = TestUtils.generateIdentifier("ks_"); + session.execute(String.format(CREATE_KEYSPACE_SIMPLE_FORMAT, ks, 1)); + session.execute("USE " + ks); + session.execute("CREATE TABLE composite (k1 int, k2 int, i int, PRIMARY KEY ((k1, k2)))"); + + // (1,2) resolves to token '4881097376275569167' which belongs to node 1 so all queries should go to that node. + PreparedStatement insertPs = session.prepare("INSERT INTO composite(k1, k2, i) VALUES (?, ?, ?)"); + BoundStatement insertBs = insertPs.bind(1, 2, 3); + + PreparedStatement selectPs = session.prepare("SELECT * FROM composite WHERE k1=? and k2=?"); + BoundStatement selectBs = selectPs.bind(1, 2); + + // when: executing a prepared statement with a composite partition key. + // then: should query the correct node (1) in for both insert and select queries. + Host host1 = TestUtils.findHost(cluster, 1); + for (int i = 0; i < 10; i++) { + ResultSet rs = session.execute(insertBs); + assertThat(rs.getExecutionInfo().getQueriedHost()).isEqualTo(host1); + + rs = session.execute(selectBs); + assertThat(rs.getExecutionInfo().getQueriedHost()).isEqualTo(host1); + assertThat(rs.isExhausted()).isFalse(); + Row r = rs.one(); + assertThat(rs.isExhausted()).isTrue(); + + assertThat(r.getInt("i")).isEqualTo(3); + } + } finally { + cluster.close(); + ccm.remove(); } } + } diff --git a/upgrade_guide/README.md b/upgrade_guide/README.md index b278a1491d5..83c2d02a5ef 100644 --- a/upgrade_guide/README.md +++ b/upgrade_guide/README.md @@ -3,6 +3,20 @@ The purpose of this guide is to detail changes made by successive versions of the Java driver. +### 3.5.0 + +The `TokenAwarePolicy` now has a new constructor that takes a `ReplicaOrdering` +argument. One of the advantages of this feature is the new `NEUTRAL` ordering +strategy, which honors its child policy's ordering, i.e., replicas +are returned in the same relative order as in the child policy's query plan. +As an example, if the child policy returns the plan [A, B, C, D], and the replicas +for the query being routed are [D, A, B], then the token aware policy would return +the plan [A, B, D, C]. + +As a consequence, the constructor taking a boolean parameter `shuffleReplicas` +is now deprecated and will be removed in the next major release. + + ### 3.4.0 `QueryBuilder` methods `in`, `lt`, `lte`, `eq`, `gt`, and `gte` now accept From 640f3a12e6a3c50fc529f269e9039c733268dffa Mon Sep 17 00:00:00 2001 From: Alex Ott Date: Wed, 28 Feb 2018 15:03:31 +0100 Subject: [PATCH 003/317] Fix eclipse problems (#954) * fix missing groupId that prevent from correct import into Eclipse * ignore plugins for that there is no m2e connectors --- pom.xml | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/pom.xml b/pom.xml index 3f4c5c92a98..07a031e7d10 100644 --- a/pom.xml +++ b/pom.xml @@ -843,6 +843,7 @@ limitations under the License. + org.apache.maven.plugins maven-jar-plugin [2.2,) @@ -853,6 +854,32 @@ limitations under the License. + + + org.codehaus.mojo + clirr-maven-plugin + [2.7,) + + check + + + + + + + + + org.codehaus.gmaven + gmaven-plugin + [1.5,) + + testCompile + + + + + + From 56e800938e5ed3d9ca556899f2353b734d10661e Mon Sep 17 00:00:00 2001 From: Andrew Tolbert Date: Wed, 28 Feb 2018 08:06:21 -0600 Subject: [PATCH 004/317] JAVA-1751: Include defaultTimestamp length in encodedSize for >= V3 (#950) * JAVA-1751: Include defaultTimestamp length in encodedSize for >= V3 QueryProtocolOptions.encodedSize previously incorrectly only accounted for defaultTimestamp length if protocol version was equal to V3. It should be included for all protocol versions >= V3. * JAVA-1770: Fix message size when using Custom Payload --- changelog/README.md | 4 +++- .../main/java/com/datastax/driver/core/CBUtil.java | 11 +++-------- .../java/com/datastax/driver/core/Message.java | 6 ++++++ .../java/com/datastax/driver/core/Requests.java | 14 +++++++------- .../datastax/driver/core/CustomPayloadTest.java | 4 ++-- 5 files changed, 21 insertions(+), 18 deletions(-) diff --git a/changelog/README.md b/changelog/README.md index 76e180e70f7..4e9604cdf08 100644 --- a/changelog/README.md +++ b/changelog/README.md @@ -1,8 +1,10 @@ ## Changelog -### 3.5.0 (in progress) +### 3.5.0 (In progress) - [improvement] JAVA-1448: TokenAwarePolicy should respect child policy ordering. +- [bug] JAVA-1751: Include defaultTimestamp length in encodedSize for protocol version >= 3. +- [bug] JAVA-1770: Fix message size when using Custom Payload. ### 3.4.0 diff --git a/driver-core/src/main/java/com/datastax/driver/core/CBUtil.java b/driver-core/src/main/java/com/datastax/driver/core/CBUtil.java index c252f8d23c2..7996baa892c 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/CBUtil.java +++ b/driver-core/src/main/java/com/datastax/driver/core/CBUtil.java @@ -113,22 +113,17 @@ public static byte[] readBytes(ByteBuf cb) { } } - public static void writeBytes(byte[] bytes, ByteBuf cb) { + public static void writeShortBytes(byte[] bytes, ByteBuf cb) { cb.writeShort(bytes.length); cb.writeBytes(bytes); } - public static void writeBytes(ByteBuffer bytes, ByteBuf cb) { - cb.writeShort(bytes.remaining()); - cb.writeBytes(bytes.duplicate()); - } - - public static int sizeOfBytes(byte[] bytes) { + public static int sizeOfShortBytes(byte[] bytes) { return 2 + bytes.length; } public static int sizeOfBytes(ByteBuffer bytes) { - return 2 + bytes.remaining(); + return 4 + bytes.remaining(); } public static Map readBytesMap(ByteBuf cb) { diff --git a/driver-core/src/main/java/com/datastax/driver/core/Message.java b/driver-core/src/main/java/com/datastax/driver/core/Message.java index b70772aaec3..a78a9d61d83 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/Message.java +++ b/driver-core/src/main/java/com/datastax/driver/core/Message.java @@ -342,6 +342,12 @@ protected void encode(ChannelHandlerContext ctx, Request request, List o } coder.encode(request, body, protocolVersion); + if (body.capacity() != messageSize) { + logger.warn("Detected buffer resizing while encoding {} message ({} => {}), " + + "this is a driver bug " + + "(ultimately it does not affect the query, but leads to a small inefficiency)", + request.type, messageSize, body.capacity()); + } out.add(Frame.create(protocolVersion, request.type.opcode, request.getStreamId(), flags, body)); } } diff --git a/driver-core/src/main/java/com/datastax/driver/core/Requests.java b/driver-core/src/main/java/com/datastax/driver/core/Requests.java index 7eeb1e46aba..c42f02b8c16 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/Requests.java +++ b/driver-core/src/main/java/com/datastax/driver/core/Requests.java @@ -187,17 +187,17 @@ static class Execute extends Message.Request { static final Message.Coder coder = new Message.Coder() { @Override public void encode(Execute msg, ByteBuf dest, ProtocolVersion version) { - CBUtil.writeBytes(msg.statementId.bytes, dest); + CBUtil.writeShortBytes(msg.statementId.bytes, dest); if (ProtocolFeature.PREPARED_METADATA_CHANGES.isSupportedBy(version)) - CBUtil.writeBytes(msg.resultMetadataId.bytes, dest); + CBUtil.writeShortBytes(msg.resultMetadataId.bytes, dest); msg.options.encode(dest, version); } @Override public int encodedSize(Execute msg, ProtocolVersion version) { - int size = CBUtil.sizeOfBytes(msg.statementId.bytes); + int size = CBUtil.sizeOfShortBytes(msg.statementId.bytes); if (ProtocolFeature.PREPARED_METADATA_CHANGES.isSupportedBy(version)) - size += CBUtil.sizeOfBytes(msg.resultMetadataId.bytes); + size += CBUtil.sizeOfShortBytes(msg.resultMetadataId.bytes); size += msg.options.encodedSize(version); return size; } @@ -402,7 +402,7 @@ int encodedSize(ProtocolVersion version) { size += CBUtil.sizeOfValue(pagingState); if (flags.contains(QueryFlag.SERIAL_CONSISTENCY)) size += CBUtil.sizeOfConsistencyLevel(serialConsistency); - if (version == ProtocolVersion.V3 && flags.contains(QueryFlag.DEFAULT_TIMESTAMP)) + if (version.compareTo(ProtocolVersion.V3) >= 0 && flags.contains(QueryFlag.DEFAULT_TIMESTAMP)) size += 8; return size; default: @@ -434,7 +434,7 @@ public void encode(Batch msg, ByteBuf dest, ProtocolVersion version) { if (q instanceof String) CBUtil.writeLongString((String) q, dest); else - CBUtil.writeBytes(((MD5Digest) q).bytes, dest); + CBUtil.writeShortBytes(((MD5Digest) q).bytes, dest); CBUtil.writeValueList(msg.values.get(i), dest); } @@ -449,7 +449,7 @@ public int encodedSize(Batch msg, ProtocolVersion version) { Object q = msg.queryOrIdList.get(i); size += 1 + (q instanceof String ? CBUtil.sizeOfLongString((String) q) - : CBUtil.sizeOfBytes(((MD5Digest) q).bytes)); + : CBUtil.sizeOfShortBytes(((MD5Digest) q).bytes)); size += CBUtil.sizeOfValueList(msg.values.get(i)); } diff --git a/driver-core/src/test/java/com/datastax/driver/core/CustomPayloadTest.java b/driver-core/src/test/java/com/datastax/driver/core/CustomPayloadTest.java index 926578a6d5c..b840aa695c7 100644 --- a/driver-core/src/test/java/com/datastax/driver/core/CustomPayloadTest.java +++ b/driver-core/src/test/java/com/datastax/driver/core/CustomPayloadTest.java @@ -258,8 +258,8 @@ public void should_print_log_message_when_level_trace() throws Exception { session().execute(statement); String logs = appender.waitAndGet(10000); assertThat(logs) - .contains("Sending payload: {k1:0x010203, k2:0x040506} (20 bytes total)") - .contains("Received payload: {k1:0x010203, k2:0x040506} (20 bytes total)"); + .contains("Sending payload: {k1:0x010203, k2:0x040506} (24 bytes total)") + .contains("Received payload: {k1:0x010203, k2:0x040506} (24 bytes total)"); } finally { logger.setLevel(null); logger.removeAppender(appender); From 2e20c9da61a6953c4dcc0dc9fcd4219f399874c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?C=C3=A9dric=20Chantepie?= Date: Wed, 28 Feb 2018 15:06:41 +0100 Subject: [PATCH 005/317] Add equals/hashCode to QueryOptions (#843) --- .../datastax/driver/core/QueryOptions.java | 35 +++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/driver-core/src/main/java/com/datastax/driver/core/QueryOptions.java b/driver-core/src/main/java/com/datastax/driver/core/QueryOptions.java index 8c4a9b4f56b..b22e109a7b3 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/QueryOptions.java +++ b/driver-core/src/main/java/com/datastax/driver/core/QueryOptions.java @@ -17,6 +17,7 @@ import com.datastax.driver.core.exceptions.UnsupportedFeatureException; import com.datastax.driver.core.utils.MoreFutures; +import com.datastax.driver.core.utils.MoreObjects; import com.google.common.util.concurrent.Futures; /** @@ -484,4 +485,38 @@ public int getMaxPendingRefreshNodeRequests() { return maxPendingRefreshNodeRequests; } + public boolean equals(Object that) { + if (that == null || !(that instanceof QueryOptions)) { + return false; + } + + QueryOptions other = (QueryOptions) that; + + return (this.consistency.equals(other.consistency) && + this.serialConsistency.equals(other.serialConsistency) && + this.fetchSize == other.fetchSize && + this.defaultIdempotence == other.defaultIdempotence && + this.metadataEnabled == other.metadataEnabled && + this.maxPendingRefreshNodeListRequests == other.maxPendingRefreshNodeListRequests && + this.maxPendingRefreshNodeRequests == other.maxPendingRefreshNodeRequests && + this.maxPendingRefreshSchemaRequests == other.maxPendingRefreshSchemaRequests && + this.refreshNodeListIntervalMillis == other.refreshNodeListIntervalMillis && + this.refreshNodeIntervalMillis == other.refreshNodeIntervalMillis && + this.refreshSchemaIntervalMillis == other.refreshSchemaIntervalMillis && + this.reprepareOnUp == other.reprepareOnUp && + this.prepareOnAllHosts == prepareOnAllHosts + ); + } + + public int hashCode() { + return MoreObjects. + hashCode(consistency, serialConsistency, fetchSize, + defaultIdempotence, metadataEnabled, + maxPendingRefreshNodeListRequests, + maxPendingRefreshNodeRequests, + maxPendingRefreshSchemaRequests, + refreshNodeListIntervalMillis, refreshNodeIntervalMillis, + refreshSchemaIntervalMillis, reprepareOnUp, + prepareOnAllHosts); + } } From e27acfcf8db59eeaf0d13623c877349d2e03630d Mon Sep 17 00:00:00 2001 From: Andrew Tolbert Date: Wed, 28 Feb 2018 08:07:22 -0600 Subject: [PATCH 006/317] JAVA-1760: Add metrics documentation (#958) --- changelog/README.md | 1 + manual/metrics/README.md | 157 +++++++++++++++++++++++++++++++++++++-- 2 files changed, 150 insertions(+), 8 deletions(-) diff --git a/changelog/README.md b/changelog/README.md index 4e9604cdf08..0a6ea0dccbf 100644 --- a/changelog/README.md +++ b/changelog/README.md @@ -5,6 +5,7 @@ - [improvement] JAVA-1448: TokenAwarePolicy should respect child policy ordering. - [bug] JAVA-1751: Include defaultTimestamp length in encodedSize for protocol version >= 3. - [bug] JAVA-1770: Fix message size when using Custom Payload. +- [documentation] JAVA-1760: Add metrics documentation. ### 3.4.0 diff --git a/manual/metrics/README.md b/manual/metrics/README.md index 78cad50c1e4..78c42af86cf 100644 --- a/manual/metrics/README.md +++ b/manual/metrics/README.md @@ -1,12 +1,153 @@ ## Metrics -*Coming soon... In the meantime, see the javadoc for [Metrics].* +The driver exposes measurements of its internal behavior through the popular [Dropwizard Metrics] +library. Developers can access these metrics and choose to export them to a monitoring tool. - +The driver depends on Metrics 3.2.x, but is compatible with newer versions of Dropwizard Metrics. +For using Metrics 4.x with the driver, see [Metrics 4 Compatibility](#metrics-4-compatibility). -[Metrics]: http://docs.datastax.com/en/drivers/java/3.4/com/datastax/driver/core/Metrics.html \ No newline at end of file +### Structure + +Metric names are path-like, dot-separated strings. Metrics are measured at the `Cluster`-level, +thus the driver prefixes them with the name of the `Cluster` they are associated with (see [withClusterName] +for how to configure this), suffixed by `-metrics`. For example: + +``` +cluster1-metrics.connected-to +cluster1-metrics.connection-errors +... +``` + +### Configuration + +By default, metrics are enabled and exposed via JMX as [MXBeans]. + +Some users may find that they don't want the driver to record and expose metrics. To disable +metrics collection, use the [withoutMetrics] builder method, i.e.: + +```java +Cluster cluster = Cluster.builder() + .withoutMetrics() + .build(); +``` + +Note that if you decide to disable metrics, you may also consider excluding metrics as a dependency. +To do this in a maven project: + +```xml + + com.datastax.cassandra + cassandra-driver-core + 3.4.0 + + + io.dropwizard.metrics + metrics-core + + + +``` + +Alternatively, one may not want to expose metrics using JMX. Disabling JMX reporting is simple +as using the [withoutJMXReporting] builder method, i.e.: + +```java +Cluster cluster = Cluster.builder() + .withoutJMXReporting() + .build(); +``` + +### Accessing Cluster Metrics + +`Cluster` metrics may be accessed via the [getMetrics] method. The [Metrics] class offers +direct access to all metrics recorded for the `Cluster` via getter methods. Refer to +the [Metrics javadoc][Metrics] for more details about the metrics offered. + +It is very common for applications to record their own metrics. You can add all metrics +recorded for a `Cluster` to your applications' [MetricRegistry] in the following manner: + +```java +MetricRegistry myRegistery = new MetricRegistry(); +myRegistry.registerAll(cluster.getMetrics().getRegistry()); +``` + +### Registering a Custom Reporter + +Dropwizard Metrics offers a variety of [Reporters] for exporting metrics. To enable reporting, +access the `Cluster`'s metrics via the [getMetrics] method. For example, to enable CSV reporting +every 30 seconds: + +```java +import com.codahale.metrics.*; + +import java.io.File; +import java.util.concurrent.TimeUnit; + +Metrics metrics = cluster.getMetrics(); + +CsvReporter csvReporter = CsvReporter.forRegistry(metrics.getRegistry()) + .convertDurationsTo(TimeUnit.MILLISECONDS) + .convertRatesTo(TimeUnit.SECONDS) + .build(new File(".")); + +csvReporter.start(30, TimeUnit.SECONDS); +``` + +### Metrics 4 Compatibility + +While the driver depends on Metrics 3.2.x, it also works with Metrics 4, with some caveats. + +In Metrics 4, JMX reporting was moved to a separate module, `metrics-jmx`. Because of this you are +likely to encounter the following exception at runtime when initializing a `Cluster`: + +``` +Exception in thread "main" java.lang.NoClassDefFoundError: com/codahale/metrics/JmxReporter + at com.datastax.driver.core.Metrics.(Metrics.java:103) + at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1402) + at com.datastax.driver.core.Cluster.init(Cluster.java:159) + at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:330) + at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:305) + at com.datastax.durationtest.core.DurationTest.createSessions(DurationTest.java:360) + .... +Caused by: java.lang.ClassNotFoundException: com.codahale.metrics.JmxReporter + at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:582) + at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:185) + at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:496) + ... 8 more +``` + +To fix this, use [withoutJMXReporting] when constructing your `Cluster`. If you still desire JMX +reporting, add `metrics-jmx` as a dependency: + +```xml + + io.dropwizard.metrics + metrics-jmx + 4.0.2 + +``` + +Then create your `Cluster` and `JmxReporter` in the following manner: + +```java +Cluster cluster = Cluster.builder() + .withoutJMXReporting() + .build(); + +JmxReporter reporter = + JmxReporter.forRegistry(cluster.getMetrics().getRegistry()) + .inDomain(cluster.getClusterName() + "-metrics") + .build(); + +reporter.start(); +``` + +[Dropwizard Metrics]: http://metrics.dropwizard.io/3.2.2/manual/index.html +[Reporters]: http://metrics.dropwizard.io/3.2.2/manual/core.html#reporters +[MetricRegistry]: http://metrics.dropwizard.io/3.2.2/apidocs/com/codahale/metrics/MetricRegistry.html +[MXBeans]: https://docs.oracle.com/javase/tutorial/jmx/mbeans/mxbeans.html +[withClusterName]: https://docs.datastax.com/en/drivers/java/3.4/com/datastax/driver/core/Cluster.Builder.html#withClusterName-java.lang.String- +[withoutMetrics]: https://docs.datastax.com/en/drivers/java/3.4/com/datastax/driver/core/Cluster.Builder.html#withoutMetrics-- +[withoutJMXReporting]: https://docs.datastax.com/en/drivers/java/3.4/com/datastax/driver/core/Cluster.Builder.html#withoutJMXReporting-- +[getMetrics]: https://docs.datastax.com/en/drivers/java/3.4/com/datastax/driver/core/Cluster.html#getMetrics-- +[Metrics]: http://docs.datastax.com/en/drivers/java/3.4/com/datastax/driver/core/Metrics.html From 5b5f7b1b035cd22b5536f8efe41ad67bf8543cfb Mon Sep 17 00:00:00 2001 From: Andrew Tolbert Date: Wed, 28 Feb 2018 08:10:12 -0600 Subject: [PATCH 007/317] JAVA-1765: Update dependencies to latest patch versions (#959) --- changelog/README.md | 1 + driver-core/pom.xml | 4 +-- .../datastax/driver/osgi/BundleOptions.java | 2 +- manual/compression/README.md | 6 ++-- manual/speculative_execution/README.md | 2 +- manual/ssl/README.md | 2 +- pom.xml | 29 ++++++++++--------- 7 files changed, 24 insertions(+), 22 deletions(-) diff --git a/changelog/README.md b/changelog/README.md index 0a6ea0dccbf..5568d69f151 100644 --- a/changelog/README.md +++ b/changelog/README.md @@ -6,6 +6,7 @@ - [bug] JAVA-1751: Include defaultTimestamp length in encodedSize for protocol version >= 3. - [bug] JAVA-1770: Fix message size when using Custom Payload. - [documentation] JAVA-1760: Add metrics documentation. +- [improvement] JAVA-1765: Update dependencies to latest patch versions. ### 3.4.0 diff --git a/driver-core/pom.xml b/driver-core/pom.xml index b38c8338152..d17c42bbd53 100644 --- a/driver-core/pom.xml +++ b/driver-core/pom.xml @@ -74,8 +74,8 @@ - net.jpountz.lz4 - lz4 + org.lz4 + lz4-java true diff --git a/driver-tests/osgi/src/test/java/com/datastax/driver/osgi/BundleOptions.java b/driver-tests/osgi/src/test/java/com/datastax/driver/osgi/BundleOptions.java index e47b2caebcb..c3a93c9a9ed 100644 --- a/driver-tests/osgi/src/test/java/com/datastax/driver/osgi/BundleOptions.java +++ b/driver-tests/osgi/src/test/java/com/datastax/driver/osgi/BundleOptions.java @@ -76,7 +76,7 @@ public static CompositeOption lz4Bundle() { public Option[] getOptions() { return options( systemProperty("cassandra.compression").value(ProtocolOptions.Compression.LZ4.name()), - mavenBundle("net.jpountz.lz4", "lz4", getVersion("lz4.version")) + mavenBundle("org.lz4", "lz4-java", getVersion("lz4.version")) ); } }; diff --git a/manual/compression/README.md b/manual/compression/README.md index 1c2b73d080b..c6cd8c885df 100644 --- a/manual/compression/README.md +++ b/manual/compression/README.md @@ -25,9 +25,9 @@ Maven dependency: ```xml - net.jpountz.lz4 - lz4 - 1.3.0 + org.lz4 + lz4-java + 1.4.1 ``` diff --git a/manual/speculative_execution/README.md b/manual/speculative_execution/README.md index 38af43c2d7f..1160f8be2a2 100644 --- a/manual/speculative_execution/README.md +++ b/manual/speculative_execution/README.md @@ -117,7 +117,7 @@ explicitly depend on it: org.hdrhistogram HdrHistogram - 2.1.9 + 2.1.10 ``` diff --git a/manual/ssl/README.md b/manual/ssl/README.md index 1e8244d037e..72f70fceb90 100644 --- a/manual/ssl/README.md +++ b/manual/ssl/README.md @@ -153,7 +153,7 @@ add it to your dependencies. There are known runtime incompatibilities between newer versions of netty-tcnative and the version of netty that the driver uses. For best -results, use version 2.0.1.Final. +results, use version 2.0.7.Final. Using netty-tcnative requires JDK 1.7 or above and requires the presence of OpenSSL on the system. It will not fall back to the JDK implementation. diff --git a/pom.xml b/pom.xml index 07a031e7d10..aa08d7c463f 100644 --- a/pom.xml +++ b/pom.xml @@ -45,25 +45,26 @@ UTF-8 UTF-8 - 3.11.1 + 3.11.2 1.6 1.2.17 1.7.25 1.7.25 19.0 - 4.0.47.Final + 4.0.56.Final + 2.0.7.Final 3.2.2 1.1.2.6 - 1.3.0 - 2.1.9 - 2.8.8 + 1.4.1 + 2.1.10 + 2.8.11 - 2.7.9.2 - 2.9.1 + 2.7.9.3 + 2.9.9 1.0 1.0.4 - 2.0.7 - 3.0.27 + 2.0.9 + 3.0.44 2.4.7 2.0.1 2.23.1 @@ -165,8 +166,8 @@ - net.jpountz.lz4 - lz4 + org.lz4 + lz4-java ${lz4.version} @@ -341,7 +342,7 @@ io.netty netty-tcnative - 2.0.1.Final + ${netty-tcnative.version} ${os.detected.classifier} @@ -515,8 +516,8 @@ ${snappy.version} - net.jpountz.lz4 - lz4 + org.lz4 + lz4-java ${lz4.version} From f5d44d3fea093a5c28d59ba0f587e9044e161813 Mon Sep 17 00:00:00 2001 From: Alexandre Dutra Date: Wed, 28 Feb 2018 15:12:08 +0100 Subject: [PATCH 008/317] JAVA-1752: Deprecate DowngradingConsistencyRetryPolicy (#962) --- changelog/README.md | 1 + .../exceptions/QueryConsistencyException.java | 2 +- .../core/exceptions/ReadTimeoutException.java | 4 +- .../DowngradingConsistencyRetryPolicy.java | 18 +- ...ConsistencyRetryPolicyIntegrationTest.java | 1 + .../examples/retry/DowngradingRetry.java | 473 ++++++++++++++++++ upgrade_guide/README.md | 59 ++- 7 files changed, 539 insertions(+), 19 deletions(-) create mode 100644 driver-examples/src/main/java/com/datastax/driver/examples/retry/DowngradingRetry.java diff --git a/changelog/README.md b/changelog/README.md index 5568d69f151..24d4e4e912c 100644 --- a/changelog/README.md +++ b/changelog/README.md @@ -7,6 +7,7 @@ - [bug] JAVA-1770: Fix message size when using Custom Payload. - [documentation] JAVA-1760: Add metrics documentation. - [improvement] JAVA-1765: Update dependencies to latest patch versions. +- [improvement] JAVA-1752: Deprecate DowngradingConsistencyRetryPolicy. ### 3.4.0 diff --git a/driver-core/src/main/java/com/datastax/driver/core/exceptions/QueryConsistencyException.java b/driver-core/src/main/java/com/datastax/driver/core/exceptions/QueryConsistencyException.java index 8e03844d8b7..434358c6ecd 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/exceptions/QueryConsistencyException.java +++ b/driver-core/src/main/java/com/datastax/driver/core/exceptions/QueryConsistencyException.java @@ -65,7 +65,7 @@ public ConsistencyLevel getConsistencyLevel() { } /** - * The number of replica that had acknowledged/responded to the operation + * The number of replicas that had acknowledged/responded to the operation * before it failed. * * @return the number of replica that had acknowledged/responded the diff --git a/driver-core/src/main/java/com/datastax/driver/core/exceptions/ReadTimeoutException.java b/driver-core/src/main/java/com/datastax/driver/core/exceptions/ReadTimeoutException.java index f3de64a9e7a..09885c16b04 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/exceptions/ReadTimeoutException.java +++ b/driver-core/src/main/java/com/datastax/driver/core/exceptions/ReadTimeoutException.java @@ -66,8 +66,8 @@ else if (!dataPresent) *

* During reads, Cassandra doesn't request data from every replica to * minimize internal network traffic. Instead, some replicas are only asked - * for a checksum of the data. A read timeout may occurred even if enough - * replicas have responded to fulfill the consistency level if only checksum + * for a checksum of the data. A read timeout may have occurred even if enough + * replicas have responded to fulfill the consistency level, if only checksum * responses have been received. This method allows to detect that case. * * @return whether the data was amongst the received replica responses. diff --git a/driver-core/src/main/java/com/datastax/driver/core/policies/DowngradingConsistencyRetryPolicy.java b/driver-core/src/main/java/com/datastax/driver/core/policies/DowngradingConsistencyRetryPolicy.java index 597ed878864..f10610917d4 100644 --- a/driver-core/src/main/java/com/datastax/driver/core/policies/DowngradingConsistencyRetryPolicy.java +++ b/driver-core/src/main/java/com/datastax/driver/core/policies/DowngradingConsistencyRetryPolicy.java @@ -63,7 +63,7 @@ * on the information the Cassandra coordinator node returns, retrying the * operation with the initially requested consistency has a chance to * succeed, do it. Otherwise, if based on this information we know the - * initially requested consistency level cannot be achieve currently, then: + * initially requested consistency level cannot be achieved currently, then: *