From ab850f0a1c458bd41f7120c3a56a4b14d403de93 Mon Sep 17 00:00:00 2001 From: Stefan Faistenauer Date: Tue, 3 Feb 2026 17:20:40 +0100 Subject: [PATCH 1/2] refactor save results (#286) * refactoring of the save results flow * linting * fix tests --- packages/core/api/ee/run-results.ts | 79 +++++ packages/core/ee/run-results-service.test.ts | 323 +++++++++++++++++++ packages/core/ee/run-results-service.ts | 93 ++++++ packages/core/filestore/file-service.ts | 7 + packages/core/runs/run-lifecycle.ts | 71 +++- packages/shared/types.ts | 12 + packages/web/next-env.d.ts | 2 +- 7 files changed, 584 insertions(+), 3 deletions(-) create mode 100644 packages/core/api/ee/run-results.ts create mode 100644 packages/core/ee/run-results-service.test.ts create mode 100644 packages/core/ee/run-results-service.ts diff --git a/packages/core/api/ee/run-results.ts b/packages/core/api/ee/run-results.ts new file mode 100644 index 00000000..36cceef9 --- /dev/null +++ b/packages/core/api/ee/run-results.ts @@ -0,0 +1,79 @@ +import { StoredRunResults } from "@superglue/shared"; +import { getRunResultsService } from "../../ee/run-results-service.js"; +import { registerApiModule } from "../registry.js"; +import { addTraceHeader, sendError } from "../response-helpers.js"; +import type { AuthenticatedFastifyRequest, RouteHandler } from "../types.js"; +import { isFileStorageAvailable } from "../../filestore/file-service.js"; + +// GET /runs/:runId/results - Fetch stored run results from S3 +const getRunResults: RouteHandler = async (request, reply) => { + const authReq = request as AuthenticatedFastifyRequest; + const params = request.params as { runId: string }; + + // Check if S3 infrastructure is available + if (!isFileStorageAvailable()) { + return sendError( + reply, + 503, + "Run results storage is not enabled. Configure file storage to enable this feature.", + ); + } + + // Get the run to find the storage URI + const run = await authReq.datastore.getRun({ + id: params.runId, + orgId: authReq.authInfo.orgId, + }); + + if (!run) { + return sendError(reply, 404, "Run not found"); + } + + if (!run.resultStorageUri) { + return addTraceHeader(reply, authReq.traceId).send({ + success: true, + data: null, + message: "No stored results available for this run", + }); + } + + // Fetch results from S3 + const metadata = { orgId: authReq.authInfo.orgId, traceId: authReq.traceId }; + const runResultsService = getRunResultsService(); + const results = await runResultsService.getResults(run.resultStorageUri, metadata); + + if (!results) { + return addTraceHeader(reply, authReq.traceId).send({ + success: true, + data: null, + message: "Results file not found or corrupted", + }); + } + + // Convert storedAt to ISO string for JSON response + const response: Omit & { storedAt: string } = { + ...results, + storedAt: results.storedAt instanceof Date ? results.storedAt.toISOString() : results.storedAt, + }; + + return addTraceHeader(reply, authReq.traceId).send({ + success: true, + data: response, + }); +}; + +registerApiModule({ + name: "run-results", + routes: [ + { + method: "GET", + path: "/runs/:runId/results", + handler: getRunResults, + permissions: { + type: "read", + resource: "run", + allowRestricted: true, + }, + }, + ], +}); diff --git a/packages/core/ee/run-results-service.test.ts b/packages/core/ee/run-results-service.test.ts new file mode 100644 index 00000000..c3d21fd0 --- /dev/null +++ b/packages/core/ee/run-results-service.test.ts @@ -0,0 +1,323 @@ +import { describe, expect, it, vi, beforeEach, afterEach } from "vitest"; +import { gzip } from "zlib"; +import { promisify } from "util"; +import { + RunResultsService, + generateRunResultsUri, + getRunResultsService, +} from "./run-results-service.js"; +import type { StoredRunResults } from "@superglue/shared"; + +const gzipAsync = promisify(gzip); + +// Mock the file service (but not isFileStorageAvailable - we test the real implementation) +vi.mock("../filestore/file-service.js", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + getFileService: vi.fn(), + }; +}); + +// Mock the logs +vi.mock("../utils/logs.js", () => ({ + logMessage: vi.fn(), +})); + +import { getFileService, isFileStorageAvailable } from "../filestore/file-service.js"; + +describe("RunResultsService", () => { + let service: RunResultsService; + let mockFileService: { + uploadFile: ReturnType; + downloadFile: ReturnType; + deleteFile: ReturnType; + }; + + beforeEach(() => { + service = new RunResultsService(); + mockFileService = { + uploadFile: vi.fn().mockResolvedValue(undefined), + downloadFile: vi.fn(), + deleteFile: vi.fn().mockResolvedValue(undefined), + }; + vi.mocked(getFileService).mockReturnValue(mockFileService as any); + }); + + afterEach(() => { + vi.clearAllMocks(); + }); + + describe("storeResults", () => { + it("should gzip and upload results to S3", async () => { + const storageUri = "s3://bucket/org/run-results/run-123.json.gz"; + const results = { + runId: "run-123", + success: true, + data: { result: "test" }, + stepResults: [{ stepId: "step-1", success: true, data: { foo: "bar" } }], + toolPayload: { input: "value" }, + }; + const metadata = { orgId: "test-org" }; + + await service.storeResults(storageUri, results, metadata); + + expect(mockFileService.uploadFile).toHaveBeenCalledTimes(1); + expect(mockFileService.uploadFile).toHaveBeenCalledWith( + storageUri, + expect.any(Buffer), + metadata, + { contentType: "application/gzip" }, + ); + + // Verify the uploaded data is valid gzipped JSON with storedAt added + const uploadedBuffer = mockFileService.uploadFile.mock.calls[0][1] as Buffer; + expect(uploadedBuffer).toBeInstanceOf(Buffer); + }); + + it("should add storedAt timestamp to results", async () => { + const storageUri = "s3://bucket/org/run-results/run-123.json.gz"; + const results = { + runId: "run-123", + success: true, + data: null, + stepResults: [], + toolPayload: {}, + }; + + // Capture the uploaded buffer to verify storedAt + let capturedBuffer: Buffer | null = null; + mockFileService.uploadFile.mockImplementation(async (uri: string, buffer: Buffer) => { + capturedBuffer = buffer; + }); + + const beforeStore = new Date(); + await service.storeResults(storageUri, results, { orgId: "test-org" }); + const afterStore = new Date(); + + // Decompress and parse to verify storedAt + const { gunzip } = await import("zlib"); + const gunzipAsync = promisify(gunzip); + const decompressed = await gunzipAsync(capturedBuffer!); + const parsed = JSON.parse(decompressed.toString("utf-8")); + + expect(parsed.storedAt).toBeDefined(); + const storedAt = new Date(parsed.storedAt); + expect(storedAt.getTime()).toBeGreaterThanOrEqual(beforeStore.getTime()); + expect(storedAt.getTime()).toBeLessThanOrEqual(afterStore.getTime()); + }); + }); + + describe("getResults", () => { + it("should download and decompress results from S3", async () => { + const storageUri = "s3://bucket/org/run-results/run-123.json.gz"; + const storedData: StoredRunResults = { + runId: "run-123", + success: true, + data: { result: "test" }, + stepResults: [{ stepId: "step-1", success: true, data: { foo: "bar" } }], + toolPayload: { input: "value" }, + storedAt: new Date("2024-01-15T10:00:00Z"), + }; + + // Compress the data as it would be stored + const compressed = await gzipAsync(Buffer.from(JSON.stringify(storedData), "utf-8")); + mockFileService.downloadFile.mockResolvedValue(compressed); + + const result = await service.getResults(storageUri, { orgId: "test-org" }); + + expect(mockFileService.downloadFile).toHaveBeenCalledWith(storageUri, { orgId: "test-org" }); + expect(result).not.toBeNull(); + expect(result!.runId).toBe("run-123"); + expect(result!.success).toBe(true); + expect(result!.data).toEqual({ result: "test" }); + expect(result!.stepResults).toHaveLength(1); + expect(result!.toolPayload).toEqual({ input: "value" }); + }); + + it("should parse storedAt string back to Date", async () => { + const storageUri = "s3://bucket/org/run-results/run-123.json.gz"; + const storedData = { + runId: "run-123", + success: true, + data: null, + stepResults: [], + toolPayload: {}, + storedAt: "2024-01-15T10:00:00.000Z", // String as stored in JSON + }; + + const compressed = await gzipAsync(Buffer.from(JSON.stringify(storedData), "utf-8")); + mockFileService.downloadFile.mockResolvedValue(compressed); + + const result = await service.getResults(storageUri, { orgId: "test-org" }); + + expect(result!.storedAt).toBeInstanceOf(Date); + expect(result!.storedAt.toISOString()).toBe("2024-01-15T10:00:00.000Z"); + }); + + it("should return null when file not found", async () => { + const storageUri = "s3://bucket/org/run-results/missing.json.gz"; + mockFileService.downloadFile.mockRejectedValue( + new Error("NoSuchKey: The specified key does not exist"), + ); + + const result = await service.getResults(storageUri, { orgId: "test-org" }); + + expect(result).toBeNull(); + }); + + it("should return null when file is corrupted", async () => { + const storageUri = "s3://bucket/org/run-results/corrupted.json.gz"; + // Return invalid gzip data + mockFileService.downloadFile.mockResolvedValue(Buffer.from("not gzipped data")); + + const result = await service.getResults(storageUri, { orgId: "test-org" }); + + expect(result).toBeNull(); + }); + + it("should return null when JSON is invalid", async () => { + const storageUri = "s3://bucket/org/run-results/bad-json.json.gz"; + // Return valid gzip but invalid JSON + const compressed = await gzipAsync(Buffer.from("{ invalid json }", "utf-8")); + mockFileService.downloadFile.mockResolvedValue(compressed); + + const result = await service.getResults(storageUri, { orgId: "test-org" }); + + expect(result).toBeNull(); + }); + }); + + describe("deleteResults", () => { + it("should delete file from S3", async () => { + const storageUri = "s3://bucket/org/run-results/run-123.json.gz"; + + await service.deleteResults(storageUri, { orgId: "test-org" }); + + expect(mockFileService.deleteFile).toHaveBeenCalledWith(storageUri, { orgId: "test-org" }); + }); + + it("should handle delete errors gracefully", async () => { + const storageUri = "s3://bucket/org/run-results/run-123.json.gz"; + mockFileService.deleteFile.mockRejectedValue(new Error("Access denied")); + + // Should not throw + await expect( + service.deleteResults(storageUri, { orgId: "test-org" }), + ).resolves.toBeUndefined(); + }); + }); + + describe("gzip/gunzip roundtrip", () => { + it("should preserve all data through store/retrieve cycle", async () => { + const storageUri = "s3://bucket/org/run-results/roundtrip.json.gz"; + const originalResults = { + runId: "roundtrip-test", + success: false, + data: { + nested: { deeply: { value: 123 } }, + array: [1, 2, 3], + nullValue: null, + unicode: "日本語テスト 🎉", + }, + stepResults: [ + { stepId: "step-1", success: true, data: { a: 1 } }, + { stepId: "step-2", success: false, data: null, error: "Something failed" }, + ], + toolPayload: { query: "test", options: { limit: 10 } }, + error: "Overall error message", + }; + + // Capture uploaded data + let uploadedBuffer: Buffer | null = null; + mockFileService.uploadFile.mockImplementation(async (_uri: string, buffer: Buffer) => { + uploadedBuffer = buffer; + }); + + // Store + await service.storeResults(storageUri, originalResults, { orgId: "test-org" }); + + // Simulate retrieval by returning the captured buffer + mockFileService.downloadFile.mockResolvedValue(uploadedBuffer!); + + // Retrieve + const retrieved = await service.getResults(storageUri, { orgId: "test-org" }); + + // Verify all fields preserved (except storedAt which is added) + expect(retrieved).not.toBeNull(); + expect(retrieved!.runId).toBe(originalResults.runId); + expect(retrieved!.success).toBe(originalResults.success); + expect(retrieved!.data).toEqual(originalResults.data); + expect(retrieved!.stepResults).toEqual(originalResults.stepResults); + expect(retrieved!.toolPayload).toEqual(originalResults.toolPayload); + expect(retrieved!.error).toBe(originalResults.error); + expect(retrieved!.storedAt).toBeInstanceOf(Date); + }); + }); +}); + +describe("generateRunResultsUri", () => { + const originalEnv = process.env.AWS_BUCKET_NAME; + + afterEach(() => { + if (originalEnv !== undefined) { + process.env.AWS_BUCKET_NAME = originalEnv; + } else { + delete process.env.AWS_BUCKET_NAME; + } + }); + + it("should generate correct S3 URI when bucket is configured", () => { + process.env.AWS_BUCKET_NAME = "my-bucket"; + + const uri = generateRunResultsUri("run-123", "org-456"); + + expect(uri).toBe("s3://my-bucket/org-456/run-results/run-123.json.gz"); + }); + + it("should return null when bucket is not configured", () => { + delete process.env.AWS_BUCKET_NAME; + + const uri = generateRunResultsUri("run-123", "org-456"); + + expect(uri).toBeNull(); + }); + + it("should handle empty org ID", () => { + process.env.AWS_BUCKET_NAME = "my-bucket"; + + const uri = generateRunResultsUri("run-123", ""); + + expect(uri).toBe("s3://my-bucket//run-results/run-123.json.gz"); + }); +}); + +describe("isFileStorageAvailable", () => { + const originalEnv = process.env.AWS_BUCKET_NAME; + + afterEach(() => { + if (originalEnv !== undefined) { + process.env.AWS_BUCKET_NAME = originalEnv; + } else { + delete process.env.AWS_BUCKET_NAME; + } + }); + + it("should return true when AWS_BUCKET_NAME is set", () => { + process.env.AWS_BUCKET_NAME = "my-bucket"; + + expect(isFileStorageAvailable()).toBe(true); + }); + + it("should return false when AWS_BUCKET_NAME is not set", () => { + delete process.env.AWS_BUCKET_NAME; + + expect(isFileStorageAvailable()).toBe(false); + }); + + it("should return false when AWS_BUCKET_NAME is empty string", () => { + process.env.AWS_BUCKET_NAME = ""; + + expect(isFileStorageAvailable()).toBe(false); + }); +}); diff --git a/packages/core/ee/run-results-service.ts b/packages/core/ee/run-results-service.ts new file mode 100644 index 00000000..a83b2a99 --- /dev/null +++ b/packages/core/ee/run-results-service.ts @@ -0,0 +1,93 @@ +import { gzip, gunzip } from "zlib"; +import { promisify } from "util"; +import { getFileService } from "../filestore/file-service.js"; +import { ServiceMetadata, StoredRunResults } from "@superglue/shared"; +import { logMessage } from "../utils/logs.js"; + +const gzipAsync = promisify(gzip); +const gunzipAsync = promisify(gunzip); + +/** + * Generate a deterministic storage URI for run results. + * This allows us to know the URI upfront before uploading. + */ +export function generateRunResultsUri(runId: string, orgId: string): string | null { + const bucketName = process.env.AWS_BUCKET_NAME; + if (!bucketName) return null; + return `s3://${bucketName}/${orgId}/run-results/${runId}.json.gz`; +} + +export class RunResultsService { + /** + * Store run results to S3 (gzipped) at the given URI + */ + async storeResults( + storageUri: string, + results: Omit, + metadata: ServiceMetadata, + ): Promise { + const fileService = getFileService(); + + const payload: StoredRunResults = { + ...results, + storedAt: new Date(), + }; + + const json = JSON.stringify(payload); + const compressed = await gzipAsync(Buffer.from(json, "utf-8")); + + await fileService.uploadFile(storageUri, compressed, metadata, { + contentType: "application/gzip", + }); + + logMessage("debug", `Stored run results to ${storageUri}`, metadata); + } + + /** + * Retrieve run results from S3 + */ + async getResults( + storageUri: string, + metadata: ServiceMetadata, + ): Promise { + try { + const fileService = getFileService(); + const compressed = await fileService.downloadFile(storageUri, metadata); + const json = await gunzipAsync(compressed); + const results = JSON.parse(json.toString("utf-8")) as StoredRunResults; + + // Parse storedAt back to Date + if (results.storedAt && typeof results.storedAt === "string") { + results.storedAt = new Date(results.storedAt); + } + + return results; + } catch (error) { + logMessage("warn", `Failed to retrieve run results from ${storageUri}: ${error}`, metadata); + return null; + } + } + + /** + * Delete stored results + */ + async deleteResults(storageUri: string, metadata: ServiceMetadata): Promise { + try { + const fileService = getFileService(); + await fileService.deleteFile(storageUri, metadata); + logMessage("debug", `Deleted run results at ${storageUri}`, metadata); + } catch (error) { + logMessage("warn", `Failed to delete run results from ${storageUri}: ${error}`, metadata); + } + } +} + +// Singleton +let _runResultsService: RunResultsService | null = null; + +export function getRunResultsService(): RunResultsService { + if (!_runResultsService) { + _runResultsService = new RunResultsService(); + } + return _runResultsService; +} diff --git a/packages/core/filestore/file-service.ts b/packages/core/filestore/file-service.ts index 96e287ed..ad6dccd3 100644 --- a/packages/core/filestore/file-service.ts +++ b/packages/core/filestore/file-service.ts @@ -70,3 +70,10 @@ export function getFileService(): FileService { } return _fileService; } + +/** + * Check if cloud file storage is available (e.g., S3 bucket configured) + */ +export function isFileStorageAvailable(): boolean { + return !!process.env.AWS_BUCKET_NAME; +} diff --git a/packages/core/runs/run-lifecycle.ts b/packages/core/runs/run-lifecycle.ts index d0dabe56..f1796597 100644 --- a/packages/core/runs/run-lifecycle.ts +++ b/packages/core/runs/run-lifecycle.ts @@ -3,9 +3,17 @@ * Context is passed through (not stored) for stateless operation. */ -import type { RequestOptions, RequestSource, Tool, ToolStepResult } from "@superglue/shared"; -import { RunStatus, RequestSource as RSrc } from "@superglue/shared"; +import type { + RequestOptions, + RequestSource, + StoredRunResults, + Tool, + ToolStepResult, +} from "@superglue/shared"; +import { RunStatus, RequestSource as RSrc, sampleResultObject } from "@superglue/shared"; import type { DataStore } from "../datastore/types.js"; +import { generateRunResultsUri, getRunResultsService } from "../ee/run-results-service.js"; +import { isFileStorageAvailable } from "../filestore/file-service.js"; import { NotificationService } from "../notifications/index.js"; import { logMessage } from "../utils/logs.js"; @@ -116,6 +124,17 @@ export class RunLifecycleManager { }, }); + // Fire-and-forget: Store run results to S3 if enabled + this.maybeStoreRunResults({ + runId: context.runId, + success: result.success, + data: result.data ?? null, + stepResults: result.stepResults ?? [], + toolPayload: result.payload ?? {}, + error: result.error, + storedAt: new Date(), + }); + // Send notification for failed runs (fire-and-forget) if (!result.success) { this.sendFailureNotification(context, result, completedAt); @@ -243,4 +262,52 @@ export class RunLifecycleManager { }) .catch((err) => logMessage("error", `Notification failed: ${err}`, this.metadata)); } + + /** + * Check if run results storage is enabled for this org (EE feature) + * Returns true if file storage is available AND org has the feature enabled + */ + private async isRunResultsStorageEnabled(): Promise { + if (!isFileStorageAvailable()) { + return false; + } + const orgSettings = await this.datastore.getOrgSettings({ orgId: this.orgId }); + return !!orgSettings?.preferences?.storeRunResults; + } + + /** + * Fire-and-forget: Check org settings and store run results to S3 if enabled + * Also updates the run with the storage URI in the database + */ + private maybeStoreRunResults(results: StoredRunResults): void { + setImmediate(async () => { + try { + const enabled = await this.isRunResultsStorageEnabled(); + if (!enabled) return; + + const storageUri = generateRunResultsUri(results.runId, this.orgId); + if (!storageUri) return; + + // Update run with storage URI + await this.datastore.updateRun({ + id: results.runId, + orgId: this.orgId, + updates: { + resultStorageUri: storageUri, + }, + }); + + // Upload to S3 with full (non-truncated) payload and result + await getRunResultsService().storeResults(storageUri, results, { orgId: this.orgId }); + + logMessage("debug", `Stored run results to S3: ${storageUri}`, this.metadata); + } catch (err) { + logMessage( + "warn", + `Failed to store run results for ${results.runId}: ${err}`, + this.metadata, + ); + } + }); + } } diff --git a/packages/shared/types.ts b/packages/shared/types.ts index c8084aa6..c5bdf515 100644 --- a/packages/shared/types.ts +++ b/packages/shared/types.ts @@ -406,6 +406,18 @@ export interface Run { requestSource?: RequestSource; traceId?: string; metadata: RunMetadata; + resultStorageUri?: string; // FileService URI where full results are stored (EE feature) +} + +// Stored run results in FileService (EE feature) +export interface StoredRunResults { + runId: string; + success: boolean; + data: any; + stepResults: ToolStepResult[]; + toolPayload: Record; + error?: string; + storedAt: Date; } export interface ApiCallArgs { diff --git a/packages/web/next-env.d.ts b/packages/web/next-env.d.ts index c4b7818f..9edff1c7 100644 --- a/packages/web/next-env.d.ts +++ b/packages/web/next-env.d.ts @@ -1,6 +1,6 @@ /// /// -import "./.next/dev/types/routes.d.ts"; +import "./.next/types/routes.d.ts"; // NOTE: This file should not be edited // see https://nextjs.org/docs/app/api-reference/config/typescript for more information. From a08437b0d5407c06d64ae6f79035f29fc177a026 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Tue, 3 Feb 2026 16:21:07 +0000 Subject: [PATCH 2/2] Remove hosted-only files from sync --- packages/core/api/ee/run-results.ts | 79 ----- packages/core/ee/run-results-service.test.ts | 323 ------------------- packages/core/ee/run-results-service.ts | 93 ------ 3 files changed, 495 deletions(-) delete mode 100644 packages/core/api/ee/run-results.ts delete mode 100644 packages/core/ee/run-results-service.test.ts delete mode 100644 packages/core/ee/run-results-service.ts diff --git a/packages/core/api/ee/run-results.ts b/packages/core/api/ee/run-results.ts deleted file mode 100644 index 36cceef9..00000000 --- a/packages/core/api/ee/run-results.ts +++ /dev/null @@ -1,79 +0,0 @@ -import { StoredRunResults } from "@superglue/shared"; -import { getRunResultsService } from "../../ee/run-results-service.js"; -import { registerApiModule } from "../registry.js"; -import { addTraceHeader, sendError } from "../response-helpers.js"; -import type { AuthenticatedFastifyRequest, RouteHandler } from "../types.js"; -import { isFileStorageAvailable } from "../../filestore/file-service.js"; - -// GET /runs/:runId/results - Fetch stored run results from S3 -const getRunResults: RouteHandler = async (request, reply) => { - const authReq = request as AuthenticatedFastifyRequest; - const params = request.params as { runId: string }; - - // Check if S3 infrastructure is available - if (!isFileStorageAvailable()) { - return sendError( - reply, - 503, - "Run results storage is not enabled. Configure file storage to enable this feature.", - ); - } - - // Get the run to find the storage URI - const run = await authReq.datastore.getRun({ - id: params.runId, - orgId: authReq.authInfo.orgId, - }); - - if (!run) { - return sendError(reply, 404, "Run not found"); - } - - if (!run.resultStorageUri) { - return addTraceHeader(reply, authReq.traceId).send({ - success: true, - data: null, - message: "No stored results available for this run", - }); - } - - // Fetch results from S3 - const metadata = { orgId: authReq.authInfo.orgId, traceId: authReq.traceId }; - const runResultsService = getRunResultsService(); - const results = await runResultsService.getResults(run.resultStorageUri, metadata); - - if (!results) { - return addTraceHeader(reply, authReq.traceId).send({ - success: true, - data: null, - message: "Results file not found or corrupted", - }); - } - - // Convert storedAt to ISO string for JSON response - const response: Omit & { storedAt: string } = { - ...results, - storedAt: results.storedAt instanceof Date ? results.storedAt.toISOString() : results.storedAt, - }; - - return addTraceHeader(reply, authReq.traceId).send({ - success: true, - data: response, - }); -}; - -registerApiModule({ - name: "run-results", - routes: [ - { - method: "GET", - path: "/runs/:runId/results", - handler: getRunResults, - permissions: { - type: "read", - resource: "run", - allowRestricted: true, - }, - }, - ], -}); diff --git a/packages/core/ee/run-results-service.test.ts b/packages/core/ee/run-results-service.test.ts deleted file mode 100644 index c3d21fd0..00000000 --- a/packages/core/ee/run-results-service.test.ts +++ /dev/null @@ -1,323 +0,0 @@ -import { describe, expect, it, vi, beforeEach, afterEach } from "vitest"; -import { gzip } from "zlib"; -import { promisify } from "util"; -import { - RunResultsService, - generateRunResultsUri, - getRunResultsService, -} from "./run-results-service.js"; -import type { StoredRunResults } from "@superglue/shared"; - -const gzipAsync = promisify(gzip); - -// Mock the file service (but not isFileStorageAvailable - we test the real implementation) -vi.mock("../filestore/file-service.js", async (importOriginal) => { - const actual = await importOriginal(); - return { - ...actual, - getFileService: vi.fn(), - }; -}); - -// Mock the logs -vi.mock("../utils/logs.js", () => ({ - logMessage: vi.fn(), -})); - -import { getFileService, isFileStorageAvailable } from "../filestore/file-service.js"; - -describe("RunResultsService", () => { - let service: RunResultsService; - let mockFileService: { - uploadFile: ReturnType; - downloadFile: ReturnType; - deleteFile: ReturnType; - }; - - beforeEach(() => { - service = new RunResultsService(); - mockFileService = { - uploadFile: vi.fn().mockResolvedValue(undefined), - downloadFile: vi.fn(), - deleteFile: vi.fn().mockResolvedValue(undefined), - }; - vi.mocked(getFileService).mockReturnValue(mockFileService as any); - }); - - afterEach(() => { - vi.clearAllMocks(); - }); - - describe("storeResults", () => { - it("should gzip and upload results to S3", async () => { - const storageUri = "s3://bucket/org/run-results/run-123.json.gz"; - const results = { - runId: "run-123", - success: true, - data: { result: "test" }, - stepResults: [{ stepId: "step-1", success: true, data: { foo: "bar" } }], - toolPayload: { input: "value" }, - }; - const metadata = { orgId: "test-org" }; - - await service.storeResults(storageUri, results, metadata); - - expect(mockFileService.uploadFile).toHaveBeenCalledTimes(1); - expect(mockFileService.uploadFile).toHaveBeenCalledWith( - storageUri, - expect.any(Buffer), - metadata, - { contentType: "application/gzip" }, - ); - - // Verify the uploaded data is valid gzipped JSON with storedAt added - const uploadedBuffer = mockFileService.uploadFile.mock.calls[0][1] as Buffer; - expect(uploadedBuffer).toBeInstanceOf(Buffer); - }); - - it("should add storedAt timestamp to results", async () => { - const storageUri = "s3://bucket/org/run-results/run-123.json.gz"; - const results = { - runId: "run-123", - success: true, - data: null, - stepResults: [], - toolPayload: {}, - }; - - // Capture the uploaded buffer to verify storedAt - let capturedBuffer: Buffer | null = null; - mockFileService.uploadFile.mockImplementation(async (uri: string, buffer: Buffer) => { - capturedBuffer = buffer; - }); - - const beforeStore = new Date(); - await service.storeResults(storageUri, results, { orgId: "test-org" }); - const afterStore = new Date(); - - // Decompress and parse to verify storedAt - const { gunzip } = await import("zlib"); - const gunzipAsync = promisify(gunzip); - const decompressed = await gunzipAsync(capturedBuffer!); - const parsed = JSON.parse(decompressed.toString("utf-8")); - - expect(parsed.storedAt).toBeDefined(); - const storedAt = new Date(parsed.storedAt); - expect(storedAt.getTime()).toBeGreaterThanOrEqual(beforeStore.getTime()); - expect(storedAt.getTime()).toBeLessThanOrEqual(afterStore.getTime()); - }); - }); - - describe("getResults", () => { - it("should download and decompress results from S3", async () => { - const storageUri = "s3://bucket/org/run-results/run-123.json.gz"; - const storedData: StoredRunResults = { - runId: "run-123", - success: true, - data: { result: "test" }, - stepResults: [{ stepId: "step-1", success: true, data: { foo: "bar" } }], - toolPayload: { input: "value" }, - storedAt: new Date("2024-01-15T10:00:00Z"), - }; - - // Compress the data as it would be stored - const compressed = await gzipAsync(Buffer.from(JSON.stringify(storedData), "utf-8")); - mockFileService.downloadFile.mockResolvedValue(compressed); - - const result = await service.getResults(storageUri, { orgId: "test-org" }); - - expect(mockFileService.downloadFile).toHaveBeenCalledWith(storageUri, { orgId: "test-org" }); - expect(result).not.toBeNull(); - expect(result!.runId).toBe("run-123"); - expect(result!.success).toBe(true); - expect(result!.data).toEqual({ result: "test" }); - expect(result!.stepResults).toHaveLength(1); - expect(result!.toolPayload).toEqual({ input: "value" }); - }); - - it("should parse storedAt string back to Date", async () => { - const storageUri = "s3://bucket/org/run-results/run-123.json.gz"; - const storedData = { - runId: "run-123", - success: true, - data: null, - stepResults: [], - toolPayload: {}, - storedAt: "2024-01-15T10:00:00.000Z", // String as stored in JSON - }; - - const compressed = await gzipAsync(Buffer.from(JSON.stringify(storedData), "utf-8")); - mockFileService.downloadFile.mockResolvedValue(compressed); - - const result = await service.getResults(storageUri, { orgId: "test-org" }); - - expect(result!.storedAt).toBeInstanceOf(Date); - expect(result!.storedAt.toISOString()).toBe("2024-01-15T10:00:00.000Z"); - }); - - it("should return null when file not found", async () => { - const storageUri = "s3://bucket/org/run-results/missing.json.gz"; - mockFileService.downloadFile.mockRejectedValue( - new Error("NoSuchKey: The specified key does not exist"), - ); - - const result = await service.getResults(storageUri, { orgId: "test-org" }); - - expect(result).toBeNull(); - }); - - it("should return null when file is corrupted", async () => { - const storageUri = "s3://bucket/org/run-results/corrupted.json.gz"; - // Return invalid gzip data - mockFileService.downloadFile.mockResolvedValue(Buffer.from("not gzipped data")); - - const result = await service.getResults(storageUri, { orgId: "test-org" }); - - expect(result).toBeNull(); - }); - - it("should return null when JSON is invalid", async () => { - const storageUri = "s3://bucket/org/run-results/bad-json.json.gz"; - // Return valid gzip but invalid JSON - const compressed = await gzipAsync(Buffer.from("{ invalid json }", "utf-8")); - mockFileService.downloadFile.mockResolvedValue(compressed); - - const result = await service.getResults(storageUri, { orgId: "test-org" }); - - expect(result).toBeNull(); - }); - }); - - describe("deleteResults", () => { - it("should delete file from S3", async () => { - const storageUri = "s3://bucket/org/run-results/run-123.json.gz"; - - await service.deleteResults(storageUri, { orgId: "test-org" }); - - expect(mockFileService.deleteFile).toHaveBeenCalledWith(storageUri, { orgId: "test-org" }); - }); - - it("should handle delete errors gracefully", async () => { - const storageUri = "s3://bucket/org/run-results/run-123.json.gz"; - mockFileService.deleteFile.mockRejectedValue(new Error("Access denied")); - - // Should not throw - await expect( - service.deleteResults(storageUri, { orgId: "test-org" }), - ).resolves.toBeUndefined(); - }); - }); - - describe("gzip/gunzip roundtrip", () => { - it("should preserve all data through store/retrieve cycle", async () => { - const storageUri = "s3://bucket/org/run-results/roundtrip.json.gz"; - const originalResults = { - runId: "roundtrip-test", - success: false, - data: { - nested: { deeply: { value: 123 } }, - array: [1, 2, 3], - nullValue: null, - unicode: "日本語テスト 🎉", - }, - stepResults: [ - { stepId: "step-1", success: true, data: { a: 1 } }, - { stepId: "step-2", success: false, data: null, error: "Something failed" }, - ], - toolPayload: { query: "test", options: { limit: 10 } }, - error: "Overall error message", - }; - - // Capture uploaded data - let uploadedBuffer: Buffer | null = null; - mockFileService.uploadFile.mockImplementation(async (_uri: string, buffer: Buffer) => { - uploadedBuffer = buffer; - }); - - // Store - await service.storeResults(storageUri, originalResults, { orgId: "test-org" }); - - // Simulate retrieval by returning the captured buffer - mockFileService.downloadFile.mockResolvedValue(uploadedBuffer!); - - // Retrieve - const retrieved = await service.getResults(storageUri, { orgId: "test-org" }); - - // Verify all fields preserved (except storedAt which is added) - expect(retrieved).not.toBeNull(); - expect(retrieved!.runId).toBe(originalResults.runId); - expect(retrieved!.success).toBe(originalResults.success); - expect(retrieved!.data).toEqual(originalResults.data); - expect(retrieved!.stepResults).toEqual(originalResults.stepResults); - expect(retrieved!.toolPayload).toEqual(originalResults.toolPayload); - expect(retrieved!.error).toBe(originalResults.error); - expect(retrieved!.storedAt).toBeInstanceOf(Date); - }); - }); -}); - -describe("generateRunResultsUri", () => { - const originalEnv = process.env.AWS_BUCKET_NAME; - - afterEach(() => { - if (originalEnv !== undefined) { - process.env.AWS_BUCKET_NAME = originalEnv; - } else { - delete process.env.AWS_BUCKET_NAME; - } - }); - - it("should generate correct S3 URI when bucket is configured", () => { - process.env.AWS_BUCKET_NAME = "my-bucket"; - - const uri = generateRunResultsUri("run-123", "org-456"); - - expect(uri).toBe("s3://my-bucket/org-456/run-results/run-123.json.gz"); - }); - - it("should return null when bucket is not configured", () => { - delete process.env.AWS_BUCKET_NAME; - - const uri = generateRunResultsUri("run-123", "org-456"); - - expect(uri).toBeNull(); - }); - - it("should handle empty org ID", () => { - process.env.AWS_BUCKET_NAME = "my-bucket"; - - const uri = generateRunResultsUri("run-123", ""); - - expect(uri).toBe("s3://my-bucket//run-results/run-123.json.gz"); - }); -}); - -describe("isFileStorageAvailable", () => { - const originalEnv = process.env.AWS_BUCKET_NAME; - - afterEach(() => { - if (originalEnv !== undefined) { - process.env.AWS_BUCKET_NAME = originalEnv; - } else { - delete process.env.AWS_BUCKET_NAME; - } - }); - - it("should return true when AWS_BUCKET_NAME is set", () => { - process.env.AWS_BUCKET_NAME = "my-bucket"; - - expect(isFileStorageAvailable()).toBe(true); - }); - - it("should return false when AWS_BUCKET_NAME is not set", () => { - delete process.env.AWS_BUCKET_NAME; - - expect(isFileStorageAvailable()).toBe(false); - }); - - it("should return false when AWS_BUCKET_NAME is empty string", () => { - process.env.AWS_BUCKET_NAME = ""; - - expect(isFileStorageAvailable()).toBe(false); - }); -}); diff --git a/packages/core/ee/run-results-service.ts b/packages/core/ee/run-results-service.ts deleted file mode 100644 index a83b2a99..00000000 --- a/packages/core/ee/run-results-service.ts +++ /dev/null @@ -1,93 +0,0 @@ -import { gzip, gunzip } from "zlib"; -import { promisify } from "util"; -import { getFileService } from "../filestore/file-service.js"; -import { ServiceMetadata, StoredRunResults } from "@superglue/shared"; -import { logMessage } from "../utils/logs.js"; - -const gzipAsync = promisify(gzip); -const gunzipAsync = promisify(gunzip); - -/** - * Generate a deterministic storage URI for run results. - * This allows us to know the URI upfront before uploading. - */ -export function generateRunResultsUri(runId: string, orgId: string): string | null { - const bucketName = process.env.AWS_BUCKET_NAME; - if (!bucketName) return null; - return `s3://${bucketName}/${orgId}/run-results/${runId}.json.gz`; -} - -export class RunResultsService { - /** - * Store run results to S3 (gzipped) at the given URI - */ - async storeResults( - storageUri: string, - results: Omit, - metadata: ServiceMetadata, - ): Promise { - const fileService = getFileService(); - - const payload: StoredRunResults = { - ...results, - storedAt: new Date(), - }; - - const json = JSON.stringify(payload); - const compressed = await gzipAsync(Buffer.from(json, "utf-8")); - - await fileService.uploadFile(storageUri, compressed, metadata, { - contentType: "application/gzip", - }); - - logMessage("debug", `Stored run results to ${storageUri}`, metadata); - } - - /** - * Retrieve run results from S3 - */ - async getResults( - storageUri: string, - metadata: ServiceMetadata, - ): Promise { - try { - const fileService = getFileService(); - const compressed = await fileService.downloadFile(storageUri, metadata); - const json = await gunzipAsync(compressed); - const results = JSON.parse(json.toString("utf-8")) as StoredRunResults; - - // Parse storedAt back to Date - if (results.storedAt && typeof results.storedAt === "string") { - results.storedAt = new Date(results.storedAt); - } - - return results; - } catch (error) { - logMessage("warn", `Failed to retrieve run results from ${storageUri}: ${error}`, metadata); - return null; - } - } - - /** - * Delete stored results - */ - async deleteResults(storageUri: string, metadata: ServiceMetadata): Promise { - try { - const fileService = getFileService(); - await fileService.deleteFile(storageUri, metadata); - logMessage("debug", `Deleted run results at ${storageUri}`, metadata); - } catch (error) { - logMessage("warn", `Failed to delete run results from ${storageUri}: ${error}`, metadata); - } - } -} - -// Singleton -let _runResultsService: RunResultsService | null = null; - -export function getRunResultsService(): RunResultsService { - if (!_runResultsService) { - _runResultsService = new RunResultsService(); - } - return _runResultsService; -}