summaryrefslogtreecommitdiff
path: root/packages/core/src
diff options
context:
space:
mode:
Diffstat (limited to 'packages/core/src')
-rw-r--r--packages/core/src/telemetry/clearcut-logger/clearcut-logger.ts126
-rw-r--r--packages/core/src/utils/retry.test.ts7
-rw-r--r--packages/core/src/utils/retry.ts4
3 files changed, 76 insertions, 61 deletions
diff --git a/packages/core/src/telemetry/clearcut-logger/clearcut-logger.ts b/packages/core/src/telemetry/clearcut-logger/clearcut-logger.ts
index d36a16b5..fd5a9ab2 100644
--- a/packages/core/src/telemetry/clearcut-logger/clearcut-logger.ts
+++ b/packages/core/src/telemetry/clearcut-logger/clearcut-logger.ts
@@ -22,12 +22,13 @@ import {
} from '../types.js';
import { EventMetadataKey } from './event-metadata-key.js';
import { Config } from '../../config/config.js';
-import { getInstallationId } from '../../utils/user_id.js';
+import { safeJsonStringify } from '../../utils/safeJsonStringify.js';
import {
getCachedGoogleAccount,
getLifetimeGoogleAccounts,
} from '../../utils/user_account.js';
-import { safeJsonStringify } from '../../utils/safeJsonStringify.js';
+import { HttpError, retryWithBackoff } from '../../utils/retry.js';
+import { getInstallationId } from '../../utils/user_id.js';
const start_session_event_name = 'start_session';
const new_prompt_event_name = 'new_prompt';
@@ -113,66 +114,81 @@ export class ClearcutLogger {
});
}
- flushToClearcut(): Promise<LogResponse> {
+ async flushToClearcut(): Promise<LogResponse> {
if (this.config?.getDebugMode()) {
console.log('Flushing log events to Clearcut.');
}
const eventsToSend = [...this.events];
- this.events.length = 0;
+ if (eventsToSend.length === 0) {
+ return {};
+ }
- return new Promise<Buffer>((resolve, reject) => {
- const request = [
- {
- log_source_name: 'CONCORD',
- request_time_ms: Date.now(),
- log_event: eventsToSend,
- },
- ];
- const body = safeJsonStringify(request);
- const options = {
- hostname: 'play.googleapis.com',
- path: '/log',
- method: 'POST',
- headers: { 'Content-Length': Buffer.byteLength(body) },
- };
- const bufs: Buffer[] = [];
- const req = https.request(
- {
- ...options,
- agent: this.getProxyAgent(),
- },
- (res) => {
- res.on('data', (buf) => bufs.push(buf));
- res.on('end', () => {
- resolve(Buffer.concat(bufs));
- });
- },
- );
- req.on('error', (e) => {
- if (this.config?.getDebugMode()) {
- console.log('Clearcut POST request error: ', e);
- }
- // Add the events back to the front of the queue to be retried.
- this.events.unshift(...eventsToSend);
- reject(e);
+ const flushFn = () =>
+ new Promise<Buffer>((resolve, reject) => {
+ const request = [
+ {
+ log_source_name: 'CONCORD',
+ request_time_ms: Date.now(),
+ log_event: eventsToSend,
+ },
+ ];
+ const body = safeJsonStringify(request);
+ const options = {
+ hostname: 'play.googleapis.com',
+ path: '/log',
+ method: 'POST',
+ headers: { 'Content-Length': Buffer.byteLength(body) },
+ };
+ const bufs: Buffer[] = [];
+ const req = https.request(
+ {
+ ...options,
+ agent: this.getProxyAgent(),
+ },
+ (res) => {
+ if (
+ res.statusCode &&
+ (res.statusCode < 200 || res.statusCode >= 300)
+ ) {
+ const err: HttpError = new Error(
+ `Request failed with status ${res.statusCode}`,
+ );
+ err.status = res.statusCode;
+ res.resume();
+ return reject(err);
+ }
+ res.on('data', (buf) => bufs.push(buf));
+ res.on('end', () => resolve(Buffer.concat(bufs)));
+ },
+ );
+ req.on('error', reject);
+ req.end(body);
});
- req.end(body);
- })
- .then((buf: Buffer) => {
- try {
- this.last_flush_time = Date.now();
- return this.decodeLogResponse(buf) || {};
- } catch (error: unknown) {
- console.error('Error flushing log events:', error);
- return {};
- }
- })
- .catch((error: unknown) => {
- // Handle all errors to prevent unhandled promise rejections
- console.error('Error flushing log events:', error);
- // Return empty response to maintain the Promise<LogResponse> contract
- return {};
+
+ try {
+ const responseBuffer = await retryWithBackoff(flushFn, {
+ maxAttempts: 3,
+ initialDelayMs: 200,
+ shouldRetry: (err: unknown) => {
+ if (!(err instanceof Error)) return false;
+ const status = (err as HttpError).status as number | undefined;
+ // If status is not available, it's likely a network error
+ if (status === undefined) return true;
+
+ // Retry on 429 (Too many Requests) and 5xx server errors.
+ return status === 429 || (status >= 500 && status < 600);
+ },
});
+
+ this.events.splice(0, eventsToSend.length);
+ this.last_flush_time = Date.now();
+ return this.decodeLogResponse(responseBuffer) || {};
+ } catch (error) {
+ if (this.config?.getDebugMode()) {
+ console.error('Clearcut flush failed after multiple retries.', error);
+ }
+ return {};
+ }
}
// Visible for testing. Decodes protobuf-encoded response from Clearcut server.
diff --git a/packages/core/src/utils/retry.test.ts b/packages/core/src/utils/retry.test.ts
index f84d2004..196e7341 100644
--- a/packages/core/src/utils/retry.test.ts
+++ b/packages/core/src/utils/retry.test.ts
@@ -6,14 +6,9 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
-import { retryWithBackoff } from './retry.js';
+import { retryWithBackoff, HttpError } from './retry.js';
import { setSimulate429 } from './testUtils.js';
-// Define an interface for the error with a status property
-interface HttpError extends Error {
- status?: number;
-}
-
// Helper to create a mock function that fails a certain number of times
const createFailingFunction = (
failures: number,
diff --git a/packages/core/src/utils/retry.ts b/packages/core/src/utils/retry.ts
index b29bf7df..81300882 100644
--- a/packages/core/src/utils/retry.ts
+++ b/packages/core/src/utils/retry.ts
@@ -10,6 +10,10 @@ import {
isGenericQuotaExceededError,
} from './quotaErrorDetection.js';
+export interface HttpError extends Error {
+ status?: number;
+}
+
export interface RetryOptions {
maxAttempts: number;
initialDelayMs: number;