Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Modify test case to update running job #124287

Merged
merged 6 commits into from
Mar 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.xpack.core.ml.action.GetRecordsAction;
import org.elasticsearch.xpack.core.ml.action.PutFilterAction;
import org.elasticsearch.xpack.core.ml.action.UpdateFilterAction;
import org.elasticsearch.xpack.core.ml.annotations.Annotation;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
Expand Down Expand Up @@ -98,7 +99,7 @@ public void testCondition() throws Exception {

// push the data for the first half buckets
postData(job.getId(), joinBetween(0, data.size() / 2, data));
closeJob(job.getId());
flushJob(job.getId(), true);

List<AnomalyRecord> records = getRecords(job.getId());
// remove records that are not anomalies
Expand All @@ -116,18 +117,35 @@ public void testCondition() throws Exception {
JobUpdate.Builder update = new JobUpdate.Builder(job.getId());
update.setDetectorUpdates(Arrays.asList(new JobUpdate.DetectorUpdate(0, null, Arrays.asList(newRule))));
updateJob(job.getId(), update.build());
// Wait until the notification that the job was updated is indexed
assertBusy(
() -> assertResponse(
prepareSearch(NotificationsIndex.NOTIFICATIONS_INDEX).setSize(1)
.addSort("timestamp", SortOrder.DESC)
.setQuery(
QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery("job_id", job.getId()))
.filter(QueryBuilders.termQuery("level", "info"))
),
searchResponse -> {
SearchHit[] hits = searchResponse.getHits().getHits();
assertThat(hits.length, equalTo(1));
assertThat((String) hits[0].getSourceAsMap().get("message"), containsString("Job updated: [detectors]"));
}
)
);
}

// push second half
openJob(job.getId());
postData(job.getId(), joinBetween(data.size() / 2, data.size(), data));
closeJob(job.getId());
flushJob(job.getId(), true);

GetRecordsAction.Request recordsAfterFirstHalf = new GetRecordsAction.Request(job.getId());
recordsAfterFirstHalf.setStart(String.valueOf(firstRecordTimestamp + 1));
records = getRecords(recordsAfterFirstHalf);
assertThat("records were " + records, (int) (records.stream().filter(r -> r.getProbability() < 0.01).count()), equalTo(1));
assertThat(records.get(0).getByFieldValue(), equalTo("low"));
closeJob(job.getId());
}

