Skip to content

Commit 6472dce

Browse files
authored
[FLINK-28087] Add validation for the meta.name of FlinkDeployment CR
1 parent 8d8e7a1 commit 6472dce

File tree

3 files changed

+26
-1
lines changed

3 files changed

+26
-1
lines changed

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java

+16-1
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,13 @@
4848
import java.util.Map;
4949
import java.util.Optional;
5050
import java.util.Set;
51+
import java.util.regex.Matcher;
52+
import java.util.regex.Pattern;
5153

5254
/** Default validator implementation for {@link FlinkDeployment}. */
5355
public class DefaultValidator implements FlinkResourceValidator {
54-
56+
private static final Pattern DEPLOYMENT_NAME_PATTERN =
57+
Pattern.compile("[a-z]([-a-z\\d]{0,43}[a-z\\d])?");
5558
private static final String[] FORBIDDEN_CONF_KEYS =
5659
new String[] {
5760
KubernetesConfigOptions.NAMESPACE.key(), KubernetesConfigOptions.CLUSTER_ID.key()
@@ -77,6 +80,7 @@ public Optional<String> validateDeployment(FlinkDeployment deployment) {
7780
effectiveConfig.putAll(spec.getFlinkConfiguration());
7881
}
7982
return firstPresent(
83+
validateDeploymentName(deployment.getMetadata().getName()),
8084
validateFlinkVersion(spec.getFlinkVersion()),
8185
validateFlinkDeploymentConfig(effectiveConfig),
8286
validateIngress(
@@ -100,6 +104,17 @@ private static Optional<String> firstPresent(Optional<String>... errOpts) {
100104
return Optional.empty();
101105
}
102106

107+
private Optional<String> validateDeploymentName(String name) {
108+
Matcher matcher = DEPLOYMENT_NAME_PATTERN.matcher(name);
109+
if (!matcher.matches()) {
110+
return Optional.of(
111+
String.format(
112+
"The FlinkDeployment name: %s is invalid, must consist of lower case alphanumeric characters or '-', start with an alphabetic character, and end with an alphanumeric character (e.g. 'my-name', or 'abc-123'), and the length must be no more than 45 characters.",
113+
name));
114+
}
115+
return Optional.empty();
116+
}
117+
103118
private Optional<String> validateFlinkVersion(FlinkVersion version) {
104119
if (version == null) {
105120
return Optional.of("Flink Version must be defined.");

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java

+6
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,12 @@ public class DefaultValidatorTest {
6868
public void testValidationWithoutDefaultConfig() {
6969
testSuccess(dep -> {});
7070

71+
// Test meta.name
72+
testSuccess(dep -> dep.getMetadata().setName("session-cluster"));
73+
testError(
74+
dep -> dep.getMetadata().setName("session-cluster-1.13"),
75+
"The FlinkDeployment name: session-cluster-1.13 is invalid, must consist of lower case alphanumeric characters or '-', start with an alphabetic character, and end with an alphanumeric character (e.g. 'my-name', or 'abc-123'), and the length must be no more than 45 characters.");
76+
7177
// Test job validation
7278
testError(dep -> dep.getSpec().getJob().setJarURI(null), "Jar URI must be defined");
7379
testError(

flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/AdmissionHandlerTest.java

+4
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import com.fasterxml.jackson.databind.ObjectMapper;
3838
import com.fasterxml.jackson.databind.exc.MismatchedInputException;
3939
import io.fabric8.kubernetes.api.model.GroupVersionKind;
40+
import io.fabric8.kubernetes.api.model.ObjectMeta;
4041
import io.fabric8.kubernetes.api.model.admission.v1.AdmissionRequest;
4142
import io.fabric8.kubernetes.api.model.admission.v1.AdmissionReview;
4243
import org.junit.jupiter.api.Assertions;
@@ -99,6 +100,9 @@ public void testHandleValidateRequestWithoutContent() {
99100
public void testHandleValidateRequestWithAdmissionReview() throws IOException {
100101
final EmbeddedChannel embeddedChannel = new EmbeddedChannel(admissionHandler);
101102
final FlinkDeployment flinkDeployment = new FlinkDeployment();
103+
ObjectMeta objectMeta = new ObjectMeta();
104+
objectMeta.setName("basic-session-cluster");
105+
flinkDeployment.setMetadata(objectMeta);
102106
flinkDeployment.setSpec(new FlinkDeploymentSpec());
103107
final AdmissionRequest admissionRequest = new AdmissionRequest();
104108
admissionRequest.setOperation(CREATE.name());

0 commit comments

Comments
 (0)