Add Phase 7: transcript reader with retry loop and dedup

Implements src/transcript.rs: lenient JSONL parsing, message.id dedup
with usage-fingerprint fallback, text extraction from ContentBlock arrays,
40×50ms retry loop for Stop-before-JSONL races (PO-5), and last_assistant_message
fallback. All 18 tests in tests/transcript.rs pass; AS-6 verified with
MOCK_DELAY_JSONL=100.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
jedarden 2026-06-10 00:51:50 -04:00
parent 343fa3898c
commit c6241e37b7
4 changed files with 642 additions and 1 deletions

View file

@ -3,5 +3,5 @@
{"id":"bf-42j","title":"Phase 9: NEEDLE Integration (~50 LOC + config)","description":"Entry: Phase 8 complete.\n\nDeliverables:\n1. claude-print.yaml — NEEDLE agent config for dispatching beads via claude-print instead of claude -p\n - input_method: stdin, output_transform: needle-transform-claude\n - invoke_template: claude-print --output-format json --model ${MODEL}\n2. install.sh — download release binary from GitHub, install to ~/.local/bin/claude-print, verify --check\n3. claude-print-ci WorkflowTemplate in jedarden/declarative-config (k8s/iad-ci/argo-workflows/)\n - verify step only (build-musl + github-release steps added in Phase 11)\n - delegates to rust-verify WorkflowTemplate\n4. --check subcommand in src/main.rs or src/check.rs\n - openpty probe: confirm openpty syscall succeeds\n - mkfifo probe: confirm mkfifo in /home/coding/.tmp succeeds\n - optional mock_claude PTY round-trip (if mock_claude binary present in PATH)\n - exits 0 on success, prints diagnostic table\n\nComplete when:\n- bash -n install.sh passes (syntactically valid)\n- Manually copying locally-built binary to ~/.local/bin/claude-print and running claude-print --check succeeds\n- NEEDLE dispatches a test bead using claude-print.yaml; AS-3 passes\n- README flags table matches claude-print --help output exactly (verified manually)\n\nReference: docs/plan/plan.md § Phase 9","design":"","acceptance_criteria":"","notes":"","status":"open","priority":1,"issue_type":"task","created_at":"2026-06-10T03:54:09.318382701Z","updated_at":"2026-06-10T03:54:09.318382701Z","source_repo":".","compaction_level":0,"dependencies":[{"issue_id":"bf-42j","depends_on_id":"bf-2f1","type":"blocks","created_at":"2026-06-10T03:54:31.706113490Z","created_by":"cli","thread_id":""}]}
{"id":"bf-4no","title":"Phase 11: CI (~YAML only)","description":"Entry: Phase 10 complete (and Phase 9 complete for install.sh e2e test).\n\nDeliverables in jedarden/declarative-config (k8s/iad-ci/argo-workflows/claude-print-ci.yaml):\n- Update claude-print-ci WorkflowTemplate stub (from Phase 9) with full steps:\n 1. verify — delegates to rust-verify WorkflowTemplate (fmt + clippy + test + cargo audit)\n 2. build-musl — cross-compile x86_64-unknown-linux-musl release binary\n 3. build-mock-claude-musl — build test-fixtures/mock-claude/ as musl binary\n 4. github-release — upload claude-print + mock_claude binaries + last-claude-version.txt artifact to GitHub Release\n- Confirm cargo audit runs (either via rust-verify or as explicit step between verify and build-musl)\n- install.sh end-to-end download test: download release artifact from GitHub Release URL, verify install.sh exits 0 and claude-print --check passes\n\nComplete when:\n- CI run on main branch produces release binary at expected GitHub Release URL\n- last-claude-version.txt artifact present in release\n- Binary passes claude-print --check (credential-free) via install.sh\n- install.sh end-to-end download test passes (deferred from Phase 9)\n- AS-1 verified manually before pushing release tag\n\nReference: docs/plan/plan.md § Phase 11","design":"","acceptance_criteria":"","notes":"","status":"open","priority":1,"issue_type":"task","created_at":"2026-06-10T03:54:27.444014247Z","updated_at":"2026-06-10T03:54:27.444014247Z","source_repo":".","compaction_level":0,"dependencies":[{"issue_id":"bf-4no","depends_on_id":"bf-10t","type":"blocks","created_at":"2026-06-10T03:54:31.717358160Z","created_by":"cli","thread_id":""},{"issue_id":"bf-4no","depends_on_id":"bf-42j","type":"blocks","created_at":"2026-06-10T03:54:31.725797267Z","created_by":"cli","thread_id":""}]}
{"id":"bf-5bl","title":"Starvation alert: beads invisible to worker","description":"## Starvation Alert\n\nOpen beads exist but Pluck found none — possible configuration error.\n\n**Workspace:** default\n**Total beads:** 6\n**Open:** 5\n**In-progress:** 1\n**Claimed by:** claude-glm-glm47-alpha\n\nCheck exclude_labels, workspace path, and filter configuration.","design":"","acceptance_criteria":"","notes":"","status":"closed","priority":2,"issue_type":"task","created_at":"2026-06-10T03:57:06.245148475Z","updated_at":"2026-06-10T04:30:00Z","closed_at":"2026-06-10T04:30:00Z","source_repo":".","compaction_level":0,"labels":["starvation-alert"]}
{"id":"bf-64k","title":"Phase 7: Transcript Reader (~180 LOC)","description":"Entry: Phase 6 complete. PO-5 acknowledged: retry loop (40×50ms) is the mitigation for Stop-before-JSONL races. Verify retry timing by running test_transcript_race with MOCK_DELAY_JSONL=100 and confirming exit 0.\n\nImplementation: src/transcript.rs\n- JSONL parse with lenient serde (unknown fields tolerated, unknown event types skipped)\n- message.id dedup + usage-fingerprint fallback dedup for events without message.id\n- Text extraction from assistant ContentBlock array (text type only)\n- 40×50ms retry loop with Stop-payload fallback to last_assistant_message after exhausted\n- Path derivation: strip leading /, replace / with -, append session_id from Stop payload\n\nComplete when:\n- All transcript unit tests pass (tests/transcript.rs)\n- test_streaming_dedup_40_retries passes\n- AS-6 (race scenario: Stop fires before JSONL flush) passes with MOCK_DELAY_JSONL=100\n\nReference: docs/plan/plan.md § Phase 7","design":"","acceptance_criteria":"","notes":"","status":"open","priority":1,"issue_type":"task","created_at":"2026-06-10T03:53:52.452812786Z","updated_at":"2026-06-10T03:53:52.452812786Z","source_repo":".","compaction_level":0,"dependencies":[{"issue_id":"bf-64k","depends_on_id":"bf-64s","type":"blocks","created_at":"2026-06-10T03:54:31.695602796Z","created_by":"cli","thread_id":""}]}
{"id":"bf-64k","title":"Phase 7: Transcript Reader (~180 LOC)","description":"Entry: Phase 6 complete. PO-5 acknowledged: retry loop (40×50ms) is the mitigation for Stop-before-JSONL races. Verify retry timing by running test_transcript_race with MOCK_DELAY_JSONL=100 and confirming exit 0.\n\nImplementation: src/transcript.rs\n- JSONL parse with lenient serde (unknown fields tolerated, unknown event types skipped)\n- message.id dedup + usage-fingerprint fallback dedup for events without message.id\n- Text extraction from assistant ContentBlock array (text type only)\n- 40×50ms retry loop with Stop-payload fallback to last_assistant_message after exhausted\n- Path derivation: strip leading /, replace / with -, append session_id from Stop payload\n\nComplete when:\n- All transcript unit tests pass (tests/transcript.rs)\n- test_streaming_dedup_40_retries passes\n- AS-6 (race scenario: Stop fires before JSONL flush) passes with MOCK_DELAY_JSONL=100\n\nReference: docs/plan/plan.md § Phase 7","design":"","acceptance_criteria":"","notes":"","status":"in_progress","priority":1,"issue_type":"task","assignee":"claude-glm-glm47-alpha","created_at":"2026-06-10T03:53:52.452812786Z","updated_at":"2026-06-10T04:40:42.953589901Z","source_repo":".","compaction_level":0,"dependencies":[{"issue_id":"bf-64k","depends_on_id":"bf-64s","type":"blocks","created_at":"2026-06-10T03:54:31.695602796Z","created_by":"cli","thread_id":""}]}
{"id":"bf-64s","title":"Phase 6: Stop Poller (~80 LOC)","description":"Entry: Phase 5 complete. OQ-2 must be resolved (verify --setting-sources= suppresses standard sources; see PO-2 for fallback). OQ-4 (FIFO open race) validated by test.\n\nImplementation: poller.rs (or extend event_loop.rs)\n- Open FIFO read-end O_NONBLOCK, integrate into poll() loop\n- Parse Stop hook JSON payload (session_id, transcript_path, last_assistant_message)\n- Derive transcript path from session_id + cwd slug if transcript_path absent\n- Signal event loop exit via channel/flag\n\nComplete when:\n- Integration test test_stop_hook_fires passes (mock_claude emits Stop, FIFO received, exit 0)\n- test_missing_transcript_path_derived passes (Stop without transcript_path → path derived from session_id)\n\nReference: docs/plan/plan.md § Phase 6","design":"","acceptance_criteria":"","notes":"","status":"closed","priority":1,"issue_type":"task","assignee":"claude-glm-glm47-alpha","created_at":"2026-06-10T03:53:44.914912586Z","updated_at":"2026-06-10T04:40:12.904790Z","closed_at":"2026-06-10T04:40:12.904790Z","source_repo":".","compaction_level":0}

