Skip to content

Commit

Permalink
Solved problems with track sequence
Browse files Browse the repository at this point in the history
  • Loading branch information
lanstat committed Aug 28, 2023
1 parent 6829edc commit a12ffe1
Show file tree
Hide file tree
Showing 7 changed files with 141 additions and 51 deletions.
3 changes: 3 additions & 0 deletions src/EventType.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
#define RESOURCE_TYPE_STREAMING 1
#define RESOURCE_TYPE_CACHE 2

#define TRANSFER_MODE_HEADER_SOON_AS_POSSIBLE 0
#define TRANSFER_MODE_WAIT_UNTIL_COMPLETE 1

#define EMPTY_BUFFER 12345678
#define END_BUFFER 12345679

Expand Down
155 changes: 113 additions & 42 deletions src/HLSStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
#define CHANNEL_INNER_TAG 0x05
#define SEGMENT_INNER_PLAYLIST_TAG 0x06

#define REMAINING_FETCHES 5
#define REMAINING_FETCHES 20
#define MAX_TRACKS_PER_PLAYLIST 4

HLSStream::HLSStream() {}

Expand All @@ -34,6 +35,7 @@ bool HLSStream::HandleExistsResource(struct Request *entry) {
mux->tag = CHANNEL_LIST_TAG;
} else if (Utils::EndsWith(path, "/index.m3u8")) {
mux->tag = CHANNEL_TAG;
mux->transfer_mode = TRANSFER_MODE_WAIT_UNTIL_COMPLETE;
mux->in_memory = true;
} else if (Utils::EndsWith(path, ".m3u8")) {
mux->tag = SEGMENT_PLAYLIST_TAG;
Expand All @@ -47,7 +49,7 @@ bool HLSStream::HandleExistsResource(struct Request *entry) {
struct SegmentPlaylist *playlist = playlists_.at(entry->resource_id);
playlist->remaining_fetches = REMAINING_FETCHES;
if (mux->is_completed) {
GeneratePlaylist(entry->resource_id, mux);
GenerateSegmentClientList(entry->resource_id, mux);
}
}
}
Expand All @@ -57,13 +59,11 @@ bool HLSStream::HandleExistsResource(struct Request *entry) {
int HLSStream::NotifyCacheCompleted(uint64_t resource_id, struct iovec *buffer,
int size) {
auto mux = GetResource(resource_id);
if (mux->tag == CHANNEL_TAG) {
ProcessChannel(mux, buffer, size, 0);
} else if (mux->tag == CHANNEL_INNER_TAG) {
uint64_t parent_id = segments_.at(resource_id);
ProcessChannel(mux, buffer, size, parent_id);
RemoveSegment(resource_id);
} else if (mux->tag == SEGMENT_INNER_PLAYLIST_TAG) {
if (mux->tag == SEGMENT_TAG) {
return Stream::NotifyCacheCompleted(resource_id, buffer, size);
}

if (mux->tag == SEGMENT_INNER_PLAYLIST_TAG) {
uint64_t parent_id = segments_.at(resource_id);
bool proceed = ProcessSegmentList(parent_id, buffer, size);
RemoveSegment(resource_id);
Expand All @@ -73,16 +73,35 @@ int HLSStream::NotifyCacheCompleted(uint64_t resource_id, struct iovec *buffer,
resource_id = parent_id;
size = 0;
auto mux = GetResource(resource_id);
GeneratePlaylist(resource_id, mux);
GenerateSegmentClientList(resource_id, mux);
} else {
return 1;
}
}
return Stream::NotifyCacheCompleted(resource_id, buffer, size);

struct iovec *new_buffer = new struct iovec[size];
if (mux->tag == CHANNEL_TAG) {
ProcessChannel(mux, buffer, size, 0, new_buffer);
} else if (mux->tag == CHANNEL_INNER_TAG) {
uint64_t parent_id = segments_.at(resource_id);
ProcessChannel(mux, buffer, size, parent_id, new_buffer);
RemoveSegment(resource_id);
}
int response = Stream::NotifyCacheCompleted(resource_id, new_buffer, size);

for (int i = 0; i < size; i++) {
if (new_buffer[i].iov_len > 0) {
free(new_buffer[i].iov_base);
}
}

delete[] new_buffer;

return response;
}

void HLSStream::ProcessChannel(struct Mux *mux, struct iovec *buffer, int size,
uint64_t parent_id) {
uint64_t parent_id, struct iovec *new_buffer) {
struct Request *channel = mux->requests[0];

for (int i = 0; i < size; i++) {
Expand All @@ -107,10 +126,11 @@ void HLSStream::ProcessChannel(struct Mux *mux, struct iovec *buffer, int size,
if (parent_id == 0) {
resource_id = Helpers::GetResourceId(path.c_str());
if (playlists_.find(resource_id) == playlists_.end()) {
CreatePlaylist(resource_id, mux->url);
CreatePlaylist(resource_id, mux->resource_id, mux->url);
}
}
RequestFile(resource_id, url, SEGMENT_INNER_PLAYLIST_TAG, 100);
break;
}
}
}
Expand All @@ -122,9 +142,13 @@ void HLSStream::ProcessChannel(struct Mux *mux, struct iovec *buffer, int size,
pos = content.find(Settings::Proxy);
}

memset(buffer[i].iov_base, 0, buffer[i].iov_len);
buffer[i].iov_len = content.length();
memcpy(buffer[i].iov_base, (void *)content.c_str(), content.length());
new_buffer[i].iov_len = content.length();
new_buffer[i].iov_base = malloc(new_buffer[i].iov_len);
memset(new_buffer[i].iov_base, 0, new_buffer[i].iov_len);
memcpy(new_buffer[i].iov_base, (void *)content.c_str(),
content.length());

UpdateHeader(mux, content.length());
}
}
}
Expand Down Expand Up @@ -173,47 +197,54 @@ bool HLSStream::ProcessSegmentList(uint64_t parent_id, struct iovec *buffer,
tmp = tmp.replace(tmp.begin(), tmp.begin() + 7, "/");
Log(__FILE__, __LINE__) << "Found TS: " << tmp;

int pos = url.find(Settings::Proxy);
if (pos != std::string::npos) {
url.replace(pos, Settings::Proxy.length(), Settings::BaseUrl);
}

std::string block = line + "\n" + url + "\n";
playlist->track_stamps.push_back(epoch_ms + timeout);
playlist->track_uids.push_back(uid);
playlist->track_urls.push_back(block);
playlist->sequence++;
}
}
}
}