public void testScope() throws Exception {
Expand Down Expand Up @@ -242,7 +260,7 @@ public void testScope() throws Exception {
closeJob(job.getId());
}

public void testScopeAndCondition() throws IOException {
public void testScopeAndCondition() throws Exception {
// We have 2 IPs and they're both safe-listed.
List<String> ips = Arrays.asList("111.111.111.111", "222.222.222.222");
MlFilter safeIps = MlFilter.builder("safe_ips").setItems(ips).build();
Expand Down Expand Up @@ -298,11 +316,112 @@ public void testScopeAndCondition() throws IOException {
}

postData(job.getId(), joinBetween(0, data.size(), data));
closeJob(job.getId());
flushJob(job.getId(), true);

List<AnomalyRecord> records = getRecords(job.getId());
assertThat(records.size(), equalTo(1));
assertThat(records.get(0).getOverFieldValue(), equalTo("222.222.222.222"));

// Remove "111.111.111.111" from the "safe_ips" filter
List<String> addedIps = Arrays.asList();
List<String> removedIps = Arrays.asList("111.111.111.111");
PutFilterAction.Response updatedFilter = updateMlFilter("safe_ips", addedIps, removedIps);
// Wait until the notification that the filter was updated is indexed
assertBusy(
() -> assertResponse(
prepareSearch(NotificationsIndex.NOTIFICATIONS_INDEX).setSize(1)
.addSort("timestamp", SortOrder.DESC)
.setQuery(
QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery("job_id", job.getId()))
.filter(QueryBuilders.termQuery("level", "info"))
),
searchResponse -> {
SearchHit[] hits = searchResponse.getHits().getHits();
assertThat(hits.length, equalTo(1));
assertThat(
(String) hits[0].getSourceAsMap().get("message"),
containsString("Filter [safe_ips] has been modified; removed items: ['111.111.111.111']")
);
}
)
);
MlFilter updatedSafeIps = MlFilter.builder("safe_ips").setItems(Arrays.asList("222.222.222.222")).build();
assertThat(updatedFilter.getFilter(), equalTo(updatedSafeIps));

data.clear();
// Now send anomalous count of 9 for 111.111.111.111
for (int i = 0; i < 9; i++) {
data.add(createIpRecord(timestamp, "111.111.111.111"));
}

// Some more normal buckets
for (int bucket = 0; bucket < 3; bucket++) {
for (String ip : ips) {
data.add(createIpRecord(timestamp, ip));
}
timestamp += TimeValue.timeValueHours(1).getMillis();
}

postData(job.getId(), joinBetween(0, data.size(), data));
flushJob(job.getId(), true);

records = getRecords(job.getId());
assertThat(records.size(), equalTo(2));
assertThat(records.get(0).getOverFieldValue(), equalTo("222.222.222.222"));
assertThat(records.get(1).getOverFieldValue(), equalTo("111.111.111.111"));

{
// Update detection rules such that it now applies only to actual values > 10.0
DetectionRule newRule = new DetectionRule.Builder(
Arrays.asList(new RuleCondition(RuleCondition.AppliesTo.ACTUAL, Operator.GT, 10.0))
).build();
JobUpdate.Builder update = new JobUpdate.Builder(job.getId());
update.setDetectorUpdates(Arrays.asList(new JobUpdate.DetectorUpdate(0, null, Arrays.asList(newRule))));
updateJob(job.getId(), update.build());
// Wait until the notification that the job was updated is indexed
assertBusy(
() -> assertResponse(
prepareSearch(NotificationsIndex.NOTIFICATIONS_INDEX).setSize(1)
.addSort("timestamp", SortOrder.DESC)
.setQuery(
QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery("job_id", job.getId()))
.filter(QueryBuilders.termQuery("level", "info"))
),
searchResponse -> {
SearchHit[] hits = searchResponse.getHits().getHits();
assertThat(hits.length, equalTo(1));
assertThat((String) hits[0].getSourceAsMap().get("message"), containsString("Job updated: [detectors]"));
}
)
);
}

data.clear();
// Now send anomalous count of 10 for 222.222.222.222
for (int i = 0; i < 10; i++) {
data.add(createIpRecord(timestamp, "222.222.222.222"));
}

// Some more normal buckets
for (int bucket = 0; bucket < 3; bucket++) {
for (String ip : ips) {
data.add(createIpRecord(timestamp, ip));
}
timestamp += TimeValue.timeValueHours(1).getMillis();
}

postData(job.getId(), joinBetween(0, data.size(), data));

closeJob(job.getId());

// The anomalous records should not have changed.
records = getRecords(job.getId());
assertThat(records.size(), equalTo(2));
assertThat(records.get(0).getOverFieldValue(), equalTo("222.222.222.222"));
assertThat(records.get(1).getOverFieldValue(), equalTo("111.111.111.111"));

}

public void testForceTimeShiftAction() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import org.elasticsearch.xpack.core.ml.action.SetUpgradeModeAction;
import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.UpdateFilterAction;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState;
import org.elasticsearch.xpack.core.ml.inference.ModelAliasMetadata;
Expand Down Expand Up @@ -314,6 +315,13 @@ protected PutFilterAction.Response putMlFilter(MlFilter filter) {
return client().execute(PutFilterAction.INSTANCE, new PutFilterAction.Request(filter)).actionGet();
}

protected PutFilterAction.Response updateMlFilter(String filterId, List<String> addItems, List<String> removeItems) {
UpdateFilterAction.Request request = new UpdateFilterAction.Request(filterId);
request.setAddItems(addItems);
request.setRemoveItems(removeItems);
return client().execute(UpdateFilterAction.INSTANCE, request).actionGet();
}

protected static List<String> fetchAllAuditMessages(String jobId) throws Exception {
RefreshRequest refreshRequest = new RefreshRequest(NotificationsIndex.NOTIFICATIONS_INDEX);
BroadcastResponse refreshResponse = client().execute(RefreshAction.INSTANCE, refreshRequest).actionGet();
Expand Down