View file

@ -7,3 +7,4 @@ pub mod poller;
pub mod pty;
pub mod startup;
pub mod terminal;
pub mod transcript;

217
src/transcript.rs Normal file
View file

@ -0,0 +1,217 @@
use crate::error::{Error, Result};
use serde::Deserialize;
use std::collections::HashSet;
use std::fs::File;
use std::io::{BufRead, BufReader};
use std::path::Path;
use std::thread;
use std::time::Duration;
type UsageKey = (Option<u64>, Option<u64>, Option<u64>, Option<u64>);
#[derive(Debug, Deserialize, Default, Clone)]
#[serde(default)]
pub struct Usage {
pub input_tokens: Option<u64>,
pub output_tokens: Option<u64>,
pub cache_creation_input_tokens: Option<u64>,
pub cache_read_input_tokens: Option<u64>,
}
impl Usage {
fn as_key(&self) -> UsageKey {
(
self.input_tokens,
self.output_tokens,
self.cache_creation_input_tokens,
self.cache_read_input_tokens,
)
}
}
#[derive(Debug, Default, Clone, PartialEq)]
pub struct AggregatedUsage {
pub input_tokens: u64,
pub output_tokens: u64,
pub cache_creation_input_tokens: u64,
pub cache_read_input_tokens: u64,
}
impl AggregatedUsage {
fn add(&mut self, usage: &Usage) {
self.input_tokens += usage.input_tokens.unwrap_or(0);
self.output_tokens += usage.output_tokens.unwrap_or(0);
self.cache_creation_input_tokens += usage.cache_creation_input_tokens.unwrap_or(0);
self.cache_read_input_tokens += usage.cache_read_input_tokens.unwrap_or(0);
}
}
#[derive(Debug, Deserialize)]
#[serde(tag = "type", rename_all = "kebab-case")]
pub enum ContentBlock {
Text { text: String },
ToolUse { name: String },
Thinking { thinking: String },
#[serde(other)]
Unknown,
}
#[derive(Debug, Deserialize, Default)]
#[serde(default)]
pub struct AssistantMessage {
pub id: Option<String>,
pub content: Vec<ContentBlock>,
pub usage: Usage,
}
#[derive(Debug, Deserialize, Default)]
#[serde(default)]
pub struct ResultEvent {
pub is_error: Option<bool>,
pub session_id: Option<String>,
}
#[derive(Debug, Deserialize)]
#[serde(tag = "type", rename_all = "kebab-case")]
pub enum Event {
Assistant { message: AssistantMessage },
User { message: serde_json::Value },
Result(ResultEvent),
#[serde(other)]
Unknown,
}
#[derive(Debug, Default)]
pub struct TranscriptResult {
pub text: String,
pub num_turns: usize,
pub usage: AggregatedUsage,
pub session_id: Option<String>,
pub is_error: bool,
pub used_fallback: bool,
}
/// Parse a transcript JSONL file once (no retry).
///
/// Missing files return an empty result. Malformed lines are silently skipped.
pub fn parse_transcript(path: &Path) -> Result<TranscriptResult> {
let file = match File::open(path) {
Ok(f) => f,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
return Ok(TranscriptResult::default());
}
Err(e) => return Err(e.into()),
};
let reader = BufReader::new(file);
let mut seen_ids: HashSet<String> = HashSet::new();
let mut prev_usage_key: Option<UsageKey> = None;
let mut agg_usage = AggregatedUsage::default();
let mut num_turns: usize = 0;
let mut current_turn_text = String::new();
let mut session_id: Option<String> = None;
let mut is_error = false;
for line in reader.lines() {
let line = match line {
Ok(l) => l,
Err(_) => continue,
};
let line = line.trim().to_owned();
if line.is_empty() {
continue;
}
let event: Event = match serde_json::from_str(&line) {
Ok(e) => e,
Err(_) => continue,
};
match event {
Event::Assistant { message } => {
let is_new_turn = if let Some(id) = &message.id {
seen_ids.insert(id.clone())
} else {
let key = message.usage.as_key();
let new = Some(&key) != prev_usage_key.as_ref();
prev_usage_key = Some(key);
new
};
if is_new_turn {
current_turn_text.clear();
num_turns += 1;
agg_usage.add(&message.usage);
}
for block in &message.content {
if let ContentBlock::Text { text } = block {
current_turn_text.push_str(text);
}
}
}
Event::Result(r) => {
if r.session_id.is_some() {
session_id = r.session_id;
}
is_error = r.is_error.unwrap_or(false);
}
Event::User { .. } | Event::Unknown => {}
}
}
Ok(TranscriptResult {
text: current_turn_text,
num_turns,
usage: agg_usage,
session_id,
is_error,
used_fallback: false,
})
}
/// Read a transcript with retry loop and fallback.
///
/// Retries up to 40×50 ms when the file is missing or text is empty (Stop-before-JSONL race
/// window, PO-5). Falls back to `last_assistant_message` if retries are exhausted.
/// Returns an error if both are empty.
pub fn read_transcript(
path: &Path,
last_assistant_message: Option<&str>,
) -> Result<TranscriptResult> {
const MAX_RETRIES: usize = 40;
const RETRY_DELAY: Duration = Duration::from_millis(50);
let mut last_session_id: Option<String> = None;
let mut last_is_error = false;
for attempt in 0..=MAX_RETRIES {
if attempt > 0 {
thread::sleep(RETRY_DELAY);
}
if let Ok(r) = parse_transcript(path) {
if r.session_id.is_some() {
last_session_id = r.session_id.clone();
}
last_is_error = r.is_error;
if !r.text.is_empty() {
return Ok(r);
}
}
}
if let Some(msg) = last_assistant_message.filter(|s| !s.is_empty()) {
return Ok(TranscriptResult {
text: msg.to_string(),
num_turns: 0,
usage: AggregatedUsage::default(),
session_id: last_session_id,
is_error: last_is_error,
used_fallback: true,
});
}
Err(Error::Internal(anyhow::anyhow!(
"no response text after 40 retries; no last_assistant_message fallback"
)))
}

