From fe3b43f4bb10148ce60ea3f4f27c045ed27b0a7c Mon Sep 17 00:00:00 2001 From: Yoshida Hiroshi Date: Thu, 28 Nov 2024 16:43:54 +0900 Subject: [PATCH] =?UTF-8?q?-=20Read=20=E6=99=82=E3=81=AE=E5=87=A6=E7=90=86?= =?UTF-8?q?=E3=82=92=20channel=20=E3=81=AB=E7=BD=AE=E3=81=8D=E6=8F=9B?= =?UTF-8?q?=E3=81=88=E3=81=A6=E5=87=A6=E7=90=86=E3=82=92=E4=B8=AD=E6=96=AD?= =?UTF-8?q?=E3=81=A7=E3=81=8D=E3=82=8B=E3=82=88=E3=81=86=E3=81=AB=E3=81=99?= =?UTF-8?q?=E3=82=8B=20-=20opus=20=E3=81=8B=E3=82=89=20ogg=20=E3=81=B8?= =?UTF-8?q?=E3=81=AE=E5=A4=89=E6=8F=9B=E5=87=A6=E7=90=86=E3=82=92=E5=85=B1?= =?UTF-8?q?=E9=80=9A=E5=8C=96=E3=81=99=E3=82=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- amazon_transcribe_handler.go | 22 +---- handler.go | 163 +++++++++++++++++++++++++++-------- speech_to_text_handler.go | 20 +---- 3 files changed, 131 insertions(+), 74 deletions(-) diff --git a/amazon_transcribe_handler.go b/amazon_transcribe_handler.go index 0093266..4244f56 100644 --- a/amazon_transcribe_handler.go +++ b/amazon_transcribe_handler.go @@ -95,27 +95,10 @@ func (h *AmazonTranscribeHandler) ResetRetryCount() int { return h.RetryCount } -func (h *AmazonTranscribeHandler) Handle(ctx context.Context, reader io.Reader) (*io.PipeReader, error) { +func (h *AmazonTranscribeHandler) Handle(ctx context.Context, packetReader io.Reader) (*io.PipeReader, error) { at := NewAmazonTranscribe(h.Config, h.LanguageCode, int64(h.SampleRate), int64(h.ChannelCount)) - oggReader, oggWriter := io.Pipe() - go func() { - defer oggWriter.Close() - if err := opus2ogg(ctx, reader, oggWriter, h.SampleRate, h.ChannelCount, h.Config); err != nil { - if !errors.Is(err, io.EOF) { - zlog.Error(). - Err(err). - Str("channel_id", h.ChannelID). - Str("connection_id", h.ConnectionID). - Send() - } - - oggWriter.CloseWithError(err) - return - } - }() - - stream, err := at.Start(ctx, oggReader) + stream, err := at.Start(ctx, packetReader) if err != nil { return nil, err } @@ -205,7 +188,6 @@ func (h *AmazonTranscribeHandler) Handle(ctx context.Context, reader io.Reader) w.CloseWithError(err) return } - w.Close() }() diff --git a/handler.go b/handler.go index 3cb3a60..525e1d0 100644 --- a/handler.go +++ b/handler.go @@ -123,9 +123,12 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte if s.config.AudioStreamingHeader { r = readPacketWithHeader(opusReader) } else { + // ヘッダー処理なし r = opusReader } + opusCh := readOpus(ctx, r) + serviceHandler, err := getServiceHandler(serviceType, *s.config, h.SoraChannelID, h.SoraConnectionID, sampleRate, channelCount, languageCode, onResultFunc) if err != nil { zlog.Error(). @@ -149,7 +152,11 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte serviceHandlerCtx, cancelServiceHandler := context.WithCancel(ctx) defer cancelServiceHandler() - reader, err := serviceHandler.Handle(serviceHandlerCtx, r) + oggCh := opus2ogg2(serviceHandlerCtx, opusCh, sampleRate, channelCount, *s.config) + + packetReader := readOgg(serviceHandlerCtx, oggCh) + + reader, err := serviceHandler.Handle(serviceHandlerCtx, packetReader) if err != nil { zlog.Error(). Err(err). @@ -161,25 +168,22 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte if s.config.MaxRetry > serviceHandler.GetRetryCount() { serviceHandler.UpdateRetryCount() - // 切断検知のために、クライアントから送られてくるパケットは受信し続ける - packetDiscardCtx, cancelPacketDiscard := context.WithCancel(serviceHandlerCtx) - defer cancelPacketDiscard() - - errCh := make(chan error) - go discardPacket(packetDiscardCtx, r, errCh) - - // 連続のリトライを避けるために少し待つ retryTimer := time.NewTimer(time.Duration(s.config.RetryIntervalMs) * time.Millisecond) + + retry: select { case <-retryTimer.C: retryTimer.Stop() - cancelPacketDiscard() // リトライ対象のエラーのため、クライアントとの接続は切らずにリトライする continue - case err := <-errCh: + case _, ok := <-oggCh: + if ok { + // エラー、または、リトライのタイマーが発火するま繰り返す + goto retry + } retryTimer.Stop() // リトライする前にクライアントとの接続でエラーが発生した場合は終了する - return err + return fmt.Errorf("retry error") } } } @@ -331,24 +335,6 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte } } -func discardPacket(ctx context.Context, r io.Reader, errCh chan error) { - defer close(errCh) - - // サービス側には接続していないため、パケットは破棄する - buf := make([]byte, HeaderLength+MaxPayloadLength) - for { - select { - case <-ctx.Done(): - return - default: - if _, err := r.Read(buf); err != nil { - errCh <- err - return - } - } - } -} - func readPacketWithHeader(reader io.Reader) io.Reader { r, w := io.Pipe() @@ -430,6 +416,117 @@ func readPacketWithHeader(reader io.Reader) io.Reader { return r } +func readOpus(ctx context.Context, reader io.Reader) chan []byte { + opusCh := make(chan []byte) + + go func() { + defer close(opusCh) + + for { + select { + case <-ctx.Done(): + return + default: + buf := make([]byte, FrameSize) + n, err := reader.Read(buf) + if err != nil { + return + } + + if n > 0 { + opusCh <- buf[:n] + } + } + } + }() + + return opusCh +} + +func readOgg(ctx context.Context, oggCh chan []byte) io.Reader { + pr, pw := io.Pipe() + + go func() { + defer pw.Close() + for { + select { + case <-ctx.Done(): + pw.CloseWithError(ctx.Err()) + return + case buf, ok := <-oggCh: + if !ok { + pw.CloseWithError(fmt.Errorf("channel closed")) + return + } + + if _, err := pw.Write(buf); err != nil { + pw.CloseWithError(err) + return + } + } + } + }() + + return pr +} + +func opus2ogg2(ctx context.Context, opusCh chan []byte, sampleRate uint32, channelCount uint16, c Config) chan []byte { + oggReader, oggWriter := io.Pipe() + oggCh := make(chan []byte) + + go func() { + defer close(oggCh) + + for { + buf := make([]byte, FrameSize) + n, err := oggReader.Read(buf) + if err != nil { + oggWriter.CloseWithError(err) + return + } + if n > 0 { + oggCh <- buf[:n] + } + } + }() + + go func() { + o, err := NewWith(oggWriter, sampleRate, channelCount) + if err != nil { + oggWriter.CloseWithError(err) + return + } + defer o.Close() + + for { + select { + case <-ctx.Done(): + oggWriter.CloseWithError(ctx.Err()) + return + case buf, ok := <-opusCh: + if !ok { + oggWriter.CloseWithError(fmt.Errorf("channel closed")) + return + } + + opus := codecs.OpusPacket{} + _, err := opus.Unmarshal(buf) + if err != nil { + oggWriter.CloseWithError(err) + return + } + + if err := o.Write(&opus); err != nil { + oggWriter.CloseWithError(err) + return + } + } + } + }() + + return oggCh +} + func opus2ogg(ctx context.Context, opusReader io.Reader, oggWriter io.Writer, sampleRate uint32, channelCount uint16, c Config) error { o, err := NewWith(oggWriter, sampleRate, channelCount) if err != nil { @@ -470,12 +567,6 @@ func opus2ogg(ctx context.Context, opusReader io.Reader, oggWriter io.Writer, sa return nil } - if !ok { - if w, ok := oggWriter.(*io.PipeWriter); ok { - w.CloseWithError(err) - } - } - opus := codecs.OpusPacket{} _, err := opus.Unmarshal(buf) if err != nil { diff --git a/speech_to_text_handler.go b/speech_to_text_handler.go index 0a55b9d..00b3ccb 100644 --- a/speech_to_text_handler.go +++ b/speech_to_text_handler.go @@ -91,26 +91,10 @@ func (h *SpeechToTextHandler) ResetRetryCount() int { return h.RetryCount } -func (h *SpeechToTextHandler) Handle(ctx context.Context, reader io.Reader) (*io.PipeReader, error) { +func (h *SpeechToTextHandler) Handle(ctx context.Context, packetReader io.Reader) (*io.PipeReader, error) { stt := NewSpeechToText(h.Config, h.LanguageCode, int32(h.SampleRate), int32(h.ChannelCount)) - oggReader, oggWriter := io.Pipe() - go func() { - defer oggWriter.Close() - if err := opus2ogg(ctx, reader, oggWriter, h.SampleRate, h.ChannelCount, h.Config); err != nil { - if !errors.Is(err, io.EOF) { - zlog.Error(). - Err(err). - Str("channel_id", h.ChannelID). - Str("connection_id", h.ConnectionID). - Send() - } - oggWriter.CloseWithError(err) - return - } - }() - - stream, err := stt.Start(ctx, oggReader) + stream, err := stt.Start(ctx, packetReader) if err != nil { return nil, err }