Skip to content

Commit cea0885

Browse files
authored
rotor: migrate to confluent kafka library (#1222)
* rotor: migrate to confluent kafka library
1 parent 7fc729d commit cea0885

File tree

11 files changed

+105
-164
lines changed

11 files changed

+105
-164
lines changed

pnpm-lock.yaml

Lines changed: 17 additions & 68 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

services/profiles/dist_package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
"version": "0.0.0",
44
"dependencies": {
55
"isolated-vm": "6.0.0",
6-
"@confluentinc/kafka-javascript": "^1.4.0",
6+
"@confluentinc/kafka-javascript": "^1.4.1",
77
"@mongodb-js/zstd": "^2.0.1"
88
}
99
}

services/profiles/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
"pg-cursor": "^2.11.0",
3030
"prom-client": "^15.1.3",
3131
"tslib": "^2.6.3",
32-
"@confluentinc/kafka-javascript": "^1.4.0",
32+
"@confluentinc/kafka-javascript": "^1.4.1",
3333
"@clickhouse/client": "^1.10.1"
3434
},
3535
"devDependencies": {

services/rotor/__tests__/kafka.test.ts

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,24 @@
11
import { getLog } from "juava";
22
import { connectToKafka } from "../src/lib/kafka-config";
3-
4-
import "@sensejs/kafkajs-zstd-support";
53
import { test } from "@jest/globals";
64

75
const log = getLog("kafka-test");
86
test.skip("Kafka Test", async () => {
9-
const kafka = connectToKafka({ defaultAppId: "test", brokers: "localhost:9092" });
7+
const kafka = connectToKafka({ defaultAppId: "test", brokers: ["localhost:9092"] });
108
const consumer = kafka.consumer({
11-
groupId: "test",
12-
allowAutoTopicCreation: true,
13-
sessionTimeout: 10000,
9+
kafkaJS: {
10+
groupId: "test",
11+
allowAutoTopicCreation: true,
12+
sessionTimeout: 10000,
13+
autoCommitInterval: 10000,
14+
autoCommit: true,
15+
fromBeginning: true,
16+
},
1417
});
1518
await consumer.connect();
16-
await consumer.subscribe({ topics: ["autocommit-test"], fromBeginning: true });
19+
await consumer.subscribe({ topics: ["autocommit-test"] });
1720

18-
const producer = kafka.producer({ allowAutoTopicCreation: false });
21+
const producer = kafka.producer({ kafkaJS: { allowAutoTopicCreation: false } });
1922
await producer.connect();
2023

2124
for (let i = 0; i < 200; i++) {
@@ -30,8 +33,6 @@ test.skip("Kafka Test", async () => {
3033
}
3134

3235
await consumer.run({
33-
autoCommitInterval: 10000,
34-
autoCommit: true,
3536
partitionsConsumedConcurrently: 8,
3637
eachMessage: async ({ topic, partition, message }) => {
3738
log.atInfo().log(`${topic}:${partition}: ${message.offset} => ${message.value?.toString()}`);

services/rotor/dist_package.json

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,7 @@
44
"dependencies": {
55
"undici": "^7.11.0",
66
"isolated-vm": "6.0.0",
7-
"@sensejs/kafkajs-zstd-support": "^0.11.0",
7+
"@confluentinc/kafka-javascript": "^1.4.1",
88
"@mongodb-js/zstd": "^2.0.1"
9-
},
10-
"overrides": {
11-
"@sensejs/kafkajs-zstd-support": {
12-
"zstd-napi": "^0.0.10"
13-
}
149
}
1510
}

services/rotor/package.json

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,14 @@
2525
"@jitsu/core-functions": "workspace:*",
2626
"@jitsu/functions-lib": "workspace:*",
2727
"@maxmind/geoip2-node": "^5.0.0",
28-
"@sensejs/kafkajs-zstd-support": "^0.11.0",
2928
"@types/pg-cursor": "^2.7.2",
3029
"dayjs": "^1.11.12",
3130
"express": "^4.21.2",
3231
"ioredis": "^5.4.1",
3332
"json5": "^2.2.3",
3433
"jsondiffpatch": "workspace:*",
3534
"juava": "workspace:*",
36-
"kafkajs": "^2.2.4",
37-
"kafkajs-snappy": "^1.1.0",
35+
"@confluentinc/kafka-javascript": "^1.4.1",
3836
"node-cache": "^5.1.2",
3937
"node-fetch-commonjs": "^3.3.2",
4038
"object-hash": "^3.0.0",

0 commit comments

Comments
 (0)