Skip to content

Commit

Permalink
Add options to control stream tuning parameters; clean-up
Browse files Browse the repository at this point in the history
  • Loading branch information
djp952 committed Jul 24, 2017
1 parent bc26b20 commit a6dd587
Show file tree
Hide file tree
Showing 6 changed files with 312 additions and 82 deletions.
10 changes: 7 additions & 3 deletions pvr.hdhomerundvr/changelog.txt
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
v1.2.0 (2017.07.xx)
v1.2.0 (2017.07.23)
- Update libhdhomerun library to version 2017.06.12
- Fix disabling of seek for Live TV when "Stream LiveTV channels directly from tuner device(s)" is enabled
- Use internal HTTP stream implementation for Recorded TV streams as well as tuner-direct Live TV streams
- Detect and remove SCTE Program Information Message data when combined with the Program Map Table (PMT) data in the same packet
- Remove custom addon callback implementations in favor of the standard ones provided in Kodi source code
- Update MPEG-TS stream implementation to support Recorded TV as well as tuner-direct Live TV streams
- Detect and remove MPEG-TS SCTE PIM (0xC0) table when inserted before the PMT (0x02) table in the same PSI packet
- Add "DVR stream read operation timeout (milliseconds)" advanced option
- Add "DVR stream read operation minimum size" advanced option
- Add "DVR stream ring buffer size" advanced option

v1.1.1 (2017.07.07)
- Allow for 30-second and 1-minute local network discovery intervals (devices, lineups, recordings)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,18 @@ msgctxt "#30113"
msgid "Startup discovery task delay (seconds)"
msgstr ""

msgctxt "#30114"
msgid "DVR stream read operation minimum size"
msgstr ""

msgctxt "#30115"
msgid "DVR stream ring buffer size"
msgstr ""

msgctxt "#30116"
msgid "DVR stream read operation timeout (milliseconds)"
msgstr ""

msgctxt "#30201"
msgid "5 Minutes"
msgstr ""
Expand Down Expand Up @@ -148,6 +160,50 @@ msgctxt "#30217"
msgid "1 Minute"
msgstr ""

msgctxt "#30218"
msgid "None"
msgstr ""

msgctxt "#30219"
msgid "1 KiB"
msgstr ""

msgctxt "#30220"
msgid "2 KiB"
msgstr ""

msgctxt "#30221"
msgid "4 KiB"
msgstr ""

msgctxt "#30222"
msgid "8 KiB"
msgstr ""

msgctxt "#30223"
msgid "16 KiB"
msgstr ""

msgctxt "#30224"
msgid "1 MiB"
msgstr ""

msgctxt "#30225"
msgid "2 MiB"
msgstr ""

msgctxt "#30226"
msgid "4 MiB"
msgstr ""

msgctxt "#30227"
msgid "8 MiB"
msgstr ""

msgctxt "#30228"
msgid "16 MiB"
msgstr ""

msgctxt "#30301"
msgid "Delete episode"
msgstr ""
Expand Down
3 changes: 3 additions & 0 deletions pvr.hdhomerundvr/resources/settings.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
<category label="30002">
<setting id="use_direct_tuning" label="30111" type="bool" default="false"/>
<setting id="startup_discovery_task_delay" label="30113" type="slider" default="3" range="1,1,10" option="int"/>
<setting id="stream_read_timeout" label="30116" type="slider" default="2500" range="500,500,5000" option="int"/>
<setting id="stream_read_minimum_byte_count" label="30114" type="enum" lvalues="30218|30219|30220|30221|30222|30223" default="1"/>
<setting id="stream_ring_buffer_size" label="30115" type="enum" lvalues="30224|30225|30226|30227|30228" default="2"/>
</category>

</settings>
178 changes: 116 additions & 62 deletions src/dvrstream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,23 @@ inline uint32_t read_be32(uint8_t const* ptr)
}

