Skip to content

Commit

Permalink
opus 取得から ogg への変換処理を分離する
Browse files Browse the repository at this point in the history
  • Loading branch information
Hexa committed Nov 29, 2024
1 parent fe3b43f commit d321c2d
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 120 deletions.
6 changes: 5 additions & 1 deletion amazon_transcribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/transcribestreamingservice"
zlog "github.com/rs/zerolog/log"
)

type AmazonTranscribe struct {
Expand All @@ -22,6 +23,7 @@ type AmazonTranscribe struct {
Region string
Debug bool
Config Config
Count int
}

func NewAmazonTranscribe(config Config, languageCode string, sampleRateHertz, audioChannelCount int64) *AmazonTranscribe {
Expand Down Expand Up @@ -89,7 +91,7 @@ func NewAmazonTranscribeClient(config Config) *transcribestreamingservice.Transc
return transcribestreamingservice.New(sess, cfg)
}

func (at *AmazonTranscribe) Start(ctx context.Context, r io.Reader) (*transcribestreamingservice.StartStreamTranscriptionEventStream, error) {
func (at *AmazonTranscribe) Start(ctx context.Context, r io.ReadCloser) (*transcribestreamingservice.StartStreamTranscriptionEventStream, error) {
config := at.Config
client := NewAmazonTranscribeClient(config)
input := NewStartStreamTranscriptionInput(at)
Expand Down Expand Up @@ -117,9 +119,11 @@ func (at *AmazonTranscribe) Start(ctx context.Context, r io.Reader) (*transcribe
stream := resp.GetStream()

go func() {
defer r.Close()
defer stream.Close()

if err := transcribestreamingservice.StreamAudioFromReader(ctx, stream, FrameSize, r); err != nil {
zlog.Error().Err(err).Send()
return
}
}()
Expand Down
4 changes: 3 additions & 1 deletion amazon_transcribe_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,11 @@ func (h *AmazonTranscribeHandler) ResetRetryCount() int {
return h.RetryCount
}

func (h *AmazonTranscribeHandler) Handle(ctx context.Context, packetReader io.Reader) (*io.PipeReader, error) {
func (h *AmazonTranscribeHandler) Handle(ctx context.Context, opusCh chan []byte) (*io.PipeReader, error) {
at := NewAmazonTranscribe(h.Config, h.LanguageCode, int64(h.SampleRate), int64(h.ChannelCount))

packetReader := opus2ogg(ctx, opusCh, h.SampleRate, h.ChannelCount, h.Config)

stream, err := at.Start(ctx, packetReader)
if err != nil {
return nil, err
Expand Down
160 changes: 46 additions & 114 deletions handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,7 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte
serviceHandlerCtx, cancelServiceHandler := context.WithCancel(ctx)
defer cancelServiceHandler()

oggCh := opus2ogg2(serviceHandlerCtx, opusCh, sampleRate, channelCount, *s.config)

packetReader := readOgg(serviceHandlerCtx, oggCh)

reader, err := serviceHandler.Handle(serviceHandlerCtx, packetReader)
reader, err := serviceHandler.Handle(serviceHandlerCtx, opusCh)
if err != nil {
zlog.Error().
Err(err).
Expand All @@ -168,22 +164,34 @@ func (s *Server) createSpeechHandler(serviceType string, onResultFunc func(conte
if s.config.MaxRetry > serviceHandler.GetRetryCount() {
serviceHandler.UpdateRetryCount()

// リトライ対象のエラーのため、クライアントとの接続は切らずにリトライする
retryTimer := time.NewTimer(time.Duration(s.config.RetryIntervalMs) * time.Millisecond)

retry:
select {
case <-retryTimer.C:
retryTimer.Stop()
// リトライ対象のエラーのため、クライアントとの接続は切らずにリトライする
zlog.Info().
Err(err).
Str("channel_id", h.SoraChannelID).
Str("connection_id", h.SoraConnectionID).
Msg("retry")
cancelServiceHandler()
continue
case _, ok := <-oggCh:
case _, ok := <-opusCh:
if ok {
// エラー、または、リトライのタイマーが発火するま繰り返す
// channel が閉じるか、または、リトライのタイマーが発火するまで繰り返す
goto retry
}
retryTimer.Stop()
zlog.Info().
Err(err).
Str("channel_id", h.SoraChannelID).
Str("connection_id", h.SoraConnectionID).
Msg("retry interrupted")
cancelServiceHandler()
// リトライする前にクライアントとの接続でエラーが発生した場合は終了する
return fmt.Errorf("retry error")
return fmt.Errorf("%s", "retry interrupted")
}
}
}
Expand Down Expand Up @@ -443,52 +451,8 @@ func readOpus(ctx context.Context, reader io.Reader) chan []byte {
return opusCh
}

func readOgg(ctx context.Context, oggCh chan []byte) io.Reader {
pr, pw := io.Pipe()

go func() {
defer pw.Close()
for {
select {
case <-ctx.Done():
pw.CloseWithError(ctx.Err())
return
case buf, ok := <-oggCh:
if !ok {
pw.CloseWithError(fmt.Errorf("channel closed"))
return
}

if _, err := pw.Write(buf); err != nil {
pw.CloseWithError(err)
return
}
}
}
}()

return pr
}

func opus2ogg2(ctx context.Context, opusCh chan []byte, sampleRate uint32, channelCount uint16, c Config) chan []byte {
func opus2ogg(ctx context.Context, opusCh chan []byte, sampleRate uint32, channelCount uint16, c Config) io.ReadCloser {
oggReader, oggWriter := io.Pipe()
oggCh := make(chan []byte)

go func() {
defer close(oggCh)

for {
buf := make([]byte, FrameSize)
n, err := oggReader.Read(buf)
if err != nil {
oggWriter.CloseWithError(err)
return
}
if n > 0 {
oggCh <- buf[:n]
}
}
}()

go func() {
o, err := NewWith(oggWriter, sampleRate, channelCount)
Expand Down Expand Up @@ -524,66 +488,7 @@ func opus2ogg2(ctx context.Context, opusCh chan []byte, sampleRate uint32, chann
}
}()

return oggCh
}

func opus2ogg(ctx context.Context, opusReader io.Reader, oggWriter io.Writer, sampleRate uint32, channelCount uint16, c Config) error {
o, err := NewWith(oggWriter, sampleRate, channelCount)
if err != nil {
if w, ok := oggWriter.(*io.PipeWriter); ok {
w.CloseWithError(err)
}
return err
}
defer o.Close()

ch := make(chan []byte)

go func() {
defer close(ch)

for {
buf := make([]byte, FrameSize)
n, err := opusReader.Read(buf)
if err != nil {
if w, ok := oggWriter.(*io.PipeWriter); ok {
w.CloseWithError(err)
}
return
}

if n > 0 {
ch <- buf[:n]
}
}
}()

for {
select {
case <-ctx.Done():
return ctx.Err()
case buf, ok := <-ch:
if !ok {
return nil
}

opus := codecs.OpusPacket{}
_, err := opus.Unmarshal(buf)
if err != nil {
if w, ok := oggWriter.(*io.PipeWriter); ok {
w.CloseWithError(err)
}
return err
}

if err := o.Write(&opus); err != nil {
if w, ok := oggWriter.(*io.PipeWriter); ok {
w.CloseWithError(err)
}
return err
}
}
}
return oggReader
}

type opusRequest struct {
Expand Down Expand Up @@ -685,3 +590,30 @@ func silentPacket(audioStreamingHeader bool) []byte {

return packet
}

func channelToIOReadCloser(ctx context.Context, ch chan []byte) io.ReadCloser {
r, w := io.Pipe()

go func() {
defer w.Close()

for {
select {
case <-ctx.Done():
w.CloseWithError(ctx.Err())
return
case buf, ok := <-ch:
if !ok {
w.CloseWithError(fmt.Errorf("channel closed"))
return
}
if _, err := w.Write(buf); err != nil {
w.CloseWithError(err)
return
}
}
}
}()

return r
}
4 changes: 3 additions & 1 deletion packet_dump_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,16 @@ func (h *PacketDumpHandler) ResetRetryCount() int {
return h.RetryCount
}

func (h *PacketDumpHandler) Handle(ctx context.Context, reader io.Reader) (*io.PipeReader, error) {
func (h *PacketDumpHandler) Handle(ctx context.Context, opusCh chan []byte) (*io.PipeReader, error) {
c := h.Config
filename := c.DumpFile
channelID := h.ChannelID
connectionID := h.ConnectionID

r, w := io.Pipe()

reader := channelToIOReadCloser(ctx, opusCh)

go func() {
f, err := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion service_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ var (
)

type serviceHandlerInterface interface {
Handle(context.Context, io.Reader) (*io.PipeReader, error)
Handle(context.Context, chan []byte) (*io.PipeReader, error)
UpdateRetryCount() int
GetRetryCount() int
ResetRetryCount() int
Expand Down
4 changes: 3 additions & 1 deletion speech_to_text_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,11 @@ func (h *SpeechToTextHandler) ResetRetryCount() int {
return h.RetryCount
}

func (h *SpeechToTextHandler) Handle(ctx context.Context, packetReader io.Reader) (*io.PipeReader, error) {
func (h *SpeechToTextHandler) Handle(ctx context.Context, opusCh chan []byte) (*io.PipeReader, error) {
stt := NewSpeechToText(h.Config, h.LanguageCode, int32(h.SampleRate), int32(h.ChannelCount))

packetReader := opus2ogg(ctx, opusCh, h.SampleRate, h.ChannelCount, h.Config)

stream, err := stt.Start(ctx, packetReader)
if err != nil {
return nil, err
Expand Down
4 changes: 3 additions & 1 deletion test_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,11 @@ func (h *TestHandler) ResetRetryCount() int {
return h.RetryCount
}

func (h *TestHandler) Handle(ctx context.Context, reader io.Reader) (*io.PipeReader, error) {
func (h *TestHandler) Handle(ctx context.Context, opusCh chan []byte) (*io.PipeReader, error) {
r, w := io.Pipe()

reader := channelToIOReadCloser(ctx, opusCh)

go func() {
encoder := json.NewEncoder(w)

Expand Down

0 comments on commit d321c2d

Please sign in to comment.