Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dropping and counting dropped bytes; continuing with the requests even after one is failed #2

Open
wants to merge 2 commits into
base: external-video-encoder
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Sources/RTMP/RTMPConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ open class RTMPConnection: EventDispatcher {
open var totalBytesOut:Int64 {
return socket.totalBytesOut
}
/// The statistics of total dropped bytes.
open var totalBytesDropped:Int64 {
return socket.totalBytesDropped
}
/// The statistics of total RTMPStream counts.
open var totalStreamsCount:Int {
return streams.count
Expand Down
2 changes: 2 additions & 0 deletions Sources/RTMP/RTMPSocket.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ protocol RTMPSocketCompatible: class {
var chunkSizeS:Int { get set }
var totalBytesIn:Int64 { get }
var totalBytesOut:Int64 { get }
var totalBytesDropped:Int64 { get }
var queueBytesOut:Int64 { get }
var inputBuffer:Data { get set }
var securityLevel:StreamSocketSecurityLevel { get set }
Expand Down Expand Up @@ -54,6 +55,7 @@ final class RTMPSocket: NetSocket, RTMPSocketCompatible {
delegate?.didSetTotalBytesIn(totalBytesIn)
}
}
fileprivate(set) var totalBytesDropped:Int64 = 0

override var connected:Bool {
didSet {
Expand Down
116 changes: 83 additions & 33 deletions Sources/RTMP/RTMPTSocket.swift
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ final class RTMPTSocket: NSObject, RTMPSocketCompatible {

fileprivate(set) var totalBytesIn:Int64 = 0
fileprivate(set) var totalBytesOut:Int64 = 0
fileprivate(set) var totalBytesDropped:Int64 = 0
fileprivate(set) var queueBytesOut:Int64 = 0
fileprivate var timer:Timer? {
didSet {
Expand All @@ -62,11 +63,15 @@ final class RTMPTSocket: NSObject, RTMPSocketCompatible {
private var connectionID:String?
private var isRequesting:Bool = false
private var outputBuffer:Data = Data()
private var outputBufferChunks:[Data] = []
private var lastResponse:Date = Date()
private var lastRequestPathComponent:String?
private var lastRequestData:Data?
private var isRetryingRequest:Bool = true

private let maxChunks = 20

private var requestTask:URLSessionUploadTask?
private var requestTaskTimer:Timer?

override init() {
super.init()
}
Expand All @@ -85,6 +90,24 @@ final class RTMPTSocket: NSObject, RTMPSocketCompatible {
timer = Timer(timeInterval: 0.1, target: self, selector: #selector(RTMPTSocket.on(timer:)), userInfo: nil, repeats: true)
}

private func addChunk(bytes:[UInt8]) {
self.outputBufferChunks.append(Data(bytes))
if (self.outputBufferChunks.count > maxChunks) {
OSAtomicAdd64(Int64(self.outputBufferChunks.first!.count), &self.totalBytesDropped)
OSAtomicAdd64(-Int64(self.outputBufferChunks.first!.count), &self.queueBytesOut)
self.outputBufferChunks.removeFirst()
}
}

private func getBuffer() -> Data {
var result:Data = Data()
for chunk in self.outputBufferChunks {
result += chunk;
}
return result
}


@discardableResult
func doOutput(chunk:RTMPChunk, locked:UnsafeMutablePointer<UInt32>? = nil) -> Int {
var bytes:[UInt8] = []
Expand All @@ -94,10 +117,11 @@ final class RTMPTSocket: NSObject, RTMPSocketCompatible {
}

outputQueue.sync {
self.outputBuffer.append(contentsOf: bytes)
OSAtomicAdd64(Int64(bytes.count), &self.queueBytesOut)
addChunk(bytes: bytes)
if !self.isRequesting {
self.doOutput(data: self.outputBuffer)
self.outputBuffer.removeAll()
self.doOutput(data: getBuffer(), isChunk: true)
self.outputBufferChunks.removeAll()
}
}
if (locked != nil) {
Expand Down Expand Up @@ -130,34 +154,14 @@ final class RTMPTSocket: NSObject, RTMPSocketCompatible {
logger.trace("\(String(describing: data)):\(String(describing: response)):\(String(describing: error))")
}

doNextRequest()

if let error:Error = error {
logger.error("\(error)")

if let lastRequestPathComponent: String = self.lastRequestPathComponent,
let lastRequestData: Data = self.lastRequestData, !isRetryingRequest {
if (logger.isEnabledFor(level: .trace)) {
logger.trace("Will retry request for path=\(lastRequestPathComponent)")
}
outputQueue.sync {
isRetryingRequest = true
doRequest(lastRequestPathComponent, lastRequestData, listen)
}
}

return
}

isRetryingRequest = false

outputQueue.sync {
if (self.outputBuffer.isEmpty) {
self.isRequesting = false
} else {
self.doOutput(data: outputBuffer)
self.outputBuffer.removeAll()
}
}

guard
let response:HTTPURLResponse = response as? HTTPURLResponse,
let contentType:String = response.allHeaderFields["Content-Type"] as? String,
Expand Down Expand Up @@ -195,6 +199,27 @@ final class RTMPTSocket: NSObject, RTMPSocketCompatible {
default:
break
}

}

private func doNextRequest() {
if (!self.connected) {
return
}

let buffer:Data = getBuffer()
self.outputQueue.sync {
if (buffer.isEmpty) {
self.isRequesting = false
} else {
self.doOutput(data: buffer, isChunk: true)
self.outputBufferChunks.removeAll()
}
}
}

func setTimeout(delay:TimeInterval, block:@escaping ()->Void) -> Timer {
return Timer.scheduledTimer(timeInterval: delay, target: BlockOperation(block: block), selector: #selector(Operation.main), userInfo: nil, repeats: false)
}

private func didIdent2(data:Data?, response:URLResponse?, error:Error?) {
Expand Down Expand Up @@ -263,23 +288,49 @@ final class RTMPTSocket: NSObject, RTMPSocketCompatible {
}

@discardableResult
final private func doOutput(data:Data) -> Int {
final private func doOutput(data:Data, isChunk:Bool? = false) -> Int {
guard let connectionID:String = connectionID, connected else {
return 0
}
let index:Int64 = OSAtomicIncrement64(&self.index)
doRequest("/send/\(connectionID)/\(index)", c2packet + data, listen)
doRequest("/send/\(connectionID)/\(index)", c2packet + data, {
(receivedData:Data?, response:URLResponse?, error:Error?) in
if (isChunk ?? false) {
OSAtomicAdd64(-Int64(data.count), &self.queueBytesOut)
}
self.listen(data: receivedData, response: response, error: error)
})
c2packet.removeAll()
return data.count
}

private var timeSent = Date()

private func doRequest(_ pathComponent: String,_ data:Data,_ completionHandler: @escaping ((Data?, URLResponse?, Error?) -> Void)) {
isRequesting = true
lastRequestPathComponent = pathComponent
lastRequestData = data
request = URLRequest(url: baseURL.appendingPathComponent(pathComponent))
request.httpMethod = "POST"
session.uploadTask(with: request, from: data, completionHandler: completionHandler).resume()
requestTask = session.uploadTask(with: request, from: data, completionHandler: {
(requestData:Data?, response:URLResponse?, error:Error?) in
if let error:Error = error {
} else {
OSAtomicAdd64(Int64(data.count), &self.totalBytesOut)
}
if (self.index > 1) {
self.requestTaskTimer!.invalidate()
}
completionHandler(requestData, response, error)
})
timeSent = Date()
requestTask!.resume()

if (index > 1) {
requestTaskTimer = setTimeout(delay: 2, block: {
OSAtomicAdd64(Int64(data.count), &self.totalBytesDropped)
self.requestTask!.cancel()
completionHandler(nil, nil, nil)
})
}
if (logger.isEnabledFor(level: .trace)) {
logger.trace("\(self.request)")
}
Expand All @@ -289,6 +340,5 @@ final class RTMPTSocket: NSObject, RTMPSocketCompatible {
// MARK: -
extension RTMPTSocket: URLSessionTaskDelegate {
func urlSession(_ session: URLSession, task: URLSessionTask, didSendBodyData bytesSent: Int64, totalBytesSent: Int64, totalBytesExpectedToSend: Int64) {
OSAtomicAdd64(bytesSent, &totalBytesOut)
}
}