336 lines
7.2 KiB
Go
336 lines
7.2 KiB
Go
package audiocap
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"os/exec"
|
|
"os/signal"
|
|
"strings"
|
|
"sync"
|
|
"syscall"
|
|
"time"
|
|
|
|
speech "cloud.google.com/go/speech/apiv2"
|
|
"cloud.google.com/go/speech/apiv2/speechpb"
|
|
"google.golang.org/api/option"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/status"
|
|
)
|
|
|
|
const (
|
|
sampleRate = 16000
|
|
chunkBytes = 3200 // 100ms @ 16kHz mono s16le
|
|
streamMaxAge = 4 * time.Minute // Googles max age is 5 min
|
|
audioDevice = "@DEFAULT_MONITOR@"
|
|
location = "europe-west4"
|
|
languageCode = "en-US"
|
|
endpoint = "europe-west4-speech.googleapis.com:443"
|
|
)
|
|
|
|
var responder *Responder
|
|
|
|
func (r *Responder) Close() error {
|
|
if r.genaiClient != nil {
|
|
return r.genaiClient.Close()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func Start() {
|
|
|
|
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
|
|
defer cancel()
|
|
|
|
mic, err := Setup()
|
|
if err != nil {
|
|
log.Fatalf("virtmic: %v", err)
|
|
}
|
|
defer mic.Teardown()
|
|
log.Printf("%s🎤 Virtmic ready: select '%s' as input in your call app%s",
|
|
Yellow, SourceName, Reset)
|
|
|
|
// 2. TTS client
|
|
ttsClient, err := New(ctx, mic.SinkForPlayback())
|
|
if err != nil {
|
|
log.Fatalf("tts: %v", err)
|
|
}
|
|
defer ttsClient.Close()
|
|
|
|
client, err := speech.NewClient(ctx, option.WithEndpoint(endpoint))
|
|
if err != nil {
|
|
log.Fatalf("speech client: %v", err)
|
|
}
|
|
defer client.Close()
|
|
|
|
responder, err = NewResponder(ctx, ttsClient)
|
|
if err != nil {
|
|
log.Fatalf("responder: %v", err)
|
|
}
|
|
defer responder.Close()
|
|
|
|
go responder.ResponseDaemon(ctx)
|
|
|
|
audio, err := startCapture(ctx)
|
|
if err != nil {
|
|
log.Fatalf("capture: %v", err)
|
|
}
|
|
defer audio.Close()
|
|
|
|
recognizer := fmt.Sprintf("projects/%s/locations/%s/recognizers/_", "cheater-492707", location)
|
|
log.Printf("🎙 Listening on %s → %s", audioDevice, recognizer)
|
|
|
|
if err := run(ctx, client, recognizer, audio); err != nil && ctx.Err() == nil {
|
|
log.Fatalf("run: %v", err)
|
|
}
|
|
log.Println("👋 Bye")
|
|
}
|
|
|
|
// startCapture spawns parec and returns its stdout as raw PCM.
|
|
func startCapture(ctx context.Context) (io.ReadCloser, error) {
|
|
cmd := exec.CommandContext(ctx, "parec",
|
|
"-d", audioDevice,
|
|
"--format=s16le",
|
|
"--rate=16000",
|
|
"--channels=1",
|
|
"--latency-msec=20",
|
|
)
|
|
stdout, err := cmd.StdoutPipe()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
cmd.Stderr = log.Writer()
|
|
if err := cmd.Start(); err != nil {
|
|
return nil, err
|
|
}
|
|
return stdout, nil
|
|
}
|
|
|
|
// run reads audio into a channel and dispatches it to rotating streams.
|
|
func run(ctx context.Context, client *speech.Client, recognizer string, audio io.Reader) error {
|
|
audioCh := make(chan []byte, 32)
|
|
|
|
// Audio pump — never blocks on stream send
|
|
go func() {
|
|
defer close(audioCh)
|
|
buf := make([]byte, chunkBytes)
|
|
for {
|
|
if ctx.Err() != nil {
|
|
return
|
|
}
|
|
n, err := io.ReadFull(audio, buf)
|
|
if err != nil && err != io.ErrUnexpectedEOF {
|
|
log.Printf("❌ audio read: %v", err)
|
|
return
|
|
}
|
|
if n == 0 {
|
|
continue
|
|
}
|
|
chunk := make([]byte, n)
|
|
copy(chunk, buf[:n])
|
|
select {
|
|
case audioCh <- chunk:
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
var (
|
|
mu sync.Mutex
|
|
current *activeStream
|
|
rotateAt = time.Now().Add(streamMaxAge)
|
|
)
|
|
|
|
openNew := func() error {
|
|
s, err := newStream(ctx, client, recognizer)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
mu.Lock()
|
|
old := current
|
|
current = s
|
|
rotateAt = time.Now().Add(streamMaxAge)
|
|
mu.Unlock()
|
|
|
|
if old != nil {
|
|
go old.close()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
if err := openNew(); err != nil {
|
|
return fmt.Errorf("open initial stream: %w", err)
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
mu.Lock()
|
|
if current != nil {
|
|
current.close()
|
|
}
|
|
mu.Unlock()
|
|
return ctx.Err()
|
|
|
|
case chunk, ok := <-audioCh:
|
|
if !ok {
|
|
return nil
|
|
}
|
|
|
|
mu.Lock()
|
|
needRotate := time.Now().After(rotateAt)
|
|
mu.Unlock()
|
|
if needRotate {
|
|
log.Println("🔄 Rotating stream")
|
|
if err := openNew(); err != nil {
|
|
log.Printf("❌ rotate: %v", err)
|
|
}
|
|
}
|
|
|
|
mu.Lock()
|
|
s := current
|
|
mu.Unlock()
|
|
|
|
if err := s.send(chunk); err != nil {
|
|
log.Printf("⚠️ send: %v — reopening", err)
|
|
if err := openNew(); err != nil {
|
|
log.Printf("❌ reopen: %v", err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// activeStream wraps a v2 streaming session and dedupes transcript output.
|
|
type activeStream struct {
|
|
stream speechpb.Speech_StreamingRecognizeClient
|
|
cancel context.CancelFunc
|
|
mu sync.Mutex
|
|
closed bool
|
|
lastInterim string // last printed interim, used to compute deltas
|
|
}
|
|
|
|
func newStream(parent context.Context, client *speech.Client, recognizer string) (*activeStream, error) {
|
|
ctx, cancel := context.WithCancel(parent)
|
|
stream, err := client.StreamingRecognize(ctx)
|
|
if err != nil {
|
|
cancel()
|
|
return nil, err
|
|
}
|
|
|
|
configReq := &speechpb.StreamingRecognizeRequest{
|
|
Recognizer: recognizer,
|
|
StreamingRequest: &speechpb.StreamingRecognizeRequest_StreamingConfig{
|
|
StreamingConfig: &speechpb.StreamingRecognitionConfig{
|
|
Config: &speechpb.RecognitionConfig{
|
|
DecodingConfig: &speechpb.RecognitionConfig_ExplicitDecodingConfig{
|
|
ExplicitDecodingConfig: &speechpb.ExplicitDecodingConfig{
|
|
Encoding: speechpb.ExplicitDecodingConfig_LINEAR16,
|
|
SampleRateHertz: sampleRate,
|
|
AudioChannelCount: 1,
|
|
},
|
|
},
|
|
Model: "chirp_2",
|
|
LanguageCodes: []string{languageCode},
|
|
Features: &speechpb.RecognitionFeatures{
|
|
EnableAutomaticPunctuation: true,
|
|
},
|
|
},
|
|
StreamingFeatures: &speechpb.StreamingRecognitionFeatures{
|
|
InterimResults: true,
|
|
},
|
|
},
|
|
},
|
|
}
|
|
if err := stream.Send(configReq); err != nil {
|
|
cancel()
|
|
return nil, err
|
|
}
|
|
|
|
as := &activeStream{stream: stream, cancel: cancel}
|
|
|
|
// Receive loop
|
|
go func() {
|
|
defer cancel()
|
|
|
|
for {
|
|
resp, err := stream.Recv()
|
|
if err == io.EOF {
|
|
return
|
|
}
|
|
if err != nil {
|
|
if st, ok := status.FromError(err); ok && st.Code() == codes.Canceled {
|
|
return
|
|
}
|
|
log.Printf("❌ recv: %v", err)
|
|
return
|
|
}
|
|
for _, result := range resp.Results {
|
|
if len(result.Alternatives) == 0 {
|
|
continue
|
|
}
|
|
as.handleTranscript(result.Alternatives[0].Transcript, result.IsFinal)
|
|
}
|
|
}
|
|
}()
|
|
|
|
return as, nil
|
|
}
|
|
|
|
func (s *activeStream) send(chunk []byte) error {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
if s.closed {
|
|
return fmt.Errorf("stream closed")
|
|
}
|
|
return s.stream.Send(&speechpb.StreamingRecognizeRequest{
|
|
StreamingRequest: &speechpb.StreamingRecognizeRequest_Audio{
|
|
Audio: chunk,
|
|
},
|
|
})
|
|
}
|
|
|
|
func (s *activeStream) close() {
|
|
s.mu.Lock()
|
|
if s.closed {
|
|
s.mu.Unlock()
|
|
return
|
|
}
|
|
s.closed = true
|
|
s.mu.Unlock()
|
|
|
|
_ = s.stream.CloseSend()
|
|
time.AfterFunc(2*time.Second, s.cancel)
|
|
}
|
|
|
|
// handleTranscript prints only the new part of a growing interim transcript.
|
|
func (s *activeStream) handleTranscript(text string, final bool) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
if final {
|
|
delta := strings.TrimSpace(strings.TrimPrefix(text, s.lastInterim))
|
|
if delta != "" {
|
|
fmt.Printf("%s✓ %s%s\n", Red, delta, Reset)
|
|
}
|
|
s.lastInterim = ""
|
|
|
|
responder.GetResponse(text)
|
|
|
|
return
|
|
}
|
|
|
|
if after, ok := strings.CutPrefix(text, s.lastInterim); ok {
|
|
delta := strings.TrimSpace(after)
|
|
if delta != "" {
|
|
fmt.Printf("… %s\n", delta)
|
|
}
|
|
} else {
|
|
// Model revised earlier text — reprint full line
|
|
fmt.Printf("… %s\n", text)
|
|
}
|
|
s.lastInterim = text
|
|
}
|