responder/pkg/audiocap/audiocap.go
2026-04-09 10:21:20 +02:00

336 lines
7.2 KiB
Go

package audiocap
import (
"context"
"fmt"
"io"
"log"
"os/exec"
"os/signal"
"strings"
"sync"
"syscall"
"time"
speech "cloud.google.com/go/speech/apiv2"
"cloud.google.com/go/speech/apiv2/speechpb"
"google.golang.org/api/option"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
const (
sampleRate = 16000
chunkBytes = 3200 // 100ms @ 16kHz mono s16le
streamMaxAge = 4 * time.Minute // Googles max age is 5 min
audioDevice = "@DEFAULT_MONITOR@"
location = "europe-west4"
languageCode = "en-US"
endpoint = "europe-west4-speech.googleapis.com:443"
)
var responder *Responder
func (r *Responder) Close() error {
if r.genaiClient != nil {
return r.genaiClient.Close()
}
return nil
}
func Start() {
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer cancel()
mic, err := Setup()
if err != nil {
log.Fatalf("virtmic: %v", err)
}
defer mic.Teardown()
log.Printf("%s🎤 Virtmic ready: select '%s' as input in your call app%s",
Yellow, SourceName, Reset)
// 2. TTS client
ttsClient, err := New(ctx, mic.SinkForPlayback())
if err != nil {
log.Fatalf("tts: %v", err)
}
defer ttsClient.Close()
client, err := speech.NewClient(ctx, option.WithEndpoint(endpoint))
if err != nil {
log.Fatalf("speech client: %v", err)
}
defer client.Close()
responder, err = NewResponder(ctx, ttsClient)
if err != nil {
log.Fatalf("responder: %v", err)
}
defer responder.Close()
go responder.ResponseDaemon(ctx)
audio, err := startCapture(ctx)
if err != nil {
log.Fatalf("capture: %v", err)
}
defer audio.Close()
recognizer := fmt.Sprintf("projects/%s/locations/%s/recognizers/_", "cheater-492707", location)
log.Printf("🎙 Listening on %s → %s", audioDevice, recognizer)
if err := run(ctx, client, recognizer, audio); err != nil && ctx.Err() == nil {
log.Fatalf("run: %v", err)
}
log.Println("👋 Bye")
}
// startCapture spawns parec and returns its stdout as raw PCM.
func startCapture(ctx context.Context) (io.ReadCloser, error) {
cmd := exec.CommandContext(ctx, "parec",
"-d", audioDevice,
"--format=s16le",
"--rate=16000",
"--channels=1",
"--latency-msec=20",
)
stdout, err := cmd.StdoutPipe()
if err != nil {
return nil, err
}
cmd.Stderr = log.Writer()
if err := cmd.Start(); err != nil {
return nil, err
}
return stdout, nil
}
// run reads audio into a channel and dispatches it to rotating streams.
func run(ctx context.Context, client *speech.Client, recognizer string, audio io.Reader) error {
audioCh := make(chan []byte, 32)
// Audio pump — never blocks on stream send
go func() {
defer close(audioCh)
buf := make([]byte, chunkBytes)
for {
if ctx.Err() != nil {
return
}
n, err := io.ReadFull(audio, buf)
if err != nil && err != io.ErrUnexpectedEOF {
log.Printf("❌ audio read: %v", err)
return
}
if n == 0 {
continue
}
chunk := make([]byte, n)
copy(chunk, buf[:n])
select {
case audioCh <- chunk:
case <-ctx.Done():
return
}
}
}()
var (
mu sync.Mutex
current *activeStream
rotateAt = time.Now().Add(streamMaxAge)
)
openNew := func() error {
s, err := newStream(ctx, client, recognizer)
if err != nil {
return err
}
mu.Lock()
old := current
current = s
rotateAt = time.Now().Add(streamMaxAge)
mu.Unlock()
if old != nil {
go old.close()
}
return nil
}
if err := openNew(); err != nil {
return fmt.Errorf("open initial stream: %w", err)
}
for {
select {
case <-ctx.Done():
mu.Lock()
if current != nil {
current.close()
}
mu.Unlock()
return ctx.Err()
case chunk, ok := <-audioCh:
if !ok {
return nil
}
mu.Lock()
needRotate := time.Now().After(rotateAt)
mu.Unlock()
if needRotate {
log.Println("🔄 Rotating stream")
if err := openNew(); err != nil {
log.Printf("❌ rotate: %v", err)
}
}
mu.Lock()
s := current
mu.Unlock()
if err := s.send(chunk); err != nil {
log.Printf("⚠️ send: %v — reopening", err)
if err := openNew(); err != nil {
log.Printf("❌ reopen: %v", err)
}
}
}
}
}
// activeStream wraps a v2 streaming session and dedupes transcript output.
type activeStream struct {
stream speechpb.Speech_StreamingRecognizeClient
cancel context.CancelFunc
mu sync.Mutex
closed bool
lastInterim string // last printed interim, used to compute deltas
}
func newStream(parent context.Context, client *speech.Client, recognizer string) (*activeStream, error) {
ctx, cancel := context.WithCancel(parent)
stream, err := client.StreamingRecognize(ctx)
if err != nil {
cancel()
return nil, err
}
configReq := &speechpb.StreamingRecognizeRequest{
Recognizer: recognizer,
StreamingRequest: &speechpb.StreamingRecognizeRequest_StreamingConfig{
StreamingConfig: &speechpb.StreamingRecognitionConfig{
Config: &speechpb.RecognitionConfig{
DecodingConfig: &speechpb.RecognitionConfig_ExplicitDecodingConfig{
ExplicitDecodingConfig: &speechpb.ExplicitDecodingConfig{
Encoding: speechpb.ExplicitDecodingConfig_LINEAR16,
SampleRateHertz: sampleRate,
AudioChannelCount: 1,
},
},
Model: "chirp_2",
LanguageCodes: []string{languageCode},
Features: &speechpb.RecognitionFeatures{
EnableAutomaticPunctuation: true,
},
},
StreamingFeatures: &speechpb.StreamingRecognitionFeatures{
InterimResults: true,
},
},
},
}
if err := stream.Send(configReq); err != nil {
cancel()
return nil, err
}
as := &activeStream{stream: stream, cancel: cancel}
// Receive loop
go func() {
defer cancel()
for {
resp, err := stream.Recv()
if err == io.EOF {
return
}
if err != nil {
if st, ok := status.FromError(err); ok && st.Code() == codes.Canceled {
return
}
log.Printf("❌ recv: %v", err)
return
}
for _, result := range resp.Results {
if len(result.Alternatives) == 0 {
continue
}
as.handleTranscript(result.Alternatives[0].Transcript, result.IsFinal)
}
}
}()
return as, nil
}
func (s *activeStream) send(chunk []byte) error {
s.mu.Lock()
defer s.mu.Unlock()
if s.closed {
return fmt.Errorf("stream closed")
}
return s.stream.Send(&speechpb.StreamingRecognizeRequest{
StreamingRequest: &speechpb.StreamingRecognizeRequest_Audio{
Audio: chunk,
},
})
}
func (s *activeStream) close() {
s.mu.Lock()
if s.closed {
s.mu.Unlock()
return
}
s.closed = true
s.mu.Unlock()
_ = s.stream.CloseSend()
time.AfterFunc(2*time.Second, s.cancel)
}
// handleTranscript prints only the new part of a growing interim transcript.
func (s *activeStream) handleTranscript(text string, final bool) {
s.mu.Lock()
defer s.mu.Unlock()
if final {
delta := strings.TrimSpace(strings.TrimPrefix(text, s.lastInterim))
if delta != "" {
fmt.Printf("%s✓ %s%s\n", Red, delta, Reset)
}
s.lastInterim = ""
responder.GetResponse(text)
return
}
if after, ok := strings.CutPrefix(text, s.lastInterim); ok {
delta := strings.TrimSpace(after)
if delta != "" {
fmt.Printf("… %s\n", delta)
}
} else {
// Model revised earlier text — reprint full line
fmt.Printf("… %s\n", text)
}
s.lastInterim = text
}