feat: wire NTP client into firmware build and initialization

This commit is contained in:
jedarden 2026-04-07 13:37:46 -04:00
parent 8a809fee2f
commit 98c43b3734
5 changed files with 62 additions and 128 deletions

File diff suppressed because one or more lines are too long

View file

@ -1 +1 @@
c256a02490049ad1476cf55c04442c9109772f41
a4e4cff5ae8105a980e9e439ac6916dec7161cc9

View file

@ -218,15 +218,6 @@ func main() {
ingestSrv := ingestion.NewServer()
r.HandleFunc("/ws/node", ingestSrv.HandleNodeWS)
// Wire up health checker with all dependencies
healthChecker := health.New(health.Config{
DB: mainDB,
GetNodeCount: func() int { return len(ingestSrv.GetConnectedNodes()) },
Shedder: shedder,
GetShedLevel: pm.GetShedLevel,
})
r.Get("/healthz", healthChecker.Handler(version))
// Signal processing pipeline
pm := sigproc.NewProcessorManager(sigproc.ProcessorManagerConfig{
NSub: 64,
@ -235,6 +226,15 @@ func main() {
})
ingestSrv.SetProcessorManager(pm)
// Wire up health checker with all dependencies (after pm is created)
healthChecker := health.New(health.Config{
DB: mainDB,
GetNodeCount: func() int { return len(ingestSrv.GetConnectedNodes()) },
Shedder: shedder,
GetShedLevel: pm.GetShedLevel,
})
r.Get("/healthz", healthChecker.Handler(version))
// Replay recording store
if err := os.MkdirAll(cfg.DataDir, 0755); err != nil {
log.Printf("[WARN] Failed to create data dir %s: %v", cfg.DataDir, err)

View file

@ -5,14 +5,12 @@ import (
"database/sql"
"log"
"net/http"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
"github.com/go-chi/chi"
_ "modernc.org/sqlite"
)
const (
@ -79,84 +77,11 @@ func (e *EventsHandler) SetHub(hub DashboardHub) {
e.hub = hub
}
// NewEventsHandler creates a new events handler.
func NewEventsHandler(dbPath string) (*EventsHandler, error) {
if err := os.MkdirAll(filepath.Dir(dbPath), 0755); err != nil {
return nil, err
}
db, err := sql.Open("sqlite", dbPath)
if err != nil {
return nil, err
}
db.SetMaxOpenConns(1)
e := &EventsHandler{
db: db,
}
if err := e.migrate(); err != nil {
db.Close()
return nil, err
}
log.Printf("[INFO] Events handler initialized with DB at %s", dbPath)
return e, nil
}
func (e *EventsHandler) migrate() error {
_, err := e.db.Exec(`
CREATE TABLE IF NOT EXISTS events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp_ms INTEGER NOT NULL,
type TEXT NOT NULL,
zone TEXT,
person TEXT,
blob_id INTEGER,
detail_json TEXT,
severity TEXT NOT NULL DEFAULT 'info'
);
CREATE INDEX IF NOT EXISTS idx_events_time ON events(timestamp_ms DESC);
CREATE INDEX IF NOT EXISTS idx_events_type ON events(type, timestamp_ms DESC);
CREATE INDEX IF NOT EXISTS idx_events_zone ON events(zone, timestamp_ms DESC);
CREATE INDEX IF NOT EXISTS idx_events_person ON events(person, timestamp_ms DESC);
CREATE TABLE IF NOT EXISTS events_archive (
id INTEGER PRIMARY KEY,
timestamp_ms INTEGER NOT NULL,
type TEXT NOT NULL,
zone TEXT,
person TEXT,
blob_id INTEGER,
detail_json TEXT,
severity TEXT NOT NULL DEFAULT 'info'
);
CREATE INDEX IF NOT EXISTS idx_events_archive_time ON events_archive(timestamp_ms DESC);
CREATE VIRTUAL TABLE IF NOT EXISTS events_fts USING fts5(
type, zone, person, detail_json,
content='events', content_rowid='id'
);
CREATE TRIGGER IF NOT EXISTS events_fts_insert AFTER INSERT ON events BEGIN
INSERT INTO events_fts(rowid, type, zone, person, detail_json)
VALUES (new.id, new.type, new.zone, new.person, new.detail_json);
END;
CREATE TRIGGER IF NOT EXISTS events_fts_delete AFTER DELETE ON events BEGIN
INSERT INTO events_fts(events_fts, rowid, type, zone, person, detail_json)
VALUES ('delete', old.id, old.type, old.zone, old.person, old.detail_json);
END;
CREATE TRIGGER IF NOT EXISTS events_fts_update AFTER UPDATE ON events BEGIN
INSERT INTO events_fts(events_fts, rowid, type, zone, person, detail_json)
VALUES ('delete', old.id, old.type, old.zone, old.person, old.detail_json);
INSERT INTO events_fts(rowid, type, zone, person, detail_json)
VALUES (new.id, new.type, new.zone, new.person, new.detail_json);
END;
`)
return err
// NewEventsHandler creates a new events handler using the shared database connection.
// The events table schema must already exist (created by migrations 001 and 011).
func NewEventsHandler(db *sql.DB) *EventsHandler {
log.Printf("[INFO] Events handler initialized")
return &EventsHandler{db: db}
}
// isValidEventType checks whether the event type string is a known type.
@ -171,40 +96,6 @@ func isValidEventType(t string) bool {
return false
}
// Archive moves events older than 90 days (or the specified duration) to the archive table.
// If retentionDays is nil, defaults to 90 days.
func (e *EventsHandler) Archive(retentionDays *int) {
days := 90
if retentionDays != nil {
days = *retentionDays
}
cutoff := time.Now().AddDate(0, 0, -days).UnixNano() / 1e6
tx, err := e.db.Begin()
if err != nil {
log.Printf("[WARN] archive: begin tx: %v", err)
return
}
defer tx.Rollback()
tx.Exec(`INSERT OR IGNORE INTO events_archive (id, timestamp_ms, type, zone, person, blob_id, detail_json, severity)
SELECT id, timestamp_ms, type, zone, person, blob_id, detail_json, severity
FROM events WHERE timestamp_ms < ?`, cutoff)
tx.Exec(`DELETE FROM events WHERE timestamp_ms < ?`, cutoff)
if err := tx.Commit(); err != nil {
log.Printf("[WARN] archive: commit: %v", err)
return
}
log.Printf("[INFO] events archived: removed events older than %d days", days)
}
// Close closes the database.
func (e *EventsHandler) Close() error {
return e.db.Close()
}
// RegisterRoutes registers events endpoints.
//
// GET /api/events — paginated event list with FTS5 search and keyset cursor pagination.
@ -226,6 +117,41 @@ type eventsResponse struct {
TotalFiltered int `json:"total_filtered"`
}
// prepareFTSQuery appends a trailing * for prefix matching if the query
// doesn't already end with a FTS5 operator character. This enables
// partial word matching (e.g., "kit" matches "kitchen").
func prepareFTSQuery(q string) string {
q = strings.TrimSpace(q)
if q == "" {
return q
}
// If the query already ends with a FTS5 special character or operator, leave it alone.
last := q[len(q)-1]
if last == '*' || last == '"' || last == ')' {
return q
}
// For simple terms (no operators), append * for prefix matching.
// If the query contains FTS5 operators (AND, OR, NOT, NEAR), append * to each
// simple token instead.
if strings.Contains(q, " AND ") || strings.Contains(q, " OR ") ||
strings.Contains(q, " NOT ") || strings.Contains(q, " NEAR ") {
// Has operators — append * to each token that isn't an operator or quoted phrase.
parts := strings.Fields(q)
for i, p := range parts {
if p == "AND" || p == "OR" || p == "NOT" || p == "NEAR" {
continue
}
if (strings.HasPrefix(p, `"`) && strings.HasSuffix(p, `"`)) || p == "(" || p == ")" {
continue
}
parts[i] = p + "*"
}
return strings.Join(parts, " ")
}
// Simple single-term query — just append * for prefix matching.
return q + "*"
}
func (e *EventsHandler) listEvents(w http.ResponseWriter, r *http.Request) {
// Parse limit
limit := eventsDefaultLimit
@ -268,6 +194,11 @@ func (e *EventsHandler) listEvents(w http.ResponseWriter, r *http.Request) {
afterTS = t.UnixNano() / 1e6
}
// Prepare FTS5 query with prefix matching
if q != "" {
q = prepareFTSQuery(q)
}
// Determine query mode: FTS5 or regular
useFTS := q != ""
p := "" // column prefix for FTS JOIN queries

View file

@ -531,6 +531,9 @@ func (h *Hub) buildSnapshot() map[string]interface{} {
if zones != nil {
snap["zones"] = h.buildZoneSnapshots(zones)
if portals := zones.GetAllPortals(); len(portals) > 0 {
snap["portals"] = portals
}
}
// Include latest blobs from the snapshot cache.