Skip to content

Commit faaff56

Browse files
authored
[FLINK-33187] Use hashcode to deduplicate scaling events
1 parent cc680e1 commit faaff56

File tree

14 files changed

+791
-237
lines changed

14 files changed

+791
-237
lines changed

docs/layouts/shortcodes/generated/auto_scaler_configuration.html

+1-1
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@
117117
<td>Enable vertex scaling execution by the autoscaler. If disabled, the autoscaler will only collect metrics and evaluate the suggested parallelism for each vertex but will not upgrade the jobs.</td>
118118
</tr>
119119
<tr>
120-
<td><h5>job.autoscaler.scaling.report.interval</h5></td>
120+
<td><h5>job.autoscaler.scaling.event.interval</h5></td>
121121
<td style="word-wrap: break-word;">30 min</td>
122122
<td>Duration</td>
123123
<td>Time interval to resend the identical event</td>

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import static org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
4040
import static org.apache.flink.autoscaler.config.AutoScalerOptions.MAX_SCALE_UP_FACTOR;
4141
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALE_UP_GRACE_PERIOD;
42+
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_EVENT_INTERVAL;
4243
import static org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
4344
import static org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM;
4445
import static org.apache.flink.autoscaler.config.AutoScalerOptions.VERTEX_MIN_PARALLELISM;
@@ -219,7 +220,7 @@ private boolean detectIneffectiveScaleUp(
219220
INEFFECTIVE_SCALING,
220221
message,
221222
null,
222-
null);
223+
conf.get(SCALING_EVENT_INTERVAL));
223224

