diff options
Diffstat (limited to 'packages/core/src')
| -rw-r--r-- | packages/core/src/telemetry/clearcut-logger/clearcut-logger.ts | 126 | ||||
| -rw-r--r-- | packages/core/src/utils/retry.test.ts | 7 | ||||
| -rw-r--r-- | packages/core/src/utils/retry.ts | 4 |
3 files changed, 76 insertions, 61 deletions
diff --git a/packages/core/src/telemetry/clearcut-logger/clearcut-logger.ts b/packages/core/src/telemetry/clearcut-logger/clearcut-logger.ts index d36a16b5..fd5a9ab2 100644 --- a/packages/core/src/telemetry/clearcut-logger/clearcut-logger.ts +++ b/packages/core/src/telemetry/clearcut-logger/clearcut-logger.ts @@ -22,12 +22,13 @@ import { } from '../types.js'; import { EventMetadataKey } from './event-metadata-key.js'; import { Config } from '../../config/config.js'; -import { getInstallationId } from '../../utils/user_id.js'; +import { safeJsonStringify } from '../../utils/safeJsonStringify.js'; import { getCachedGoogleAccount, getLifetimeGoogleAccounts, } from '../../utils/user_account.js'; -import { safeJsonStringify } from '../../utils/safeJsonStringify.js'; +import { HttpError, retryWithBackoff } from '../../utils/retry.js'; +import { getInstallationId } from '../../utils/user_id.js'; const start_session_event_name = 'start_session'; const new_prompt_event_name = 'new_prompt'; @@ -113,66 +114,81 @@ export class ClearcutLogger { }); } - flushToClearcut(): Promise<LogResponse> { + async flushToClearcut(): Promise<LogResponse> { if (this.config?.getDebugMode()) { console.log('Flushing log events to Clearcut.'); } const eventsToSend = [...this.events]; - this.events.length = 0; + if (eventsToSend.length === 0) { + return {}; + } - return new Promise<Buffer>((resolve, reject) => { - const request = [ - { - log_source_name: 'CONCORD', - request_time_ms: Date.now(), - log_event: eventsToSend, - }, - ]; - const body = safeJsonStringify(request); - const options = { - hostname: 'play.googleapis.com', - path: '/log', - method: 'POST', - headers: { 'Content-Length': Buffer.byteLength(body) }, - }; - const bufs: Buffer[] = []; - const req = https.request( - { - ...options, - agent: this.getProxyAgent(), - }, - (res) => { - res.on('data', (buf) => bufs.push(buf)); - res.on('end', () => { - resolve(Buffer.concat(bufs)); - }); - }, - ); - req.on('error', (e) => { - if (this.config?.getDebugMode()) { - console.log('Clearcut POST request error: ', e); - } - // Add the events back to the front of the queue to be retried. - this.events.unshift(...eventsToSend); - reject(e); + const flushFn = () => + new Promise<Buffer>((resolve, reject) => { + const request = [ + { + log_source_name: 'CONCORD', + request_time_ms: Date.now(), + log_event: eventsToSend, + }, + ]; + const body = safeJsonStringify(request); + const options = { + hostname: 'play.googleapis.com', + path: '/log', + method: 'POST', + headers: { 'Content-Length': Buffer.byteLength(body) }, + }; + const bufs: Buffer[] = []; + const req = https.request( + { + ...options, + agent: this.getProxyAgent(), + }, + (res) => { + if ( + res.statusCode && + (res.statusCode < 200 || res.statusCode >= 300) + ) { + const err: HttpError = new Error( + `Request failed with status ${res.statusCode}`, + ); + err.status = res.statusCode; + res.resume(); + return reject(err); + } + res.on('data', (buf) => bufs.push(buf)); + res.on('end', () => resolve(Buffer.concat(bufs))); + }, + ); + req.on('error', reject); + req.end(body); }); - req.end(body); - }) - .then((buf: Buffer) => { - try { - this.last_flush_time = Date.now(); - return this.decodeLogResponse(buf) || {}; - } catch (error: unknown) { - console.error('Error flushing log events:', error); - return {}; - } - }) - .catch((error: unknown) => { - // Handle all errors to prevent unhandled promise rejections - console.error('Error flushing log events:', error); - // Return empty response to maintain the Promise<LogResponse> contract - return {}; + + try { + const responseBuffer = await retryWithBackoff(flushFn, { + maxAttempts: 3, + initialDelayMs: 200, + shouldRetry: (err: unknown) => { + if (!(err instanceof Error)) return false; + const status = (err as HttpError).status as number | undefined; + // If status is not available, it's likely a network error + if (status === undefined) return true; + + // Retry on 429 (Too many Requests) and 5xx server errors. + return status === 429 || (status >= 500 && status < 600); + }, }); + + this.events.splice(0, eventsToSend.length); + this.last_flush_time = Date.now(); + return this.decodeLogResponse(responseBuffer) || {}; + } catch (error) { + if (this.config?.getDebugMode()) { + console.error('Clearcut flush failed after multiple retries.', error); + } + return {}; + } } // Visible for testing. Decodes protobuf-encoded response from Clearcut server. diff --git a/packages/core/src/utils/retry.test.ts b/packages/core/src/utils/retry.test.ts index f84d2004..196e7341 100644 --- a/packages/core/src/utils/retry.test.ts +++ b/packages/core/src/utils/retry.test.ts @@ -6,14 +6,9 @@ /* eslint-disable @typescript-eslint/no-explicit-any */ import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; -import { retryWithBackoff } from './retry.js'; +import { retryWithBackoff, HttpError } from './retry.js'; import { setSimulate429 } from './testUtils.js'; -// Define an interface for the error with a status property -interface HttpError extends Error { - status?: number; -} - // Helper to create a mock function that fails a certain number of times const createFailingFunction = ( failures: number, diff --git a/packages/core/src/utils/retry.ts b/packages/core/src/utils/retry.ts index b29bf7df..81300882 100644 --- a/packages/core/src/utils/retry.ts +++ b/packages/core/src/utils/retry.ts @@ -10,6 +10,10 @@ import { isGenericQuotaExceededError, } from './quotaErrorDetection.js'; +export interface HttpError extends Error { + status?: number; +} + export interface RetryOptions { maxAttempts: number; initialDelayMs: number; |