//---------------------------------------------------------------------------
// dvrstream Constructor
// dvrstream Constructor (private)
//
// Arguments:
//
// buffersize - Size in bytes of the stream ring buffer
// url - URL of the file stream to be opened
// url - URL of the stream to be opened
// buffersize - Ring buffer size, in bytes
// readmincount - Minimum bytes to return from a read operation
// readtimeout - Read operation timeout, in millseconds

dvrstream::dvrstream(size_t buffersize, char const* url) : m_buffersize(align::up(buffersize, 65536))
dvrstream::dvrstream(char const* url, size_t buffersize, size_t readmincount, unsigned int readtimeout) :
m_readmincount(std::max(align::down(readmincount, MPEGTS_PACKET_LENGTH), MPEGTS_PACKET_LENGTH)),
m_readtimeout(std::max(1U, readtimeout)), m_buffersize(align::up(buffersize, 65536))
{
// m_readmincount is aligned downward to a mpeg-ts packet boundary with a minimum of one packet
// m_readtimeout has a minimum value of one millisecond
// m_buffersize is aligned upward to a 64KiB boundary

if(url == nullptr) throw std::invalid_argument("url");

// Allocate the ring buffer using the 64KiB upward-aligned buffer size
Expand Down Expand Up @@ -188,6 +196,68 @@ void dvrstream::close(void)
m_worker.join(); // Wait for it to actually stop
}

//---------------------------------------------------------------------------
// dvrstream::create (static)
//
// Factory method, creates a new dvrstream instance
//
// Arguments:
//
// url - URL of the stream to be opened

std::unique_ptr<dvrstream> dvrstream::create(char const* url)
{
return create(url, DEFAULT_RINGBUFFER_SIZE, DEFAULT_READ_MINCOUNT, DEFAULT_READ_TIMEOUT_MS);
}

//---------------------------------------------------------------------------
// dvrstream::create (static)
//
// Factory method, creates a new dvrstream instance
//
// Arguments:
//
// url - URL of the stream to be opened
// buffersize - Ring buffer size, in bytes

std::unique_ptr<dvrstream> dvrstream::create(char const* url, size_t buffersize)
{
return create(url, buffersize, DEFAULT_READ_MINCOUNT, DEFAULT_READ_TIMEOUT_MS);
}

//---------------------------------------------------------------------------
// dvrstream::create (static)
//
// Factory method, creates a new dvrstream instance
//
// Arguments:
//
// url - URL of the stream to be opened
// buffersize - Ring buffer size, in bytes
// readmincount - Minimum bytes to return from a read operation

std::unique_ptr<dvrstream> dvrstream::create(char const* url, size_t buffersize, size_t readmincount)
{
return create(url, buffersize, readmincount, DEFAULT_READ_TIMEOUT_MS);
}

//---------------------------------------------------------------------------
// dvrstream::create (static)
//
// Factory method, creates a new dvrstream instance
//
// Arguments:
//
// url - URL of the stream to be opened
// buffersize - Ring buffer size, in bytes
// readmincount - Minimum bytes to return from a read operation
// readtimeout - Read operation timeout, in millseconds

std::unique_ptr<dvrstream> dvrstream::create(char const* url, size_t buffersize, size_t readmincount, unsigned int readtimeout)
{
return std::unique_ptr<dvrstream>(new dvrstream(url, buffersize, readmincount, readtimeout));
}

