Skip to content

Commit 384cc54

Browse files
authored
Add core coordination algorithm for cluster state publishing (#32171)
Adds the core coordination algorithm that guarantees safety for cluster state publishing. This is a straight-forward port of the transition rules from the formal model at https://github.com/elastic/elasticsearch-formal-models/blob/master/ZenWithTerms/tla/ZenWithTerms.tla
1 parent ad78f73 commit 384cc54

13 files changed

+2090
-2
lines changed

server/src/main/java/org/elasticsearch/ElasticsearchException.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.elasticsearch.action.support.replication.ReplicationOperation;
2323
import org.elasticsearch.cluster.action.shard.ShardStateAction;
24+
import org.elasticsearch.cluster.coordination.CoordinationStateRejectedException;
2425
import org.elasticsearch.common.CheckedFunction;
2526
import org.elasticsearch.common.Nullable;
2627
import org.elasticsearch.common.ParseField;
@@ -1024,8 +1025,10 @@ private enum ElasticsearchExceptionHandle {
10241025
UNKNOWN_NAMED_OBJECT_EXCEPTION(org.elasticsearch.common.xcontent.UnknownNamedObjectException.class,
10251026
org.elasticsearch.common.xcontent.UnknownNamedObjectException::new, 148, Version.V_5_2_0),
10261027
TOO_MANY_BUCKETS_EXCEPTION(MultiBucketConsumerService.TooManyBucketsException.class,
1027-
MultiBucketConsumerService.TooManyBucketsException::new, 149,
1028-
Version.V_7_0_0_alpha1);
1028+
MultiBucketConsumerService.TooManyBucketsException::new, 149, Version.V_7_0_0_alpha1),
1029+
COORDINATION_STATE_REJECTED_EXCEPTION(CoordinationStateRejectedException.class,
1030+
CoordinationStateRejectedException::new, 150, Version.V_7_0_0_alpha1);
1031+
10291032

10301033
final Class<? extends ElasticsearchException> exceptionClass;
10311034
final CheckedFunction<StreamInput, ? extends ElasticsearchException, IOException> constructor;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.cluster.coordination;
20+
21+
import org.elasticsearch.cluster.node.DiscoveryNode;
22+
import org.elasticsearch.common.io.stream.StreamInput;
23+
import org.elasticsearch.common.io.stream.StreamOutput;
24+
25+
import java.io.IOException;
26+
27+
/**
28+
* A master node sends this request to its peers to inform them that it could commit the
29+
* cluster state with the given term and version. Peers that have accepted the given cluster
30+
* state will then consider it as committed and proceed to apply the state locally.
31+
*/
32+
public class ApplyCommitRequest extends TermVersionRequest {
33+
34+
public ApplyCommitRequest(DiscoveryNode sourceNode, long term, long version) {
35+
super(sourceNode, term, version);
36+
}
37+
38+
public ApplyCommitRequest(StreamInput in) throws IOException {
39+
super(in);
40+
}
41+
42+
@Override
43+
public void writeTo(StreamOutput out) throws IOException {
44+
super.writeTo(out);
45+
}
46+
47+
@Override
48+
public String toString() {
49+
return "ApplyCommitRequest{" +
50+
"term=" + term +
51+
", version=" + version +
52+
'}';
53+
}
54+
}

server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java

+485
Large diffs are not rendered by default.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.cluster.coordination;
21+
22+
import org.elasticsearch.ElasticsearchException;
23+
import org.elasticsearch.common.io.stream.StreamInput;
24+
25+
import java.io.IOException;
26+
27+
/**
28+
* This exception is thrown when rejecting state transitions on the {@link CoordinationState} object,
29+
* for example when receiving a publish request with the wrong term or version.
30+
* Occurrences of this exception don't always signal failures, but can often be just caused by the
31+
* asynchronous, distributed nature of the system. They will, for example, naturally happen during
32+
* leader election, if multiple nodes are trying to become leader at the same time.
33+
*/
34+
public class CoordinationStateRejectedException extends ElasticsearchException {
35+
public CoordinationStateRejectedException(String msg, Object... args) {
36+
super(msg, args);
37+
}
38+
39+
public CoordinationStateRejectedException(StreamInput in) throws IOException {
40+
super(in);
41+
}
42+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.cluster.coordination;
20+
21+
import org.elasticsearch.cluster.node.DiscoveryNode;
22+
import org.elasticsearch.common.io.stream.StreamInput;
23+
import org.elasticsearch.common.io.stream.StreamOutput;
24+
import org.elasticsearch.common.io.stream.Writeable;
25+
26+
import java.io.IOException;
27+
28+
/**
29+
* Triggered by a {@link StartJoinRequest}, instances of this class represent join votes,
30+
* and have a source and target node. The source node is the node that provides the vote,
31+
* and the target node is the node for which this vote is cast. A node will only cast
32+
* a single vote per term, and this for a unique target node. The vote also carries
33+
* information about the current state of the node that provided the vote, so that
34+
* the receiver of the vote can determine if it has a more up-to-date state than the
35+
* source node.
36+
*/
37+
public class Join implements Writeable {
38+
private final DiscoveryNode sourceNode;
39+
private final DiscoveryNode targetNode;
40+
private final long term;
41+
private final long lastAcceptedTerm;
42+
private final long lastAcceptedVersion;
43+
44+
public Join(DiscoveryNode sourceNode, DiscoveryNode targetNode, long term, long lastAcceptedTerm, long lastAcceptedVersion) {
45+
assert term >= 0;
46+
assert lastAcceptedTerm >= 0;
47+
assert lastAcceptedVersion >= 0;
48+
49+
this.sourceNode = sourceNode;
50+
this.targetNode = targetNode;
51+
this.term = term;
52+
this.lastAcceptedTerm = lastAcceptedTerm;
53+
this.lastAcceptedVersion = lastAcceptedVersion;
54+
}
55+
56+
public Join(StreamInput in) throws IOException {
57+
sourceNode = new DiscoveryNode(in);
58+
targetNode = new DiscoveryNode(in);
59+
term = in.readLong();
60+
lastAcceptedTerm = in.readLong();
61+
lastAcceptedVersion = in.readLong();
62+
}
63+
64+
@Override
65+
public void writeTo(StreamOutput out) throws IOException {
66+
sourceNode.writeTo(out);
67+
targetNode.writeTo(out);
68+
out.writeLong(term);
69+
out.writeLong(lastAcceptedTerm);
70+
out.writeLong(lastAcceptedVersion);
71+
}
72+
73+
public DiscoveryNode getSourceNode() {
74+
return sourceNode;
75+
}
76+
77+
public DiscoveryNode getTargetNode() {
78+
return targetNode;
79+
}
80+
81+
public long getLastAcceptedVersion() {
82+
return lastAcceptedVersion;
83+
}
84+
85+
public long getTerm() {
86+
return term;
87+
}
88+
89+
public long getLastAcceptedTerm() {
90+
return lastAcceptedTerm;
91+
}
92+
93+
@Override
94+
public String toString() {
95+
return "Join{" +
96+
"term=" + term +
97+
", lastAcceptedTerm=" + lastAcceptedTerm +
98+
", lastAcceptedVersion=" + lastAcceptedVersion +
99+
", sourceNode=" + sourceNode +
100+
", targetNode=" + targetNode +
101+
'}';
102+
}
103+
104+
@Override
105+
public boolean equals(Object o) {
106+
if (this == o) return true;
107+
if (o == null || getClass() != o.getClass()) return false;
108+
109+
Join join = (Join) o;
110+
111+
if (sourceNode.equals(join.sourceNode) == false) return false;
112+
if (targetNode.equals(join.targetNode) == false) return false;
113+
if (lastAcceptedVersion != join.lastAcceptedVersion) return false;
114+
if (term != join.term) return false;
115+
return lastAcceptedTerm == join.lastAcceptedTerm;
116+
}
117+
118+
@Override
119+
public int hashCode() {
120+
int result = (int) (lastAcceptedVersion ^ (lastAcceptedVersion >>> 32));
121+
result = 31 * result + sourceNode.hashCode();
122+
result = 31 * result + targetNode.hashCode();
123+
result = 31 * result + (int) (term ^ (term >>> 32));
124+
result = 31 * result + (int) (lastAcceptedTerm ^ (lastAcceptedTerm >>> 32));
125+
return result;
126+
}
127+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.cluster.coordination;
20+
21+
import org.elasticsearch.cluster.ClusterState;
22+
import org.elasticsearch.cluster.node.DiscoveryNode;
23+
import org.elasticsearch.common.io.stream.StreamInput;
24+
import org.elasticsearch.common.io.stream.StreamOutput;
25+
import org.elasticsearch.transport.TransportRequest;
26+
27+
import java.io.IOException;
28+
import java.util.Objects;
29+
30+
/**
31+
* Request which is used by the master node to publish cluster state changes.
32+
*/
33+
public class PublishRequest extends TransportRequest {
34+
35+
private final ClusterState acceptedState;
36+
37+
public PublishRequest(ClusterState acceptedState) {
38+
this.acceptedState = acceptedState;
39+
}
40+
41+
public PublishRequest(StreamInput in, DiscoveryNode localNode) throws IOException {
42+
super(in);
43+
acceptedState = ClusterState.readFrom(in, localNode);
44+
}
45+
46+
@Override
47+
public void writeTo(StreamOutput out) throws IOException {
48+
super.writeTo(out);
49+
acceptedState.writeTo(out);
50+
}
51+
52+
public ClusterState getAcceptedState() {
53+
return acceptedState;
54+
}
55+
56+
@Override
57+
public boolean equals(Object o) {
58+
if (this == o) return true;
59+
if (!(o instanceof PublishRequest)) return false;
60+
61+
PublishRequest that = (PublishRequest) o;
62+
63+
return acceptedState.term() == that.acceptedState.term() &&
64+
acceptedState.version() == that.acceptedState.version();
65+
}
66+
67+
@Override
68+
public int hashCode() {
69+
return Objects.hash(acceptedState.term(), acceptedState.version());
70+
}
71+
72+
@Override
73+
public String toString() {
74+
return "PublishRequest{term=" + acceptedState.term()
75+
+ ", version=" + acceptedState.version()
76+
+ ", state=" + acceptedState + '}';
77+
}
78+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.cluster.coordination;
20+
21+
import org.elasticsearch.common.io.stream.StreamInput;
22+
import org.elasticsearch.common.io.stream.StreamOutput;
23+
24+
import java.io.IOException;
25+
26+
/**
27+
* Response to a {@link PublishRequest}, carrying the term and version of the request.
28+
*/
29+
public class PublishResponse extends TermVersionResponse {
30+
31+
public PublishResponse(long term, long version) {
32+
super(term, version);
33+
}
34+
35+
public PublishResponse(StreamInput in) throws IOException {
36+
super(in);
37+
}
38+
39+
@Override
40+
public void writeTo(StreamOutput out) throws IOException {
41+
super.writeTo(out);
42+
}
43+
44+
@Override
45+
public String toString() {
46+
return "PublishResponse{" +
47+
"term=" + term +
48+
", version=" + version +
49+
'}';
50+
}
51+
}

0 commit comments

Comments
 (0)