Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix live crash #232

Merged
merged 5 commits into from
Nov 13, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions wiliwili/include/api/live/danmaku_live.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@
#include <atomic>
#include <thread>
#include <mutex>
#include <functional>

#include <borealis.hpp>
#include <borealis/core/singleton.hpp>
#include "mongoose.h"

typedef void (*on_message_func_t)(const std::string &);
class LiveDanmaku : public brls::Singleton<LiveDanmaku> {
public:
int room_id;
Expand All @@ -26,8 +26,8 @@ class LiveDanmaku : public brls::Singleton<LiveDanmaku> {
void send_heartbeat();
void send_text_message(const std::string &message);

void setonMessage(std::function<void(std::string &&)> func);
std::function<void(std::string &&)> onMessage;
void setonMessage(on_message_func_t func);
on_message_func_t onMessage = nullptr;

void set_wait_time(size_t time);
size_t wait_time = 800;
Expand Down
29 changes: 13 additions & 16 deletions wiliwili/source/activity/live_player_activity.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ static void process_danmaku(const std::vector<LiveDanmakuItem>& danmaku_list) {
// danmaku_t_free(dan);
}

static void onDanmakuReceived(std::string&& message) {
const std::string& msg = message;
static void onDanmakuReceived(const std::string& msg) {
std::vector<uint8_t> payload(msg.begin(), msg.end());
std::vector<std::string> messages = parse_packet(payload);

Expand All @@ -62,7 +61,7 @@ static void onDanmakuReceived(std::string&& message) {
}

static void showDialog(const std::string& msg, const std::string& pic,
bool forceQuit) {
bool forceQuit) {
brls::Dialog* dialog;
if (pic.empty()) {
dialog = new brls::Dialog(msg);
Expand Down Expand Up @@ -90,26 +89,25 @@ static void showDialog(const std::string& msg, const std::string& pic,
dialog->open();
}


LiveActivity::LiveActivity(const bilibili::LiveVideoResult& live)
: liveData(live) {
brls::Logger::debug("LiveActivity: create: {}", live.roomid);
LiveDanmaku::instance().setonMessage(onDanmakuReceived);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

问题就在这,原来是先this->setCommonData(),再注册函数,但是这样连接的过快的话LiveDanmaku里面的onMessage是空的

this->setCommonData();
GA("open_live", {{"id", std::to_string(live.roomid)}})
GA("open_live", {{"live_id", std::to_string(live.roomid)}})
LiveDanmaku::instance().setonMessage(onDanmakuReceived);
}

LiveActivity::LiveActivity(int roomid, const std::string& name,
const std::string& views) {
brls::Logger::debug("LiveActivity: create: {}", roomid);
LiveDanmaku::instance().setonMessage(onDanmakuReceived);
this->liveData.roomid = roomid;
this->liveData.title = name;
this->liveData.watched_show.text_large = views;
this->setCommonData();
GA("open_live", {{"id", std::to_string(roomid)}})
GA("open_live", {{"live_id", std::to_string(roomid)}})
LiveDanmaku::instance().setonMessage(onDanmakuReceived);
}

void LiveActivity::setCommonData() {
Expand Down Expand Up @@ -261,10 +259,9 @@ void LiveActivity::onLiveData(const bilibili::LiveRoomPlayInfo& result) {
if (result.is_locked) {
brls::Logger::error("LiveActivity: live {} is locked", result.room_id);
this->video->showOSD(false);
showDialog(
fmt::format("这个房间已经被封禁(至 {})!(╯°口°)╯(┴—┴",
wiliwili::sec2FullDate(result.lock_till)),
"pictures/room-block.png", true);
showDialog(fmt::format("这个房间已经被封禁(至 {})!(╯°口°)╯(┴—┴",
wiliwili::sec2FullDate(result.lock_till)),
"pictures/room-block.png", true);
return;
}
// 0: 未开播 1: 直播中 2: 轮播中
Expand Down Expand Up @@ -306,17 +303,17 @@ void LiveActivity::onError(const std::string& error) {
}

void LiveActivity::onNeedPay(const std::string& msg, const std::string& link,
const std::string& startTime,
const std::string& endTime) {
const std::string& startTime,
const std::string& endTime) {
if (link.empty()) {
showDialog(msg, "", true);
return;
}

auto box = new brls::Box();
auto img = new QRImage();
auto label = new brls::Label();
auto header = new brls::Label();
auto box = new brls::Box();
auto img = new QRImage();
auto label = new brls::Label();
auto header = new brls::Label();
auto subtitle = new brls::Label();
header->setFontSize(24);
header->setMargins(10, 0, 20, 0);
Expand Down
46 changes: 16 additions & 30 deletions wiliwili/source/api/danmaku_live.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

#include <cstddef>
#include <ctime>
#include <iostream>
#include <queue>
#include <condition_variable>
#include <string>
Expand All @@ -36,8 +35,9 @@ static void get_live_s(int room_id) {
json _json;
try {
_json = json::parse(res.text);
} catch (const std::exception &e) {
std::cout << "getDanmuInfo json parse error" << std::endl;
} catch (...) {
brls::Logger::error("getDanmuInfo json parse error");
return;
}
if (_json["code"].get<int>() == 0) {
// url = "ws://" +
Expand All @@ -62,23 +62,13 @@ static void heartbeat_timer(void *param) {
}
}

typedef struct task {
// 函数
const std::function<void(std::string &&)> onMessage;
// 参数
std::string arg;
// 优先级,暂时都设置0
int priority;
} task;

static std::queue<task> task_q;
static std::queue<std::string> msg_q;
static std::condition_variable cv;
static std::mutex task_mutex;
static std::mutex msg_q_mutex;

static void add_task(const std::function<void(std::string &&)> &func,
std::string &&a) {
std::lock_guard<std::mutex> lock(task_mutex);
task_q.emplace(task{func, a, 0});
static void add_msg(std::string &&a) {
std::lock_guard<std::mutex> lock(msg_q_mutex);
msg_q.emplace(std::move(a));
cv.notify_one();
}

Expand Down Expand Up @@ -148,16 +138,15 @@ void LiveDanmaku::connect(int room_id, int64_t uid) {

task_thread = std::thread([this]() {
while (true) {
std::unique_lock<std::mutex> lock(task_mutex);
cv.wait(lock, [this] {
return !task_q.empty() or !this->is_connected();
});
std::unique_lock<std::mutex> lock(msg_q_mutex);
cv.wait(lock,
[this] { return !msg_q.empty() or !this->is_connected(); });
if (!this->is_connected()) break;
auto task = task_q.front();
task_q.pop();
auto msg = std::move(msg_q.front());
msg_q.pop();
lock.unlock();

task.onMessage(std::move(task.arg));
this->onMessage(msg);
}
});

Expand Down Expand Up @@ -247,14 +236,11 @@ static void mongoose_event_handler(struct mg_connection *nc, int ev,
} else if (ev == MG_EV_WS_MSG) {
MG_DEBUG(("%p %s", nc->fd, (char *)ev_data));
struct mg_ws_message *wm = (struct mg_ws_message *)ev_data;
add_task(liveDanmaku->onMessage,
std::string(wm->data.ptr, wm->data.len));
add_msg(std::string(wm->data.ptr, wm->data.len));
} else if (ev == MG_EV_CLOSE) {
MG_DEBUG(("%p %s", nc->fd, (char *)ev_data));
liveDanmaku->ms_ev_ok.store(false, std::memory_order_release);
}
}

void LiveDanmaku::setonMessage(std::function<void(std::string &&)> func) {
onMessage = func;
}
void LiveDanmaku::setonMessage(on_message_func_t func) { onMessage = func; }
10 changes: 6 additions & 4 deletions wiliwili/source/api/util/extract_messages.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,20 +71,22 @@ std::vector<live_t> extract_messages(const std::vector<std::string> &messages) {
std::vector<live_t> live_messages;
live_messages.reserve(messages.size() / 5);

for (auto &message : messages) {
for (const auto &message : messages) {
nlohmann::json json_message;

try {
json_message = nlohmann::json::parse(message);
} catch (nlohmann::json::parse_error &e) {
} catch (const std::exception &e) {
continue;
} catch (...) {
continue;
}

auto it = json_message.find("cmd");

if (it == json_message.end()) continue;

if (it->get<std::string>() == "WATCHED_CHANGE") {
if (it->get_ref<std::string &>() == "WATCHED_CHANGE") {
if (json_message["data"]["num"].is_number()) continue;

auto num = json_message["data"]["num"].get<int>();
Expand All @@ -99,7 +101,7 @@ std::vector<live_t> extract_messages(const std::vector<std::string> &messages) {
wc->num = num;
live_messages.emplace_back(live_t{watched_change, wc});

} else if (it->get<std::string>() == "DANMU_MSG") {
} else if (it->get_ref<std::string &>() == "DANMU_MSG") {
auto &info = json_message["info"];

if (!info.is_array() || info.size() != 17) continue;
Expand Down