Skip to content

Commit 1732899

Browse files
committed
fix(mixpanel-destination): skip events with invalid distinct id
fix(mixpanel-destination): support for multithreaded batch mode by utilising additional bulker "tablename"-s
1 parent 05882f4 commit 1732899

File tree

1 file changed

+32
-1
lines changed

1 file changed

+32
-1
lines changed

libs/core-functions/src/functions/mixpanel-destination.ts

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,26 @@ export const specialProperties = [
2222
"unsubscribed",
2323
];
2424

25+
const invalidDistinctIds = new Set([
26+
"00000000-0000-0000-0000-000000000000",
27+
"anon",
28+
"anonymous",
29+
"nil",
30+
"none",
31+
"null",
32+
"n/a",
33+
"na",
34+
"undefined",
35+
"unknown",
36+
"<nil>",
37+
"0",
38+
"-1",
39+
"true",
40+
"false",
41+
"[]",
42+
"{}",
43+
]);
44+
2545
const CLICK_IDS = ["dclid", "fbclid", "gclid", "ko_click_id", "li_fat_id", "msclkid", "ttclid", "twclid", "wbraid"];
2646

2747
export type MixpanelRequest = {
@@ -204,6 +224,14 @@ function trackEvent(
204224
},
205225
};
206226
if (ctx["connectionOptions"]?.mode === "batch" && bulkerBase) {
227+
let tableName = "import";
228+
if (ctx["connectionOptions"]?.multithreading) {
229+
const threadsCount = ctx["connectionOptions"]?.threadsCount || 2;
230+
const thread = Math.floor(Math.random() * threadsCount);
231+
if (thread > 0) {
232+
tableName = `import_${thread + 1}`;
233+
}
234+
}
207235
const metricsMeta: Omit<MetricsMeta, "messageId"> = {
208236
workspaceId: ctx.workspace.id,
209237
streamId: ctx.source.id,
@@ -212,7 +240,7 @@ function trackEvent(
212240
functionId: "builtin.destination.bulker",
213241
};
214242
return {
215-
url: `${bulkerBase}/post/${ctx.connection.id}?tableName=import&modeOverride=batch`,
243+
url: `${bulkerBase}/post/${ctx.connection.id}?tableName=${tableName}&modeOverride=batch`,
216244
eventType,
217245
insertId,
218246
headers: {
@@ -506,6 +534,9 @@ const MixpanelDestination: JitsuFunction<AnalyticsServerEvent, MixpanelCredentia
506534
return;
507535
}
508536
const distinctId = getDistinctId(ctx, event, deviceId);
537+
if (invalidDistinctIds.has(distinctId)) {
538+
throw new Error(`Invalid distinctId '${distinctId}'. Skipping event: ${JSON.stringify(event)}`);
539+
}
509540
// no userId or email
510541
const isAnonymous = event.anonymousId && distinctId.endsWith(event.anonymousId);
511542
if (isAnonymous && !ctx.props.enableAnonymousUserProfiles) {

0 commit comments

Comments
 (0)