Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@
@Group("sample.javaoperatorsdk")
@Version("v1")
@ShortNames("clu")
public class Cluster extends CustomResource<Void, Void> implements Namespaced {}
public class Cluster extends CustomResource<ClusterSpec, Void> implements Namespaced {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package io.javaoperatorsdk.operator.baseapi.primarytosecondary;

public class ClusterSpec {

private String clusterValue;

public String getClusterValue() {
return clusterValue;
}

public void setClusterValue(String clusterValue) {
this.clusterValue = clusterValue;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@
@Group("sample.javaoperatorsdk")
@Version("v1")
@ShortNames("cjo")
public class Job extends CustomResource<JobSpec, Void> implements Namespaced {}
public class Job extends CustomResource<JobSpec, JobStatus> implements Namespaced {}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* needed, and to show the use cases when some mechanisms would not work without that. It's not
* intended to be a reusable code as it is, rather serves for deeper understanding of the problem.
*/
@ControllerConfiguration()
@ControllerConfiguration
public class JobReconciler implements Reconciler<Job> {

private static final String JOB_CLUSTER_INDEX = "job-cluster-index";
Expand All @@ -38,26 +38,37 @@ public JobReconciler(boolean addPrimaryToSecondaryMapper) {

@Override
public UpdateControl<Job> reconcile(Job resource, Context<Job> context) {

Cluster cluster;
if (!getResourceDirectlyFromCache) {
// this is only possible when there is primary to secondary mapper
context
.getSecondaryResource(Cluster.class)
.orElseThrow(() -> new IllegalStateException("Secondary resource should be present"));
cluster =
context
.getSecondaryResource(Cluster.class)
.orElseThrow(() -> new IllegalStateException("Secondary resource should be present"));
} else {
// reading the resource from cache as alternative, works without primary to secondary mapper
var informerEventSource =
(InformerEventSource<Cluster, Job>)
context.eventSourceRetriever().getEventSourceFor(Cluster.class);
informerEventSource
.get(
new ResourceID(
resource.getSpec().getClusterName(), resource.getMetadata().getNamespace()))
.orElseThrow(
() -> new IllegalStateException("Secondary resource cannot be read from cache"));
cluster =
informerEventSource
.get(
new ResourceID(
resource.getSpec().getClusterName(), resource.getMetadata().getNamespace()))
.orElseThrow(
() -> new IllegalStateException("Secondary resource cannot be read from cache"));
}
if (resource.getStatus() == null) {
resource.setStatus(new JobStatus());
}
numberOfExecutions.addAndGet(1);
return UpdateControl.noUpdate();
// copy a value to job status, to we can test triggering
if (!cluster.getSpec().getClusterValue().equals(resource.getStatus().getValueFromCluster())) {
resource.getStatus().setValueFromCluster(cluster.getSpec().getClusterValue());
return UpdateControl.patchStatus(resource);
} else {
return UpdateControl.noUpdate();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package io.javaoperatorsdk.operator.baseapi.primarytosecondary;

public class JobStatus {

private String valueFromCluster;

public String getValueFromCluster() {
return valueFromCluster;
}

public void setValueFromCluster(String valueFromCluster) {
this.valueFromCluster = valueFromCluster;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,31 +16,50 @@ class PrimaryToSecondaryIT {
public static final String CLUSTER_NAME = "cluster1";
public static final int MIN_DELAY = 150;

public static final String CLUSTER_VALUE = "clusterValue";
public static final String JOB_1 = "job1";
public static final String CHANGED_VALUE = "CHANGED_VALUE";

@RegisterExtension
LocallyRunOperatorExtension operator =
LocallyRunOperatorExtension extension =
LocallyRunOperatorExtension.builder()
.withAdditionalCustomResourceDefinition(Cluster.class)
.withReconciler(new JobReconciler())
.build();

@Test
void readsSecondaryInManyToOneCases() throws InterruptedException {
operator.create(cluster());
var cluster = extension.create(cluster());
Thread.sleep(MIN_DELAY);
operator.create(job());
extension.create(job());

await()
.pollDelay(Duration.ofMillis(300))
.untilAsserted(
() -> {
assertThat(extension.getReconcilerOfType(JobReconciler.class).getNumberOfExecutions())
.isEqualTo(1);
var job = extension.get(Job.class, JOB_1);
assertThat(job.getStatus()).isNotNull();
assertThat(job.getStatus().getValueFromCluster()).isEqualTo(CLUSTER_VALUE);
});

cluster.getSpec().setClusterValue(CHANGED_VALUE);
extension.replace(cluster);

// cluster change triggers job reconciliations
await()
.pollDelay(Duration.ofMillis(300))
.untilAsserted(
() ->
assertThat(
operator.getReconcilerOfType(JobReconciler.class).getNumberOfExecutions())
.isEqualTo(1));
() -> {
var job = extension.get(Job.class, JOB_1);
assertThat(job.getStatus().getValueFromCluster()).isEqualTo(CHANGED_VALUE);
});
}

public static Job job() {
var job = new Job();
job.setMetadata(new ObjectMetaBuilder().withName("job1").build());
job.setMetadata(new ObjectMetaBuilder().withName(JOB_1).build());
job.setSpec(new JobSpec());
job.getSpec().setClusterName(CLUSTER_NAME);
return job;
Expand All @@ -49,6 +68,8 @@ public static Job job() {
public static Cluster cluster() {
Cluster cluster = new Cluster();
cluster.setMetadata(new ObjectMetaBuilder().withName(CLUSTER_NAME).build());
cluster.setSpec(new ClusterSpec());
cluster.getSpec().setClusterValue(CLUSTER_VALUE);
return cluster;
}
}