From 7cc6b8c270a67803f9387eeead0d3d7ac914303a Mon Sep 17 00:00:00 2001 From: Richie Foreman Date: Tue, 12 Aug 2025 14:31:59 -0400 Subject: chore(usage telemetry): Freshen up Clearcut logging (#6013) Co-authored-by: christine betts Co-authored-by: Jacob Richman Co-authored-by: matt korwel --- .../telemetry/clearcut-logger/clearcut-logger.ts | 306 ++++++++++----------- 1 file changed, 139 insertions(+), 167 deletions(-) (limited to 'packages/core/src/telemetry/clearcut-logger/clearcut-logger.ts') diff --git a/packages/core/src/telemetry/clearcut-logger/clearcut-logger.ts b/packages/core/src/telemetry/clearcut-logger/clearcut-logger.ts index 1e67d1cf..a41f832d 100644 --- a/packages/core/src/telemetry/clearcut-logger/clearcut-logger.ts +++ b/packages/core/src/telemetry/clearcut-logger/clearcut-logger.ts @@ -4,10 +4,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -import { Buffer } from 'buffer'; -import * as https from 'https'; import { HttpsProxyAgent } from 'https-proxy-agent'; - import { StartSessionEvent, EndSessionEvent, @@ -56,19 +53,25 @@ export interface LogEventEntry { source_extension_json: string; } -export type EventValue = { +export interface EventValue { gemini_cli_key: EventMetadataKey | string; value: string; -}; +} -export type LogEvent = { - console_type: string; +export interface LogEvent { + console_type: 'GEMINI_CLI'; application: number; event_name: string; event_metadata: EventValue[][]; client_email?: string; client_install_id?: string; -}; +} + +export interface LogRequest { + log_source_name: 'CONCORD'; + request_time_ms: number; + log_event: LogEventEntry[][]; +} /** * Determine the surface that the user is currently using. Surface is effectively the @@ -89,22 +92,59 @@ function determineSurface(): string { } } +/** + * Clearcut URL to send logging events to. + */ +const CLEARCUT_URL = 'https://play.googleapis.com/log?format=json&hasfast=true'; + +/** + * Interval in which buffered events are sent to clearcut. + */ +const FLUSH_INTERVAL_MS = 1000 * 60; + +/** + * Maximum amount of events to keep in memory. Events added after this amount + * are dropped until the next flush to clearcut, which happens periodically as + * defined by {@link FLUSH_INTERVAL_MS}. + */ +const MAX_EVENTS = 1000; + +/** + * Maximum events to retry after a failed clearcut flush + */ +const MAX_RETRY_EVENTS = 100; + // Singleton class for batch posting log events to Clearcut. When a new event comes in, the elapsed time // is checked and events are flushed to Clearcut if at least a minute has passed since the last flush. export class ClearcutLogger { private static instance: ClearcutLogger; private config?: Config; + + /** + * Queue of pending events that need to be flushed to the server. New events + * are added to this queue and then flushed on demand (via `flushToClearcut`) + */ private readonly events: FixedDeque; - private last_flush_time: number = Date.now(); - private flush_interval_ms: number = 1000 * 60; // Wait at least a minute before flushing events. - private readonly max_events: number = 1000; // Maximum events to keep in memory - private readonly max_retry_events: number = 100; // Maximum failed events to retry - private flushing: boolean = false; // Prevent concurrent flush operations - private pendingFlush: boolean = false; // Track if a flush was requested during an ongoing flush + + /** + * The last time that the events were successfully flushed to the server. + */ + private lastFlushTime: number = Date.now(); + + /** + * the value is true when there is a pending flush happening. This prevents + * concurrent flush operations. + */ + private flushing: boolean = false; + + /** + * This value is true when a flush was requested during an ongoing flush. + */ + private pendingFlush: boolean = false; private constructor(config?: Config) { this.config = config; - this.events = new FixedDeque(Array, this.max_events); + this.events = new FixedDeque(Array, MAX_EVENTS); } static getInstance(config?: Config): ClearcutLogger | undefined { @@ -125,7 +165,7 @@ export class ClearcutLogger { enqueueLogEvent(event: object): void { try { // Manually handle overflow for FixedDeque, which throws when full. - const wasAtCapacity = this.events.size >= this.max_events; + const wasAtCapacity = this.events.size >= MAX_EVENTS; if (wasAtCapacity) { this.events.shift(); // Evict oldest element to make space. @@ -150,31 +190,14 @@ export class ClearcutLogger { } } - addDefaultFields(data: EventValue[]): void { - const totalAccounts = getLifetimeGoogleAccounts(); - const surface = determineSurface(); - const defaultLogMetadata = [ - { - gemini_cli_key: EventMetadataKey.GEMINI_CLI_GOOGLE_ACCOUNTS_COUNT, - value: totalAccounts.toString(), - }, - { - gemini_cli_key: EventMetadataKey.GEMINI_CLI_SURFACE, - value: surface, - }, - ]; - data.push(...defaultLogMetadata); - } - createLogEvent(name: string, data: EventValue[]): LogEvent { const email = getCachedGoogleAccount(); - // Add default fields that should exist for all logs - this.addDefaultFields(data); + data = addDefaultFields(data); const logEvent: LogEvent = { console_type: 'GEMINI_CLI', - application: 102, + application: 102, // GEMINI_CLI event_name: name, event_metadata: [data], }; @@ -190,7 +213,7 @@ export class ClearcutLogger { } flushIfNeeded(): void { - if (Date.now() - this.last_flush_time < this.flush_interval_ms) { + if (Date.now() - this.lastFlushTime < FLUSH_INTERVAL_MS) { return; } @@ -217,140 +240,67 @@ export class ClearcutLogger { const eventsToSend = this.events.toArray() as LogEventEntry[][]; this.events.clear(); - return new Promise<{ buffer: Buffer; statusCode?: number }>( - (resolve, reject) => { - const request = [ - { - log_source_name: 'CONCORD', - request_time_ms: Date.now(), - log_event: eventsToSend, - }, - ]; - const body = safeJsonStringify(request); - const options = { - hostname: 'play.googleapis.com', - path: '/log', - method: 'POST', - headers: { 'Content-Length': Buffer.byteLength(body) }, - timeout: 30000, // 30-second timeout + const request: LogRequest[] = [ + { + log_source_name: 'CONCORD', + request_time_ms: Date.now(), + log_event: eventsToSend, + }, + ]; + + let result: LogResponse = {}; + + try { + const response = await fetch(CLEARCUT_URL, { + method: 'POST', + body: safeJsonStringify(request), + headers: { + 'Content-Type': 'application/json', + }, + }); + + const responseBody = await response.text(); + + if (response.status >= 200 && response.status < 300) { + this.lastFlushTime = Date.now(); + const nextRequestWaitMs = Number(JSON.parse(responseBody)[0]); + result = { + ...result, + nextRequestWaitMs, }; - const bufs: Buffer[] = []; - const req = https.request( - { - ...options, - agent: this.getProxyAgent(), - }, - (res) => { - res.on('error', reject); // Handle stream errors - res.on('data', (buf) => bufs.push(buf)); - res.on('end', () => { - try { - const buffer = Buffer.concat(bufs); - // Check if we got a successful response - if ( - res.statusCode && - res.statusCode >= 200 && - res.statusCode < 300 - ) { - resolve({ buffer, statusCode: res.statusCode }); - } else { - // HTTP error - reject with status code for retry handling - reject( - new Error(`HTTP ${res.statusCode}: ${res.statusMessage}`), - ); - } - } catch (e) { - reject(e); - } - }); - }, - ); - req.on('error', (e) => { - // Network-level error - reject(e); - }); - req.on('timeout', () => { - if (!req.destroyed) { - req.destroy(new Error('Request timeout after 30 seconds')); - } - }); - req.end(body); - }, - ) - .then(({ buffer }) => { - try { - this.last_flush_time = Date.now(); - return this.decodeLogResponse(buffer) || {}; - } catch (error: unknown) { - console.error('Error decoding log response:', error); - return {}; - } - }) - .catch((error: unknown) => { - // Handle both network-level and HTTP-level errors + } else { if (this.config?.getDebugMode()) { - console.error('Error flushing log events:', error); + console.error( + `Error flushing log events: HTTP ${response.status}: ${response.statusText}`, + ); } // Re-queue failed events for retry this.requeueFailedEvents(eventsToSend); + } + } catch (e: unknown) { + if (this.config?.getDebugMode()) { + console.error('Error flushing log events:', e as Error); + } - // Return empty response to maintain the Promise contract - return {}; - }) - .finally(() => { - this.flushing = false; - - // If a flush was requested while we were flushing, flush again - if (this.pendingFlush) { - this.pendingFlush = false; - // Fire and forget the pending flush - this.flushToClearcut().catch((error) => { - if (this.config?.getDebugMode()) { - console.debug('Error in pending flush to Clearcut:', error); - } - }); - } - }); - } - - // Visible for testing. Decodes protobuf-encoded response from Clearcut server. - decodeLogResponse(buf: Buffer): LogResponse | undefined { - // TODO(obrienowen): return specific errors to facilitate debugging. - if (buf.length < 1) { - return undefined; - } - - // The first byte of the buffer is `field<<3 | type`. We're looking for field - // 1, with type varint, represented by type=0. If the first byte isn't 8, that - // means field 1 is missing or the message is corrupted. Either way, we return - // undefined. - if (buf.readUInt8(0) !== 8) { - return undefined; + // Re-queue failed events for retry + this.requeueFailedEvents(eventsToSend); } - let ms = BigInt(0); - let cont = true; - - // In each byte, the most significant bit is the continuation bit. If it's - // set, we keep going. The lowest 7 bits, are data bits. They are concatenated - // in reverse order to form the final number. - for (let i = 1; cont && i < buf.length; i++) { - const byte = buf.readUInt8(i); - ms |= BigInt(byte & 0x7f) << BigInt(7 * (i - 1)); - cont = (byte & 0x80) !== 0; - } + this.flushing = false; - if (cont) { - // We have fallen off the buffer without seeing a terminating byte. The - // message is corrupted. - return undefined; + // If a flush was requested while we were flushing, flush again + if (this.pendingFlush) { + this.pendingFlush = false; + // Fire and forget the pending flush + this.flushToClearcut().catch((error) => { + if (this.config?.getDebugMode()) { + console.debug('Error in pending flush to Clearcut:', error); + } + }); } - const returnVal = { - nextRequestWaitMs: Number(ms), - }; - return returnVal; + return result; } logStartSessionEvent(event: StartSessionEvent): void { @@ -752,24 +702,21 @@ export class ClearcutLogger { private requeueFailedEvents(eventsToSend: LogEventEntry[][]): void { // Add the events back to the front of the queue to be retried, but limit retry queue size - const eventsToRetry = eventsToSend.slice(-this.max_retry_events); // Keep only the most recent events + const eventsToRetry = eventsToSend.slice(-MAX_RETRY_EVENTS); // Keep only the most recent events // Log a warning if we're dropping events - if ( - eventsToSend.length > this.max_retry_events && - this.config?.getDebugMode() - ) { + if (eventsToSend.length > MAX_RETRY_EVENTS && this.config?.getDebugMode()) { console.warn( `ClearcutLogger: Dropping ${ - eventsToSend.length - this.max_retry_events + eventsToSend.length - MAX_RETRY_EVENTS } events due to retry queue limit. Total events: ${ eventsToSend.length - }, keeping: ${this.max_retry_events}`, + }, keeping: ${MAX_RETRY_EVENTS}`, ); } // Determine how many events can be re-queued - const availableSpace = this.max_events - this.events.size; + const availableSpace = MAX_EVENTS - this.events.size; const numEventsToRequeue = Math.min(eventsToRetry.length, availableSpace); if (numEventsToRequeue === 0) { @@ -792,7 +739,7 @@ export class ClearcutLogger { this.events.unshift(eventsToRequeue[i]); } // Clear any potential overflow - while (this.events.size > this.max_events) { + while (this.events.size > MAX_EVENTS) { this.events.pop(); } @@ -803,3 +750,28 @@ export class ClearcutLogger { } } } + +/** + * Adds default fields to data, and returns a new data array. This fields + * should exist on all log events. + */ +function addDefaultFields(data: EventValue[]): EventValue[] { + const totalAccounts = getLifetimeGoogleAccounts(); + const surface = determineSurface(); + const defaultLogMetadata: EventValue[] = [ + { + gemini_cli_key: EventMetadataKey.GEMINI_CLI_GOOGLE_ACCOUNTS_COUNT, + value: `${totalAccounts}`, + }, + { + gemini_cli_key: EventMetadataKey.GEMINI_CLI_SURFACE, + value: surface, + }, + ]; + return [...data, ...defaultLogMetadata]; +} + +export const TEST_ONLY = { + MAX_RETRY_EVENTS, + MAX_EVENTS, +}; -- cgit v1.2.3