Add Phase 8: Emitter — text/json/stream-json output formats

Adds emitter.rs with three output format handlers and stream-json reader thread,
ClaudePrintError enum with exit codes and JSON subtypes to error.rs,
and 13 unit tests in tests/emitter.rs covering all plan requirements.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
jedarden 2026-06-10 00:57:30 -04:00
parent c6241e37b7
commit bfb50da40c
4 changed files with 456 additions and 0 deletions

184
src/emitter.rs Normal file
View file

@ -0,0 +1,184 @@
use crate::cli::OutputFormat;
use crate::error::ClaudePrintError;
use crate::transcript::TranscriptResult;
use std::io::Write;
use std::path::PathBuf;
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
/// Emit a successful response.
///
/// `text`: writes `{response_text}\n` to stdout.
/// `json`: writes a single-line JSON result object.
/// `stream-json`: no-op — the reader thread handles all output.
pub fn emit_success(
writer: &mut dyn Write,
result: &TranscriptResult,
format: &OutputFormat,
claude_version: &str,
duration_ms: u64,
) -> std::io::Result<()> {
match format {
OutputFormat::Text => {
writeln!(writer, "{}", result.text)?;
}
OutputFormat::Json => {
let obj = serde_json::json!({
"type": "result",
"subtype": "success",
"is_error": false,
"result": result.text,
"session_id": result.session_id,
"num_turns": result.num_turns as u64,
"duration_ms": duration_ms,
"cost_usd": 0,
"claude_version": claude_version,
"usage": {
"input_tokens": result.usage.input_tokens,
"output_tokens": result.usage.output_tokens,
"cache_creation_input_tokens": result.usage.cache_creation_input_tokens,
"cache_read_input_tokens": result.usage.cache_read_input_tokens,
}
});
writeln!(writer, "{}", serde_json::to_string(&obj).unwrap())?;
}
OutputFormat::StreamJson => {
// Reader thread handles all output; nothing to emit here on success.
}
}
Ok(())
}
/// Emit an error result.
///
/// `text`: message to stderr only.
/// `json`: JSON error object to stdout.
/// `stream-json` after inject: JSON error object to stdout.
/// `stream-json` before inject: message to stderr only (same as text).
pub fn emit_error(
stdout: &mut dyn Write,
stderr: &mut dyn Write,
error: &ClaudePrintError,
format: &OutputFormat,
claude_version: &str,
stream_json_after_inject: bool,
) -> std::io::Result<()> {
let write_json = match format {
OutputFormat::Json => true,
OutputFormat::StreamJson => stream_json_after_inject,
OutputFormat::Text => false,
};
if write_json {
let obj = serde_json::json!({
"type": "result",
"subtype": error.subtype(),
"is_error": true,
"error_message": error.message(),
"claude_version": claude_version,
});
writeln!(stdout, "{}", serde_json::to_string(&obj).unwrap())?;
} else {
writeln!(stderr, "error: {}", error.message())?;
}
Ok(())
}
/// Handle for the stream-json reader thread.
pub struct StreamJsonHandle {
/// Send `()` to signal "drain remaining lines then exit".
/// Drop without sending to signal "exit immediately".
pub drain_tx: mpsc::SyncSender<()>,
pub join_handle: thread::JoinHandle<()>,
}
/// Spawn a stream-json reader thread writing to stdout.
pub fn spawn_stream_json_reader(transcript_path: PathBuf, start_offset: u64) -> StreamJsonHandle {
spawn_stream_json_reader_to(transcript_path, start_offset, Box::new(std::io::stdout()))
}
/// Spawn a stream-json reader thread writing to the given writer (testable).
pub fn spawn_stream_json_reader_to(
transcript_path: PathBuf,
start_offset: u64,
writer: Box<dyn Write + Send + 'static>,
) -> StreamJsonHandle {
let (drain_tx, drain_rx) = mpsc::sync_channel(1);
let join_handle = thread::spawn(move || {
stream_json_reader_loop(transcript_path, start_offset, writer, drain_rx);
});
StreamJsonHandle {
drain_tx,
join_handle,
}
}
fn stream_json_reader_loop(
transcript_path: PathBuf,
start_offset: u64,
mut writer: Box<dyn Write + Send + 'static>,
drain_rx: mpsc::Receiver<()>,
) {
use std::fs::File;
use std::io::{BufRead, BufReader, Seek, SeekFrom};
// Open the file, waiting if it doesn't exist yet.
let file = loop {
match File::open(&transcript_path) {
Ok(f) => break f,
Err(_) => match drain_rx.try_recv() {
Ok(()) => return,
Err(mpsc::TryRecvError::Disconnected) => return,
Err(mpsc::TryRecvError::Empty) => {
thread::sleep(Duration::from_millis(5));
}
},
}
};
let mut reader = BufReader::new(file);
if reader.seek(SeekFrom::Start(start_offset)).is_err() {
let _ = drain_rx.recv();
return;
}
let mut draining = false;
let mut line = String::new();
loop {
line.clear();
match reader.read_line(&mut line) {
Ok(0) => {
if draining {
break;
}
match drain_rx.try_recv() {
Ok(()) => {
draining = true;
}
Err(mpsc::TryRecvError::Disconnected) => return,
Err(mpsc::TryRecvError::Empty) => {
thread::sleep(Duration::from_millis(5));
}
}
}
Ok(_) => {
let trimmed = line.trim_end_matches('\n').trim_end_matches('\r');
if !trimmed.is_empty() {
let _ = writeln!(writer, "{}", trimmed);
}
}
Err(_) => {
if draining {
break;
}
match drain_rx.try_recv() {
Ok(()) => draining = true,
Err(mpsc::TryRecvError::Disconnected) => return,
Err(mpsc::TryRecvError::Empty) => {}
}
}
}
}
}

