From c6241e37b7d299ba66ac9874172b5252dc29afca Mon Sep 17 00:00:00 2001 From: jedarden Date: Wed, 10 Jun 2026 00:51:50 -0400 Subject: [PATCH] Add Phase 7: transcript reader with retry loop and dedup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements src/transcript.rs: lenient JSONL parsing, message.id dedup with usage-fingerprint fallback, text extraction from ContentBlock arrays, 40×50ms retry loop for Stop-before-JSONL races (PO-5), and last_assistant_message fallback. All 18 tests in tests/transcript.rs pass; AS-6 verified with MOCK_DELAY_JSONL=100. Co-Authored-By: Claude Sonnet 4.6 --- .beads/issues.jsonl | 2 +- src/lib.rs | 1 + src/transcript.rs | 217 +++++++++++++++++++++++ tests/transcript.rs | 423 ++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 642 insertions(+), 1 deletion(-) create mode 100644 src/transcript.rs create mode 100644 tests/transcript.rs diff --git a/.beads/issues.jsonl b/.beads/issues.jsonl index 5a94b04..d29b69c 100644 --- a/.beads/issues.jsonl +++ b/.beads/issues.jsonl @@ -3,5 +3,5 @@ {"id":"bf-42j","title":"Phase 9: NEEDLE Integration (~50 LOC + config)","description":"Entry: Phase 8 complete.\n\nDeliverables:\n1. claude-print.yaml — NEEDLE agent config for dispatching beads via claude-print instead of claude -p\n - input_method: stdin, output_transform: needle-transform-claude\n - invoke_template: claude-print --output-format json --model ${MODEL}\n2. install.sh — download release binary from GitHub, install to ~/.local/bin/claude-print, verify --check\n3. claude-print-ci WorkflowTemplate in jedarden/declarative-config (k8s/iad-ci/argo-workflows/)\n - verify step only (build-musl + github-release steps added in Phase 11)\n - delegates to rust-verify WorkflowTemplate\n4. --check subcommand in src/main.rs or src/check.rs\n - openpty probe: confirm openpty syscall succeeds\n - mkfifo probe: confirm mkfifo in /home/coding/.tmp succeeds\n - optional mock_claude PTY round-trip (if mock_claude binary present in PATH)\n - exits 0 on success, prints diagnostic table\n\nComplete when:\n- bash -n install.sh passes (syntactically valid)\n- Manually copying locally-built binary to ~/.local/bin/claude-print and running claude-print --check succeeds\n- NEEDLE dispatches a test bead using claude-print.yaml; AS-3 passes\n- README flags table matches claude-print --help output exactly (verified manually)\n\nReference: docs/plan/plan.md § Phase 9","design":"","acceptance_criteria":"","notes":"","status":"open","priority":1,"issue_type":"task","created_at":"2026-06-10T03:54:09.318382701Z","updated_at":"2026-06-10T03:54:09.318382701Z","source_repo":".","compaction_level":0,"dependencies":[{"issue_id":"bf-42j","depends_on_id":"bf-2f1","type":"blocks","created_at":"2026-06-10T03:54:31.706113490Z","created_by":"cli","thread_id":""}]} {"id":"bf-4no","title":"Phase 11: CI (~YAML only)","description":"Entry: Phase 10 complete (and Phase 9 complete for install.sh e2e test).\n\nDeliverables in jedarden/declarative-config (k8s/iad-ci/argo-workflows/claude-print-ci.yaml):\n- Update claude-print-ci WorkflowTemplate stub (from Phase 9) with full steps:\n 1. verify — delegates to rust-verify WorkflowTemplate (fmt + clippy + test + cargo audit)\n 2. build-musl — cross-compile x86_64-unknown-linux-musl release binary\n 3. build-mock-claude-musl — build test-fixtures/mock-claude/ as musl binary\n 4. github-release — upload claude-print + mock_claude binaries + last-claude-version.txt artifact to GitHub Release\n- Confirm cargo audit runs (either via rust-verify or as explicit step between verify and build-musl)\n- install.sh end-to-end download test: download release artifact from GitHub Release URL, verify install.sh exits 0 and claude-print --check passes\n\nComplete when:\n- CI run on main branch produces release binary at expected GitHub Release URL\n- last-claude-version.txt artifact present in release\n- Binary passes claude-print --check (credential-free) via install.sh\n- install.sh end-to-end download test passes (deferred from Phase 9)\n- AS-1 verified manually before pushing release tag\n\nReference: docs/plan/plan.md § Phase 11","design":"","acceptance_criteria":"","notes":"","status":"open","priority":1,"issue_type":"task","created_at":"2026-06-10T03:54:27.444014247Z","updated_at":"2026-06-10T03:54:27.444014247Z","source_repo":".","compaction_level":0,"dependencies":[{"issue_id":"bf-4no","depends_on_id":"bf-10t","type":"blocks","created_at":"2026-06-10T03:54:31.717358160Z","created_by":"cli","thread_id":""},{"issue_id":"bf-4no","depends_on_id":"bf-42j","type":"blocks","created_at":"2026-06-10T03:54:31.725797267Z","created_by":"cli","thread_id":""}]} {"id":"bf-5bl","title":"Starvation alert: beads invisible to worker","description":"## Starvation Alert\n\nOpen beads exist but Pluck found none — possible configuration error.\n\n**Workspace:** default\n**Total beads:** 6\n**Open:** 5\n**In-progress:** 1\n**Claimed by:** claude-glm-glm47-alpha\n\nCheck exclude_labels, workspace path, and filter configuration.","design":"","acceptance_criteria":"","notes":"","status":"closed","priority":2,"issue_type":"task","created_at":"2026-06-10T03:57:06.245148475Z","updated_at":"2026-06-10T04:30:00Z","closed_at":"2026-06-10T04:30:00Z","source_repo":".","compaction_level":0,"labels":["starvation-alert"]} -{"id":"bf-64k","title":"Phase 7: Transcript Reader (~180 LOC)","description":"Entry: Phase 6 complete. PO-5 acknowledged: retry loop (40×50ms) is the mitigation for Stop-before-JSONL races. Verify retry timing by running test_transcript_race with MOCK_DELAY_JSONL=100 and confirming exit 0.\n\nImplementation: src/transcript.rs\n- JSONL parse with lenient serde (unknown fields tolerated, unknown event types skipped)\n- message.id dedup + usage-fingerprint fallback dedup for events without message.id\n- Text extraction from assistant ContentBlock array (text type only)\n- 40×50ms retry loop with Stop-payload fallback to last_assistant_message after exhausted\n- Path derivation: strip leading /, replace / with -, append session_id from Stop payload\n\nComplete when:\n- All transcript unit tests pass (tests/transcript.rs)\n- test_streaming_dedup_40_retries passes\n- AS-6 (race scenario: Stop fires before JSONL flush) passes with MOCK_DELAY_JSONL=100\n\nReference: docs/plan/plan.md § Phase 7","design":"","acceptance_criteria":"","notes":"","status":"open","priority":1,"issue_type":"task","created_at":"2026-06-10T03:53:52.452812786Z","updated_at":"2026-06-10T03:53:52.452812786Z","source_repo":".","compaction_level":0,"dependencies":[{"issue_id":"bf-64k","depends_on_id":"bf-64s","type":"blocks","created_at":"2026-06-10T03:54:31.695602796Z","created_by":"cli","thread_id":""}]} +{"id":"bf-64k","title":"Phase 7: Transcript Reader (~180 LOC)","description":"Entry: Phase 6 complete. PO-5 acknowledged: retry loop (40×50ms) is the mitigation for Stop-before-JSONL races. Verify retry timing by running test_transcript_race with MOCK_DELAY_JSONL=100 and confirming exit 0.\n\nImplementation: src/transcript.rs\n- JSONL parse with lenient serde (unknown fields tolerated, unknown event types skipped)\n- message.id dedup + usage-fingerprint fallback dedup for events without message.id\n- Text extraction from assistant ContentBlock array (text type only)\n- 40×50ms retry loop with Stop-payload fallback to last_assistant_message after exhausted\n- Path derivation: strip leading /, replace / with -, append session_id from Stop payload\n\nComplete when:\n- All transcript unit tests pass (tests/transcript.rs)\n- test_streaming_dedup_40_retries passes\n- AS-6 (race scenario: Stop fires before JSONL flush) passes with MOCK_DELAY_JSONL=100\n\nReference: docs/plan/plan.md § Phase 7","design":"","acceptance_criteria":"","notes":"","status":"in_progress","priority":1,"issue_type":"task","assignee":"claude-glm-glm47-alpha","created_at":"2026-06-10T03:53:52.452812786Z","updated_at":"2026-06-10T04:40:42.953589901Z","source_repo":".","compaction_level":0,"dependencies":[{"issue_id":"bf-64k","depends_on_id":"bf-64s","type":"blocks","created_at":"2026-06-10T03:54:31.695602796Z","created_by":"cli","thread_id":""}]} {"id":"bf-64s","title":"Phase 6: Stop Poller (~80 LOC)","description":"Entry: Phase 5 complete. OQ-2 must be resolved (verify --setting-sources= suppresses standard sources; see PO-2 for fallback). OQ-4 (FIFO open race) validated by test.\n\nImplementation: poller.rs (or extend event_loop.rs)\n- Open FIFO read-end O_NONBLOCK, integrate into poll() loop\n- Parse Stop hook JSON payload (session_id, transcript_path, last_assistant_message)\n- Derive transcript path from session_id + cwd slug if transcript_path absent\n- Signal event loop exit via channel/flag\n\nComplete when:\n- Integration test test_stop_hook_fires passes (mock_claude emits Stop, FIFO received, exit 0)\n- test_missing_transcript_path_derived passes (Stop without transcript_path → path derived from session_id)\n\nReference: docs/plan/plan.md § Phase 6","design":"","acceptance_criteria":"","notes":"","status":"closed","priority":1,"issue_type":"task","assignee":"claude-glm-glm47-alpha","created_at":"2026-06-10T03:53:44.914912586Z","updated_at":"2026-06-10T04:40:12.904790Z","closed_at":"2026-06-10T04:40:12.904790Z","source_repo":".","compaction_level":0} diff --git a/src/lib.rs b/src/lib.rs index f7e1078..cd2e658 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,3 +7,4 @@ pub mod poller; pub mod pty; pub mod startup; pub mod terminal; +pub mod transcript; diff --git a/src/transcript.rs b/src/transcript.rs new file mode 100644 index 0000000..cfb8c52 --- /dev/null +++ b/src/transcript.rs @@ -0,0 +1,217 @@ +use crate::error::{Error, Result}; +use serde::Deserialize; +use std::collections::HashSet; +use std::fs::File; +use std::io::{BufRead, BufReader}; +use std::path::Path; +use std::thread; +use std::time::Duration; + +type UsageKey = (Option, Option, Option, Option); + +#[derive(Debug, Deserialize, Default, Clone)] +#[serde(default)] +pub struct Usage { + pub input_tokens: Option, + pub output_tokens: Option, + pub cache_creation_input_tokens: Option, + pub cache_read_input_tokens: Option, +} + +impl Usage { + fn as_key(&self) -> UsageKey { + ( + self.input_tokens, + self.output_tokens, + self.cache_creation_input_tokens, + self.cache_read_input_tokens, + ) + } +} + +#[derive(Debug, Default, Clone, PartialEq)] +pub struct AggregatedUsage { + pub input_tokens: u64, + pub output_tokens: u64, + pub cache_creation_input_tokens: u64, + pub cache_read_input_tokens: u64, +} + +impl AggregatedUsage { + fn add(&mut self, usage: &Usage) { + self.input_tokens += usage.input_tokens.unwrap_or(0); + self.output_tokens += usage.output_tokens.unwrap_or(0); + self.cache_creation_input_tokens += usage.cache_creation_input_tokens.unwrap_or(0); + self.cache_read_input_tokens += usage.cache_read_input_tokens.unwrap_or(0); + } +} + +#[derive(Debug, Deserialize)] +#[serde(tag = "type", rename_all = "kebab-case")] +pub enum ContentBlock { + Text { text: String }, + ToolUse { name: String }, + Thinking { thinking: String }, + #[serde(other)] + Unknown, +} + +#[derive(Debug, Deserialize, Default)] +#[serde(default)] +pub struct AssistantMessage { + pub id: Option, + pub content: Vec, + pub usage: Usage, +} + +#[derive(Debug, Deserialize, Default)] +#[serde(default)] +pub struct ResultEvent { + pub is_error: Option, + pub session_id: Option, +} + +#[derive(Debug, Deserialize)] +#[serde(tag = "type", rename_all = "kebab-case")] +pub enum Event { + Assistant { message: AssistantMessage }, + User { message: serde_json::Value }, + Result(ResultEvent), + #[serde(other)] + Unknown, +} + +#[derive(Debug, Default)] +pub struct TranscriptResult { + pub text: String, + pub num_turns: usize, + pub usage: AggregatedUsage, + pub session_id: Option, + pub is_error: bool, + pub used_fallback: bool, +} + +/// Parse a transcript JSONL file once (no retry). +/// +/// Missing files return an empty result. Malformed lines are silently skipped. +pub fn parse_transcript(path: &Path) -> Result { + let file = match File::open(path) { + Ok(f) => f, + Err(e) if e.kind() == std::io::ErrorKind::NotFound => { + return Ok(TranscriptResult::default()); + } + Err(e) => return Err(e.into()), + }; + + let reader = BufReader::new(file); + let mut seen_ids: HashSet = HashSet::new(); + let mut prev_usage_key: Option = None; + let mut agg_usage = AggregatedUsage::default(); + let mut num_turns: usize = 0; + let mut current_turn_text = String::new(); + let mut session_id: Option = None; + let mut is_error = false; + + for line in reader.lines() { + let line = match line { + Ok(l) => l, + Err(_) => continue, + }; + let line = line.trim().to_owned(); + if line.is_empty() { + continue; + } + + let event: Event = match serde_json::from_str(&line) { + Ok(e) => e, + Err(_) => continue, + }; + + match event { + Event::Assistant { message } => { + let is_new_turn = if let Some(id) = &message.id { + seen_ids.insert(id.clone()) + } else { + let key = message.usage.as_key(); + let new = Some(&key) != prev_usage_key.as_ref(); + prev_usage_key = Some(key); + new + }; + + if is_new_turn { + current_turn_text.clear(); + num_turns += 1; + agg_usage.add(&message.usage); + } + + for block in &message.content { + if let ContentBlock::Text { text } = block { + current_turn_text.push_str(text); + } + } + } + Event::Result(r) => { + if r.session_id.is_some() { + session_id = r.session_id; + } + is_error = r.is_error.unwrap_or(false); + } + Event::User { .. } | Event::Unknown => {} + } + } + + Ok(TranscriptResult { + text: current_turn_text, + num_turns, + usage: agg_usage, + session_id, + is_error, + used_fallback: false, + }) +} + +/// Read a transcript with retry loop and fallback. +/// +/// Retries up to 40×50 ms when the file is missing or text is empty (Stop-before-JSONL race +/// window, PO-5). Falls back to `last_assistant_message` if retries are exhausted. +/// Returns an error if both are empty. +pub fn read_transcript( + path: &Path, + last_assistant_message: Option<&str>, +) -> Result { + const MAX_RETRIES: usize = 40; + const RETRY_DELAY: Duration = Duration::from_millis(50); + + let mut last_session_id: Option = None; + let mut last_is_error = false; + + for attempt in 0..=MAX_RETRIES { + if attempt > 0 { + thread::sleep(RETRY_DELAY); + } + if let Ok(r) = parse_transcript(path) { + if r.session_id.is_some() { + last_session_id = r.session_id.clone(); + } + last_is_error = r.is_error; + if !r.text.is_empty() { + return Ok(r); + } + } + } + + if let Some(msg) = last_assistant_message.filter(|s| !s.is_empty()) { + return Ok(TranscriptResult { + text: msg.to_string(), + num_turns: 0, + usage: AggregatedUsage::default(), + session_id: last_session_id, + is_error: last_is_error, + used_fallback: true, + }); + } + + Err(Error::Internal(anyhow::anyhow!( + "no response text after 40 retries; no last_assistant_message fallback" + ))) +} diff --git a/tests/transcript.rs b/tests/transcript.rs new file mode 100644 index 0000000..e94a4ff --- /dev/null +++ b/tests/transcript.rs @@ -0,0 +1,423 @@ +use claude_print::transcript::{parse_transcript, read_transcript, AggregatedUsage}; +use std::io::Write; +use std::path::Path; +use tempfile::TempDir; + +fn write_jsonl(path: &Path, lines: &[String]) { + let mut f = std::fs::File::create(path).unwrap(); + for line in lines { + writeln!(f, "{}", line).unwrap(); + } +} + +fn assistant_event(id: &str, text: &str, in_tok: u64, out_tok: u64, cache_create: u64, cache_read: u64) -> String { + serde_json::json!({ + "type": "assistant", + "message": { + "id": id, + "content": [{"type": "text", "text": text}], + "usage": { + "input_tokens": in_tok, + "output_tokens": out_tok, + "cache_creation_input_tokens": cache_create, + "cache_read_input_tokens": cache_read + } + } + }) + .to_string() +} + +fn assistant_event_no_id(usage_in: u64, usage_out: u64, text: &str) -> String { + serde_json::json!({ + "type": "assistant", + "message": { + "content": [{"type": "text", "text": text}], + "usage": { + "input_tokens": usage_in, + "output_tokens": usage_out, + "cache_creation_input_tokens": 0, + "cache_read_input_tokens": 0 + } + } + }) + .to_string() +} + +fn result_event(session_id: &str, is_error: bool) -> String { + serde_json::json!({ + "type": "result", + "session_id": session_id, + "is_error": is_error + }) + .to_string() +} + +// ── Single turn, single text block ─────────────────────────────────────────── + +#[test] +fn test_single_turn_single_text_block() { + let dir = TempDir::new().unwrap(); + let path = dir.path().join("t.jsonl"); + write_jsonl( + &path, + &[assistant_event("msg-1", "hello world", 10, 5, 0, 0)], + ); + let r = parse_transcript(&path).unwrap(); + assert_eq!(r.text, "hello world"); + assert_eq!(r.num_turns, 1); + assert_eq!(r.usage.input_tokens, 10); + assert_eq!(r.usage.output_tokens, 5); +} + +// ── Multi-block content: text + tool_use + thinking + text → text concatenated + +#[test] +fn test_multi_block_content() { + let dir = TempDir::new().unwrap(); + let path = dir.path().join("t.jsonl"); + let event = serde_json::json!({ + "type": "assistant", + "message": { + "id": "msg-2", + "content": [ + {"type": "text", "text": "first "}, + {"type": "tool_use", "name": "bash", "id": "toolu_1", "input": {}}, + {"type": "thinking", "thinking": "reasoning here"}, + {"type": "text", "text": "second"} + ], + "usage": {"input_tokens": 20, "output_tokens": 10, "cache_creation_input_tokens": 0, "cache_read_input_tokens": 0} + } + }) + .to_string(); + write_jsonl(&path, &[event]); + let r = parse_transcript(&path).unwrap(); + assert_eq!(r.text, "first second"); + assert_eq!(r.num_turns, 1); +} + +// ── Multi-turn: 3 unique usage keys → 3 turns, last turn's text returned ───── + +#[test] +fn test_multi_turn_unique_keys() { + let dir = TempDir::new().unwrap(); + let path = dir.path().join("t.jsonl"); + write_jsonl( + &path, + &[ + assistant_event("msg-a", "turn one", 10, 5, 0, 0), + assistant_event("msg-b", "turn two", 20, 8, 0, 0), + assistant_event("msg-c", "turn three", 30, 12, 0, 0), + ], + ); + let r = parse_transcript(&path).unwrap(); + assert_eq!(r.num_turns, 3); + assert_eq!(r.text, "turn three"); + assert_eq!(r.usage.input_tokens, 60); + assert_eq!(r.usage.output_tokens, 25); +} + +// ── Streaming dedup: 5 consecutive events with identical usage → 1 turn ────── + +#[test] +fn test_streaming_dedup_five_chunks() { + let dir = TempDir::new().unwrap(); + let path = dir.path().join("t.jsonl"); + let chunks: Vec = (0..5) + .map(|i| assistant_event("msg-stream", &format!("chunk{i}"), 10, 5, 0, 0)) + .collect(); + write_jsonl(&path, &chunks); + let r = parse_transcript(&path).unwrap(); + assert_eq!(r.num_turns, 1, "5 chunks of same message.id = 1 turn"); + assert_eq!(r.text, "chunk0chunk1chunk2chunk3chunk4"); + assert_eq!(r.usage.input_tokens, 10); +} + +// ── Token aggregation: 45 unique turns → correct sum ───────────────────────── + +#[test] +fn test_token_aggregation_45_turns() { + let dir = TempDir::new().unwrap(); + let path = dir.path().join("t.jsonl"); + let lines: Vec = (0..45) + .map(|i| assistant_event(&format!("msg-{i}"), "x", 100, 50, 10, 20)) + .collect(); + write_jsonl(&path, &lines); + let r = parse_transcript(&path).unwrap(); + assert_eq!(r.num_turns, 45); + assert_eq!(r.usage.input_tokens, 45 * 100); + assert_eq!(r.usage.output_tokens, 45 * 50); + assert_eq!(r.usage.cache_creation_input_tokens, 45 * 10); + assert_eq!(r.usage.cache_read_input_tokens, 45 * 20); +} + +// ── Missing cache_creation_input_tokens → defaults to 0 ────────────────────── + +#[test] +fn test_missing_cache_creation_tokens() { + let dir = TempDir::new().unwrap(); + let path = dir.path().join("t.jsonl"); + let event = serde_json::json!({ + "type": "assistant", + "message": { + "id": "msg-3", + "content": [{"type": "text", "text": "hello"}], + "usage": {"input_tokens": 5, "output_tokens": 3} + } + }) + .to_string(); + write_jsonl(&path, &[event]); + let r = parse_transcript(&path).unwrap(); + assert_eq!(r.text, "hello"); + assert_eq!(r.usage.cache_creation_input_tokens, 0); +} + +// ── input_tokens: null → treated as 0 ──────────────────────────────────────── + +#[test] +fn test_null_input_tokens() { + let dir = TempDir::new().unwrap(); + let path = dir.path().join("t.jsonl"); + let event = serde_json::json!({ + "type": "assistant", + "message": { + "id": "msg-4", + "content": [{"type": "text", "text": "text"}], + "usage": {"input_tokens": null, "output_tokens": 7, "cache_creation_input_tokens": null, "cache_read_input_tokens": null} + } + }) + .to_string(); + write_jsonl(&path, &[event]); + let r = parse_transcript(&path).unwrap(); + assert_eq!(r.usage.input_tokens, 0); + assert_eq!(r.usage.output_tokens, 7); + assert_eq!(r.usage.cache_creation_input_tokens, 0); +} + +// ── Unknown event type → silently skipped ──────────────────────────────────── + +#[test] +fn test_unknown_event_type_skipped() { + let dir = TempDir::new().unwrap(); + let path = dir.path().join("t.jsonl"); + write_jsonl( + &path, + &[ + r#"{"type":"new-future-event","data":{"foo":42}}"#.to_string(), + assistant_event("msg-5", "real text", 10, 5, 0, 0), + ], + ); + let r = parse_transcript(&path).unwrap(); + assert_eq!(r.text, "real text"); + assert_eq!(r.num_turns, 1); +} + +// ── Unknown content block type → skipped, text blocks still extracted ───────── + +#[test] +fn test_unknown_content_block_skipped() { + let dir = TempDir::new().unwrap(); + let path = dir.path().join("t.jsonl"); + let event = serde_json::json!({ + "type": "assistant", + "message": { + "id": "msg-6", + "content": [ + {"type": "image", "source": {"type": "base64", "data": "abc"}}, + {"type": "text", "text": "still here"} + ], + "usage": {"input_tokens": 5, "output_tokens": 3, "cache_creation_input_tokens": 0, "cache_read_input_tokens": 0} + } + }) + .to_string(); + write_jsonl(&path, &[event]); + let r = parse_transcript(&path).unwrap(); + assert_eq!(r.text, "still here"); +} + +// ── Unknown usage fields → silently ignored ─────────────────────────────────── + +#[test] +fn test_unknown_usage_fields_ignored() { + let dir = TempDir::new().unwrap(); + let path = dir.path().join("t.jsonl"); + let event = serde_json::json!({ + "type": "assistant", + "message": { + "id": "msg-7", + "content": [{"type": "text", "text": "ok"}], + "usage": { + "input_tokens": 8, + "output_tokens": 4, + "cache_creation_input_tokens": 0, + "cache_read_input_tokens": 0, + "future_token_field": 999, + "nested_future": {"a": 1} + } + } + }) + .to_string(); + write_jsonl(&path, &[event]); + let r = parse_transcript(&path).unwrap(); + assert_eq!(r.usage.input_tokens, 8); + assert_eq!(r.usage.output_tokens, 4); +} + +// ── Malformed JSONL line → skipped, subsequent lines parsed ────────────────── + +#[test] +fn test_malformed_line_skipped() { + let dir = TempDir::new().unwrap(); + let path = dir.path().join("t.jsonl"); + write_jsonl( + &path, + &[ + r#"{"type":"assistant","message":{"id":"msg-bad","content":[{"type":"text""# + .to_string(), // truncated + assistant_event("msg-8", "recovered", 5, 3, 0, 0), + ], + ); + let r = parse_transcript(&path).unwrap(); + assert_eq!(r.text, "recovered"); + assert_eq!(r.num_turns, 1); +} + +// ── Empty file → empty text, zero token counts, no panic ───────────────────── + +#[test] +fn test_empty_file() { + let dir = TempDir::new().unwrap(); + let path = dir.path().join("t.jsonl"); + std::fs::File::create(&path).unwrap(); + let r = parse_transcript(&path).unwrap(); + assert_eq!(r.text, ""); + assert_eq!(r.num_turns, 0); + assert_eq!(r.usage, AggregatedUsage::default()); +} + +// ── Usage-fingerprint fallback dedup (no message.id) ───────────────────────── + +#[test] +fn test_fingerprint_dedup_no_id() { + let dir = TempDir::new().unwrap(); + let path = dir.path().join("t.jsonl"); + // 3 chunks with same usage but no message.id → 1 turn + let chunks: Vec = (0..3) + .map(|i| assistant_event_no_id(10, 5, &format!("p{i}"))) + .collect(); + write_jsonl(&path, &chunks); + let r = parse_transcript(&path).unwrap(); + assert_eq!(r.num_turns, 1); + assert_eq!(r.text, "p0p1p2"); +} + +// ── Result event: session_id and is_error extracted ────────────────────────── + +#[test] +fn test_result_event_fields() { + let dir = TempDir::new().unwrap(); + let path = dir.path().join("t.jsonl"); + write_jsonl( + &path, + &[ + assistant_event("msg-9", "response", 5, 3, 0, 0), + result_event("session-xyz", false), + ], + ); + let r = parse_transcript(&path).unwrap(); + assert_eq!(r.session_id.as_deref(), Some("session-xyz")); + assert!(!r.is_error); +} + +// ── read_transcript: fallback to last_assistant_message ────────────────────── + +#[test] +fn test_fallback_to_last_assistant_message() { + let dir = TempDir::new().unwrap(); + let _path = dir.path().join("nonexistent.jsonl"); + // File doesn't exist; retries would time out, so use a real file with no text + // For speed, use a transcript with no text content and provide fallback + let path2 = dir.path().join("empty.jsonl"); + // Write a file that has no text (only a result event without assistant) + write_jsonl(&path2, &[result_event("s1", false)]); + + let r = read_transcript(&path2, Some("fallback text")).unwrap(); + assert_eq!(r.text, "fallback text"); + assert!(r.used_fallback); +} + +// ── read_transcript: error when both empty ──────────────────────────────────── + +#[test] +fn test_both_empty_returns_error() { + let dir = TempDir::new().unwrap(); + let path = dir.path().join("empty.jsonl"); + // File with result event only (no assistant text) and no fallback + write_jsonl(&path, &[result_event("s2", false)]); + let result = read_transcript(&path, None); + assert!(result.is_err()); +} + +// ── test_streaming_dedup_40_retries: race + dedup combined ─────────────────── +// Simulates Stop-before-JSONL-flush (AS-6 / MOCK_DELAY_JSONL=100): +// JSONL is written after 100ms; retry loop (40×50ms = 2s budget) catches it. +// Also verifies that streaming chunks (same message.id) are correctly deduped. + +#[test] +fn test_streaming_dedup_40_retries() { + let dir = TempDir::new().unwrap(); + let path = dir.path().join("race.jsonl"); + + let path_clone = path.clone(); + std::thread::spawn(move || { + let delay_ms: u64 = std::env::var("MOCK_DELAY_JSONL") + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(100); + std::thread::sleep(std::time::Duration::from_millis(delay_ms)); + + // 5 streaming chunks of the same turn (same message.id) + let mut content = String::new(); + for i in 0..5 { + let line = serde_json::json!({ + "type": "assistant", + "message": { + "id": "msg-race-1", + "content": [{"type": "text", "text": format!("chunk{i}")}], + "usage": {"input_tokens": 10, "output_tokens": 5, "cache_creation_input_tokens": 0, "cache_read_input_tokens": 0} + } + }) + .to_string(); + content.push_str(&line); + content.push('\n'); + } + std::fs::write(&path_clone, content).unwrap(); + }); + + let r = read_transcript(&path, None).unwrap(); + assert_eq!(r.num_turns, 1, "5 streaming chunks of same message.id = 1 turn"); + assert_eq!(r.text, "chunk0chunk1chunk2chunk3chunk4"); + assert_eq!(r.usage.input_tokens, 10); +} + +// ── test_transcript_race: MOCK_DELAY_JSONL=100 ─────────────────────────────── +// Direct test for the race window mitigation (AS-6). + +#[test] +fn test_transcript_race() { + let dir = TempDir::new().unwrap(); + let path = dir.path().join("race2.jsonl"); + + let path_clone = path.clone(); + std::thread::spawn(move || { + std::thread::sleep(std::time::Duration::from_millis(100)); + std::fs::write( + &path_clone, + format!("{}\n", assistant_event("msg-race-2", "race result", 10, 5, 0, 0)), + ) + .unwrap(); + }); + + let r = read_transcript(&path, None).unwrap(); + assert_eq!(r.text, "race result"); + assert_eq!(r.num_turns, 1); +}