feat: add meeting transcription app scaffolding
Security Scan and Upload / Security & DefectDojo Upload (pull_request) Successful in 48s
CI / Lint & Test (pull_request) Successful in 14m18s
CI / Build Linux (pull_request) Successful in 14m19s
CI / Build Windows (cross-compile) (pull_request) Failing after 19m39s

- Add Python backend structure with FastAPI for transcription/summarization
- Add React UI with audio recording, transcript, and summary views
- Configure Tauri to manage Python backend lifecycle
- Set up Windows cross-compilation with cargo-xwin
- Add Gitea CI workflow for lint, test, and multi-platform builds
- Configure ESLint, Prettier, and Vitest for code quality

Note: App scaffolding only - Python env and models not yet set up
This commit is contained in:
2026-01-21 20:18:03 -08:00
parent 96494a9997
commit 3c8a46e5a6
41 changed files with 2679 additions and 1797 deletions
+1
View File
@@ -0,0 +1 @@
"""Chronara backend - Local meeting transcription and summarization."""
+74
View File
@@ -0,0 +1,74 @@
"""Main FastAPI application for Chronara."""
import os
from pathlib import Path
from fastapi import FastAPI, WebSocket
from fastapi.middleware.cors import CORSMiddleware
from .models.audio import AudioProcessor
from .models.llm import LlamaSummarizer
from .models.transcriber import WhisperXTranscriber
app = FastAPI(title="Chronara API", version="0.1.0")
# Enable CORS for Tauri frontend
app.add_middleware(
CORSMiddleware,
allow_origins=["tauri://localhost", "http://localhost:*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Initialize models
MODEL_DIR = Path(__file__).parent.parent.parent / "models"
transcriber = WhisperXTranscriber(model_dir=MODEL_DIR)
summarizer = LlamaSummarizer(model_dir=MODEL_DIR)
audio_processor = AudioProcessor()
@app.get("/health")
async def health_check():
"""Check if the API is running and models are loaded."""
return {
"status": "healthy",
"models": {
"whisper": transcriber.is_loaded,
"llama": summarizer.is_loaded,
},
}
@app.websocket("/ws/transcribe")
async def transcribe_audio(websocket: WebSocket):
"""WebSocket endpoint for real-time audio transcription."""
await websocket.accept()
try:
while True:
# Receive audio chunk
audio_data = await websocket.receive_bytes()
# Process audio
audio_chunk = audio_processor.process_chunk(audio_data)
# Transcribe if we have enough audio
if audio_processor.has_speech(audio_chunk):
result = await transcriber.transcribe_chunk(audio_chunk)
if result:
await websocket.send_json({
"type": "transcription",
"data": result,
})
except Exception as e:
await websocket.close(code=1000, reason=str(e))
@app.post("/summarize")
async def summarize_transcript(transcript: str):
"""Summarize a meeting transcript."""
summary = await summarizer.summarize(transcript)
return {"summary": summary}
+1
View File
@@ -0,0 +1 @@
"""Model modules for Chronara."""
+73
View File
@@ -0,0 +1,73 @@
"""Audio processing utilities."""
import io
import wave
from typing import Optional
import numpy as np
import pyaudio
class AudioProcessor:
"""Handles audio capture and processing."""
def __init__(self, sample_rate: int = 16000, channels: int = 1):
"""Initialize audio processor."""
self.sample_rate = sample_rate
self.channels = channels
self.chunk_size = 1024
self.format = pyaudio.paInt16
# Initialize PyAudio
self.audio = pyaudio.PyAudio()
# Audio buffer for accumulating chunks
self.buffer = []
self.min_speech_duration = 0.5 # seconds
def start_recording(self) -> pyaudio.Stream:
"""Start audio recording stream."""
stream = self.audio.open(
format=self.format,
channels=self.channels,
rate=self.sample_rate,
input=True,
frames_per_buffer=self.chunk_size,
)
return stream
def stop_recording(self, stream: pyaudio.Stream) -> None:
"""Stop audio recording."""
stream.stop_stream()
stream.close()
def process_chunk(self, audio_bytes: bytes) -> np.ndarray:
"""Convert audio bytes to numpy array."""
# Convert bytes to numpy array
audio_array = np.frombuffer(audio_bytes, dtype=np.int16)
# Normalize to [-1, 1]
audio_float = audio_array.astype(np.float32) / 32768.0
return audio_float
def has_speech(self, audio_chunk: np.ndarray, energy_threshold: float = 0.01) -> bool:
"""Simple voice activity detection based on energy."""
# Calculate RMS energy
energy = np.sqrt(np.mean(audio_chunk**2))
# Check if energy exceeds threshold
return energy > energy_threshold
def save_audio(self, audio_data: bytes, filepath: str) -> None:
"""Save audio data to WAV file."""
with wave.open(filepath, "wb") as wf:
wf.setnchannels(self.channels)
wf.setsampwidth(self.audio.get_sample_size(self.format))
wf.setframerate(self.sample_rate)
wf.writeframes(audio_data)
def __del__(self):
"""Cleanup PyAudio."""
if hasattr(self, "audio"):
self.audio.terminate()
+67
View File
@@ -0,0 +1,67 @@
"""Local LLM for meeting summarization using Llama."""
from pathlib import Path
from typing import Optional
from llama_cpp import Llama
class LlamaSummarizer:
"""Handles meeting summarization using local Llama model."""
def __init__(self, model_dir: Path, model_size: str = "1B"):
"""Initialize Llama model."""
self.model_dir = model_dir
self.is_loaded = False
model_path = model_dir / f"llama-3.2-{model_size}-instruct-Q4_K_M.gguf"
try:
self.llm = Llama(
model_path=str(model_path),
n_ctx=8192, # Context window
n_threads=4, # CPU threads
n_gpu_layers=-1, # Use GPU if available
verbose=False,
)
self.is_loaded = True
except Exception as e:
print(f"Failed to load Llama model: {e}")
self.is_loaded = False
async def summarize(self, transcript: str) -> Optional[str]:
"""Generate a meeting summary from transcript."""
if not self.is_loaded:
return None
prompt = f"""<|begin_of_text|><|start_header_id|>system<|end_header_id|>
You are a helpful assistant that creates concise meeting summaries. Focus on:
- Key decisions made
- Action items and who owns them
- Important discussions and their outcomes
- Next steps
Keep the summary structured and easy to scan.<|eot_id|><|start_header_id|>user<|end_header_id|>
Please summarize this meeting transcript:
{transcript}<|eot_id|><|start_header_id|>assistant<|end_header_id|>
Meeting Summary:
"""
try:
response = self.llm(
prompt,
max_tokens=1024,
temperature=0.7,
top_p=0.9,
stop=["<|eot_id|>", "<|end_of_text|>"],
)
return response["choices"][0]["text"].strip()
except Exception as e:
print(f"Summarization error: {e}")
return None
+88
View File
@@ -0,0 +1,88 @@
"""WhisperX transcription with speaker diarization."""
import json
from pathlib import Path
from typing import Any, Optional
import numpy as np
import torch
import whisperx
class WhisperXTranscriber:
"""Handles audio transcription and speaker diarization using WhisperX."""
def __init__(self, model_dir: Path, model_size: str = "base"):
"""Initialize WhisperX with local models."""
self.model_dir = model_dir
self.device = "cuda" if torch.cuda.is_available() else "cpu"
self.compute_type = "float16" if self.device == "cuda" else "int8"
self.is_loaded = False
try:
# Load ASR model
self.model = whisperx.load_model(
model_size,
self.device,
compute_type=self.compute_type,
download_root=str(model_dir / "whisper"),
)
# Load alignment model
self.align_model, self.align_metadata = whisperx.load_align_model(
language_code="en",
device=self.device,
model_dir=str(model_dir / "alignment"),
)
# Load diarization pipeline
self.diarize_model = whisperx.DiarizationPipeline(
device=self.device,
model_name=str(model_dir / "diarization"),
)
self.is_loaded = True
except Exception as e:
print(f"Failed to load WhisperX models: {e}")
self.is_loaded = False
async def transcribe_chunk(self, audio_chunk: np.ndarray) -> Optional[dict[str, Any]]:
"""Transcribe an audio chunk with speaker diarization."""
if not self.is_loaded:
return None
try:
# Transcribe
result = self.model.transcribe(
audio_chunk,
batch_size=16,
)
# Align whisper output
result = whisperx.align(
result["segments"],
self.align_model,
self.align_metadata,
audio_chunk,
self.device,
)
# Diarize
diarize_segments = self.diarize_model(audio_chunk)
result = whisperx.assign_word_speakers(diarize_segments, result)
# Format output
formatted_result = []
for segment in result["segments"]:
formatted_result.append({
"start": segment["start"],
"end": segment["end"],
"text": segment["text"],
"speaker": segment.get("speaker", "Unknown"),
})
return {"segments": formatted_result}
except Exception as e:
print(f"Transcription error: {e}")
return None