Skip to content

Commit df81605

Browse files
committed
Revert "fix: remove flushbatch"
This reverts commit 58b5823.
1 parent 58b5823 commit df81605

File tree

1 file changed

+72
-31
lines changed

1 file changed

+72
-31
lines changed

src/store/reducers/query/query.ts

+72-31
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ const slice = createSlice({
142142
state.result.queryId = chunk.meta.query_id;
143143
state.result.data.traceId = chunk.meta.trace_id;
144144
},
145-
addStreamingChunk: (state, action: PayloadAction<StreamDataChunk>) => {
145+
addStreamingChunks: (state, action: PayloadAction<StreamDataChunk[]>) => {
146146
if (!state.result) {
147147
return;
148148
}
@@ -151,6 +151,7 @@ const slice = createSlice({
151151
state.result.data = prepareQueryData(null);
152152
}
153153

154+
// Initialize speed metrics if not present
154155
if (!state.result.speedMetrics) {
155156
state.result.speedMetrics = {
156157
rowsPerSecond: 0,
@@ -160,27 +161,45 @@ const slice = createSlice({
160161
}
161162

162163
const currentTime = Date.now();
163-
const chunk = action.payload;
164+
let totalNewRows = 0;
165+
166+
const mergedStreamDataChunks = new Map<number, StreamDataChunk>();
167+
for (const chunk of action.payload) {
168+
const currentMergedChunk = mergedStreamDataChunks.get(chunk.meta.result_index);
169+
const chunkRowCount = (chunk.result.rows || []).length;
170+
totalNewRows += chunkRowCount;
171+
172+
if (currentMergedChunk) {
173+
if (!currentMergedChunk.result.rows) {
174+
currentMergedChunk.result.rows = [];
175+
}
176+
for (const row of chunk.result.rows || []) {
177+
currentMergedChunk.result.rows.push(row);
178+
}
179+
} else {
180+
mergedStreamDataChunks.set(chunk.meta.result_index, chunk);
181+
}
182+
}
164183

165184
// Update speed metrics
166185
const metrics = state.result.speedMetrics;
167186
metrics.recentChunks.push({
168187
timestamp: currentTime,
169-
rowCount: chunk.result.rows?.length || 0,
188+
rowCount: totalNewRows,
170189
});
171190

172191
// Keep only chunks from the last 5 seconds
173192
const WINDOW_SIZE = 5000; // 5 seconds in milliseconds
174193
metrics.recentChunks = metrics.recentChunks.filter(
175-
(_chunk) => currentTime - _chunk.timestamp <= WINDOW_SIZE,
194+
(chunk) => currentTime - chunk.timestamp <= WINDOW_SIZE,
176195
);
177196

178197
// Calculate moving average
179198
if (metrics.recentChunks.length > 0) {
180199
const oldestChunkTime = metrics.recentChunks[0].timestamp;
181200
const timeWindow = (currentTime - oldestChunkTime) / 1000; // Convert to seconds
182201
const totalRows = metrics.recentChunks.reduce(
183-
(sum, _chunk) => sum + _chunk.rowCount,
202+
(sum, chunk) => sum + chunk.rowCount,
184203
0,
185204
);
186205
metrics.rowsPerSecond = timeWindow > 0 ? totalRows / timeWindow : 0;
@@ -191,38 +210,40 @@ const slice = createSlice({
191210
if (!state.result.data.resultSets) {
192211
state.result.data.resultSets = [];
193212
}
194-
const resultIndex = chunk.meta.result_index;
195213

196-
const {columns, rows} = chunk.result;
214+
for (const [resultIndex, chunk] of mergedStreamDataChunks.entries()) {
215+
const {columns, rows} = chunk.result;
197216

198-
if (!state.result.data.resultSets[resultIndex]) {
199-
state.result.data.resultSets[resultIndex] = {
200-
columns: [],
201-
result: [],
202-
};
203-
}
217+
if (!state.result.data.resultSets[resultIndex]) {
218+
state.result.data.resultSets[resultIndex] = {
219+
columns: [],
220+
result: [],
221+
};
222+
}
204223

205-
if (columns && !state.result.data.resultSets[resultIndex].columns?.length) {
206-
state.result.data.resultSets[resultIndex].columns?.push(INDEX_COLUMN);
207-
for (const column of columns) {
208-
state.result.data.resultSets[resultIndex].columns?.push(column);
224+
if (columns && !state.result.data.resultSets[resultIndex].columns?.length) {
225+
state.result.data.resultSets[resultIndex].columns?.push(INDEX_COLUMN);
226+
for (const column of columns) {
227+
state.result.data.resultSets[resultIndex].columns?.push(column);
228+
}
209229
}
210-
}
211230

212-
const indexedRows = rows || [];
213-
const startIndex = state.result?.data?.resultSets?.[resultIndex].result?.length || 1;
231+
const indexedRows = rows || [];
232+
const startIndex =
233+
state.result?.data?.resultSets?.[resultIndex].result?.length || 1;
214234

215-
indexedRows.forEach((row, index) => {
216-
row.unshift(startIndex + index);
217-
});
235+
indexedRows.forEach((row, index) => {
236+
row.unshift(startIndex + index);
237+
});
218238

219-
const formattedRows = parseResult(
220-
indexedRows,
221-
state.result.data.resultSets[resultIndex].columns || [],
222-
);
239+
const formattedRows = parseResult(
240+
indexedRows,
241+
state.result.data.resultSets[resultIndex].columns || [],
242+
);
223243

224-
for (const row of formattedRows) {
225-
state.result.data.resultSets[resultIndex].result?.push(row);
244+
for (const row of formattedRows) {
245+
state.result.data.resultSets[resultIndex].result?.push(row);
246+
}
226247
}
227248
},
228249
setStreamQueryResponse: (state, action: PayloadAction<QueryResponseChunk>) => {
@@ -281,7 +302,7 @@ export const {
281302
goToNextQuery,
282303
setTenantPath,
283304
setQueryHistoryFilter,
284-
addStreamingChunk,
305+
addStreamingChunks,
285306
setStreamQueryResponse,
286307
setStreamSession,
287308
} = slice.actions;
@@ -327,6 +348,17 @@ export const queryApi = api.injectEndpoints({
327348
);
328349

329350
try {
351+
let streamDataChunkBatch: StreamDataChunk[] = [];
352+
let batchTimeout: number | null = null;
353+
354+
const flushBatch = () => {
355+
if (streamDataChunkBatch.length > 0) {
356+
dispatch(addStreamingChunks(streamDataChunkBatch));
357+
streamDataChunkBatch = [];
358+
}
359+
batchTimeout = null;
360+
};
361+
330362
await window.api.streaming.streamQuery(
331363
{
332364
query,
@@ -358,11 +390,20 @@ export const queryApi = api.injectEndpoints({
358390
dispatch(setStreamSession(chunk));
359391
},
360392
onStreamDataChunk: (chunk) => {
361-
dispatch(addStreamingChunk(chunk));
393+
streamDataChunkBatch.push(chunk);
394+
if (!batchTimeout) {
395+
batchTimeout = window.requestAnimationFrame(flushBatch);
396+
}
362397
},
363398
},
364399
);
365400

401+
// Flush any remaining chunks
402+
if (batchTimeout) {
403+
window.cancelAnimationFrame(batchTimeout);
404+
flushBatch();
405+
}
406+
366407
return {data: null};
367408
} catch (error) {
368409
const state = getState() as RootState;

0 commit comments

Comments
 (0)