Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
Expand All @@ -42,6 +44,8 @@ public class AnalyticsClient {
private static final Charset ENCODING = StandardCharsets.UTF_8;
private Gson gsonInstance;
private static final String instanceId = UUID.randomUUID().toString();
private static final int WAIT_FOR_THREAD_COMPLETE_S = 5;
private static final int TERMINATION_TIMEOUT_S = 1;

static {
Map<String, String> library = new LinkedHashMap<>();
Expand All @@ -67,6 +71,7 @@ public class AnalyticsClient {
private final ScheduledExecutorService flushScheduler;
private final AtomicBoolean isShutDown;
private final String writeKey;
private volatile Future<?> looperFuture;

public static AnalyticsClient create(
HttpUrl uploadUrl,
Expand Down Expand Up @@ -130,7 +135,9 @@ public AnalyticsClient(

this.currentQueueSizeInBytes = 0;

if (!isShutDown.get()) looperExecutor.submit(new Looper());
if (!isShutDown.get()) {
this.looperFuture = looperExecutor.submit(new Looper());
}

flushScheduler = Executors.newScheduledThreadPool(1, threadFactory);
flushScheduler.scheduleAtFixedRate(
Expand Down Expand Up @@ -218,6 +225,8 @@ public void shutdown() {
// we can shutdown the flush scheduler without worrying
flushScheduler.shutdownNow();

// Wait for the looper to complete processing before shutting down executors
waitForLooperCompletion();
shutdownAndWait(looperExecutor, "looper");
shutdownAndWait(networkExecutor, "network");

Expand All @@ -226,19 +235,81 @@ public void shutdown() {
}
}

/**
* Wait for the looper to complete processing all messages before proceeding with shutdown. This
* prevents the race condition where the network executor is shut down before the looper finishes
* submitting all batches.
*/
private void waitForLooperCompletion() {
if (looperFuture != null) {
try {
// Wait for the looper to complete processing the STOP message and finish
// Use a reasonable timeout to avoid hanging indefinitely
looperFuture.get(WAIT_FOR_THREAD_COMPLETE_S, TimeUnit.SECONDS);
log.print(VERBOSE, "Looper completed successfully.");
} catch (Exception e) {
log.print(ERROR, e, "Error waiting for looper to complete.");
// Cancel the looper if it's taking too long or if there's an error
if (!looperFuture.isDone()) {
looperFuture.cancel(true);
log.print(VERBOSE, "Looper was cancelled due to timeout or error.");
}
}
}
}

public void shutdownAndWait(ExecutorService executor, String name) {
boolean isLooperExecutor = name != null && name.equalsIgnoreCase("looper");
try {
executor.shutdown();
final boolean executorTerminated = executor.awaitTermination(1, TimeUnit.SECONDS);

log.print(
VERBOSE,
"%s executor %s.",
name,
executorTerminated ? "terminated normally" : "timed out");
boolean terminated = executor.awaitTermination(TERMINATION_TIMEOUT_S, TimeUnit.SECONDS);
if (terminated) {
log.print(VERBOSE, "%s executor terminated normally.", name);
return;
}
if (isLooperExecutor) { // Handle looper - network should finish on its own
// not terminated within timeout -> force shutdown
log.print(
VERBOSE,
"%s did not terminate in %d seconds; requesting shutdownNow().",
name,
TERMINATION_TIMEOUT_S);
List<Runnable> dropped = executor.shutdownNow(); // interrupts running tasks
log.print(
VERBOSE,
"%s shutdownNow returned %d queued tasks that never started.",
name,
dropped.size());

// optional short wait to give interrupted tasks a chance to exit
boolean terminatedAfterForce =
executor.awaitTermination(TERMINATION_TIMEOUT_S, TimeUnit.SECONDS);
log.print(
VERBOSE,
"%s executor %s after shutdownNow().",
name,
terminatedAfterForce ? "terminated" : "still running (did not terminate)");

if (!terminatedAfterForce) {
// final warning — investigate tasks that ignore interrupts
log.print(
ERROR,
"%s executor still did not terminate; tasks may be ignoring interrupts.",
name);
}
}
} catch (InterruptedException e) {
// Preserve interrupt status and attempt forceful shutdown
log.print(ERROR, e, "Interrupted while stopping %s executor.", name);
Thread.currentThread().interrupt();
if (isLooperExecutor) {
List<Runnable> dropped = executor.shutdownNow();
log.print(
VERBOSE,
"%s shutdownNow invoked after interrupt; %d tasks returned.",
name,
dropped.size());
}
}
}

Expand Down Expand Up @@ -299,8 +370,22 @@ public void run() {
"Batching %s message(s) into batch %s.",
batch.batch().size(),
batch.sequence());
networkExecutor.submit(
BatchUploadTask.create(AnalyticsClient.this, batch, maximumRetries));
try {
networkExecutor.submit(
BatchUploadTask.create(AnalyticsClient.this, batch, maximumRetries));
} catch (RejectedExecutionException e) {
log.print(
ERROR,
e,
"Failed to submit batch %s to network executor during shutdown. Batch will be lost.",
batch.sequence());
// Notify callbacks about the failure
for (Message msg : batch.batch()) {
for (Callback callback : callbacks) {
callback.failure(msg, e);
}
}
}
Comment on lines +373 to +388
Copy link

Copilot AI Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new RejectedExecutionException handling lacks test coverage. Consider adding a test that verifies:

  1. The error is properly logged when batch submission fails during shutdown
  2. Callbacks are notified with failure() for each message in the rejected batch
  3. The looper continues processing after a rejection

This ensures that messages aren't silently lost without callback notification.

Copilot uses AI. Check for mistakes.

currentBatchSize.set(0);
messages.clear();
Expand Down