diff --git a/src/emitter.rs b/src/emitter.rs new file mode 100644 index 0000000..f5d67ed --- /dev/null +++ b/src/emitter.rs @@ -0,0 +1,184 @@ +use crate::cli::OutputFormat; +use crate::error::ClaudePrintError; +use crate::transcript::TranscriptResult; +use std::io::Write; +use std::path::PathBuf; +use std::sync::mpsc; +use std::thread; +use std::time::Duration; + +/// Emit a successful response. +/// +/// `text`: writes `{response_text}\n` to stdout. +/// `json`: writes a single-line JSON result object. +/// `stream-json`: no-op — the reader thread handles all output. +pub fn emit_success( + writer: &mut dyn Write, + result: &TranscriptResult, + format: &OutputFormat, + claude_version: &str, + duration_ms: u64, +) -> std::io::Result<()> { + match format { + OutputFormat::Text => { + writeln!(writer, "{}", result.text)?; + } + OutputFormat::Json => { + let obj = serde_json::json!({ + "type": "result", + "subtype": "success", + "is_error": false, + "result": result.text, + "session_id": result.session_id, + "num_turns": result.num_turns as u64, + "duration_ms": duration_ms, + "cost_usd": 0, + "claude_version": claude_version, + "usage": { + "input_tokens": result.usage.input_tokens, + "output_tokens": result.usage.output_tokens, + "cache_creation_input_tokens": result.usage.cache_creation_input_tokens, + "cache_read_input_tokens": result.usage.cache_read_input_tokens, + } + }); + writeln!(writer, "{}", serde_json::to_string(&obj).unwrap())?; + } + OutputFormat::StreamJson => { + // Reader thread handles all output; nothing to emit here on success. + } + } + Ok(()) +} + +/// Emit an error result. +/// +/// `text`: message to stderr only. +/// `json`: JSON error object to stdout. +/// `stream-json` after inject: JSON error object to stdout. +/// `stream-json` before inject: message to stderr only (same as text). +pub fn emit_error( + stdout: &mut dyn Write, + stderr: &mut dyn Write, + error: &ClaudePrintError, + format: &OutputFormat, + claude_version: &str, + stream_json_after_inject: bool, +) -> std::io::Result<()> { + let write_json = match format { + OutputFormat::Json => true, + OutputFormat::StreamJson => stream_json_after_inject, + OutputFormat::Text => false, + }; + + if write_json { + let obj = serde_json::json!({ + "type": "result", + "subtype": error.subtype(), + "is_error": true, + "error_message": error.message(), + "claude_version": claude_version, + }); + writeln!(stdout, "{}", serde_json::to_string(&obj).unwrap())?; + } else { + writeln!(stderr, "error: {}", error.message())?; + } + Ok(()) +} + +/// Handle for the stream-json reader thread. +pub struct StreamJsonHandle { + /// Send `()` to signal "drain remaining lines then exit". + /// Drop without sending to signal "exit immediately". + pub drain_tx: mpsc::SyncSender<()>, + pub join_handle: thread::JoinHandle<()>, +} + +/// Spawn a stream-json reader thread writing to stdout. +pub fn spawn_stream_json_reader(transcript_path: PathBuf, start_offset: u64) -> StreamJsonHandle { + spawn_stream_json_reader_to(transcript_path, start_offset, Box::new(std::io::stdout())) +} + +/// Spawn a stream-json reader thread writing to the given writer (testable). +pub fn spawn_stream_json_reader_to( + transcript_path: PathBuf, + start_offset: u64, + writer: Box, +) -> StreamJsonHandle { + let (drain_tx, drain_rx) = mpsc::sync_channel(1); + let join_handle = thread::spawn(move || { + stream_json_reader_loop(transcript_path, start_offset, writer, drain_rx); + }); + StreamJsonHandle { + drain_tx, + join_handle, + } +} + +fn stream_json_reader_loop( + transcript_path: PathBuf, + start_offset: u64, + mut writer: Box, + drain_rx: mpsc::Receiver<()>, +) { + use std::fs::File; + use std::io::{BufRead, BufReader, Seek, SeekFrom}; + + // Open the file, waiting if it doesn't exist yet. + let file = loop { + match File::open(&transcript_path) { + Ok(f) => break f, + Err(_) => match drain_rx.try_recv() { + Ok(()) => return, + Err(mpsc::TryRecvError::Disconnected) => return, + Err(mpsc::TryRecvError::Empty) => { + thread::sleep(Duration::from_millis(5)); + } + }, + } + }; + + let mut reader = BufReader::new(file); + if reader.seek(SeekFrom::Start(start_offset)).is_err() { + let _ = drain_rx.recv(); + return; + } + + let mut draining = false; + let mut line = String::new(); + + loop { + line.clear(); + match reader.read_line(&mut line) { + Ok(0) => { + if draining { + break; + } + match drain_rx.try_recv() { + Ok(()) => { + draining = true; + } + Err(mpsc::TryRecvError::Disconnected) => return, + Err(mpsc::TryRecvError::Empty) => { + thread::sleep(Duration::from_millis(5)); + } + } + } + Ok(_) => { + let trimmed = line.trim_end_matches('\n').trim_end_matches('\r'); + if !trimmed.is_empty() { + let _ = writeln!(writer, "{}", trimmed); + } + } + Err(_) => { + if draining { + break; + } + match drain_rx.try_recv() { + Ok(()) => draining = true, + Err(mpsc::TryRecvError::Disconnected) => return, + Err(mpsc::TryRecvError::Empty) => {} + } + } + } + } +} diff --git a/src/error.rs b/src/error.rs index 697165b..d86038b 100644 --- a/src/error.rs +++ b/src/error.rs @@ -13,3 +13,41 @@ pub enum Error { } pub type Result = std::result::Result; + +/// User-facing error type with exit code and JSON subtype mapping. +#[derive(Debug)] +pub enum ClaudePrintError { + Setup(String), // exit 2 + Timeout, // exit 124 + Interrupted, // exit 130 + AssistantError(String), // exit 1 +} + +impl ClaudePrintError { + pub fn exit_code(&self) -> i32 { + match self { + ClaudePrintError::Setup(_) => 2, + ClaudePrintError::Timeout => 124, + ClaudePrintError::Interrupted => 130, + ClaudePrintError::AssistantError(_) => 1, + } + } + + pub fn subtype(&self) -> &'static str { + match self { + ClaudePrintError::Setup(_) => "internal_error", + ClaudePrintError::Timeout => "timeout", + ClaudePrintError::Interrupted => "interrupted", + ClaudePrintError::AssistantError(_) => "assistant_error", + } + } + + pub fn message(&self) -> &str { + match self { + ClaudePrintError::Setup(m) => m, + ClaudePrintError::Timeout => "operation timed out", + ClaudePrintError::Interrupted => "interrupted by signal", + ClaudePrintError::AssistantError(m) => m, + } + } +} diff --git a/src/lib.rs b/src/lib.rs index cd2e658..056d6a4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,6 @@ pub mod cli; pub mod config; +pub mod emitter; pub mod error; pub mod event_loop; pub mod hook; diff --git a/tests/emitter.rs b/tests/emitter.rs new file mode 100644 index 0000000..25563c4 --- /dev/null +++ b/tests/emitter.rs @@ -0,0 +1,233 @@ +use claude_print::cli::OutputFormat; +use claude_print::emitter::{emit_error, emit_success, spawn_stream_json_reader_to}; +use claude_print::error::ClaudePrintError; +use claude_print::transcript::{AggregatedUsage, TranscriptResult}; +use std::io::Write; +use std::sync::{Arc, Mutex}; +use tempfile::TempDir; + +fn make_result(text: &str) -> TranscriptResult { + TranscriptResult { + text: text.to_string(), + num_turns: 2, + usage: AggregatedUsage { + input_tokens: 100, + output_tokens: 50, + cache_creation_input_tokens: 10, + cache_read_input_tokens: 5, + }, + session_id: Some("test-session-id".to_string()), + is_error: false, + used_fallback: false, + } +} + +struct CaptureWriter(Arc>>); + +impl Write for CaptureWriter { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + self.0.lock().unwrap().extend_from_slice(buf); + Ok(buf.len()) + } + fn flush(&mut self) -> std::io::Result<()> { + Ok(()) + } +} + +fn capture() -> (Arc>>, CaptureWriter) { + let buf = Arc::new(Mutex::new(Vec::new())); + let writer = CaptureWriter(Arc::clone(&buf)); + (buf, writer) +} + +// ── text format ────────────────────────────────────────────────────────────── + +#[test] +fn test_text_correct_string_trailing_newline() { + let result = make_result("hello world"); + let (buf, mut writer) = capture(); + emit_success(&mut writer, &result, &OutputFormat::Text, "2.1.168", 0).unwrap(); + let output = buf.lock().unwrap().clone(); + assert_eq!(output, b"hello world\n"); +} + +#[test] +fn test_text_no_extra_whitespace() { + let result = make_result("response"); + let (buf, mut writer) = capture(); + emit_success(&mut writer, &result, &OutputFormat::Text, "1.0", 0).unwrap(); + let output = buf.lock().unwrap(); + let s = std::str::from_utf8(&output).unwrap(); + assert_eq!(s.trim_end_matches('\n'), "response"); + assert!(s.ends_with('\n')); + assert!(!s.starts_with(' ')); +} + +// ── json format ────────────────────────────────────────────────────────────── + +#[test] +fn test_json_valid_with_required_fields() { + let result = make_result("the answer"); + let (buf, mut writer) = capture(); + emit_success(&mut writer, &result, &OutputFormat::Json, "2.1.168", 4200).unwrap(); + let output = buf.lock().unwrap().clone(); + let v: serde_json::Value = serde_json::from_slice(&output).unwrap(); + + assert_eq!(v["type"], "result"); + assert_eq!(v["subtype"], "success"); + assert_eq!(v["is_error"], false); + assert_eq!(v["result"], "the answer"); + assert!(v.get("session_id").is_some()); + assert!(v.get("num_turns").is_some()); + assert!(v.get("duration_ms").is_some()); + assert!(v.get("cost_usd").is_some()); + assert!(v.get("usage").is_some()); + assert!(v.get("claude_version").is_some()); +} + +#[test] +fn test_json_claude_version_included() { + let result = make_result("text"); + let (buf, mut writer) = capture(); + emit_success(&mut writer, &result, &OutputFormat::Json, "2.1.168", 0).unwrap(); + let output = buf.lock().unwrap().clone(); + let v: serde_json::Value = serde_json::from_slice(&output).unwrap(); + assert_eq!(v["claude_version"], "2.1.168"); +} + +#[test] +fn test_json_usage_fields_are_integers() { + let result = make_result("text"); + let (buf, mut writer) = capture(); + emit_success(&mut writer, &result, &OutputFormat::Json, "1.0", 0).unwrap(); + let output = buf.lock().unwrap().clone(); + let v: serde_json::Value = serde_json::from_slice(&output).unwrap(); + let usage = &v["usage"]; + assert!(usage["input_tokens"].is_u64(), "input_tokens must be integer"); + assert!(usage["output_tokens"].is_u64(), "output_tokens must be integer"); + assert!(usage["cache_creation_input_tokens"].is_u64()); + assert!(usage["cache_read_input_tokens"].is_u64()); +} + +// ── error result ───────────────────────────────────────────────────────────── + +#[test] +fn test_error_result_is_error_true_and_subtype() { + let err = ClaudePrintError::Timeout; + let (out_buf, mut stdout) = capture(); + let (_, mut stderr) = capture(); + emit_error(&mut stdout, &mut stderr, &err, &OutputFormat::Json, "1.0", false).unwrap(); + let output = out_buf.lock().unwrap().clone(); + let v: serde_json::Value = serde_json::from_slice(&output).unwrap(); + assert_eq!(v["is_error"], true); + assert_eq!(v["subtype"], "timeout"); +} + +#[test] +fn test_error_exit_code_nonzero() { + assert_ne!(ClaudePrintError::Setup("x".to_string()).exit_code(), 0); + assert_ne!(ClaudePrintError::Timeout.exit_code(), 0); + assert_ne!(ClaudePrintError::Interrupted.exit_code(), 0); + assert_ne!(ClaudePrintError::AssistantError("x".to_string()).exit_code(), 0); +} + +#[test] +fn test_error_subtypes() { + assert_eq!(ClaudePrintError::Setup("x".to_string()).subtype(), "internal_error"); + assert_eq!(ClaudePrintError::Timeout.subtype(), "timeout"); + assert_eq!(ClaudePrintError::Interrupted.subtype(), "interrupted"); + assert_eq!(ClaudePrintError::AssistantError("x".to_string()).subtype(), "assistant_error"); +} + +#[test] +fn test_error_exit_codes() { + assert_eq!(ClaudePrintError::Setup("x".to_string()).exit_code(), 2); + assert_eq!(ClaudePrintError::Timeout.exit_code(), 124); + assert_eq!(ClaudePrintError::Interrupted.exit_code(), 130); + assert_eq!(ClaudePrintError::AssistantError("x".to_string()).exit_code(), 1); +} + +#[test] +fn test_text_error_goes_to_stderr_not_stdout() { + let err = ClaudePrintError::Setup("missing binary".to_string()); + let (out_buf, mut stdout) = capture(); + let (err_buf, mut stderr) = capture(); + emit_error(&mut stdout, &mut stderr, &err, &OutputFormat::Text, "1.0", false).unwrap(); + assert!(out_buf.lock().unwrap().is_empty(), "text error must not write to stdout"); + assert!(!err_buf.lock().unwrap().is_empty(), "text error must write to stderr"); +} + +// ── zero token counts ───────────────────────────────────────────────────────── + +#[test] +fn test_zero_token_counts_when_fallback() { + let result = TranscriptResult { + text: "fallback text".to_string(), + num_turns: 0, + usage: AggregatedUsage::default(), + session_id: None, + is_error: false, + used_fallback: true, + }; + let (buf, mut writer) = capture(); + emit_success(&mut writer, &result, &OutputFormat::Json, "1.0", 0).unwrap(); + let output = buf.lock().unwrap().clone(); + let v: serde_json::Value = serde_json::from_slice(&output).unwrap(); + let usage = &v["usage"]; + assert!(usage.get("input_tokens").is_some(), "usage must be present"); + assert_eq!(usage["input_tokens"], 0); + assert_eq!(usage["output_tokens"], 0); + assert_eq!(usage["cache_creation_input_tokens"], 0); + assert_eq!(usage["cache_read_input_tokens"], 0); +} + +// ── stream-json ─────────────────────────────────────────────────────────────── + +#[test] +fn test_stream_json_each_line_parses_as_json() { + let dir = TempDir::new().unwrap(); + let path = dir.path().join("transcript.jsonl"); + + let lines = vec![ + r#"{"type":"assistant","message":{"id":"msg-1","content":[{"type":"text","text":"hi"}],"usage":{"input_tokens":10,"output_tokens":5}}}"#, + r#"{"type":"result","is_error":false,"session_id":"abc123"}"#, + ]; + { + let mut f = std::fs::File::create(&path).unwrap(); + for line in &lines { + writeln!(f, "{}", line).unwrap(); + } + } + + let output_buf = Arc::new(Mutex::new(Vec::new())); + let writer = Box::new(CaptureWriter(Arc::clone(&output_buf))); + + let handle = spawn_stream_json_reader_to(path, 0, writer); + handle.drain_tx.send(()).unwrap(); + handle.join_handle.join().unwrap(); + + let output = output_buf.lock().unwrap().clone(); + let text = std::str::from_utf8(&output).unwrap(); + let output_lines: Vec<&str> = text.lines().filter(|l| !l.is_empty()).collect(); + + assert_eq!(output_lines.len(), lines.len(), "should forward all lines"); + for line in &output_lines { + let _: serde_json::Value = serde_json::from_str(line) + .unwrap_or_else(|_| panic!("line is not valid JSON: {line}")); + } +} + +#[test] +fn test_stream_json_disconnect_exits_immediately() { + let dir = TempDir::new().unwrap(); + let path = dir.path().join("transcript.jsonl"); + std::fs::write(&path, b"").unwrap(); + + let output_buf = Arc::new(Mutex::new(Vec::new())); + let writer = Box::new(CaptureWriter(Arc::clone(&output_buf))); + + let handle = spawn_stream_json_reader_to(path, 0, writer); + // Drop drain_tx without sending — thread should exit immediately + drop(handle.drain_tx); + handle.join_handle.join().unwrap(); // must not hang +}