423
tests/transcript.rs Normal file
View file

@ -0,0 +1,423 @@
use claude_print::transcript::{parse_transcript, read_transcript, AggregatedUsage};
use std::io::Write;
use std::path::Path;
use tempfile::TempDir;
fn write_jsonl(path: &Path, lines: &[String]) {
let mut f = std::fs::File::create(path).unwrap();
for line in lines {
writeln!(f, "{}", line).unwrap();
}
}
fn assistant_event(id: &str, text: &str, in_tok: u64, out_tok: u64, cache_create: u64, cache_read: u64) -> String {
serde_json::json!({
"type": "assistant",
"message": {
"id": id,
"content": [{"type": "text", "text": text}],
"usage": {
"input_tokens": in_tok,
"output_tokens": out_tok,
"cache_creation_input_tokens": cache_create,
"cache_read_input_tokens": cache_read
}
}
})
.to_string()
}
fn assistant_event_no_id(usage_in: u64, usage_out: u64, text: &str) -> String {
serde_json::json!({
"type": "assistant",
"message": {
"content": [{"type": "text", "text": text}],
"usage": {
"input_tokens": usage_in,
"output_tokens": usage_out,
"cache_creation_input_tokens": 0,
"cache_read_input_tokens": 0
}
}
})
.to_string()
}
fn result_event(session_id: &str, is_error: bool) -> String {
serde_json::json!({
"type": "result",
"session_id": session_id,
"is_error": is_error
})
.to_string()
}
// ── Single turn, single text block ───────────────────────────────────────────
#[test]
fn test_single_turn_single_text_block() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("t.jsonl");
write_jsonl(
&path,
&[assistant_event("msg-1", "hello world", 10, 5, 0, 0)],
);
let r = parse_transcript(&path).unwrap();
assert_eq!(r.text, "hello world");
assert_eq!(r.num_turns, 1);
assert_eq!(r.usage.input_tokens, 10);
assert_eq!(r.usage.output_tokens, 5);
}
// ── Multi-block content: text + tool_use + thinking + text → text concatenated
#[test]
fn test_multi_block_content() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("t.jsonl");
let event = serde_json::json!({
"type": "assistant",
"message": {
"id": "msg-2",
"content": [
{"type": "text", "text": "first "},
{"type": "tool_use", "name": "bash", "id": "toolu_1", "input": {}},
{"type": "thinking", "thinking": "reasoning here"},
{"type": "text", "text": "second"}
],
"usage": {"input_tokens": 20, "output_tokens": 10, "cache_creation_input_tokens": 0, "cache_read_input_tokens": 0}
}
})
.to_string();
write_jsonl(&path, &[event]);
let r = parse_transcript(&path).unwrap();
assert_eq!(r.text, "first second");
assert_eq!(r.num_turns, 1);
}
// ── Multi-turn: 3 unique usage keys → 3 turns, last turn's text returned ─────
#[test]
fn test_multi_turn_unique_keys() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("t.jsonl");
write_jsonl(
&path,
&[
assistant_event("msg-a", "turn one", 10, 5, 0, 0),
assistant_event("msg-b", "turn two", 20, 8, 0, 0),
assistant_event("msg-c", "turn three", 30, 12, 0, 0),
],
);
let r = parse_transcript(&path).unwrap();
assert_eq!(r.num_turns, 3);
assert_eq!(r.text, "turn three");
assert_eq!(r.usage.input_tokens, 60);
assert_eq!(r.usage.output_tokens, 25);
}
// ── Streaming dedup: 5 consecutive events with identical usage → 1 turn ──────
#[test]
fn test_streaming_dedup_five_chunks() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("t.jsonl");
let chunks: Vec<String> = (0..5)
.map(|i| assistant_event("msg-stream", &format!("chunk{i}"), 10, 5, 0, 0))
.collect();
write_jsonl(&path, &chunks);
let r = parse_transcript(&path).unwrap();
assert_eq!(r.num_turns, 1, "5 chunks of same message.id = 1 turn");
assert_eq!(r.text, "chunk0chunk1chunk2chunk3chunk4");
assert_eq!(r.usage.input_tokens, 10);
}
// ── Token aggregation: 45 unique turns → correct sum ─────────────────────────
#[test]
fn test_token_aggregation_45_turns() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("t.jsonl");
let lines: Vec<String> = (0..45)
.map(|i| assistant_event(&format!("msg-{i}"), "x", 100, 50, 10, 20))
.collect();
write_jsonl(&path, &lines);
let r = parse_transcript(&path).unwrap();
assert_eq!(r.num_turns, 45);
assert_eq!(r.usage.input_tokens, 45 * 100);
assert_eq!(r.usage.output_tokens, 45 * 50);
assert_eq!(r.usage.cache_creation_input_tokens, 45 * 10);
assert_eq!(r.usage.cache_read_input_tokens, 45 * 20);
}
// ── Missing cache_creation_input_tokens → defaults to 0 ──────────────────────
#[test]
fn test_missing_cache_creation_tokens() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("t.jsonl");
let event = serde_json::json!({
"type": "assistant",
"message": {
"id": "msg-3",
"content": [{"type": "text", "text": "hello"}],
"usage": {"input_tokens": 5, "output_tokens": 3}
}
})
.to_string();
write_jsonl(&path, &[event]);
let r = parse_transcript(&path).unwrap();
assert_eq!(r.text, "hello");
assert_eq!(r.usage.cache_creation_input_tokens, 0);
}
// ── input_tokens: null → treated as 0 ────────────────────────────────────────
#[test]
fn test_null_input_tokens() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("t.jsonl");
let event = serde_json::json!({
"type": "assistant",
"message": {
"id": "msg-4",
"content": [{"type": "text", "text": "text"}],
"usage": {"input_tokens": null, "output_tokens": 7, "cache_creation_input_tokens": null, "cache_read_input_tokens": null}
}
})
.to_string();
write_jsonl(&path, &[event]);
let r = parse_transcript(&path).unwrap();
assert_eq!(r.usage.input_tokens, 0);
assert_eq!(r.usage.output_tokens, 7);
assert_eq!(r.usage.cache_creation_input_tokens, 0);
}
// ── Unknown event type → silently skipped ────────────────────────────────────
#[test]
fn test_unknown_event_type_skipped() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("t.jsonl");
write_jsonl(
&path,
&[
r#"{"type":"new-future-event","data":{"foo":42}}"#.to_string(),
assistant_event("msg-5", "real text", 10, 5, 0, 0),
],
);
let r = parse_transcript(&path).unwrap();
assert_eq!(r.text, "real text");
assert_eq!(r.num_turns, 1);
}
// ── Unknown content block type → skipped, text blocks still extracted ─────────
#[test]
fn test_unknown_content_block_skipped() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("t.jsonl");
let event = serde_json::json!({
"type": "assistant",
"message": {
"id": "msg-6",
"content": [
{"type": "image", "source": {"type": "base64", "data": "abc"}},
{"type": "text", "text": "still here"}
],
"usage": {"input_tokens": 5, "output_tokens": 3, "cache_creation_input_tokens": 0, "cache_read_input_tokens": 0}
}
})
.to_string();
write_jsonl(&path, &[event]);
let r = parse_transcript(&path).unwrap();
assert_eq!(r.text, "still here");
}
// ── Unknown usage fields → silently ignored ───────────────────────────────────
#[test]
fn test_unknown_usage_fields_ignored() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("t.jsonl");
let event = serde_json::json!({
"type": "assistant",
"message": {
"id": "msg-7",
"content": [{"type": "text", "text": "ok"}],
"usage": {
"input_tokens": 8,
"output_tokens": 4,
"cache_creation_input_tokens": 0,
"cache_read_input_tokens": 0,
"future_token_field": 999,
"nested_future": {"a": 1}
}
}
})
.to_string();
write_jsonl(&path, &[event]);
let r = parse_transcript(&path).unwrap();
assert_eq!(r.usage.input_tokens, 8);
assert_eq!(r.usage.output_tokens, 4);
}
// ── Malformed JSONL line → skipped, subsequent lines parsed ──────────────────
#[test]
fn test_malformed_line_skipped() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("t.jsonl");
write_jsonl(
&path,
&[
r#"{"type":"assistant","message":{"id":"msg-bad","content":[{"type":"text""#
.to_string(), // truncated
assistant_event("msg-8", "recovered", 5, 3, 0, 0),
],
);
let r = parse_transcript(&path).unwrap();
assert_eq!(r.text, "recovered");
assert_eq!(r.num_turns, 1);
}
// ── Empty file → empty text, zero token counts, no panic ─────────────────────
#[test]
fn test_empty_file() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("t.jsonl");
std::fs::File::create(&path).unwrap();
let r = parse_transcript(&path).unwrap();
assert_eq!(r.text, "");
assert_eq!(r.num_turns, 0);
assert_eq!(r.usage, AggregatedUsage::default());
}
// ── Usage-fingerprint fallback dedup (no message.id) ─────────────────────────
#[test]
fn test_fingerprint_dedup_no_id() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("t.jsonl");
// 3 chunks with same usage but no message.id → 1 turn
let chunks: Vec<String> = (0..3)
.map(|i| assistant_event_no_id(10, 5, &format!("p{i}")))
.collect();
write_jsonl(&path, &chunks);
let r = parse_transcript(&path).unwrap();
assert_eq!(r.num_turns, 1);
assert_eq!(r.text, "p0p1p2");
}
// ── Result event: session_id and is_error extracted ──────────────────────────
#[test]
fn test_result_event_fields() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("t.jsonl");
write_jsonl(
&path,
&[
assistant_event("msg-9", "response", 5, 3, 0, 0),
result_event("session-xyz", false),
],
);
let r = parse_transcript(&path).unwrap();
assert_eq!(r.session_id.as_deref(), Some("session-xyz"));
assert!(!r.is_error);
}
// ── read_transcript: fallback to last_assistant_message ──────────────────────
#[test]
fn test_fallback_to_last_assistant_message() {
let dir = TempDir::new().unwrap();
let _path = dir.path().join("nonexistent.jsonl");
// File doesn't exist; retries would time out, so use a real file with no text
// For speed, use a transcript with no text content and provide fallback
let path2 = dir.path().join("empty.jsonl");
// Write a file that has no text (only a result event without assistant)
write_jsonl(&path2, &[result_event("s1", false)]);
let r = read_transcript(&path2, Some("fallback text")).unwrap();
assert_eq!(r.text, "fallback text");
assert!(r.used_fallback);
}
// ── read_transcript: error when both empty ────────────────────────────────────
#[test]
fn test_both_empty_returns_error() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("empty.jsonl");
// File with result event only (no assistant text) and no fallback
write_jsonl(&path, &[result_event("s2", false)]);
let result = read_transcript(&path, None);
assert!(result.is_err());
}
// ── test_streaming_dedup_40_retries: race + dedup combined ───────────────────
// Simulates Stop-before-JSONL-flush (AS-6 / MOCK_DELAY_JSONL=100):
// JSONL is written after 100ms; retry loop (40×50ms = 2s budget) catches it.
// Also verifies that streaming chunks (same message.id) are correctly deduped.
#[test]
fn test_streaming_dedup_40_retries() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("race.jsonl");
let path_clone = path.clone();
std::thread::spawn(move || {
let delay_ms: u64 = std::env::var("MOCK_DELAY_JSONL")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(100);
std::thread::sleep(std::time::Duration::from_millis(delay_ms));
// 5 streaming chunks of the same turn (same message.id)
let mut content = String::new();
for i in 0..5 {
let line = serde_json::json!({
"type": "assistant",
"message": {
"id": "msg-race-1",
"content": [{"type": "text", "text": format!("chunk{i}")}],
"usage": {"input_tokens": 10, "output_tokens": 5, "cache_creation_input_tokens": 0, "cache_read_input_tokens": 0}
}
})
.to_string();
content.push_str(&line);
content.push('\n');
}
std::fs::write(&path_clone, content).unwrap();
});
let r = read_transcript(&path, None).unwrap();
assert_eq!(r.num_turns, 1, "5 streaming chunks of same message.id = 1 turn");
assert_eq!(r.text, "chunk0chunk1chunk2chunk3chunk4");
assert_eq!(r.usage.input_tokens, 10);
}
// ── test_transcript_race: MOCK_DELAY_JSONL=100 ───────────────────────────────
// Direct test for the race window mitigation (AS-6).
#[test]
fn test_transcript_race() {
let dir = TempDir::new().unwrap();
let path = dir.path().join("race2.jsonl");
let path_clone = path.clone();
std::thread::spawn(move || {
std::thread::sleep(std::time::Duration::from_millis(100));
std::fs::write(
&path_clone,
format!("{}\n", assistant_event("msg-race-2", "race result", 10, 5, 0, 0)),
)
.unwrap();
});
let r = read_transcript(&path, None).unwrap();
assert_eq!(r.text, "race result");
assert_eq!(r.num_turns, 1);
}