View file

@ -13,3 +13,41 @@ pub enum Error {
}
pub type Result<T> = std::result::Result<T, Error>;
/// User-facing error type with exit code and JSON subtype mapping.
#[derive(Debug)]
pub enum ClaudePrintError {
Setup(String), // exit 2
Timeout, // exit 124
Interrupted, // exit 130
AssistantError(String), // exit 1
}
impl ClaudePrintError {
pub fn exit_code(&self) -> i32 {
match self {
ClaudePrintError::Setup(_) => 2,
ClaudePrintError::Timeout => 124,
ClaudePrintError::Interrupted => 130,
ClaudePrintError::AssistantError(_) => 1,
}
}
pub fn subtype(&self) -> &'static str {
match self {
ClaudePrintError::Setup(_) => "internal_error",
ClaudePrintError::Timeout => "timeout",
ClaudePrintError::Interrupted => "interrupted",
ClaudePrintError::AssistantError(_) => "assistant_error",
}
}
pub fn message(&self) -> &str {
match self {
ClaudePrintError::Setup(m) => m,
ClaudePrintError::Timeout => "operation timed out",
ClaudePrintError::Interrupted => "interrupted by signal",
ClaudePrintError::AssistantError(m) => m,
}
}
}

View file

@ -1,5 +1,6 @@
pub mod cli;
pub mod config;
pub mod emitter;
pub mod error;
pub mod event_loop;
pub mod hook;

233
tests/emitter.rs Normal file
View file

