Skip to content

Commit

Permalink
Clean up telemetry_test and fix. (#3047)
Browse files Browse the repository at this point in the history
b/330355045

The fix includes fixing the starboard implementation of `MessagePump`:
`MessagePumpUIStarboard` and `MessagePumpIOStarboard`.

(cherry picked from commit a529b2b)
  • Loading branch information
aee-google authored and anonymous1-me committed Apr 30, 2024
1 parent beaec51 commit 224b1b3
Show file tree
Hide file tree
Showing 18 changed files with 556 additions and 232 deletions.
5 changes: 5 additions & 0 deletions base/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -994,6 +994,7 @@ component("base") {
"synchronization/condition_variable_starboard.cc",
"synchronization/lock_impl_starboard.cc",
"synchronization/waitable_event_starboard.cc",
"synchronization/waitable_event_watcher_starboard.cc",
"sys_info_starboard.cc",
"threading/platform_thread_starboard.cc",
"threading/thread_local_storage_starboard.cc",
Expand Down Expand Up @@ -3902,6 +3903,10 @@ test("base_unittests") {
sources += [ "debug/allocation_trace_unittest.cc" ]
}

if (is_starboard) {
sources += [ "message_loop/message_pump_io_starboard_unittest.cc" ]
}

if (is_ios && !is_starboard) {
sources += [
"ios/device_util_unittest.mm",
Expand Down
66 changes: 28 additions & 38 deletions base/message_loop/message_pump_io_starboard.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@

namespace base {

MessagePumpIOStarboard::SocketWatcher::SocketWatcher()
: interests_(kSbSocketWaiterInterestNone),
MessagePumpIOStarboard::SocketWatcher::SocketWatcher(const Location& from_here)
: created_from_location_(from_here),
interests_(kSbSocketWaiterInterestNone),
socket_(kSbSocketInvalid),
pump_(NULL),
watcher_(NULL),
pump_(nullptr),
watcher_(nullptr),
weak_factory_(this) {}

MessagePumpIOStarboard::SocketWatcher::~SocketWatcher() {
Expand All @@ -39,7 +40,10 @@ MessagePumpIOStarboard::SocketWatcher::~SocketWatcher() {
}

bool MessagePumpIOStarboard::SocketWatcher::StopWatchingSocket() {
watcher_ = nullptr;
interests_ = kSbSocketWaiterInterestNone;
if (!SbSocketIsValid(socket_)) {
pump_ = nullptr;
// If this watcher is not watching anything, no-op and return success.
return true;
}
Expand All @@ -50,9 +54,7 @@ bool MessagePumpIOStarboard::SocketWatcher::StopWatchingSocket() {
DCHECK(pump_);
result = pump_->StopWatching(socket);
}
pump_ = NULL;
watcher_ = NULL;
interests_ = kSbSocketWaiterInterestNone;
pump_ = nullptr;
return result;
}

Expand Down Expand Up @@ -92,7 +94,6 @@ void MessagePumpIOStarboard::SocketWatcher::OnSocketReadyToWrite(

MessagePumpIOStarboard::MessagePumpIOStarboard()
: keep_running_(true),
in_run_(false),
processed_io_events_(false),
waiter_(SbSocketWaiterCreate()) {}

Expand Down Expand Up @@ -170,16 +171,14 @@ void MessagePumpIOStarboard::RemoveIOObserver(IOObserver* obs) {

// Reentrant!
void MessagePumpIOStarboard::Run(Delegate* delegate) {
AutoReset<bool> auto_reset_in_run(&in_run_, true);
AutoReset<bool> auto_reset_keep_running(&keep_running_, true);

for (;;) {
Delegate::NextWorkInfo next_work_info = delegate->DoWork();
bool has_more_immediate_work = next_work_info.is_immediate();
if (!keep_running_)
break;
bool immediate_work_available = next_work_info.is_immediate();

if (has_more_immediate_work)
continue;
if (should_quit())
break;

// NOTE: We need to have a wake-up pending any time there is work queued,
// and the MessageLoop only wakes up the pump when the work queue goes from
Expand All @@ -193,44 +192,37 @@ void MessagePumpIOStarboard::Run(Delegate* delegate) {
// loop and call delegate->DoWork() before we decide to block.
SbSocketWaiterResult result = SbSocketWaiterWaitTimed(waiter_, 0);
DCHECK_NE(kSbSocketWaiterResultInvalid, result);
has_more_immediate_work |=
(result == kSbSocketWaiterResultWokenUp) || processed_io_events_;

bool attempt_more_work =
(result == kSbSocketWaiterResultWokenUp) || immediate_work_available || processed_io_events_;
processed_io_events_ = false;
if (!keep_running_)

if (should_quit())
break;

if (has_more_immediate_work)
if (attempt_more_work)
continue;

has_more_immediate_work = delegate->DoIdleWork();
if (!keep_running_)
attempt_more_work = delegate->DoIdleWork();

if (should_quit())
break;

if (has_more_immediate_work)
if (attempt_more_work)
continue;

if (!next_work_info.delayed_run_time.is_null()) {
delayed_work_time_ = next_work_info.delayed_run_time;
}
if (delayed_work_time_.is_null()) {
if (next_work_info.delayed_run_time.is_max()) {
SbSocketWaiterWait(waiter_);
} else {
TimeDelta delay = delayed_work_time_ - TimeTicks::Now();
if (delay > TimeDelta()) {
SbSocketWaiterWaitTimed(waiter_, delay.InMicroseconds());
} else {
// It looks like delayed_work_time_ indicates a time in the past, so we
// need to call DoDelayedWork now.
delayed_work_time_ = TimeTicks();
}
SbSocketWaiterWaitTimed(waiter_, next_work_info.remaining_delay().InMicroseconds());
}
}

keep_running_ = true;
if (should_quit())
break;
}
}

void MessagePumpIOStarboard::Quit() {
DCHECK(in_run_);
// Tell both the SbObjectWaiter and Run that they should break out of their
// loops.
keep_running_ = false;
Expand All @@ -246,8 +238,6 @@ void MessagePumpIOStarboard::ScheduleDelayedWork(
// We know that we can't be blocked on Wait right now since this method can
// only be called on the same thread as Run, so we only need to update our
// record of how long to sleep when we do sleep.
delayed_work_time_ = next_work_info.delayed_run_time;
ScheduleWork();
}

void MessagePumpIOStarboard::WillProcessIOEvent() {
Expand Down
24 changes: 11 additions & 13 deletions base/message_loop/message_pump_io_starboard.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,12 @@ class BASE_EXPORT MessagePumpIOStarboard : public MessagePump {
// Object returned by WatchSocket to manage further watching.
class SocketWatcher {
public:
SocketWatcher();
SocketWatcher(const Location& from_here);
~SocketWatcher(); // Implicitly calls StopWatchingSocket.

SocketWatcher(const SocketWatcher&) = delete;
SocketWatcher& operator=(const SocketWatcher&) = delete;

// NOTE: These methods aren't called StartWatching()/StopWatching() to avoid
// confusion with the win32 ObjectWatcher class.

Expand Down Expand Up @@ -90,14 +93,13 @@ class BASE_EXPORT MessagePumpIOStarboard : public MessagePump {
void OnSocketReadyToRead(SbSocket socket, MessagePumpIOStarboard* pump);
void OnSocketReadyToWrite(SbSocket socket, MessagePumpIOStarboard* pump);

const Location created_from_location_;
int interests_;
SbSocket socket_;
bool persistent_;
MessagePumpIOStarboard* pump_;
Watcher* watcher_;
base::WeakPtrFactory<SocketWatcher> weak_factory_;

// DISALLOW_COPY_AND_ASSIGN(SocketWatcher);
};

enum Mode {
Expand All @@ -107,6 +109,10 @@ class BASE_EXPORT MessagePumpIOStarboard : public MessagePump {
};

MessagePumpIOStarboard();
virtual ~MessagePumpIOStarboard();

MessagePumpIOStarboard(const MessagePumpIOStarboard&) = delete;
MessagePumpIOStarboard& operator=(const MessagePumpIOStarboard&) = delete;

// Have the current thread's message loop watch for a a situation in which
// reading/writing to the socket can be performed without blocking. Callers
Expand Down Expand Up @@ -135,9 +141,6 @@ class BASE_EXPORT MessagePumpIOStarboard : public MessagePump {
virtual void ScheduleWork() override;
virtual void ScheduleDelayedWork(const Delegate::NextWorkInfo& next_work_info) override;

// protected:
virtual ~MessagePumpIOStarboard();

private:
friend class MessagePumpIOStarboardTest;

Expand All @@ -151,25 +154,20 @@ class BASE_EXPORT MessagePumpIOStarboard : public MessagePump {
void* context,
int ready_interests);

bool should_quit() const { return !keep_running_; }

// This flag is set to false when Run should return.
bool keep_running_;

// This flag is set when inside Run.
bool in_run_;

// This flag is set if the Socket Waiter has processed I/O events.
bool processed_io_events_;

// The time at which we should call DoDelayedWork.
TimeTicks delayed_work_time_;

// Starboard socket waiter dispatcher. Waits for all sockets registered with
// it, and sends readiness callbacks when a socket is ready for I/O.
SbSocketWaiter waiter_;

ObserverList<IOObserver> io_observers_;
THREAD_CHECKER(watch_socket_caller_checker_);
// DISALLOW_COPY_AND_ASSIGN(MessagePumpIOStarboard);
};

using MessagePumpForIO = MessagePumpIOStarboard;
Expand Down
Loading

0 comments on commit 224b1b3

Please sign in to comment.