224225
if (conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED)) {
225226
LOG.warn(

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java

+7-42
Original file line numberDiff line numberDiff line change
@@ -38,23 +38,15 @@
3838
import java.util.SortedMap;
3939

4040
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_ENABLED;
41+
import static org.apache.flink.autoscaler.config.AutoScalerOptions.SCALING_EVENT_INTERVAL;
4142
import static org.apache.flink.autoscaler.metrics.ScalingHistoryUtils.addToScalingHistoryAndStore;
4243
import static org.apache.flink.autoscaler.metrics.ScalingHistoryUtils.getTrimmedScalingHistory;
43-
import static org.apache.flink.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE;
4444
import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_DOWN_RATE_THRESHOLD;
4545
import static org.apache.flink.autoscaler.metrics.ScalingMetric.SCALE_UP_RATE_THRESHOLD;
46-
import static org.apache.flink.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE;
4746
import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
4847

4948
/** Class responsible for executing scaling decisions. */
5049
public class ScalingExecutor<KEY, Context extends JobAutoScalerContext<KEY>> {
51-
public static final String SCALING_SUMMARY_ENTRY =
52-
" Vertex ID %s | Parallelism %d -> %d | Processing capacity %.2f -> %.2f | Target data rate %.2f";
53-
public static final String SCALING_SUMMARY_HEADER_SCALING_DISABLED =
54-
"Recommended parallelism change:";
55-
public static final String SCALING_SUMMARY_HEADER_SCALING_ENABLED = "Scaling vertices:";
56-
@VisibleForTesting static final String SCALING_REPORT_REASON = "ScalingReport";
57-
5850
private static final Logger LOG = LoggerFactory.getLogger(ScalingExecutor.class);
5951

6052
private final JobVertexScaler<KEY, Context> jobVertexScaler;
@@ -100,18 +92,11 @@ public boolean scaleResource(
10092

10193
updateRecommendedParallelism(evaluatedMetrics, scalingSummaries);
10294

103-
var scalingEnabled = conf.get(SCALING_ENABLED);
104-
105-
var scalingReport = scalingReport(scalingSummaries, scalingEnabled);
106-
autoScalerEventHandler.handleEvent(
107-
context,
108-
AutoScalerEventHandler.Type.Normal,
109-
SCALING_REPORT_REASON,
110-
scalingReport,
111-
"ScalingExecutor",
112-
scalingEnabled ? null : conf.get(AutoScalerOptions.SCALING_REPORT_INTERVAL));
95+
var scaleEnabled = conf.get(SCALING_ENABLED);
96+
autoScalerEventHandler.handleScalingEvent(
97+
context, scalingSummaries, scaleEnabled, conf.get(SCALING_EVENT_INTERVAL));
11398

114-
if (!scalingEnabled) {
99+
if (!scaleEnabled) {
115100
return false;
116101
}
117102

@@ -136,27 +121,6 @@ private void updateRecommendedParallelism(
136121
scalingSummary.getNewParallelism())));
137122
}
138123

139-
private static String scalingReport(
140-
Map<JobVertexID, ScalingSummary> scalingSummaries, boolean scalingEnabled) {
141-
StringBuilder sb =
142-
new StringBuilder(
143-
scalingEnabled
144-
? SCALING_SUMMARY_HEADER_SCALING_ENABLED
145-
: SCALING_SUMMARY_HEADER_SCALING_DISABLED);
146-
scalingSummaries.forEach(
147-
(v, s) ->
148-
sb.append(
149-
String.format(
150-
SCALING_SUMMARY_ENTRY,
151-
v,
152-
s.getCurrentParallelism(),
153-
s.getNewParallelism(),
154-
s.getMetrics().get(TRUE_PROCESSING_RATE).getAverage(),
155-
s.getMetrics().get(EXPECTED_PROCESSING_RATE).getCurrent(),
156-
s.getMetrics().get(TARGET_DATA_RATE).getAverage())));
157-
return sb.toString();
158-
}
159-
160124
protected static boolean allVerticesWithinUtilizationTarget(
161125
Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> evaluatedMetrics,
162126
Map<JobVertexID, ScalingSummary> scalingSummaries) {
@@ -190,7 +154,8 @@ protected static boolean allVerticesWithinUtilizationTarget(
190154
return true;
191155
}
192156

193-
private Map<JobVertexID, ScalingSummary> computeScalingSummary(
157+
@VisibleForTesting
158+
Map<JobVertexID, ScalingSummary> computeScalingSummary(
194159
Context context,
195160
Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> evaluatedMetrics,
196161
Map<JobVertexID, SortedMap<Instant, ScalingSummary>> scalingHistory) {

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -231,10 +231,11 @@ private static ConfigOptions.OptionBuilder autoScalerConfig(String key) {
231231
.withDescription(
232232
"A (semicolon-separated) list of vertex ids in hexstring for which to disable scaling. Caution: For non-sink vertices this will still scale their downstream operators until https://issues.apache.org/jira/browse/FLINK-31215 is implemented.");
233233

234-
public static final ConfigOption<Duration> SCALING_REPORT_INTERVAL =
235-
autoScalerConfig("scaling.report.interval")
234+
public static final ConfigOption<Duration> SCALING_EVENT_INTERVAL =
235+
autoScalerConfig("scaling.event.interval")
236236
.durationType()
237237
.defaultValue(Duration.ofSeconds(1800))
238+
.withDeprecatedKeys(deprecatedOperatorConfigKey("scaling.event.interval"))
238239
.withDescription("Time interval to resend the identical event");
239240

240241
public static final ConfigOption<Duration> FLINK_CLIENT_TIMEOUT =

flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java

+58-2
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,17 @@
1919

2020
import org.apache.flink.annotation.Experimental;
2121
import org.apache.flink.autoscaler.JobAutoScalerContext;
22+
import org.apache.flink.autoscaler.ScalingSummary;
23+
import org.apache.flink.runtime.jobgraph.JobVertexID;
2224

2325
import javax.annotation.Nullable;
2426

2527
import java.time.Duration;
28+
import java.util.Map;
29+
30+
import static org.apache.flink.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE;
31+
import static org.apache.flink.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE;
32+
import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
2633

2734
/**
2835
* Handler for autoscaler events.
@@ -32,12 +39,17 @@
3239
*/
3340
@Experimental
3441
public interface AutoScalerEventHandler<KEY, Context extends JobAutoScalerContext<KEY>> {
42+
String SCALING_SUMMARY_ENTRY =
43+
" Vertex ID %s | Parallelism %d -> %d | Processing capacity %.2f -> %.2f | Target data rate %.2f";
44+
String SCALING_SUMMARY_HEADER_SCALING_DISABLED = "Recommended parallelism change:";
45+
String SCALING_SUMMARY_HEADER_SCALING_ENABLED = "Scaling vertices:";
46+
String SCALING_REPORT_REASON = "ScalingReport";
47+
String SCALING_REPORT_KEY = "ScalingExecutor";
3548

3649
/**
3750
* Handle the event.
3851
*
39-
* @param interval When interval is great than 0, events that repeat within the interval will be
40-
* ignored.
52+
* @param interval Define the interval to suppress duplicate events. No dedupe if null.
4153
*/
4254
void handleEvent(
4355
Context context,
@@ -47,6 +59,50 @@ void handleEvent(
4759
@Nullable String messageKey,
4860
@Nullable Duration interval);
4961

62+
/**
63+
* Handle scaling reports.
64+
*
65+
* @param interval Define the interval to suppress duplicate events.
66+
* @param scaled Whether AutoScaler actually scaled the Flink job or just generate advice for
67+
* scaling.
68+
*/
69+
default void handleScalingEvent(
70+
Context context,
71+
Map<JobVertexID, ScalingSummary> scalingSummaries,
72+
boolean scaled,
73+
Duration interval) {
74+
// Provide default implementation without proper deduplication
75+
var scalingReport = scalingReport(scalingSummaries, scaled);
76+
handleEvent(
77+
context,
78+
Type.Normal,
79+
SCALING_REPORT_REASON,
80+
scalingReport,
81+
SCALING_REPORT_KEY,
82+
interval);
83+
}
84+
85+
static String scalingReport(
86+
Map<JobVertexID, ScalingSummary> scalingSummaries, boolean scalingEnabled) {
87+
StringBuilder sb =
88+
new StringBuilder(
89+
scalingEnabled
90+
? SCALING_SUMMARY_HEADER_SCALING_ENABLED
91+
: SCALING_SUMMARY_HEADER_SCALING_DISABLED);
92+
scalingSummaries.forEach(
93+
(v, s) ->
94+
sb.append(
95+
String.format(
96+
SCALING_SUMMARY_ENTRY,
97+
v,
98+
s.getCurrentParallelism(),
99+
s.getNewParallelism(),
100+
s.getMetrics().get(TRUE_PROCESSING_RATE).getAverage(),
101+
s.getMetrics().get(EXPECTED_PROCESSING_RATE).getCurrent(),
102+
s.getMetrics().get(TARGET_DATA_RATE).getAverage())));
103+
return sb.toString();
104+
}
105+
50106
/** The type of the events. */
51107
enum Type {
52108
Normal,

flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java

+33
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,39 @@ public void testSendingIneffectiveScalingEvents() {
358358
assertThat(event.getMessage())
359359
.isEqualTo(String.format(INEFFECTIVE_MESSAGE_FORMAT, jobVertexID));
360360
assertThat(event.getReason()).isEqualTo(INEFFECTIVE_SCALING);
361+
assertEquals(1, event.getCount());
362+
363+
// Repeat ineffective scale with default interval, no event is triggered
364+
assertEquals(
365+
20,
366+
vertexScaler.computeScaleTargetParallelism(
367+
context, jobVertexID, evaluated, history));
368+
assertFalse(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
369+
assertEquals(0, eventCollector.events.size());
370+
371+
// Repeat ineffective scale with postive interval, no event is triggered
372+
conf.set(AutoScalerOptions.SCALING_EVENT_INTERVAL, Duration.ofSeconds(1800));
373+
assertEquals(
374+
20,
375+
vertexScaler.computeScaleTargetParallelism(
376+
context, jobVertexID, evaluated, history));
377+
assertFalse(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
378+
assertEquals(0, eventCollector.events.size());
379+
380+
// Ineffective scale with interval set to 0, an event is triggered
381+
conf.set(AutoScalerOptions.SCALING_EVENT_INTERVAL, Duration.ZERO);
382+
assertEquals(
383+
20,
384+
vertexScaler.computeScaleTargetParallelism(
385+
context, jobVertexID, evaluated, history));
386+
assertFalse(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
387+
assertEquals(1, eventCollector.events.size());
388+
event = eventCollector.events.poll();
389+
assertThat(event).isNotNull();
390+
assertThat(event.getMessage())
391+
.isEqualTo(String.format(INEFFECTIVE_MESSAGE_FORMAT, jobVertexID));
392+
assertThat(event.getReason()).isEqualTo(INEFFECTIVE_SCALING);
393+
assertEquals(2, event.getCount());
361394
}
362395

363396
private Map<ScalingMetric, EvaluatedScalingMetric> evaluated(

flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java

+13-13
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,11 @@
3737
import java.util.Map;
3838
import java.util.stream.Collectors;
3939

40-
import static org.apache.flink.autoscaler.ScalingExecutor.SCALING_SUMMARY_ENTRY;
4140
import static org.apache.flink.autoscaler.TestingAutoscalerUtils.createDefaultJobAutoScalerContext;
41+
import static org.apache.flink.autoscaler.event.AutoScalerEventHandler.SCALING_REPORT_REASON;
42+
import static org.apache.flink.autoscaler.event.AutoScalerEventHandler.SCALING_SUMMARY_ENTRY;
43+
import static org.apache.flink.autoscaler.event.AutoScalerEventHandler.SCALING_SUMMARY_HEADER_SCALING_DISABLED;
44+
import static org.apache.flink.autoscaler.event.AutoScalerEventHandler.SCALING_SUMMARY_HEADER_SCALING_ENABLED;
4245
import static org.assertj.core.api.Assertions.assertThat;
4346
import static org.junit.jupiter.api.Assertions.assertEquals;
4447
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -151,19 +154,20 @@ public void testVertexesExclusionForScaling() throws Exception {
151154

152155
@ParameterizedTest
153156
@ValueSource(booleans = {true, false})
154-
public void testScalingEventsWith0Interval(boolean scalingEnabled) throws Exception {
157+
public void testScalingEventsWith0IntervalConfig(boolean scalingEnabled) throws Exception {
155158
testScalingEvents(scalingEnabled, Duration.ofSeconds(0));
156159
}
157160

158161
@ParameterizedTest
159162
@ValueSource(booleans = {true, false})
160-
public void testScalingEventsWithInterval(boolean scalingEnabled) throws Exception {
163+
public void testScalingEventsWithIntervalConfig(boolean scalingEnabled) throws Exception {
161164
testScalingEvents(scalingEnabled, Duration.ofSeconds(1800));
162165
}
163166

164167
@ParameterizedTest
165168
@ValueSource(booleans = {true, false})
166-
public void testScalingEventsWithDefaultInterval(boolean scalingEnabled) throws Exception {
169+
public void testScalingEventsWithDefaultIntervalConfig(boolean scalingEnabled)
170+
throws Exception {
167171
testScalingEvents(scalingEnabled, null);
168172
}
169173

@@ -175,17 +179,13 @@ private void testScalingEvents(boolean scalingEnabled, Duration interval) throws
175179
var metrics = Map.of(jobVertexID, evaluated(1, 110, 100));
176180

177181
if (interval != null) {
178-
conf.set(AutoScalerOptions.SCALING_REPORT_INTERVAL, interval);
182+
conf.set(AutoScalerOptions.SCALING_EVENT_INTERVAL, interval);
179183
}
180184

181185
assertEquals(scalingEnabled, scalingDecisionExecutor.scaleResource(context, metrics));
182186
assertEquals(scalingEnabled, scalingDecisionExecutor.scaleResource(context, metrics));
183187

184-
int expectedSize =
185-
(interval == null || (!interval.isNegative() && !interval.isZero()))
186-
&& !scalingEnabled
187-
? 1
188-
: 2;
188+
int expectedSize = (interval == null || interval.toMillis() > 0) && !scalingEnabled ? 1 : 2;
189189
assertEquals(expectedSize, eventCollector.events.size());
190190

191191
TestingEventCollector.Event<JobID, JobAutoScalerContext<JobID>> event;
@@ -208,9 +208,9 @@ private void testScalingEvents(boolean scalingEnabled, Duration interval) throws
208208
event.getMessage()
209209
.contains(
210210
scalingEnabled
211-
? ScalingExecutor.SCALING_SUMMARY_HEADER_SCALING_ENABLED
212-
: ScalingExecutor.SCALING_SUMMARY_HEADER_SCALING_DISABLED));
213-
assertEquals(ScalingExecutor.SCALING_REPORT_REASON, event.getReason());
211+
? SCALING_SUMMARY_HEADER_SCALING_ENABLED
212+
: SCALING_SUMMARY_HEADER_SCALING_DISABLED));
213+
assertEquals(SCALING_REPORT_REASON, event.getReason());
214214

215215
metrics = Map.of(jobVertexID, evaluated(1, 110, 101));
216216

0 commit comments

Comments
 (0)