std::cout << "LAN_[" << __FILE__ << ":" << __LINE__ << "] "
<< playlist->track_uids.size() << std::endl;
if (!found) {
RequestFile(playlist->resource_id, playlist->url, CHANNEL_INNER_TAG, 500);
} else {
RequestFile(playlist->resource_id, playlist->url, CHANNEL_INNER_TAG,
timeout - 8000);
2500);
}
return true;
}

void HLSStream::CreatePlaylist(uint64_t resource_id, std::string channel_url) {
void HLSStream::CreatePlaylist(uint64_t resource_id, uint64_t channel_id,
std::string channel_url) {
auto playlist = new SegmentPlaylist();
playlist->resource_id = resource_id;
playlist->channel_id = channel_id;
playlist->url = channel_url;
playlist->remaining_fetches = REMAINING_FETCHES;
playlists_.insert(
std::pair<uint64_t, struct SegmentPlaylist *>(resource_id, playlist));

struct Mux *mux = CreateMux(std::string());
struct Mux *mux = CreateMux(resource_id, std::string());
mux->tag = SEGMENT_PLAYLIST_TAG;
mux->type = RESOURCE_TYPE_CACHE;
mux->transfer_mode = TRANSFER_MODE_WAIT_UNTIL_COMPLETE;
mux->in_memory = true;
mux->is_completed = false;

std::stringstream ss;
ss << "HTTP/1.1 200 OK\r\n"
<< "Server: Astra\r\n"
<< "Cache-Control: no-cache\r\n"
<< "Content-Length: 0\r\n"
<< "Access-Control-Allow-Origin: *\r\n"
<< "Access-Control-Allow-Methods: GET\r\n"
<< "X-Id: G2T\r\n"
<< "Access-Control-Allow-Credentials: true\r\n"
<< "Content-Type: application/vnd.apple.mpegURL\r\n"
<< "Connection: close\r\n"
Expand All @@ -234,7 +265,7 @@ void HLSStream::CreatePlaylist(uint64_t resource_id, std::string channel_url) {
void HLSStream::RequestFile(uint64_t parent_id, std::string url, int tag,
int msecs) {
uint64_t resource_id = Helpers::GetResourceId(url.c_str());
struct Mux *mux = CreateMux(std::string());
struct Mux *mux = CreateMux(resource_id, std::string());
mux->tag = tag;
mux->in_memory = true;
std::pair<uint64_t, struct Mux *> item(resource_id, mux);
Expand Down Expand Up @@ -275,17 +306,6 @@ void HLSStream::RequestFile(uint64_t parent_id, std::string url, int tag,
Helpers::SendRequestNop(ring_, inner, msecs);
}

void HLSStream::RemoveSegment(uint64_t resource_id) {
auto mux = GetResource(resource_id);
const std::vector<struct Request *> &requests = mux->requests;
for (struct Request *const c : requests) {
Utils::ReleaseRequest(c);
}
ReleaseResource(resource_id);

segments_.erase(resource_id);
}

long HLSStream::GetTicks() {
auto now = std::chrono::system_clock::now();
auto now_ms = std::chrono::time_point_cast<std::chrono::milliseconds>(now);
Expand All @@ -294,22 +314,39 @@ long HLSStream::GetTicks() {
return epoch_ms;
}

void HLSStream::GeneratePlaylist(uint64_t resource_id, struct Mux *mux) {
void HLSStream::GenerateSegmentClientList(uint64_t resource_id,
struct Mux *mux) {
// Only generate playlist if there are pending requests
if (mux->requests.empty()) {
return;
}
struct SegmentPlaylist *playlist = playlists_.at(resource_id);
long ticks = GetTicks();

std::stringstream ss;
ss << "#EXTM3U\r\n";
ss << "#EXT-X-VERSION:3\r\n";
ss << "#EXT-X-TARGETDURATION:3\r\n";
ss << "#EXT-X-MEDIA-SEQUENCE:2086\r\n";

long ticks = GetTicks();

struct SegmentPlaylist *playlist = playlists_.at(resource_id);
for (auto url : playlist->track_urls) {
ss << url;
ss << "#EXT-X-MEDIA-SEQUENCE:" << playlist->sequence << "\r\n";

// Only copy the tracks that are after the current ticks
int pivot = 0;
const std::vector<long> &stamps = playlist->track_stamps;
for (int i = 0; i < stamps.size(); i++) {
if (stamps[i] > ticks) {
pivot = i;
if (pivot > 0) {
pivot--;
}
break;
}
}
const std::vector<std::string> &urls = playlist->track_urls;
for (int i = 0; i < MAX_TRACKS_PER_PLAYLIST; i++) {
if ((pivot + i) >= urls.size()) {
break;
}
ss << urls.at(pivot + i);
}
std::string content = ss.str();
int size = content.size() + 1;
Expand All @@ -318,6 +355,8 @@ void HLSStream::GeneratePlaylist(uint64_t resource_id, struct Mux *mux) {
mux->buffer[0].iov_base = malloc(size);
memset(mux->buffer[0].iov_base, 0, size);
memcpy(mux->buffer[0].iov_base, content.c_str(), size);

UpdateHeader(mux, size);
}

void HLSStream::CreateSegment(uint64_t resource_id, uint64_t parent) {
Expand All @@ -328,6 +367,38 @@ void HLSStream::RemovePlaylist(uint64_t resource_id) {
Log(__FILE__, __LINE__, Log::kWarning)
<< "Removing playlist " << resource_id;
auto playlist = playlists_.at(resource_id);

auto channel_mux = GetResource(playlist->channel_id);
const std::vector<struct Request *> &requests = channel_mux->requests;
for (struct Request *const c : requests) {
Utils::ReleaseRequest(c);
}
ReleaseResource(playlist->channel_id);

delete playlist;
playlists_.erase(resource_id);
}

void HLSStream::RemoveSegment(uint64_t resource_id) {
auto mux = GetResource(resource_id);
const std::vector<struct Request *> &requests = mux->requests;
for (struct Request *const c : requests) {
Utils::ReleaseRequest(c);
}
ReleaseResource(resource_id);

segments_.erase(resource_id);
}

void HLSStream::UpdateHeader(struct Mux *mux, int content_length) {
std::string header((char *)mux->header.iov_base);

header = Utils::ReplaceHeaderTag(header, "Content-Length", std::to_string(content_length));
if (mux->header.iov_len > 0) {
free(mux->header.iov_base);
}
mux->header.iov_len = header.size();
mux->header.iov_base = malloc(mux->header.iov_len);
memset(mux->header.iov_base, 0, mux->header.iov_len);
memcpy(mux->header.iov_base, (void *)header.c_str(), header.size());
}
9 changes: 6 additions & 3 deletions src/HLSStream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ class HLSStream : public Stream {
protected:
struct SegmentPlaylist {
uint64_t resource_id;
uint64_t channel_id;
int remaining_fetches;
int sequence;
std::string url;
std::vector<std::string> track_urls;
std::vector<uint64_t> track_uids;
Expand All @@ -27,19 +29,20 @@ class HLSStream : public Stream {
std::unordered_map<uint64_t, uint64_t> segments_;

private:
void ProcessChannel(struct Mux *mux, struct iovec *buffer, int size, uint64_t parent_id);
void ProcessChannel(struct Mux *mux, struct iovec *buffer, int size, uint64_t parent_id, struct iovec *new_buffer);
bool ProcessSegmentList(uint64_t parent_id, struct iovec *buffer, int size);
void CreatePlaylist(uint64_t resource_id, std::string channel_url);
void CreatePlaylist(uint64_t resource_id, uint64_t channel_id, std::string channel_url);
void AppendSegments();
void RequestPlaylist(uint64_t resource_id, std::string url);
void RequestFile(uint64_t parent_id, std::string url, int tag, int msecs = 0);

long GetTicks();
void GeneratePlaylist(uint64_t resource_id, struct Mux *mux);
void GenerateSegmentClientList(uint64_t resource_id, struct Mux *mux);
void CreateSegment(uint64_t resource_id, uint64_t parent);

void RemoveSegment(uint64_t resource_id);
void RemovePlaylist(uint64_t resource_id);
void UpdateHeader(struct Mux *mux, int content_length);

};
#endif
Expand Down
1 change: 0 additions & 1 deletion src/Http.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ std::string Http::ProcessExternalHeader(struct Request *http) {

//tmp = Utils::ReplaceHeaderTag(tmp, "Server", "cdn/0.1.0");
// tmp = Utils::ReplaceHeaderTag(tmp, "ETag", GetEtag(http->resource_id));
tmp = Utils::RemoveHeaderTag(tmp, "Content-Length");

return tmp;
}
Expand Down
2 changes: 2 additions & 0 deletions src/Mux.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ struct Node {
};

struct Mux {
uint64_t resource_id;
std::vector<struct Request *> requests;
struct iovec header;
unsigned int pivot;
int type;
int tag;
int transfer_mode;
bool is_completed;
bool in_memory;
std::string path;
Expand Down
Loading

0 comments on commit a12ffe1

Please sign in to comment.