package audiocap import ( "context" "fmt" "io" "log" "os/exec" "os/signal" "strings" "sync" "syscall" "time" speech "cloud.google.com/go/speech/apiv2" "cloud.google.com/go/speech/apiv2/speechpb" "google.golang.org/api/option" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) const ( sampleRate = 16000 chunkBytes = 3200 // 100ms @ 16kHz mono s16le streamMaxAge = 4 * time.Minute // Googles max age is 5 min audioDevice = "@DEFAULT_MONITOR@" location = "europe-west4" languageCode = "en-US" endpoint = "europe-west4-speech.googleapis.com:443" ) var responder *Responder func (r *Responder) Close() error { if r.genaiClient != nil { return r.genaiClient.Close() } return nil } func Start() { ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer cancel() mic, err := Setup() if err != nil { log.Fatalf("virtmic: %v", err) } defer mic.Teardown() log.Printf("%sšŸŽ¤ Virtmic ready: select '%s' as input in your call app%s", Yellow, SourceName, Reset) // 2. TTS client ttsClient, err := New(ctx, mic.SinkForPlayback()) if err != nil { log.Fatalf("tts: %v", err) } defer ttsClient.Close() client, err := speech.NewClient(ctx, option.WithEndpoint(endpoint)) if err != nil { log.Fatalf("speech client: %v", err) } defer client.Close() responder, err = NewResponder(ctx, ttsClient) if err != nil { log.Fatalf("responder: %v", err) } defer responder.Close() go responder.ResponseDaemon(ctx) audio, err := startCapture(ctx) if err != nil { log.Fatalf("capture: %v", err) } defer audio.Close() recognizer := fmt.Sprintf("projects/%s/locations/%s/recognizers/_", "cheater-492707", location) log.Printf("šŸŽ™ Listening on %s → %s", audioDevice, recognizer) if err := run(ctx, client, recognizer, audio); err != nil && ctx.Err() == nil { log.Fatalf("run: %v", err) } log.Println("šŸ‘‹ Bye") } // startCapture spawns parec and returns its stdout as raw PCM. func startCapture(ctx context.Context) (io.ReadCloser, error) { cmd := exec.CommandContext(ctx, "parec", "-d", audioDevice, "--format=s16le", "--rate=16000", "--channels=1", "--latency-msec=20", ) stdout, err := cmd.StdoutPipe() if err != nil { return nil, err } cmd.Stderr = log.Writer() if err := cmd.Start(); err != nil { return nil, err } return stdout, nil } // run reads audio into a channel and dispatches it to rotating streams. func run(ctx context.Context, client *speech.Client, recognizer string, audio io.Reader) error { audioCh := make(chan []byte, 32) // Audio pump — never blocks on stream send go func() { defer close(audioCh) buf := make([]byte, chunkBytes) for { if ctx.Err() != nil { return } n, err := io.ReadFull(audio, buf) if err != nil && err != io.ErrUnexpectedEOF { log.Printf("āŒ audio read: %v", err) return } if n == 0 { continue } chunk := make([]byte, n) copy(chunk, buf[:n]) select { case audioCh <- chunk: case <-ctx.Done(): return } } }() var ( mu sync.Mutex current *activeStream rotateAt = time.Now().Add(streamMaxAge) ) openNew := func() error { s, err := newStream(ctx, client, recognizer) if err != nil { return err } mu.Lock() old := current current = s rotateAt = time.Now().Add(streamMaxAge) mu.Unlock() if old != nil { go old.close() } return nil } if err := openNew(); err != nil { return fmt.Errorf("open initial stream: %w", err) } for { select { case <-ctx.Done(): mu.Lock() if current != nil { current.close() } mu.Unlock() return ctx.Err() case chunk, ok := <-audioCh: if !ok { return nil } mu.Lock() needRotate := time.Now().After(rotateAt) mu.Unlock() if needRotate { log.Println("šŸ”„ Rotating stream") if err := openNew(); err != nil { log.Printf("āŒ rotate: %v", err) } } mu.Lock() s := current mu.Unlock() if err := s.send(chunk); err != nil { log.Printf("āš ļø send: %v — reopening", err) if err := openNew(); err != nil { log.Printf("āŒ reopen: %v", err) } } } } } // activeStream wraps a v2 streaming session and dedupes transcript output. type activeStream struct { stream speechpb.Speech_StreamingRecognizeClient cancel context.CancelFunc mu sync.Mutex closed bool lastInterim string // last printed interim, used to compute deltas } func newStream(parent context.Context, client *speech.Client, recognizer string) (*activeStream, error) { ctx, cancel := context.WithCancel(parent) stream, err := client.StreamingRecognize(ctx) if err != nil { cancel() return nil, err } configReq := &speechpb.StreamingRecognizeRequest{ Recognizer: recognizer, StreamingRequest: &speechpb.StreamingRecognizeRequest_StreamingConfig{ StreamingConfig: &speechpb.StreamingRecognitionConfig{ Config: &speechpb.RecognitionConfig{ DecodingConfig: &speechpb.RecognitionConfig_ExplicitDecodingConfig{ ExplicitDecodingConfig: &speechpb.ExplicitDecodingConfig{ Encoding: speechpb.ExplicitDecodingConfig_LINEAR16, SampleRateHertz: sampleRate, AudioChannelCount: 1, }, }, Model: "chirp_2", LanguageCodes: []string{languageCode}, Features: &speechpb.RecognitionFeatures{ EnableAutomaticPunctuation: true, }, }, StreamingFeatures: &speechpb.StreamingRecognitionFeatures{ InterimResults: true, }, }, }, } if err := stream.Send(configReq); err != nil { cancel() return nil, err } as := &activeStream{stream: stream, cancel: cancel} // Receive loop go func() { defer cancel() for { resp, err := stream.Recv() if err == io.EOF { return } if err != nil { if st, ok := status.FromError(err); ok && st.Code() == codes.Canceled { return } log.Printf("āŒ recv: %v", err) return } for _, result := range resp.Results { if len(result.Alternatives) == 0 { continue } as.handleTranscript(result.Alternatives[0].Transcript, result.IsFinal) } } }() return as, nil } func (s *activeStream) send(chunk []byte) error { s.mu.Lock() defer s.mu.Unlock() if s.closed { return fmt.Errorf("stream closed") } return s.stream.Send(&speechpb.StreamingRecognizeRequest{ StreamingRequest: &speechpb.StreamingRecognizeRequest_Audio{ Audio: chunk, }, }) } func (s *activeStream) close() { s.mu.Lock() if s.closed { s.mu.Unlock() return } s.closed = true s.mu.Unlock() _ = s.stream.CloseSend() time.AfterFunc(2*time.Second, s.cancel) } // handleTranscript prints only the new part of a growing interim transcript. func (s *activeStream) handleTranscript(text string, final bool) { s.mu.Lock() defer s.mu.Unlock() if final { delta := strings.TrimSpace(strings.TrimPrefix(text, s.lastInterim)) if delta != "" { fmt.Printf("%sāœ“ %s%s\n", Red, delta, Reset) } s.lastInterim = "" responder.GetResponse(text) return } if after, ok := strings.CutPrefix(text, s.lastInterim); ok { delta := strings.TrimSpace(after) if delta != "" { fmt.Printf("… %s\n", delta) } } else { // Model revised earlier text — reprint full line fmt.Printf("… %s\n", text) } s.lastInterim = text }