//---------------------------------------------------------------------------
// dvrstream::curl_responseheaders (static, private)
//
Expand All @@ -214,7 +284,6 @@ size_t dvrstream::curl_responseheaders(char const* data, size_t size, size_t cou
dvrstream* instance = reinterpret_cast<dvrstream*>(context);

// Accept-Ranges: bytes
//
if((cb >= ACCEPT_RANGES_HEADER_LEN) && (strncmp(ACCEPT_RANGES_HEADER, data, ACCEPT_RANGES_HEADER_LEN) == 0)) {

instance->m_canseek = true; // Only care if header is present
Expand All @@ -223,7 +292,6 @@ size_t dvrstream::curl_responseheaders(char const* data, size_t size, size_t cou
// Content-Range: bytes xxxxxx-yyyyyy/zzzzzz
// Content-Range: bytes xxxxxx-yyyyyy/*
// Content-Range: bytes */zzzzzz
//
else if((cb >= CONTENT_RANGE_HEADER_LEN) && (strncmp(CONTENT_RANGE_HEADER, data, CONTENT_RANGE_HEADER_LEN) == 0)) {

unsigned long long start = 0; // Parsed range start
Expand Down Expand Up @@ -391,83 +459,78 @@ size_t dvrstream::curl_write(void const* data, size_t size, size_t count, void*

void dvrstream::filter_packets(std::unique_lock<std::mutex> const& lock, uint8_t* buffer, size_t count)
{
uint8_t* packet = buffer; // Pointer to the current packet

// The lock argument is necessary to ensure the caller owns it before proceeding
if(!lock.owns_lock()) throw string_exception(__func__, ": caller does not own the unique_lock<>");

// Iterate over all of the packets provided in the buffer
for(size_t index = 0; index < count; index++) {

// Set up the pointer to the start of the packet
packet = buffer + (index * MPEGTS_PACKET_LENGTH);
// Set up the pointer to the start of the packet and a working pointer
uint8_t* packet = buffer + (index * MPEGTS_PACKET_LENGTH);
uint8_t* current = packet;

// READ TRANSPORT STREAM HEADER
//
uint32_t ts_header = read_be32(packet);
// Read relevant values from the transport stream header
uint32_t ts_header = read_be32(current);
uint8_t sync = (ts_header & 0xFF000000) >> 24;
bool pusi = (ts_header & 0x00400000) == 0x00400000;
uint16_t pid = static_cast<uint16_t>((ts_header & 0x001FFF00) >> 8);
bool adaptation = (ts_header & 0x00000020) == 0x00000020;
bool payload = (ts_header & 0x00000010) == 0x00000010;

// CHECK SYNC BYTE
//
// Check the sync byte, should always be 0x47
assert(sync == 0x47);
if(sync != 0x47) continue;

// SKIP TO PLAYLOAD
//
packet += sizeof(uint32_t);
if(adaptation) packet += read_be8(packet);
// Skip over the header and any adaptation bytes
current += 4U;
if(adaptation) current += read_be8(current);

// PAT
//
// >> PAT
if((pid == 0x0000) && (payload)) {

// Align the payload using the pointer provided when pusi is set
if(pusi) packet += read_be8(packet) + 1;
if(pusi) current += read_be8(current) + 1U;

// Get the first and last section indices and skip to the section data
uint8_t firstsection = read_be8(packet + 6);
uint8_t lastsection = read_be8(packet + 7);
packet += 8;
uint8_t firstsection = read_be8(current + 6U);
uint8_t lastsection = read_be8(current + 7U);
current += 8U;

// Iterate over all the sections and add the PMT program ids to the set<>
for(uint8_t section = firstsection; section <= lastsection; section++) {

uint16_t pmt_program = read_be16(packet);
if(pmt_program != 0) m_pmtpids.insert(read_be16(packet + 2) & 0x1FFF);
uint16_t pmt_program = read_be16(current);
if(pmt_program != 0) m_pmtpids.insert(read_be16(current + 2U) & 0x1FFF);

packet += sizeof(uint32_t); // Move to the next section
current += 4U; // Move to the next section
}
}

// PMT
//
// >> PMT
if((pusi) && (payload) && (m_pmtpids.find(pid) != m_pmtpids.end())) {

// Get the address of the current payload pointer and align to it
uint8_t* pointer = packet;
packet += (*pointer + 1);
// Get the length of the entire payload to be sure we don't exceed it
size_t payloadlen = MPEGTS_PACKET_LENGTH - (current - packet);

uint8_t* pointer = current; // Get address of current pointer
current += (*pointer + 1U); // Align offset with the pointer

// FILTER: Skip over 0xC0 (SCTE Program Information Message) entries followed immediately
// by 0x02 (Program Map Table) entries by adjusting the payload pointer and overwriting 0xC0
if(read_be8(packet) == 0xC0) {
if(read_be8(current) == 0xC0) {

// Acquire the length of the 0xC0 entry
uint16_t length = read_be16(packet + 1) & 0x3FF;

//
// TODO: The length cannot specify a position outside of this packet
//
// Acquire the length of the 0xC0 entry, if it exceeds the length of the payload give
// up -- the + 4 bytes is for the pointer (1), the table id (1) and the length (2)
uint16_t length = read_be16(current + 1) & 0x3FF;
if((length + 4U) > payloadlen) break;

// If the 0xC0 entry is immediately followed by a 0x02 entry, adjust the payload
// pointer to align to the 0x02 entry and overwrite the 0xC0 entry with filler
if(read_be8(packet + length) == 0x02) {
if(read_be8(current + 3U + length) == 0x02) {

*pointer = 3 + static_cast<uint8_t>(length & 0xFF);
memset(packet, 0xFF, 3 + length);
// Take into account any existing pointer value when adjusting it
*pointer += (3U + static_cast<uint8_t>(length & 0xFF));
memset(current, 0xFF, 3U + length);
}
}
}
Expand Down Expand Up @@ -515,29 +578,20 @@ unsigned long long dvrstream::position(void) const
// count - Size of the destination buffer in bytes

size_t dvrstream::read(uint8_t* buffer, size_t count)
{
return read(buffer, count, DEFAULT_READ_TIMEOUT_MS);
}

//---------------------------------------------------------------------------
// dvrstream::read
//
// Reads data from the live stream
//
// Arguments:
//
// buffer - Buffer to receive the live stream data
// count - Size of the destination buffer in bytes
// timeoutms - Maximum number of milliseconds to wait before failing

size_t dvrstream::read(uint8_t* buffer, size_t count, unsigned int timeoutms)
{
size_t bytesread = 0; // Total bytes actually read
size_t head = 0; // Current head position
size_t tail = 0; // Current tail position
size_t available = 0; // Available bytes to read
bool stopped = false; // Flag if data transfer has stopped

// Verify that the minimum read count has been aligned properly during construction
assert(m_readmincount == align::down(m_readmincount, MPEGTS_PACKET_LENGTH));
assert(m_readmincount >= MPEGTS_PACKET_LENGTH);

// Verify that the read timeout is at least one millisecond
assert(m_readtimeout >= 1U);

std::unique_lock<std::mutex> lock(m_lock);

if(buffer == nullptr) throw std::invalid_argument("buffer");
Expand All @@ -546,7 +600,7 @@ size_t dvrstream::read(uint8_t* buffer, size_t count, unsigned int timeoutms)

// Wait up to the timeout for there to be at least one single full mpeg-ts packet available in
// the buffer, if there is not the condvar will be triggered on a write or a thread stop.
if(cv_wait_until_equals(m_cv, lock, timeoutms, [&]() -> bool {
if(cv_wait_until_equals(m_cv, lock, m_readtimeout, [&]() -> bool {

tail = m_buffertail.load(); // Copy the atomic<> tail position
head = m_bufferhead.load(); // Copy the atomic<> head position
Expand All @@ -556,13 +610,13 @@ size_t dvrstream::read(uint8_t* buffer, size_t count, unsigned int timeoutms)
available = (tail > head) ? (m_buffersize - tail) + head : head - tail;

// The result from the predicate is true if enough data or stopped
return ((available >= MPEGTS_PACKET_LENGTH) || (stopped));
return ((available >= m_readmincount) || (stopped));

}) == false) return 0;

// If the wait loop was broken by the worker thread stopping, make one more pass
// to ensure that no additional data was first written by the thread
if((available < MPEGTS_PACKET_LENGTH) && (stopped)) {
if((available < m_readmincount) && (stopped)) {

tail = m_buffertail.load(); // Copy the atomic<> tail position
head = m_bufferhead.load(); // Copy the atomic<> head position
Expand Down
Loading

0 comments on commit a6dd587

Please sign in to comment.