From a9b4de5c3ab4ffa60e1547ef9ef8c3c81a429767 Mon Sep 17 00:00:00 2001 From: Maksym Sobolyev Date: Tue, 7 Jan 2025 12:20:24 -0800 Subject: [PATCH] Move update command into its own TU and start adding subcommands support. --- sippy/rtp_proxy/session/main.go | 29 ++--- sippy/rtp_proxy/session/side.go | 137 ++++++++++------------- sippy/rtp_proxy/session/update.go | 179 ++++++++++++++++++++++++++++++ 3 files changed, 247 insertions(+), 98 deletions(-) create mode 100644 sippy/rtp_proxy/session/update.go diff --git a/sippy/rtp_proxy/session/main.go b/sippy/rtp_proxy/session/main.go index 7ffecd2..329ed45 100644 --- a/sippy/rtp_proxy/session/main.go +++ b/sippy/rtp_proxy/session/main.go @@ -59,23 +59,12 @@ type Rtp_proxy_session struct { rtpp_wi chan *rtpp_cmd } -type rtpproxy_update_result struct { - rtpproxy_address string - rtpproxy_port string - family string - sendonly bool -} - type rtpp_cmd struct { cmd string cb func(string) rtp_proxy_client sippy_types.RtpProxyClient } -func (self *rtpproxy_update_result) Address() string { - return self.rtpproxy_address -} - func NewRtp_proxy_session(config sippy_conf.Config, rtp_proxy_clients []sippy_types.RtpProxyClient, call_id, from_tag, to_tag, notify_socket, notify_tag string, session_lock sync.Locker) (*Rtp_proxy_session, error) { self := &Rtp_proxy_session{ notify_socket : notify_socket, @@ -91,8 +80,6 @@ func NewRtp_proxy_session(config sippy_conf.Config, rtp_proxy_clients []sippy_ty } self.caller.otherside = &self.callee self.callee.otherside = &self.caller - self.caller.owner = self - self.callee.owner = self self.caller.session_exists = false self.callee.session_exists = false online_clients := []sippy_types.RtpProxyClient{} @@ -140,7 +127,7 @@ func NewRtp_proxy_session(config sippy_conf.Config, rtp_proxy_clients []sippy_ty result_callback(result) */ func (self *Rtp_proxy_session) PlayCaller(prompt_name string, times int/*= 1*/, result_callback func(string)/*= nil*/, index int /*= 0*/) { - self.caller._play(prompt_name, times, result_callback, index) + self.caller._play(prompt_name, times, result_callback, index, self) } func (self *Rtp_proxy_session) send_command(cmd string, cb func(string)) { @@ -175,12 +162,14 @@ func (self *Rtp_proxy_session) cmd_done(res string) { } func (self *Rtp_proxy_session) StopPlayCaller(result_callback func(string)/*= nil*/, index int/*= 0*/) { - self.caller._stop_play(result_callback, index) + self.caller._stop_play(result_callback, index, self) } func (self *Rtp_proxy_session) StartRecording(rname/*= nil*/ string, result_callback func(string)/*= nil*/, index int/*= 0*/) { if ! self.caller.session_exists { - self.caller.update("0.0.0.0", "0", func(*rtpproxy_update_result) { self._start_recording(rname, result_callback, index) }, "", index, "IP4") + up_cb := func(*UpdateResult, *Rtp_proxy_session, sippy_types.SipHandlingError) { self._start_recording(rname, result_callback, index) } + up := NewUpdateParams(self, index, up_cb) + self.caller.update(up) return } self._start_recording(rname, result_callback, index) @@ -220,12 +209,12 @@ func (self *Rtp_proxy_session) Delete() { self._rtp_proxy_client = nil } -func (self *Rtp_proxy_session) OnCallerSdpChange(sdp_body sippy_types.MsgBody, result_callback func(sippy_types.MsgBody)) error { - return self.caller._on_sdp_change(sdp_body, result_callback) +func (self *Rtp_proxy_session) OnCallerSdpChange(sdp_body sippy_types.MsgBody, result_callback sippy_types.OnDelayedCB) error { + return self.caller._on_sdp_change(self, sdp_body, result_callback) } -func (self *Rtp_proxy_session) OnCalleeSdpChange(sdp_body sippy_types.MsgBody, result_callback func(sippy_types.MsgBody)) error { - return self.callee._on_sdp_change(sdp_body, result_callback) +func (self *Rtp_proxy_session) OnCalleeSdpChange(sdp_body sippy_types.MsgBody, result_callback sippy_types.OnDelayedCB) error { + return self.callee._on_sdp_change(self, sdp_body, result_callback) } func rtp_proxy_session_destructor(self *Rtp_proxy_session) { diff --git a/sippy/rtp_proxy/session/side.go b/sippy/rtp_proxy/session/side.go index c657c1e..57efa49 100644 --- a/sippy/rtp_proxy/session/side.go +++ b/sippy/rtp_proxy/session/side.go @@ -28,7 +28,6 @@ package rtp_proxy_session import ( - "math" "strconv" "strings" "sync/atomic" @@ -40,7 +39,6 @@ import ( type _rtpps_side struct { otherside *_rtpps_side - owner *Rtp_proxy_session session_exists bool laddress string raddress *sippy_net.HostPort @@ -51,106 +49,76 @@ type _rtpps_side struct { to_tag string } -func (self *_rtpps_side) _play(prompt_name string, times int, result_callback func(string), index int) { +func (self *_rtpps_side) _play(prompt_name string, times int, result_callback func(string), index int, rtpps *Rtp_proxy_session) { if ! self.session_exists { return } if ! self.otherside.session_exists { - self.otherside.update("0.0.0.0", "0", func(*rtpproxy_update_result) { self.__play(prompt_name, times, result_callback, index) }, "", index, "IP4") + up_cb := func(*UpdateResult, *Rtp_proxy_session, sippy_types.SipHandlingError) { self.__play(prompt_name, times, result_callback, index, rtpps) } + up := NewUpdateParams(rtpps, index, up_cb) + self.otherside.update(up) return } - self.__play(prompt_name, times, result_callback, index) + self.__play(prompt_name, times, result_callback, index, rtpps) } -func (self *_rtpps_side) __play(prompt_name string, times int, result_callback func(string), index int) { - command := "P" + strconv.Itoa(times) + " " + self.owner.call_id + "-" + strconv.Itoa(index) + " " + prompt_name + " " + self.codecs + " " + self.from_tag + " " + self.to_tag - self.owner.send_command(command, func(r string) { self.owner.command_result(r, result_callback) }) +func (self *_rtpps_side) __play(prompt_name string, times int, result_callback func(string), index int, rtpps *Rtp_proxy_session) { + command := "P" + strconv.Itoa(times) + " " + rtpps.call_id + "-" + strconv.Itoa(index) + " " + prompt_name + " " + self.codecs + " " + self.from_tag + " " + self.to_tag + rtpps.send_command(command, func(r string) { rtpps.command_result(r, result_callback) }) } -func (self *_rtpps_side) update(remote_ip string, remote_port string, result_callback func(*rtpproxy_update_result), options/*= ""*/ string, index /*= 0*/int, atype /*= "IP4"*/string) { +func max(a, b int) int { + if a >= b {return a} + return b +} + +func (self *_rtpps_side) update(up *UpdateParams) { var sbind_supported, is_local, tnot_supported bool var err error command := "U" - self.owner.max_index = int(math.Max(float64(self.owner.max_index), float64(index))) - if sbind_supported, err = self.owner.SBindSupported(); err != nil { + up.rtpps.max_index = max(up.rtpps.max_index, up.index) + if sbind_supported, err = up.rtpps.SBindSupported(); err != nil { return } - if is_local, err = self.owner.IsLocal(); err != nil { + if is_local, err = up.rtpps.IsLocal(); err != nil { return } - if tnot_supported, err = self.owner.TNotSupported(); err != nil { + if tnot_supported, err = up.rtpps.TNotSupported(); err != nil { return } if sbind_supported { if self.raddress != nil { - //if self.owner.IsLocal() && atype == "IP4" { + //if self.owner.IsLocal() && up.atype == "IP4" { // options += "L" + self.laddress //} else if ! self.owner.IsLocal() { // options += "R" + self.raddress.Host.String() //} - options += "R" + self.raddress.Host.String() + up.options += "R" + self.raddress.Host.String() } else if self.laddress != "" && is_local { - options += "L" + self.laddress + up.options += "L" + self.laddress } } - command += options + command += up.options if self.otherside.session_exists { - command += " " + self.owner.call_id + "-" + strconv.Itoa(index) + " " + remote_ip + " " + remote_port + " " + self.from_tag + " " + self.to_tag + command += " " + up.rtpps.call_id + "-" + strconv.Itoa(up.index) + " " + up.remote_ip + " " + up.remote_port + " " + self.from_tag + " " + self.to_tag } else { - command += " " + self.owner.call_id + "-" + strconv.Itoa(index) + " " + remote_ip + " " + remote_port + " " + self.from_tag + command += " " + up.rtpps.call_id + "-" + strconv.Itoa(up.index) + " " + up.remote_ip + " " + up.remote_port + " " + self.from_tag } - if self.owner.notify_socket != "" && index == 0 && tnot_supported { - command += " " + self.owner.notify_socket + " " + self.owner.notify_tag + if up.rtpps.notify_socket != "" && up.index == 0 && tnot_supported { + command += " " + up.rtpps.notify_socket + " " + up.rtpps.notify_tag } - self.owner.send_command(command, func(r string) { self.update_result(r, remote_ip, atype, result_callback) }) + up.rtpps.send_command(command, func(r string) { self.update_result(r, up) }) } -func (self *_rtpps_side) update_result(result, remote_ip, atype string, result_callback func(*rtpproxy_update_result)) { +func (self *_rtpps_side) update_result(result string, up *UpdateParams) { //print "%s.update_result(%s)" % (id(self), result) //result_callback, face, callback_parameters = args self.session_exists = true - if result == "" { - result_callback(nil) - return - } - t1 := strings.Fields(result) - if t1[0][0] == 'E' { - result_callback(nil) - return - } - rtpproxy_port, err := strconv.Atoi(t1[0]) - if err != nil || rtpproxy_port == 0 { - result_callback(nil) - return - } - family := "IP4" - rtpproxy_address := "" - if len(t1) > 1 { - rtpproxy_address = t1[1] - if len(t1) > 2 && t1[2] == "6" { - family = "IP6" - } - } else { - if rtpproxy_address, err = self.owner.GetProxyAddress(); err != nil { - return - } - } - sendonly := false - if atype == "IP4" && remote_ip == "0.0.0.0" { - sendonly = true - } else if atype == "IP6" && remote_ip == "::" { - sendonly = true - } - result_callback(&rtpproxy_update_result{ - rtpproxy_address : rtpproxy_address, - rtpproxy_port : t1[0], - family : family, - sendonly : sendonly, - }) + up.ProcessRtppResult(result) } -func (self *_rtpps_side) _on_sdp_change(sdp_body sippy_types.MsgBody, result_callback func(sippy_types.MsgBody)) error { +func (self *_rtpps_side) _on_sdp_change(rtpps *Rtp_proxy_session, sdp_body sippy_types.MsgBody, result_callback sippy_types.OnDelayedCB) error { parsed_body, err := sdp_body.GetSdp() if err != nil { return err @@ -165,7 +133,7 @@ func (self *_rtpps_side) _on_sdp_change(sdp_body sippy_types.MsgBody, result_cal } if len(sects) == 0 { sdp_body.SetNeedsUpdate(false) - result_callback(sdp_body) + result_callback(sdp_body, nil) return nil } formats := sects[0].GetMHeader().GetFormats() @@ -180,24 +148,37 @@ func (self *_rtpps_side) _on_sdp_change(sdp_body sippy_types.MsgBody, result_cal if sect.GetCHeader().GetAType() == "IP6" { sect_options = "6" + options } - self.update(sect.GetCHeader().GetAddr(), sect.GetMHeader().GetPort(), - func (res *rtpproxy_update_result) { self._sdp_change_finish(res, sdp_body, parsed_body, sect, §ions_left, result_callback) }, - sect_options, i, sect.GetCHeader().GetAType()) + up_cb := func (ur *UpdateResult, rtpps *Rtp_proxy_session, ex sippy_types.SipHandlingError) { self._sdp_change_finish(sdp_body, parsed_body, sect, §ions_left, result_callback, ur, rtpps, ex) } + up := NewUpdateParams(rtpps, i, up_cb) + up.remote_ip = sect.GetCHeader().GetAddr() + up.remote_port = sect.GetMHeader().GetPort() + up.atype = sect.GetCHeader().GetAType() + up.options = sect_options + + self.update(up) } return nil } -func (self *_rtpps_side) _sdp_change_finish(cb_args *rtpproxy_update_result, sdp_body sippy_types.MsgBody, parsed_body sippy_types.Sdp, sect *sippy_sdp.SdpMediaDescription, sections_left *int64, result_callback func(sippy_types.MsgBody)) { - if cb_args != nil { +func (self *_rtpps_side) _sdp_change_finish(sdp_body sippy_types.MsgBody, parsed_body sippy_types.Sdp, sect *sippy_sdp.SdpMediaDescription, sections_left *int64, result_callback sippy_types.OnDelayedCB, ur *UpdateResult, rtpps *Rtp_proxy_session, ex sippy_types.SipHandlingError) { + if ! sdp_body.NeedsUpdate() { + return + } + if ex != nil { + sdp_body.SetNeedsUpdate(false) + result_callback(nil, ex) + return + } + if ur != nil { if self.after_sdp_change != nil { - self.after_sdp_change(cb_args) + self.after_sdp_change(ur) } - sect.GetCHeader().SetAType(cb_args.family) - sect.GetCHeader().SetAddr(cb_args.rtpproxy_address) + sect.GetCHeader().SetAType(ur.family) + sect.GetCHeader().SetAddr(ur.rtpproxy_address) if sect.GetMHeader().GetPort() != "0" { - sect.GetMHeader().SetPort(cb_args.rtpproxy_port) + sect.GetMHeader().SetPort(strconv.Itoa(ur.rtpproxy_port)) } - if cb_args.sendonly { + if ur.sendonly { sect.RemoveAHeader("sendrecv") if ! sect.HasAHeader([]string{ "recvonly", "sendonly", "inactive" }) { sect.AddHeader("a", "sendonly") @@ -212,7 +193,7 @@ func (self *_rtpps_side) _sdp_change_finish(cb_args *rtpproxy_update_result, sdp // more work is in progress return } - if self.owner.insert_nortpp { + if rtpps.insert_nortpp { parsed_body.AppendAHeader("nortpproxy=yes") } sdp_body.SetNeedsUpdate(false) @@ -228,13 +209,13 @@ func (self *_rtpps_side) _sdp_change_finish(cb_args *rtpproxy_update_result, sdp origin.SetAddress("192.0.2.1") origin.SetAddressType("IP4") origin.SetNetworkType("IN") - result_callback(sdp_body) + result_callback(sdp_body, nil) } -func (self *_rtpps_side) _stop_play(cb func(string), index int) { +func (self *_rtpps_side) _stop_play(cb func(string), index int, rtpps *Rtp_proxy_session) { if ! self.otherside.session_exists { return } - command := "S " + self.owner.call_id + "-" + strconv.Itoa(index) + " " + self.from_tag + " " + self.to_tag - self.owner.send_command(command, func(r string) { self.owner.command_result(r, cb) }) + command := "S " + rtpps.call_id + "-" + strconv.Itoa(index) + " " + self.from_tag + " " + self.to_tag + rtpps.send_command(command, func(r string) { rtpps.command_result(r, cb) }) } diff --git a/sippy/rtp_proxy/session/update.go b/sippy/rtp_proxy/session/update.go new file mode 100644 index 0000000..695b408 --- /dev/null +++ b/sippy/rtp_proxy/session/update.go @@ -0,0 +1,179 @@ +// Copyright (c) 2003-2005 Maxim Sobolev. All rights reserved. +// Copyright (c) 2006-2025 Sippy Software, Inc. All rights reserved. +// Copyright (c) 2016 Andriy Pylypenko. All rights reserved. +// +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, +// are permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this +// list of conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, +// this list of conditions and the following disclaimer in the documentation and/or +// other materials provided with the distribution. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR +// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON +// ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +package rtp_proxy_session + +import ( + "strings" + "strconv" + + "github.com/sippy/go-b2bua/sippy/types" + "github.com/sippy/go-b2bua/sippy/exceptions" +) + +type UpdateParams struct { + rtpps *Rtp_proxy_session + remote_ip string + remote_port string + result_callback func(*UpdateResult, *Rtp_proxy_session, sippy_types.SipHandlingError) + options string + index int + atype string + subcommands []*Subcommand +} + +type UpdateResult struct { + rtpproxy_address string + rtpproxy_port int + family string + sendonly bool +} + +type Subcommand struct { + commands []string + handleResults func([]string, *UpdateResult) (sippy_types.SipHandlingError) +} + +func NewUpdateParams(rtpps *Rtp_proxy_session, index int, result_cb func(*UpdateResult, *Rtp_proxy_session, sippy_types.SipHandlingError)) *UpdateParams { + return &UpdateParams{ + rtpps: rtpps, + index: index, + result_callback: result_cb, + remote_ip: "0.0.0.0", + remote_port: "0", + atype: "IP4", + subcommands: []*Subcommand{}, + } +} + +func (up *UpdateParams) ProcessRtppResult(result string) *UpdateResult { + if result == "" { + ex := sippy_exceptions.NewRtpProxyError("RTPProxy errored") + up.result_callback(nil, up.rtpps, ex) + return nil + } + + result = strings.TrimSpace(result) + t0 := strings.SplitN(result, "&&", 2) + t1 := strings.Fields(t0[0]) + + if len(t1) == 0 || t1[0][0] == 'E' { + ex := sippy_exceptions.NewRtpProxyError("RTPProxy errored: " + t1[0]) + up.result_callback(nil, up.rtpps, ex) + return nil + } + + ur := &UpdateResult{} + + if len(up.subcommands) > 0 { + subcRess := []string{} + if len(t0) > 1 { + subcRess = strings.Split(t0[1], "&&") + for i := range subcRess { + subcRess[i] = strings.TrimSpace(subcRess[i]) + } + } + + actual := len(subcRess) + expected := 0 + for _, subc := range up.subcommands { + expected += len(subc.commands) + } + + if actual > expected { + ex := sippy_exceptions.NewRtpProxyError("RTPProxy errored: too many results") + up.result_callback(nil, up.rtpps, ex) + return nil + } + + if actual > 0 && subcRess[len(subcRess)-1] == "-1" { + foff := len(subcRess) + for _, subc := range up.subcommands { + if foff > len(subc.commands) { + foff -= len(subc.commands) + continue + } + ex := sippy_exceptions.NewRtpProxyError("RTPProxy errored: " + subc.commands[foff-1] + ": -1") + up.result_callback(nil, up.rtpps, ex) + return nil + } + } + + if actual < expected { + extra := make([]string, expected - actual) + for i := range extra { + extra[i] = "0" + } + subcRess = append(subcRess, extra...) + } + + for _, subc := range up.subcommands { + results := subcRess[:len(subc.commands)] + if ex := subc.handleResults(results, ur); ex != nil { + up.result_callback(nil, up.rtpps, ex) + return nil + } + subcRess = subcRess[len(subc.commands):] + if len(subcRess) == 0 { + break + } + } + } + + var err error + ur.rtpproxy_port, err = strconv.Atoi(t1[0]) + if err != nil || ur.rtpproxy_port == 0 { + ex := sippy_exceptions.NewRtpProxyError("RTPProxy errored: bad port") + up.result_callback(nil, up.rtpps, ex) + return nil + } + + ur.family = "IP4" + if len(t1) > 1 { + ur.rtpproxy_address = t1[1] + if len(t1) > 2 && t1[2] == "6" { + ur.family = "IP6" + } + } else { + ur.rtpproxy_address = up.rtpps._rtp_proxy_client.GetProxyAddress() + } + + if up.atype == "IP4" && up.remote_ip == "0.0.0.0" { + ur.sendonly = true + } else if up.atype == "IP6" && up.remote_ip == "::" { + ur.sendonly = true + } else { + ur.sendonly = false + } + + up.result_callback(ur, up.rtpps, nil) + return ur +} + +func (self *UpdateResult) Address() string { + return self.rtpproxy_address +}