From 3a2095db5dc6be067405faf9fb5b6b12a7a75487 Mon Sep 17 00:00:00 2001 From: Alex <132889147+alexvcodesphere@users.noreply.github.com> Date: Fri, 20 Feb 2026 18:21:18 +0100 Subject: [PATCH] feat(api): add StreamLogs SSE method to workspace client Adds a StreamLogs method to api.Client that connects to the Codesphere SSE log endpoint (/workspaces/{id}/logs/{stage}/{step}) and writes parsed log entries to an io.Writer. Supports context cancellation for use with concurrent pipeline polling. Signed-off-by: Alex <132889147+alexvcodesphere@users.noreply.github.com> --- api/workspace.go | 101 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 101 insertions(+) diff --git a/api/workspace.go b/api/workspace.go index 2ec8ef1..9961c79 100644 --- a/api/workspace.go +++ b/api/workspace.go @@ -4,7 +4,14 @@ package api import ( + "bufio" + "context" + "encoding/json" "fmt" + "io" + "log" + "net/http" + "strings" "github.com/codesphere-cloud/cs-go/api/errors" "github.com/codesphere-cloud/cs-go/api/openapi_client" @@ -210,3 +217,97 @@ func (c Client) GitPull(workspaceId int, remote string, branch string) error { _, err := req.Execute() return errors.FormatAPIError(err) } + +// logEntry represents a single log line from the SSE stream. +type logEntry struct { + Timestamp string `json:"timestamp"` + Kind string `json:"kind"` + Data string `json:"data"` +} + +// StreamLogs connects to the Codesphere SSE log endpoint and writes parsed +// log entries to the provided writer until the context is cancelled or the +// stream ends. This is used during pipeline execution to provide real-time +// log output. +func (c *Client) StreamLogs(ctx context.Context, apiUrl string, wsId int, stage string, step int, w io.Writer) error { + endpoint := fmt.Sprintf("%s/workspaces/%d/logs/%s/%d", apiUrl, wsId, stage, step) + + req, err := http.NewRequestWithContext(ctx, "GET", endpoint, nil) + if err != nil { + return fmt.Errorf("failed to construct log stream request: %w", err) + } + + req.Header.Set("Accept", "text/event-stream") + + // Set auth from the client's context token + if token, ok := ctx.Value(openapi_client.ContextAccessToken).(string); ok && token != "" { + req.Header.Set("Authorization", "Bearer "+token) + } else if token, ok := c.ctx.Value(openapi_client.ContextAccessToken).(string); ok && token != "" { + req.Header.Set("Authorization", "Bearer "+token) + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + // Context cancellation is expected when the stage finishes + if ctx.Err() != nil { + return nil + } + return fmt.Errorf("failed to connect to log stream: %w", err) + } + defer func() { _ = resp.Body.Close() }() + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("log stream responded with status %d", resp.StatusCode) + } + + reader := bufio.NewReader(resp.Body) + + for { + // Check if context is done + select { + case <-ctx.Done(): + return nil + default: + } + + // Parse one SSE event + var eventData string + for { + line, err := reader.ReadString('\n') + if err != nil { + if ctx.Err() != nil || err == io.EOF { + return nil + } + return fmt.Errorf("failed to read log stream: %w", err) + } + + line = strings.TrimSpace(line) + + if strings.HasPrefix(line, "data:") { + data := strings.TrimSpace(strings.TrimPrefix(line, "data:")) + if eventData != "" { + eventData += "\n" + data + } else { + eventData = data + } + } else if line == "" && eventData != "" { + // Empty line marks end of SSE event + break + } + } + + // Parse and print log entries + var entries []logEntry + if err := json.Unmarshal([]byte(eventData), &entries); err != nil { + // Skip unparseable events (e.g. error responses) + log.Printf("⚠ log stream: %s", eventData) + eventData = "" + continue + } + + for _, entry := range entries { + fmt.Fprintf(w, "%s | %s\n", entry.Timestamp, entry.Data) + } + eventData = "" + } +}