package proxy import ( "bufio" "bytes" "context" "encoding/base64" "fmt" "io" "net/http" "time" "chorus/pkg/seqthink/ageio" "github.com/rs/zerolog/log" ) // SSEFrame represents a single Server-Sent Event frame type SSEFrame struct { Event string `json:"event,omitempty"` Data string `json:"data"` ID string `json:"id,omitempty"` } // handleSSEEncrypted handles encrypted Server-Sent Events streaming func (s *Server) handleSSEEncrypted(w http.ResponseWriter, r *http.Request) { s.config.Metrics.IncrementRequests() startTime := time.Now() // Set SSE headers w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") w.Header().Set("X-Accel-Buffering", "no") // Disable nginx buffering // Create flusher for streaming flusher, ok := w.(http.Flusher) if !ok { log.Error().Msg("Streaming not supported") http.Error(w, "Streaming not supported", http.StatusInternalServerError) return } // Create encryptor for streaming encryptor, err := ageio.NewEncryptor(s.config.AgeRecipsPath) if err != nil { log.Error().Err(err).Msg("Failed to create encryptor") http.Error(w, "Encryption initialization failed", http.StatusInternalServerError) return } // Create context with timeout ctx, cancel := context.WithTimeout(r.Context(), 5*time.Minute) defer cancel() log.Info().Msg("Starting encrypted SSE stream") // Simulate streaming encrypted frames // In production, this would stream from MCP server frameCount := 0 ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() for { select { case <-ctx.Done(): log.Info(). Int("frames_sent", frameCount). Dur("duration", time.Since(startTime)). Msg("SSE stream closed") return case <-ticker.C: frameCount++ // Create frame data frameData := fmt.Sprintf(`{"thought_number":%d,"thought":"Processing...","next_thought_needed":true}`, frameCount) // Encrypt frame encryptedFrame, err := encryptor.Encrypt([]byte(frameData)) if err != nil { log.Error().Err(err).Msg("Failed to encrypt SSE frame") continue } // Base64 encode for SSE transmission encodedFrame := base64.StdEncoding.EncodeToString(encryptedFrame) // Send SSE frame fmt.Fprintf(w, "event: thought\n") fmt.Fprintf(w, "data: %s\n", encodedFrame) fmt.Fprintf(w, "id: %d\n\n", frameCount) flusher.Flush() log.Debug(). Int("frame", frameCount). Int("encrypted_size", len(encryptedFrame)). Msg("Sent encrypted SSE frame") // Stop after 10 frames for demo if frameCount >= 10 { fmt.Fprintf(w, "event: done\n") fmt.Fprintf(w, "data: complete\n\n") flusher.Flush() log.Info(). Int("frames_sent", frameCount). Dur("duration", time.Since(startTime)). Msg("SSE stream completed") return } } } } // handleSSEPlaintext handles plaintext Server-Sent Events streaming func (s *Server) handleSSEPlaintext(w http.ResponseWriter, r *http.Request) { s.config.Metrics.IncrementRequests() startTime := time.Now() // Set SSE headers w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") w.Header().Set("X-Accel-Buffering", "no") // Create flusher for streaming flusher, ok := w.(http.Flusher) if !ok { log.Error().Msg("Streaming not supported") http.Error(w, "Streaming not supported", http.StatusInternalServerError) return } // Create context with timeout ctx, cancel := context.WithTimeout(r.Context(), 5*time.Minute) defer cancel() log.Info().Msg("Starting plaintext SSE stream") // Simulate streaming frames frameCount := 0 ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() for { select { case <-ctx.Done(): log.Info(). Int("frames_sent", frameCount). Dur("duration", time.Since(startTime)). Msg("SSE stream closed") return case <-ticker.C: frameCount++ // Create frame data frameData := fmt.Sprintf(`{"thought_number":%d,"thought":"Processing...","next_thought_needed":true}`, frameCount) // Send SSE frame fmt.Fprintf(w, "event: thought\n") fmt.Fprintf(w, "data: %s\n", frameData) fmt.Fprintf(w, "id: %d\n\n", frameCount) flusher.Flush() log.Debug(). Int("frame", frameCount). Msg("Sent plaintext SSE frame") // Stop after 10 frames for demo if frameCount >= 10 { fmt.Fprintf(w, "event: done\n") fmt.Fprintf(w, "data: complete\n\n") flusher.Flush() log.Info(). Int("frames_sent", frameCount). Dur("duration", time.Since(startTime)). Msg("SSE stream completed") return } } } } // DecryptSSEFrame decrypts a base64-encoded encrypted SSE frame func DecryptSSEFrame(encodedFrame string, identityPath string) ([]byte, error) { // Base64 decode encryptedFrame, err := base64.StdEncoding.DecodeString(encodedFrame) if err != nil { return nil, fmt.Errorf("base64 decode: %w", err) } // Create decryptor decryptor, err := ageio.NewDecryptor(identityPath) if err != nil { return nil, fmt.Errorf("create decryptor: %w", err) } // Decrypt plaintext, err := decryptor.Decrypt(encryptedFrame) if err != nil { return nil, fmt.Errorf("decrypt: %w", err) } return plaintext, nil } // ReadSSEStream reads an SSE stream and returns frames func ReadSSEStream(r io.Reader) ([]SSEFrame, error) { var frames []SSEFrame scanner := bufio.NewScanner(r) var currentFrame SSEFrame for scanner.Scan() { line := scanner.Text() if line == "" { // Empty line signals end of frame if currentFrame.Data != "" { frames = append(frames, currentFrame) currentFrame = SSEFrame{} } continue } // Parse SSE field if bytes.HasPrefix([]byte(line), []byte("event: ")) { currentFrame.Event = line[7:] } else if bytes.HasPrefix([]byte(line), []byte("data: ")) { currentFrame.Data = line[6:] } else if bytes.HasPrefix([]byte(line), []byte("id: ")) { currentFrame.ID = line[4:] } } if err := scanner.Err(); err != nil { return nil, fmt.Errorf("scan stream: %w", err) } return frames, nil }