diff --git a/Sources/RTMP/RTMPConnection.swift b/Sources/RTMP/RTMPConnection.swift index bd0ff7c14..4eb6fdd45 100644 --- a/Sources/RTMP/RTMPConnection.swift +++ b/Sources/RTMP/RTMPConnection.swift @@ -177,6 +177,10 @@ open class RTMPConnection: EventDispatcher { open var totalBytesOut:Int64 { return socket.totalBytesOut } + /// The statistics of total dropped bytes. + open var totalBytesDropped:Int64 { + return socket.totalBytesDropped + } /// The statistics of total RTMPStream counts. open var totalStreamsCount:Int { return streams.count diff --git a/Sources/RTMP/RTMPSocket.swift b/Sources/RTMP/RTMPSocket.swift index 1e8d55a83..fe80ea849 100644 --- a/Sources/RTMP/RTMPSocket.swift +++ b/Sources/RTMP/RTMPSocket.swift @@ -8,6 +8,7 @@ protocol RTMPSocketCompatible: class { var chunkSizeS:Int { get set } var totalBytesIn:Int64 { get } var totalBytesOut:Int64 { get } + var totalBytesDropped:Int64 { get } var queueBytesOut:Int64 { get } var inputBuffer:Data { get set } var securityLevel:StreamSocketSecurityLevel { get set } @@ -54,6 +55,7 @@ final class RTMPSocket: NetSocket, RTMPSocketCompatible { delegate?.didSetTotalBytesIn(totalBytesIn) } } + fileprivate(set) var totalBytesDropped:Int64 = 0 override var connected:Bool { didSet { diff --git a/Sources/RTMP/RTMPTSocket.swift b/Sources/RTMP/RTMPTSocket.swift index 5b36bf17a..0bd610dd1 100644 --- a/Sources/RTMP/RTMPTSocket.swift +++ b/Sources/RTMP/RTMPTSocket.swift @@ -38,6 +38,7 @@ final class RTMPTSocket: NSObject, RTMPSocketCompatible { fileprivate(set) var totalBytesIn:Int64 = 0 fileprivate(set) var totalBytesOut:Int64 = 0 + fileprivate(set) var totalBytesDropped:Int64 = 0 fileprivate(set) var queueBytesOut:Int64 = 0 fileprivate var timer:Timer? { didSet { @@ -62,11 +63,15 @@ final class RTMPTSocket: NSObject, RTMPSocketCompatible { private var connectionID:String? private var isRequesting:Bool = false private var outputBuffer:Data = Data() + private var outputBufferChunks:[Data] = [] private var lastResponse:Date = Date() - private var lastRequestPathComponent:String? - private var lastRequestData:Data? private var isRetryingRequest:Bool = true + private let maxChunks = 20 + + private var requestTask:URLSessionUploadTask? + private var requestTaskTimer:Timer? + override init() { super.init() } @@ -85,6 +90,24 @@ final class RTMPTSocket: NSObject, RTMPSocketCompatible { timer = Timer(timeInterval: 0.1, target: self, selector: #selector(RTMPTSocket.on(timer:)), userInfo: nil, repeats: true) } + private func addChunk(bytes:[UInt8]) { + self.outputBufferChunks.append(Data(bytes)) + if (self.outputBufferChunks.count > maxChunks) { + OSAtomicAdd64(Int64(self.outputBufferChunks.first!.count), &self.totalBytesDropped) + OSAtomicAdd64(-Int64(self.outputBufferChunks.first!.count), &self.queueBytesOut) + self.outputBufferChunks.removeFirst() + } + } + + private func getBuffer() -> Data { + var result:Data = Data() + for chunk in self.outputBufferChunks { + result += chunk; + } + return result + } + + @discardableResult func doOutput(chunk:RTMPChunk, locked:UnsafeMutablePointer? = nil) -> Int { var bytes:[UInt8] = [] @@ -94,10 +117,11 @@ final class RTMPTSocket: NSObject, RTMPSocketCompatible { } outputQueue.sync { - self.outputBuffer.append(contentsOf: bytes) + OSAtomicAdd64(Int64(bytes.count), &self.queueBytesOut) + addChunk(bytes: bytes) if !self.isRequesting { - self.doOutput(data: self.outputBuffer) - self.outputBuffer.removeAll() + self.doOutput(data: getBuffer(), isChunk: true) + self.outputBufferChunks.removeAll() } } if (locked != nil) { @@ -130,34 +154,14 @@ final class RTMPTSocket: NSObject, RTMPSocketCompatible { logger.trace("\(String(describing: data)):\(String(describing: response)):\(String(describing: error))") } + doNextRequest() + if let error:Error = error { logger.error("\(error)") - if let lastRequestPathComponent: String = self.lastRequestPathComponent, - let lastRequestData: Data = self.lastRequestData, !isRetryingRequest { - if (logger.isEnabledFor(level: .trace)) { - logger.trace("Will retry request for path=\(lastRequestPathComponent)") - } - outputQueue.sync { - isRetryingRequest = true - doRequest(lastRequestPathComponent, lastRequestData, listen) - } - } - return } - isRetryingRequest = false - - outputQueue.sync { - if (self.outputBuffer.isEmpty) { - self.isRequesting = false - } else { - self.doOutput(data: outputBuffer) - self.outputBuffer.removeAll() - } - } - guard let response:HTTPURLResponse = response as? HTTPURLResponse, let contentType:String = response.allHeaderFields["Content-Type"] as? String, @@ -195,6 +199,27 @@ final class RTMPTSocket: NSObject, RTMPSocketCompatible { default: break } + + } + + private func doNextRequest() { + if (!self.connected) { + return + } + + let buffer:Data = getBuffer() + self.outputQueue.sync { + if (buffer.isEmpty) { + self.isRequesting = false + } else { + self.doOutput(data: buffer, isChunk: true) + self.outputBufferChunks.removeAll() + } + } + } + + func setTimeout(delay:TimeInterval, block:@escaping ()->Void) -> Timer { + return Timer.scheduledTimer(timeInterval: delay, target: BlockOperation(block: block), selector: #selector(Operation.main), userInfo: nil, repeats: false) } private func didIdent2(data:Data?, response:URLResponse?, error:Error?) { @@ -263,23 +288,49 @@ final class RTMPTSocket: NSObject, RTMPSocketCompatible { } @discardableResult - final private func doOutput(data:Data) -> Int { + final private func doOutput(data:Data, isChunk:Bool? = false) -> Int { guard let connectionID:String = connectionID, connected else { return 0 } let index:Int64 = OSAtomicIncrement64(&self.index) - doRequest("/send/\(connectionID)/\(index)", c2packet + data, listen) + doRequest("/send/\(connectionID)/\(index)", c2packet + data, { + (receivedData:Data?, response:URLResponse?, error:Error?) in + if (isChunk ?? false) { + OSAtomicAdd64(-Int64(data.count), &self.queueBytesOut) + } + self.listen(data: receivedData, response: response, error: error) + }) c2packet.removeAll() return data.count } + private var timeSent = Date() + private func doRequest(_ pathComponent: String,_ data:Data,_ completionHandler: @escaping ((Data?, URLResponse?, Error?) -> Void)) { isRequesting = true - lastRequestPathComponent = pathComponent - lastRequestData = data request = URLRequest(url: baseURL.appendingPathComponent(pathComponent)) request.httpMethod = "POST" - session.uploadTask(with: request, from: data, completionHandler: completionHandler).resume() + requestTask = session.uploadTask(with: request, from: data, completionHandler: { + (requestData:Data?, response:URLResponse?, error:Error?) in + if let error:Error = error { + } else { + OSAtomicAdd64(Int64(data.count), &self.totalBytesOut) + } + if (self.index > 1) { + self.requestTaskTimer!.invalidate() + } + completionHandler(requestData, response, error) + }) + timeSent = Date() + requestTask!.resume() + + if (index > 1) { + requestTaskTimer = setTimeout(delay: 2, block: { + OSAtomicAdd64(Int64(data.count), &self.totalBytesDropped) + self.requestTask!.cancel() + completionHandler(nil, nil, nil) + }) + } if (logger.isEnabledFor(level: .trace)) { logger.trace("\(self.request)") } @@ -289,6 +340,5 @@ final class RTMPTSocket: NSObject, RTMPSocketCompatible { // MARK: - extension RTMPTSocket: URLSessionTaskDelegate { func urlSession(_ session: URLSession, task: URLSessionTask, didSendBodyData bytesSent: Int64, totalBytesSent: Int64, totalBytesExpectedToSend: Int64) { - OSAtomicAdd64(bytesSent, &totalBytesOut) } }