All acceptance criteria verified: - Reader thread spawned at PROMPT_INJECTED transition - Byte offset captured from transcript file length - Retry logic (50ms intervals, 5s timeout) implemented in emitter.rs - Reader thread drains to mpsc channel All 90 library tests and 13 emitter tests pass.
3.8 KiB
Bead bf-30e: Stream-JSON Reader Thread Spawn Implementation
Status: COMPLETE ✅
Verification Summary
The stream-json reader thread spawn functionality was already implemented in src/session.rs (lines 360-376). All acceptance criteria have been verified and met.
Acceptance Criteria Verification
1. ✅ Reader thread spawned at PROMPT_INJECTED transition
Location: src/session.rs:360-376
if last_phase != *current_phase && current_phase.is_prompt_injected() {
// Spawn stream-json reader at PROMPT_INJECTED for stream-json output
if matches!(output_format, crate::cli::OutputFormat::StreamJson) {
let start_offset = std::fs::metadata(&transcript_path)
.map(|m| m.len())
.unwrap_or(0);
stream_json_handle = Some(emitter::spawn_stream_json_reader(
transcript_path.clone(),
start_offset,
));
stream_json_spawned_clone.store(true, std::sync::atomic::Ordering::SeqCst);
}
}
Verification: The code checks for the transition to PromptInjected phase and spawns the reader thread only when output_format is StreamJson.
2. ✅ Byte offset captured from transcript file length at bracketed-paste write
Location: src/session.rs:366-368
let start_offset = std::fs::metadata(&transcript_path)
.map(|m| m.len())
.unwrap_or(0);
Verification: The byte offset is calculated by reading the current transcript file size at the moment of prompt injection. If the file doesn't exist yet, it defaults to 0.
3. ✅ Retry logic implemented (50ms intervals, 5s timeout)
Location: src/emitter.rs:129-149
let deadline = std::time::Instant::now() + Duration::from_secs(5);
let file = loop {
match File::open(&transcript_path) {
Ok(f) => break f,
Err(_) => {
match drain_rx.try_recv() {
Ok(()) => return,
Err(mpsc::TryRecvError::Disconnected) => return,
Err(mpsc::TryRecvError::Empty) => {
if std::time::Instant::now() >= deadline {
return; // Timeout expired - file never appeared
}
thread::sleep(Duration::from_millis(50));
}
}
}
}
};
Verification: The retry logic checks for file existence every 50ms, with a 5-second timeout. It also respects drain signals for immediate exit.
4. ✅ Reader thread drains to mpsc channel
Location: src/emitter.rs:108-116
pub fn spawn_stream_json_reader(transcript_path: PathBuf, start_offset: u64) -> StreamJsonHandle {
let (drain_tx, drain_rx) = mpsc::sync_channel(1);
let join_handle = thread::spawn(move || {
stream_json_reader_loop(transcript_path, start_offset, writer, drain_rx);
});
StreamJsonHandle {
drain_tx,
join_handle,
}
}
Verification: The reader thread uses an mpsc::sync_channel for drain signaling, allowing graceful shutdown.
Test Results
All library and integration tests pass:
- 90 library tests: ✅ All passing
- 13 emitter tests: ✅ All passing (including stream-json tests)
Implementation Notes
The reader thread is properly integrated into the session flow:
- Spawned only when output format is
StreamJson - Byte offset captured at exact moment of prompt injection
- Retry logic handles race condition where transcript file may not exist yet
- Proper cleanup on all exit paths (success, timeout, interrupted) via drain channel
Related Code
src/session.rs:360-376- Reader spawn at PROMPT_INJECTEDsrc/emitter.rs:98-116- Spawn function with mpsc channelsrc/emitter.rs:118-195- Reader loop with retry logic and drain handling
Conclusion
The implementation is complete, tested, and working correctly. All acceptance criteria are satisfied.