Skip to content

Commit d6d1f8b

Browse files
committed
Revert "Revert "fix: remove flushbatch""
This reverts commit df81605.
1 parent df81605 commit d6d1f8b

File tree

1 file changed

+31
-72
lines changed

1 file changed

+31
-72
lines changed

src/store/reducers/query/query.ts

+31-72
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ const slice = createSlice({
142142
state.result.queryId = chunk.meta.query_id;
143143
state.result.data.traceId = chunk.meta.trace_id;
144144
},
145-
addStreamingChunks: (state, action: PayloadAction<StreamDataChunk[]>) => {
145+
addStreamingChunk: (state, action: PayloadAction<StreamDataChunk>) => {
146146
if (!state.result) {
147147
return;
148148
}
@@ -151,7 +151,6 @@ const slice = createSlice({
151151
state.result.data = prepareQueryData(null);
152152
}
153153

154-
// Initialize speed metrics if not present
155154
if (!state.result.speedMetrics) {
156155
state.result.speedMetrics = {
157156
rowsPerSecond: 0,
@@ -161,45 +160,27 @@ const slice = createSlice({
161160
}
162161

163162
const currentTime = Date.now();
164-
let totalNewRows = 0;
165-
166-
const mergedStreamDataChunks = new Map<number, StreamDataChunk>();
167-
for (const chunk of action.payload) {
168-
const currentMergedChunk = mergedStreamDataChunks.get(chunk.meta.result_index);
169-
const chunkRowCount = (chunk.result.rows || []).length;
170-
totalNewRows += chunkRowCount;
171-
172-
if (currentMergedChunk) {
173-
if (!currentMergedChunk.result.rows) {
174-
currentMergedChunk.result.rows = [];
175-
}
176-
for (const row of chunk.result.rows || []) {
177-
currentMergedChunk.result.rows.push(row);
178-
}
179-
} else {
180-
mergedStreamDataChunks.set(chunk.meta.result_index, chunk);
181-
}
182-
}
163+
const chunk = action.payload;
183164

184165
// Update speed metrics
185166
const metrics = state.result.speedMetrics;
186167
metrics.recentChunks.push({
187168
timestamp: currentTime,
188-
rowCount: totalNewRows,
169+
rowCount: chunk.result.rows?.length || 0,
189170
});
190171

191172
// Keep only chunks from the last 5 seconds
192173
const WINDOW_SIZE = 5000; // 5 seconds in milliseconds
193174
metrics.recentChunks = metrics.recentChunks.filter(
194-
(chunk) => currentTime - chunk.timestamp <= WINDOW_SIZE,
175+
(_chunk) => currentTime - _chunk.timestamp <= WINDOW_SIZE,
195176
);
196177

197178
// Calculate moving average
198179
if (metrics.recentChunks.length > 0) {
199180
const oldestChunkTime = metrics.recentChunks[0].timestamp;
200181
const timeWindow = (currentTime - oldestChunkTime) / 1000; // Convert to seconds
201182
const totalRows = metrics.recentChunks.reduce(
202-
(sum, chunk) => sum + chunk.rowCount,
183+
(sum, _chunk) => sum + _chunk.rowCount,
203184
0,
204185
);
205186
metrics.rowsPerSecond = timeWindow > 0 ? totalRows / timeWindow : 0;
@@ -210,40 +191,38 @@ const slice = createSlice({
210191
if (!state.result.data.resultSets) {
211192
state.result.data.resultSets = [];
212193
}
194+
const resultIndex = chunk.meta.result_index;
213195

214-
for (const [resultIndex, chunk] of mergedStreamDataChunks.entries()) {
215-
const {columns, rows} = chunk.result;
196+
const {columns, rows} = chunk.result;
216197

217-
if (!state.result.data.resultSets[resultIndex]) {
218-
state.result.data.resultSets[resultIndex] = {
219-
columns: [],
220-
result: [],
221-
};
222-
}
198+
if (!state.result.data.resultSets[resultIndex]) {
199+
state.result.data.resultSets[resultIndex] = {
200+
columns: [],
201+
result: [],
202+
};
203+
}
223204

224-
if (columns && !state.result.data.resultSets[resultIndex].columns?.length) {
225-
state.result.data.resultSets[resultIndex].columns?.push(INDEX_COLUMN);
226-
for (const column of columns) {
227-
state.result.data.resultSets[resultIndex].columns?.push(column);
228-
}
205+
if (columns && !state.result.data.resultSets[resultIndex].columns?.length) {
206+
state.result.data.resultSets[resultIndex].columns?.push(INDEX_COLUMN);
207+
for (const column of columns) {
208+
state.result.data.resultSets[resultIndex].columns?.push(column);
229209
}
210+
}
230211

231-
const indexedRows = rows || [];
232-
const startIndex =
233-
state.result?.data?.resultSets?.[resultIndex].result?.length || 1;
212+
const indexedRows = rows || [];
213+
const startIndex = state.result?.data?.resultSets?.[resultIndex].result?.length || 1;
234214

235-
indexedRows.forEach((row, index) => {
236-
row.unshift(startIndex + index);
237-
});
215+
indexedRows.forEach((row, index) => {
216+
row.unshift(startIndex + index);
217+
});
238218

239-
const formattedRows = parseResult(
240-
indexedRows,
241-
state.result.data.resultSets[resultIndex].columns || [],
242-
);
219+
const formattedRows = parseResult(
220+
indexedRows,
221+
state.result.data.resultSets[resultIndex].columns || [],
222+
);
243223

244-
for (const row of formattedRows) {
245-
state.result.data.resultSets[resultIndex].result?.push(row);
246-
}
224+
for (const row of formattedRows) {
225+
state.result.data.resultSets[resultIndex].result?.push(row);
247226
}
248227
},
249228
setStreamQueryResponse: (state, action: PayloadAction<QueryResponseChunk>) => {
@@ -302,7 +281,7 @@ export const {
302281
goToNextQuery,
303282
setTenantPath,
304283
setQueryHistoryFilter,
305-
addStreamingChunks,
284+
addStreamingChunk,
306285
setStreamQueryResponse,
307286
setStreamSession,
308287
} = slice.actions;
@@ -348,17 +327,6 @@ export const queryApi = api.injectEndpoints({
348327
);
349328

350329
try {
351-
let streamDataChunkBatch: StreamDataChunk[] = [];
352-
let batchTimeout: number | null = null;
353-
354-
const flushBatch = () => {
355-
if (streamDataChunkBatch.length > 0) {
356-
dispatch(addStreamingChunks(streamDataChunkBatch));
357-
streamDataChunkBatch = [];
358-
}
359-
batchTimeout = null;
360-
};
361-
362330
await window.api.streaming.streamQuery(
363331
{
364332
query,
@@ -390,20 +358,11 @@ export const queryApi = api.injectEndpoints({
390358
dispatch(setStreamSession(chunk));
391359
},
392360
onStreamDataChunk: (chunk) => {
393-
streamDataChunkBatch.push(chunk);
394-
if (!batchTimeout) {
395-
batchTimeout = window.requestAnimationFrame(flushBatch);
396-
}
361+
dispatch(addStreamingChunk(chunk));
397362
},
398363
},
399364
);
400365

401-
// Flush any remaining chunks
402-
if (batchTimeout) {
403-
window.cancelAnimationFrame(batchTimeout);
404-
flushBatch();
405-
}
406-
407366
return {data: null};
408367
} catch (error) {
409368
const state = getState() as RootState;

0 commit comments

Comments
 (0)