Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 101 additions & 0 deletions api/workspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,14 @@
package api

import (
"bufio"
"context"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"strings"

"github.com/codesphere-cloud/cs-go/api/errors"
"github.com/codesphere-cloud/cs-go/api/openapi_client"
Expand Down Expand Up @@ -210,3 +217,97 @@
_, err := req.Execute()
return errors.FormatAPIError(err)
}

// logEntry represents a single log line from the SSE stream.
type logEntry struct {
Timestamp string `json:"timestamp"`
Kind string `json:"kind"`
Data string `json:"data"`
}

// StreamLogs connects to the Codesphere SSE log endpoint and writes parsed
// log entries to the provided writer until the context is cancelled or the
// stream ends. This is used during pipeline execution to provide real-time
// log output.
func (c *Client) StreamLogs(ctx context.Context, apiUrl string, wsId int, stage string, step int, w io.Writer) error {
endpoint := fmt.Sprintf("%s/workspaces/%d/logs/%s/%d", apiUrl, wsId, stage, step)

req, err := http.NewRequestWithContext(ctx, "GET", endpoint, nil)
if err != nil {
return fmt.Errorf("failed to construct log stream request: %w", err)
}

req.Header.Set("Accept", "text/event-stream")

// Set auth from the client's context token
if token, ok := ctx.Value(openapi_client.ContextAccessToken).(string); ok && token != "" {
req.Header.Set("Authorization", "Bearer "+token)
} else if token, ok := c.ctx.Value(openapi_client.ContextAccessToken).(string); ok && token != "" {
req.Header.Set("Authorization", "Bearer "+token)
}

resp, err := http.DefaultClient.Do(req)
if err != nil {
// Context cancellation is expected when the stage finishes
if ctx.Err() != nil {
return nil
}
return fmt.Errorf("failed to connect to log stream: %w", err)
}
defer func() { _ = resp.Body.Close() }()

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("log stream responded with status %d", resp.StatusCode)
}

reader := bufio.NewReader(resp.Body)

for {
// Check if context is done
select {
case <-ctx.Done():
return nil
default:
}

// Parse one SSE event
var eventData string
for {
line, err := reader.ReadString('\n')
if err != nil {
if ctx.Err() != nil || err == io.EOF {
return nil
}
return fmt.Errorf("failed to read log stream: %w", err)
}

line = strings.TrimSpace(line)

if strings.HasPrefix(line, "data:") {
data := strings.TrimSpace(strings.TrimPrefix(line, "data:"))
if eventData != "" {
eventData += "\n" + data
} else {
eventData = data
}
} else if line == "" && eventData != "" {
// Empty line marks end of SSE event
break
}
}

// Parse and print log entries
var entries []logEntry
if err := json.Unmarshal([]byte(eventData), &entries); err != nil {
// Skip unparseable events (e.g. error responses)
log.Printf("⚠ log stream: %s", eventData)
eventData = ""
continue
}

for _, entry := range entries {
fmt.Fprintf(w, "%s | %s\n", entry.Timestamp, entry.Data)

Check failure on line 309 in api/workspace.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `fmt.Fprintf` is not checked (errcheck)
}
eventData = ""
}
}
Loading