@ -0,0 +1,233 @@
use claude_print::cli::OutputFormat;
use claude_print::emitter::{emit_error, emit_success, spawn_stream_json_reader_to};
use claude_print::error::ClaudePrintError;
use claude_print::transcript::{AggregatedUsage, TranscriptResult};
use std::io::Write;
use std::sync::{Arc, Mutex};
use tempfile::TempDir;
fn make_result(text: &str) -> TranscriptResult {
TranscriptResult {
text: text.to_string(),
num_turns: 2,
usage: AggregatedUsage {
input_tokens: 100,
output_tokens: 50,
cache_creation_input_tokens: 10,
cache_read_input_tokens: 5,
},
session_id: Some("test-session-id".to_string()),
is_error: false,
used_fallback: false,
}
}
struct CaptureWriter(Arc<Mutex<Vec<u8>>>);
impl Write for CaptureWriter {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.0.lock().unwrap().extend_from_slice(buf);
Ok(buf.len())
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
fn capture() -> (Arc<Mutex<Vec<u8>>>, CaptureWriter) {
let buf = Arc::new(Mutex::new(Vec::new()));
let writer = CaptureWriter(Arc::clone(&buf));
(buf, writer)
}
// ── text format ──────────────────────────────────────────────────────────────
#[test]
fn test_text_correct_string_trailing_newline() {
let result = make_result("hello world");
let (buf, mut writer) = capture();
emit_success(&mut writer, &result, &OutputFormat::Text, "2.1.168", 0).unwrap();
let output = buf.lock().unwrap().clone();
assert_eq!(output, b"hello world\n");
}
#[test]
fn test_text_no_extra_whitespace() {
let result = make_result("response");
let (buf, mut writer) = capture();
emit_success(&mut writer, &result, &OutputFormat::Text, "1.0", 0).unwrap();
let output = buf.lock().unwrap();
let s = std::str::from_utf8(&output).unwrap();
assert_eq!(s.trim_end_matches('\n'), "response");
assert!(s.ends_with('\n'));
assert!(!s.starts_with(' '));
}
// ── json format ──────────────────────────────────────────────────────────────
#[test]
fn test_json_valid_with_required_fields() {
let result = make_result("the answer");
let (buf, mut writer) = capture();
emit_success(&mut writer, &result, &OutputFormat::Json, "2.1.168", 4200).unwrap();
let output = buf.lock().unwrap().clone();
let v: serde_json::Value = serde_json::from_slice(&output).unwrap();
assert_eq!(v["type"], "result");
assert_eq!(v["subtype"], "success");
assert_eq!(v["is_error"], false);
assert_eq!(v["result"], "the answer");
assert!(v.get("session_id").is_some());
assert!(v.get("num_turns").is_some());
assert!(v.get("duration_ms").is_some());
assert!(v.get("cost_usd").is_some());
assert!(v.get("usage").is_some());
assert!(v.get("claude_version").is_some());
}
#[test]
fn test_json_claude_version_included() {
let result = make_result("text");
let (buf, mut writer) = capture();
emit_success(&mut writer, &result, &OutputFormat::Json, "2.1.168", 0).unwrap();
let output = buf.lock().unwrap().clone();
let v: serde_json::Value = serde_json::from_slice(&output).unwrap();
assert_eq!(v["claude_version"], "2.1.168");
}
#[test]
fn test_json_usage_fields_are_integers() {
let result = make_result("text");
let (buf, mut writer) = capture();
emit_success(&mut writer, &result, &OutputFormat::Json, "1.0", 0).unwrap();
let output = buf.lock().unwrap().clone();
let v: serde_json::Value = serde_json::from_slice(&output).unwrap();
let usage = &v["usage"];
assert!(usage["input_tokens"].is_u64(), "input_tokens must be integer");
assert!(usage["output_tokens"].is_u64(), "output_tokens must be integer");
assert!(usage["cache_creation_input_tokens"].is_u64());
assert!(usage["cache_read_input_tokens"].is_u64());
}
// ── error result ─────────────────────────────────────────────────────────────
#[test]
fn test_error_result_is_error_true_and_subtype() {
let err = ClaudePrintError::Timeout;
let (out_buf, mut stdout) = capture();
let (_, mut stderr) = capture();
emit_error(&mut stdout, &mut stderr, &err, &OutputFormat::Json, "1.0", false).unwrap();
let output = out_buf.lock().unwrap().clone();
let v: serde_json::Value = serde_json::from_slice(&output).unwrap();
assert_eq!(v["is_error"], true);
assert_eq!(v["subtype"], "timeout");
}
#[test]
fn test_error_exit_code_nonzero() {
assert_ne!(ClaudePrintError::Setup("x".to_string()).exit_code(), 0);
assert_ne!(ClaudePrintError::Timeout.exit_code(), 0);
assert_ne!(ClaudePrintError::Interrupted.exit_code(), 0);
assert_ne!(ClaudePrintError::AssistantError("x".to_string()).exit_code(), 0);
}
#[test]
fn test_error_subtypes() {
assert_eq!(ClaudePrintError::Setup("x".to_string()).subtype(), "internal_error");
assert_eq!(ClaudePrintError::Timeout.subtype(), "timeout");
assert_eq!(ClaudePrintError::Interrupted.subtype(), "interrupted");
assert_eq!(ClaudePrintError::AssistantError("x".to_string()).subtype(), "assistant_error");
}
#[test]
fn test_error_exit_codes() {
assert_eq!(ClaudePrintError::Setup("x".to_string()).exit_code(), 2);
assert_eq!(ClaudePrintError::Timeout.exit_code(), 124);
assert_eq!(ClaudePrintError::Interrupted.exit_code(), 130);
assert_eq!(ClaudePrintError::AssistantError("x".to_string()).exit_code(), 1);
}
#[test]
fn test_text_error_goes_to_stderr_not_stdout() {
let err = ClaudePrintError::Setup("missing binary".to_string());
let (out_buf, mut stdout) = capture();
let (err_buf, mut stderr) = capture();
emit_error(&mut stdout, &mut stderr, &err, &OutputFormat::Text, "1.0", false).unwrap();
assert!(out_buf.lock().unwrap().is_empty(), "text error must not write to stdout");
assert!(!err_buf.lock().unwrap().is_empty(), "text error must write to stderr");
}
// ── zero token counts ─────────────────────────────────────────────────────────
#[test]
fn test_zero_token_counts_when_fallback() {
let result = TranscriptResult {
text: "fallback text".to_string(),
num_turns: 0,
usage: AggregatedUsage::default(),
session_id: None,
is_error: false,
used_fallback: true,
};
let (buf, mut writer) = capture();
emit_success(&mut writer, &result, &OutputFormat::Json, "1.0", 0).unwrap();
let output = buf.lock().unwrap().clone();
let v: serde_json::Value = serde_json::from_slice(&output).unwrap();
let usage = &v["usage"];
assert!(usage.get("input_tokens").is_some(), "usage must be present");
assert_eq!(usage["input_tokens"], 0);
assert_eq!(usage["output_tokens"], 0);
assert_eq!(usage["cache_creation_input_tokens"], 0);
assert_eq!(usage["cache_read_input_tokens"], 0);
}
// ── stream-json ───────────────────────────────────────────────────────────────
#[test]
fn test_stream_json_each_line_parses_as_json() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("transcript.jsonl");
let lines = vec![
r#"{"type":"assistant","message":{"id":"msg-1","content":[{"type":"text","text":"hi"}],"usage":{"input_tokens":10,"output_tokens":5}}}"#,
r#"{"type":"result","is_error":false,"session_id":"abc123"}"#,
];
{
let mut f = std::fs::File::create(&path).unwrap();
for line in &lines {
writeln!(f, "{}", line).unwrap();
}
}
let output_buf = Arc::new(Mutex::new(Vec::new()));
let writer = Box::new(CaptureWriter(Arc::clone(&output_buf)));
let handle = spawn_stream_json_reader_to(path, 0, writer);
handle.drain_tx.send(()).unwrap();
handle.join_handle.join().unwrap();
let output = output_buf.lock().unwrap().clone();
let text = std::str::from_utf8(&output).unwrap();
let output_lines: Vec<&str> = text.lines().filter(|l| !l.is_empty()).collect();
assert_eq!(output_lines.len(), lines.len(), "should forward all lines");
for line in &output_lines {
let _: serde_json::Value = serde_json::from_str(line)
.unwrap_or_else(|_| panic!("line is not valid JSON: {line}"));
}
}
#[test]
fn test_stream_json_disconnect_exits_immediately() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("transcript.jsonl");
std::fs::write(&path, b"").unwrap();
let output_buf = Arc::new(Mutex::new(Vec::new()));
let writer = Box::new(CaptureWriter(Arc::clone(&output_buf)));
let handle = spawn_stream_json_reader_to(path, 0, writer);
// Drop drain_tx without sending — thread should exit immediately
drop(handle.drain_tx);
handle.join_handle.join().unwrap(); // must not hang
}