Add Phase 8: Emitter — text/json/stream-json output formats
Adds emitter.rs with three output format handlers and stream-json reader thread, ClaudePrintError enum with exit codes and JSON subtypes to error.rs, and 13 unit tests in tests/emitter.rs covering all plan requirements. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
c6241e37b7
commit
bfb50da40c
4 changed files with 456 additions and 0 deletions
184
src/emitter.rs
Normal file
184
src/emitter.rs
Normal file
|
|
@ -0,0 +1,184 @@
|
|||
use crate::cli::OutputFormat;
|
||||
use crate::error::ClaudePrintError;
|
||||
use crate::transcript::TranscriptResult;
|
||||
use std::io::Write;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::mpsc;
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
|
||||
/// Emit a successful response.
|
||||
///
|
||||
/// `text`: writes `{response_text}\n` to stdout.
|
||||
/// `json`: writes a single-line JSON result object.
|
||||
/// `stream-json`: no-op — the reader thread handles all output.
|
||||
pub fn emit_success(
|
||||
writer: &mut dyn Write,
|
||||
result: &TranscriptResult,
|
||||
format: &OutputFormat,
|
||||
claude_version: &str,
|
||||
duration_ms: u64,
|
||||
) -> std::io::Result<()> {
|
||||
match format {
|
||||
OutputFormat::Text => {
|
||||
writeln!(writer, "{}", result.text)?;
|
||||
}
|
||||
OutputFormat::Json => {
|
||||
let obj = serde_json::json!({
|
||||
"type": "result",
|
||||
"subtype": "success",
|
||||
"is_error": false,
|
||||
"result": result.text,
|
||||
"session_id": result.session_id,
|
||||
"num_turns": result.num_turns as u64,
|
||||
"duration_ms": duration_ms,
|
||||
"cost_usd": 0,
|
||||
"claude_version": claude_version,
|
||||
"usage": {
|
||||
"input_tokens": result.usage.input_tokens,
|
||||
"output_tokens": result.usage.output_tokens,
|
||||
"cache_creation_input_tokens": result.usage.cache_creation_input_tokens,
|
||||
"cache_read_input_tokens": result.usage.cache_read_input_tokens,
|
||||
}
|
||||
});
|
||||
writeln!(writer, "{}", serde_json::to_string(&obj).unwrap())?;
|
||||
}
|
||||
OutputFormat::StreamJson => {
|
||||
// Reader thread handles all output; nothing to emit here on success.
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Emit an error result.
|
||||
///
|
||||
/// `text`: message to stderr only.
|
||||
/// `json`: JSON error object to stdout.
|
||||
/// `stream-json` after inject: JSON error object to stdout.
|
||||
/// `stream-json` before inject: message to stderr only (same as text).
|
||||
pub fn emit_error(
|
||||
stdout: &mut dyn Write,
|
||||
stderr: &mut dyn Write,
|
||||
error: &ClaudePrintError,
|
||||
format: &OutputFormat,
|
||||
claude_version: &str,
|
||||
stream_json_after_inject: bool,
|
||||
) -> std::io::Result<()> {
|
||||
let write_json = match format {
|
||||
OutputFormat::Json => true,
|
||||
OutputFormat::StreamJson => stream_json_after_inject,
|
||||
OutputFormat::Text => false,
|
||||
};
|
||||
|
||||
if write_json {
|
||||
let obj = serde_json::json!({
|
||||
"type": "result",
|
||||
"subtype": error.subtype(),
|
||||
"is_error": true,
|
||||
"error_message": error.message(),
|
||||
"claude_version": claude_version,
|
||||
});
|
||||
writeln!(stdout, "{}", serde_json::to_string(&obj).unwrap())?;
|
||||
} else {
|
||||
writeln!(stderr, "error: {}", error.message())?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Handle for the stream-json reader thread.
|
||||
pub struct StreamJsonHandle {
|
||||
/// Send `()` to signal "drain remaining lines then exit".
|
||||
/// Drop without sending to signal "exit immediately".
|
||||
pub drain_tx: mpsc::SyncSender<()>,
|
||||
pub join_handle: thread::JoinHandle<()>,
|
||||
}
|
||||
|
||||
/// Spawn a stream-json reader thread writing to stdout.
|
||||
pub fn spawn_stream_json_reader(transcript_path: PathBuf, start_offset: u64) -> StreamJsonHandle {
|
||||
spawn_stream_json_reader_to(transcript_path, start_offset, Box::new(std::io::stdout()))
|
||||
}
|
||||
|
||||
/// Spawn a stream-json reader thread writing to the given writer (testable).
|
||||
pub fn spawn_stream_json_reader_to(
|
||||
transcript_path: PathBuf,
|
||||
start_offset: u64,
|
||||
writer: Box<dyn Write + Send + 'static>,
|
||||
) -> StreamJsonHandle {
|
||||
let (drain_tx, drain_rx) = mpsc::sync_channel(1);
|
||||
let join_handle = thread::spawn(move || {
|
||||
stream_json_reader_loop(transcript_path, start_offset, writer, drain_rx);
|
||||
});
|
||||
StreamJsonHandle {
|
||||
drain_tx,
|
||||
join_handle,
|
||||
}
|
||||
}
|
||||
|
||||
fn stream_json_reader_loop(
|
||||
transcript_path: PathBuf,
|
||||
start_offset: u64,
|
||||
mut writer: Box<dyn Write + Send + 'static>,
|
||||
drain_rx: mpsc::Receiver<()>,
|
||||
) {
|
||||
use std::fs::File;
|
||||
use std::io::{BufRead, BufReader, Seek, SeekFrom};
|
||||
|
||||
// Open the file, waiting if it doesn't exist yet.
|
||||
let file = loop {
|
||||
match File::open(&transcript_path) {
|
||||
Ok(f) => break f,
|
||||
Err(_) => match drain_rx.try_recv() {
|
||||
Ok(()) => return,
|
||||
Err(mpsc::TryRecvError::Disconnected) => return,
|
||||
Err(mpsc::TryRecvError::Empty) => {
|
||||
thread::sleep(Duration::from_millis(5));
|
||||
}
|
||||
},
|
||||
}
|
||||
};
|
||||
|
||||
let mut reader = BufReader::new(file);
|
||||
if reader.seek(SeekFrom::Start(start_offset)).is_err() {
|
||||
let _ = drain_rx.recv();
|
||||
return;
|
||||
}
|
||||
|
||||
let mut draining = false;
|
||||
let mut line = String::new();
|
||||
|
||||
loop {
|
||||
line.clear();
|
||||
match reader.read_line(&mut line) {
|
||||
Ok(0) => {
|
||||
if draining {
|
||||
break;
|
||||
}
|
||||
match drain_rx.try_recv() {
|
||||
Ok(()) => {
|
||||
draining = true;
|
||||
}
|
||||
Err(mpsc::TryRecvError::Disconnected) => return,
|
||||
Err(mpsc::TryRecvError::Empty) => {
|
||||
thread::sleep(Duration::from_millis(5));
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(_) => {
|
||||
let trimmed = line.trim_end_matches('\n').trim_end_matches('\r');
|
||||
if !trimmed.is_empty() {
|
||||
let _ = writeln!(writer, "{}", trimmed);
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
if draining {
|
||||
break;
|
||||
}
|
||||
match drain_rx.try_recv() {
|
||||
Ok(()) => draining = true,
|
||||
Err(mpsc::TryRecvError::Disconnected) => return,
|
||||
Err(mpsc::TryRecvError::Empty) => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
38
src/error.rs
38
src/error.rs
|
|
@ -13,3 +13,41 @@ pub enum Error {
|
|||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
|
||||
/// User-facing error type with exit code and JSON subtype mapping.
|
||||
#[derive(Debug)]
|
||||
pub enum ClaudePrintError {
|
||||
Setup(String), // exit 2
|
||||
Timeout, // exit 124
|
||||
Interrupted, // exit 130
|
||||
AssistantError(String), // exit 1
|
||||
}
|
||||
|
||||
impl ClaudePrintError {
|
||||
pub fn exit_code(&self) -> i32 {
|
||||
match self {
|
||||
ClaudePrintError::Setup(_) => 2,
|
||||
ClaudePrintError::Timeout => 124,
|
||||
ClaudePrintError::Interrupted => 130,
|
||||
ClaudePrintError::AssistantError(_) => 1,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn subtype(&self) -> &'static str {
|
||||
match self {
|
||||
ClaudePrintError::Setup(_) => "internal_error",
|
||||
ClaudePrintError::Timeout => "timeout",
|
||||
ClaudePrintError::Interrupted => "interrupted",
|
||||
ClaudePrintError::AssistantError(_) => "assistant_error",
|
||||
}
|
||||
}
|
||||
|
||||
pub fn message(&self) -> &str {
|
||||
match self {
|
||||
ClaudePrintError::Setup(m) => m,
|
||||
ClaudePrintError::Timeout => "operation timed out",
|
||||
ClaudePrintError::Interrupted => "interrupted by signal",
|
||||
ClaudePrintError::AssistantError(m) => m,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
pub mod cli;
|
||||
pub mod config;
|
||||
pub mod emitter;
|
||||
pub mod error;
|
||||
pub mod event_loop;
|
||||
pub mod hook;
|
||||
|
|
|
|||
233
tests/emitter.rs
Normal file
233
tests/emitter.rs
Normal file
|
|
@ -0,0 +1,233 @@
|
|||
use claude_print::cli::OutputFormat;
|
||||
use claude_print::emitter::{emit_error, emit_success, spawn_stream_json_reader_to};
|
||||
use claude_print::error::ClaudePrintError;
|
||||
use claude_print::transcript::{AggregatedUsage, TranscriptResult};
|
||||
use std::io::Write;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use tempfile::TempDir;
|
||||
|
||||
fn make_result(text: &str) -> TranscriptResult {
|
||||
TranscriptResult {
|
||||
text: text.to_string(),
|
||||
num_turns: 2,
|
||||
usage: AggregatedUsage {
|
||||
input_tokens: 100,
|
||||
output_tokens: 50,
|
||||
cache_creation_input_tokens: 10,
|
||||
cache_read_input_tokens: 5,
|
||||
},
|
||||
session_id: Some("test-session-id".to_string()),
|
||||
is_error: false,
|
||||
used_fallback: false,
|
||||
}
|
||||
}
|
||||
|
||||
struct CaptureWriter(Arc<Mutex<Vec<u8>>>);
|
||||
|
||||
impl Write for CaptureWriter {
|
||||
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
|
||||
self.0.lock().unwrap().extend_from_slice(buf);
|
||||
Ok(buf.len())
|
||||
}
|
||||
fn flush(&mut self) -> std::io::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn capture() -> (Arc<Mutex<Vec<u8>>>, CaptureWriter) {
|
||||
let buf = Arc::new(Mutex::new(Vec::new()));
|
||||
let writer = CaptureWriter(Arc::clone(&buf));
|
||||
(buf, writer)
|
||||
}
|
||||
|
||||
// ── text format ──────────────────────────────────────────────────────────────
|
||||
|
||||
#[test]
|
||||
fn test_text_correct_string_trailing_newline() {
|
||||
let result = make_result("hello world");
|
||||
let (buf, mut writer) = capture();
|
||||
emit_success(&mut writer, &result, &OutputFormat::Text, "2.1.168", 0).unwrap();
|
||||
let output = buf.lock().unwrap().clone();
|
||||
assert_eq!(output, b"hello world\n");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_text_no_extra_whitespace() {
|
||||
let result = make_result("response");
|
||||
let (buf, mut writer) = capture();
|
||||
emit_success(&mut writer, &result, &OutputFormat::Text, "1.0", 0).unwrap();
|
||||
let output = buf.lock().unwrap();
|
||||
let s = std::str::from_utf8(&output).unwrap();
|
||||
assert_eq!(s.trim_end_matches('\n'), "response");
|
||||
assert!(s.ends_with('\n'));
|
||||
assert!(!s.starts_with(' '));
|
||||
}
|
||||
|
||||
// ── json format ──────────────────────────────────────────────────────────────
|
||||
|
||||
#[test]
|
||||
fn test_json_valid_with_required_fields() {
|
||||
let result = make_result("the answer");
|
||||
let (buf, mut writer) = capture();
|
||||
emit_success(&mut writer, &result, &OutputFormat::Json, "2.1.168", 4200).unwrap();
|
||||
let output = buf.lock().unwrap().clone();
|
||||
let v: serde_json::Value = serde_json::from_slice(&output).unwrap();
|
||||
|
||||
assert_eq!(v["type"], "result");
|
||||
assert_eq!(v["subtype"], "success");
|
||||
assert_eq!(v["is_error"], false);
|
||||
assert_eq!(v["result"], "the answer");
|
||||
assert!(v.get("session_id").is_some());
|
||||
assert!(v.get("num_turns").is_some());
|
||||
assert!(v.get("duration_ms").is_some());
|
||||
assert!(v.get("cost_usd").is_some());
|
||||
assert!(v.get("usage").is_some());
|
||||
assert!(v.get("claude_version").is_some());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_json_claude_version_included() {
|
||||
let result = make_result("text");
|
||||
let (buf, mut writer) = capture();
|
||||
emit_success(&mut writer, &result, &OutputFormat::Json, "2.1.168", 0).unwrap();
|
||||
let output = buf.lock().unwrap().clone();
|
||||
let v: serde_json::Value = serde_json::from_slice(&output).unwrap();
|
||||
assert_eq!(v["claude_version"], "2.1.168");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_json_usage_fields_are_integers() {
|
||||
let result = make_result("text");
|
||||
let (buf, mut writer) = capture();
|
||||
emit_success(&mut writer, &result, &OutputFormat::Json, "1.0", 0).unwrap();
|
||||
let output = buf.lock().unwrap().clone();
|
||||
let v: serde_json::Value = serde_json::from_slice(&output).unwrap();
|
||||
let usage = &v["usage"];
|
||||
assert!(usage["input_tokens"].is_u64(), "input_tokens must be integer");
|
||||
assert!(usage["output_tokens"].is_u64(), "output_tokens must be integer");
|
||||
assert!(usage["cache_creation_input_tokens"].is_u64());
|
||||
assert!(usage["cache_read_input_tokens"].is_u64());
|
||||
}
|
||||
|
||||
// ── error result ─────────────────────────────────────────────────────────────
|
||||
|
||||
#[test]
|
||||
fn test_error_result_is_error_true_and_subtype() {
|
||||
let err = ClaudePrintError::Timeout;
|
||||
let (out_buf, mut stdout) = capture();
|
||||
let (_, mut stderr) = capture();
|
||||
emit_error(&mut stdout, &mut stderr, &err, &OutputFormat::Json, "1.0", false).unwrap();
|
||||
let output = out_buf.lock().unwrap().clone();
|
||||
let v: serde_json::Value = serde_json::from_slice(&output).unwrap();
|
||||
assert_eq!(v["is_error"], true);
|
||||
assert_eq!(v["subtype"], "timeout");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_error_exit_code_nonzero() {
|
||||
assert_ne!(ClaudePrintError::Setup("x".to_string()).exit_code(), 0);
|
||||
assert_ne!(ClaudePrintError::Timeout.exit_code(), 0);
|
||||
assert_ne!(ClaudePrintError::Interrupted.exit_code(), 0);
|
||||
assert_ne!(ClaudePrintError::AssistantError("x".to_string()).exit_code(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_error_subtypes() {
|
||||
assert_eq!(ClaudePrintError::Setup("x".to_string()).subtype(), "internal_error");
|
||||
assert_eq!(ClaudePrintError::Timeout.subtype(), "timeout");
|
||||
assert_eq!(ClaudePrintError::Interrupted.subtype(), "interrupted");
|
||||
assert_eq!(ClaudePrintError::AssistantError("x".to_string()).subtype(), "assistant_error");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_error_exit_codes() {
|
||||
assert_eq!(ClaudePrintError::Setup("x".to_string()).exit_code(), 2);
|
||||
assert_eq!(ClaudePrintError::Timeout.exit_code(), 124);
|
||||
assert_eq!(ClaudePrintError::Interrupted.exit_code(), 130);
|
||||
assert_eq!(ClaudePrintError::AssistantError("x".to_string()).exit_code(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_text_error_goes_to_stderr_not_stdout() {
|
||||
let err = ClaudePrintError::Setup("missing binary".to_string());
|
||||
let (out_buf, mut stdout) = capture();
|
||||
let (err_buf, mut stderr) = capture();
|
||||
emit_error(&mut stdout, &mut stderr, &err, &OutputFormat::Text, "1.0", false).unwrap();
|
||||
assert!(out_buf.lock().unwrap().is_empty(), "text error must not write to stdout");
|
||||
assert!(!err_buf.lock().unwrap().is_empty(), "text error must write to stderr");
|
||||
}
|
||||
|
||||
// ── zero token counts ─────────────────────────────────────────────────────────
|
||||
|
||||
#[test]
|
||||
fn test_zero_token_counts_when_fallback() {
|
||||
let result = TranscriptResult {
|
||||
text: "fallback text".to_string(),
|
||||
num_turns: 0,
|
||||
usage: AggregatedUsage::default(),
|
||||
session_id: None,
|
||||
is_error: false,
|
||||
used_fallback: true,
|
||||
};
|
||||
let (buf, mut writer) = capture();
|
||||
emit_success(&mut writer, &result, &OutputFormat::Json, "1.0", 0).unwrap();
|
||||
let output = buf.lock().unwrap().clone();
|
||||
let v: serde_json::Value = serde_json::from_slice(&output).unwrap();
|
||||
let usage = &v["usage"];
|
||||
assert!(usage.get("input_tokens").is_some(), "usage must be present");
|
||||
assert_eq!(usage["input_tokens"], 0);
|
||||
assert_eq!(usage["output_tokens"], 0);
|
||||
assert_eq!(usage["cache_creation_input_tokens"], 0);
|
||||
assert_eq!(usage["cache_read_input_tokens"], 0);
|
||||
}
|
||||
|
||||
// ── stream-json ───────────────────────────────────────────────────────────────
|
||||
|
||||
#[test]
|
||||
fn test_stream_json_each_line_parses_as_json() {
|
||||
let dir = TempDir::new().unwrap();
|
||||
let path = dir.path().join("transcript.jsonl");
|
||||
|
||||
let lines = vec![
|
||||
r#"{"type":"assistant","message":{"id":"msg-1","content":[{"type":"text","text":"hi"}],"usage":{"input_tokens":10,"output_tokens":5}}}"#,
|
||||
r#"{"type":"result","is_error":false,"session_id":"abc123"}"#,
|
||||
];
|
||||
{
|
||||
let mut f = std::fs::File::create(&path).unwrap();
|
||||
for line in &lines {
|
||||
writeln!(f, "{}", line).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
let output_buf = Arc::new(Mutex::new(Vec::new()));
|
||||
let writer = Box::new(CaptureWriter(Arc::clone(&output_buf)));
|
||||
|
||||
let handle = spawn_stream_json_reader_to(path, 0, writer);
|
||||
handle.drain_tx.send(()).unwrap();
|
||||
handle.join_handle.join().unwrap();
|
||||
|
||||
let output = output_buf.lock().unwrap().clone();
|
||||
let text = std::str::from_utf8(&output).unwrap();
|
||||
let output_lines: Vec<&str> = text.lines().filter(|l| !l.is_empty()).collect();
|
||||
|
||||
assert_eq!(output_lines.len(), lines.len(), "should forward all lines");
|
||||
for line in &output_lines {
|
||||
let _: serde_json::Value = serde_json::from_str(line)
|
||||
.unwrap_or_else(|_| panic!("line is not valid JSON: {line}"));
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_stream_json_disconnect_exits_immediately() {
|
||||
let dir = TempDir::new().unwrap();
|
||||
let path = dir.path().join("transcript.jsonl");
|
||||
std::fs::write(&path, b"").unwrap();
|
||||
|
||||
let output_buf = Arc::new(Mutex::new(Vec::new()));
|
||||
let writer = Box::new(CaptureWriter(Arc::clone(&output_buf)));
|
||||
|
||||
let handle = spawn_stream_json_reader_to(path, 0, writer);
|
||||
// Drop drain_tx without sending — thread should exit immediately
|
||||
drop(handle.drain_tx);
|
||||
handle.join_handle.join().unwrap(); // must not hang
|
||||
}
|
||||
Loading…
Add table
Reference in a new issue