Skip to content

Commit

Permalink
WIP: deprecate SbSocket APIs
Browse files Browse the repository at this point in the history
approach 0

Reuse tcp_socket_starboard and message_pump_starboard.

b/302741384
  • Loading branch information
maxz-lab committed Aug 15, 2024
1 parent 2a9a01c commit 495064e
Show file tree
Hide file tree
Showing 29 changed files with 1,720 additions and 69 deletions.
115 changes: 114 additions & 1 deletion base/message_loop/message_pump_io_starboard.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@

#include "base/message_loop/message_pump_io_starboard.h"

#if SB_API_VERSION >= 16
#include <sys/socket.h>
#include <sys/stat.h>
#include <netinet/in.h>
#endif

#include "base/auto_reset.h"
#include "base/compiler_specific.h"
#include "base/logging.h"
Expand All @@ -23,57 +29,97 @@
#include "starboard/common/socket.h"
#include "starboard/socket_waiter.h"



namespace base {

MessagePumpIOStarboard::SocketWatcher::SocketWatcher(const Location& from_here)
: created_from_location_(from_here),
interests_(kSbSocketWaiterInterestNone),
#if SB_API_VERSION >= 16
socket_(-1),
#else
socket_(kSbSocketInvalid),
#endif
pump_(nullptr),
watcher_(nullptr),
weak_factory_(this) {}

MessagePumpIOStarboard::SocketWatcher::~SocketWatcher() {
#if SB_API_VERSION >= 16
if (socket_ >= 0) {
#else
if (SbSocketIsValid(socket_)) {
#endif
StopWatchingSocket();
}
}

bool MessagePumpIOStarboard::SocketWatcher::StopWatchingSocket() {
watcher_ = nullptr;
interests_ = kSbSocketWaiterInterestNone;
#if SB_API_VERSION >= 16
if (socket_ < 0) {
#else
if (!SbSocketIsValid(socket_)) {
#endif
pump_ = nullptr;
// If this watcher is not watching anything, no-op and return success.
return true;
}

#if SB_API_VERSION >= 16
int socket = Release();
bool result = true;
if (socket >= 0) {
#else
SbSocket socket = Release();
bool result = true;
if (SbSocketIsValid(socket)) {
#endif
DCHECK(pump_);
result = pump_->StopWatching(socket);
}
pump_ = nullptr;
return result;
}

#if SB_API_VERSION >= 16
void MessagePumpIOStarboard::SocketWatcher::Init(int socket,
bool persistent) {
DCHECK(socket >= 0);
DCHECK(socket_ < 0);
#else
void MessagePumpIOStarboard::SocketWatcher::Init(SbSocket socket,
bool persistent) {
DCHECK(socket);
DCHECK(!socket_);
#endif

socket_ = socket;
persistent_ = persistent;
}

#if SB_API_VERSION >= 16
int MessagePumpIOStarboard::SocketWatcher::Release() {
int socket = socket_;
socket_ = -1;
return socket;
}
#else
SbSocket MessagePumpIOStarboard::SocketWatcher::Release() {
SbSocket socket = socket_;
socket_ = kSbSocketInvalid;
return socket;
}
#endif

void MessagePumpIOStarboard::SocketWatcher::OnSocketReadyToRead(
#if SB_API_VERSION >= 16
int socket,
#else
SbSocket socket,
#endif
MessagePumpIOStarboard* pump) {
if (!watcher_)
return;
Expand All @@ -83,7 +129,11 @@ void MessagePumpIOStarboard::SocketWatcher::OnSocketReadyToRead(
}

void MessagePumpIOStarboard::SocketWatcher::OnSocketReadyToWrite(
#if SB_API_VERSION >= 16
int socket,
#else
SbSocket socket,
#endif
MessagePumpIOStarboard* pump) {
if (!watcher_)
return;
Expand All @@ -102,12 +152,21 @@ MessagePumpIOStarboard::~MessagePumpIOStarboard() {
SbSocketWaiterDestroy(waiter_);
}

#if SB_API_VERSION >= 16
bool MessagePumpIOStarboard::Watch(int socket,
bool persistent,
int mode,
SocketWatcher* controller,
Watcher* delegate) {
DCHECK(socket >= 0);
#else
bool MessagePumpIOStarboard::Watch(SbSocket socket,
bool persistent,
int mode,
SocketWatcher* controller,
Watcher* delegate) {
DCHECK(SbSocketIsValid(socket));
#endif
DCHECK(controller);
DCHECK(delegate);
DCHECK(mode == WATCH_READ || mode == WATCH_WRITE || mode == WATCH_READ_WRITE);
Expand All @@ -123,8 +182,14 @@ bool MessagePumpIOStarboard::Watch(SbSocket socket,
interests |= kSbSocketWaiterInterestWrite;
}

#if SB_API_VERSION >= 16
int old_socket = controller->Release();
if (old_socket >= 0)
#else
SbSocket old_socket = controller->Release();
if (SbSocketIsValid(old_socket)) {
if (SbSocketIsValid(old_socket))
#endif
{
// It's illegal to use this function to listen on 2 separate fds with the
// same |controller|.
if (old_socket != socket) {
Expand All @@ -141,14 +206,25 @@ bool MessagePumpIOStarboard::Watch(SbSocket socket,
interests |= old_interest_mask;

// Must disarm the event before we can reuse it.
#if SB_API_VERSION >= 16
SbPosixSocketWaiterRemove(waiter_, old_socket);
#else
SbSocketWaiterRemove(waiter_, old_socket);
#endif // SB_API_VERSION >= 16
}

// Set current interest mask and waiter for this event.
#if SB_API_VERSION >= 16
if (!SbPosixSocketWaiterAdd(waiter_, socket, controller,
OnPosixSocketWaiterNotification, interests, persistent)){
return false;
}
#else
if (!SbSocketWaiterAdd(waiter_, socket, controller,
OnSocketWaiterNotification, interests, persistent)) {
return false;
}
#endif // SB_API_VERSION >= 16

controller->Init(socket, persistent);
controller->set_watcher(delegate);
Expand All @@ -157,9 +233,15 @@ bool MessagePumpIOStarboard::Watch(SbSocket socket,
return true;
}

#if SB_API_VERSION >= 16
bool MessagePumpIOStarboard::StopWatching(int socket) {
return SbPosixSocketWaiterRemove(waiter_, socket);
}
#else
bool MessagePumpIOStarboard::StopWatching(SbSocket socket) {
return SbSocketWaiterRemove(waiter_, socket);
}
#endif // SB_API_VERSION >= 16

void MessagePumpIOStarboard::AddIOObserver(IOObserver* obs) {
io_observers_.AddObserver(obs);
Expand Down Expand Up @@ -252,6 +334,36 @@ void MessagePumpIOStarboard::DidProcessIOEvent() {
}
}

#if SB_API_VERSION >= 16

// static
void MessagePumpIOStarboard::OnPosixSocketWaiterNotification(SbSocketWaiter waiter,
int socket,
void* context,
int ready_interests) {
base::WeakPtr<SocketWatcher> controller =
static_cast<SocketWatcher*>(context)->weak_factory_.GetWeakPtr();
DCHECK(controller.get());

MessagePumpIOStarboard* pump = controller->pump();
pump->processed_io_events_ = true;

// If not persistent, the watch has been released at this point.
if (!controller->persistent()) {
controller->Release();
}

if (ready_interests & kSbSocketWaiterInterestWrite) {
controller->OnSocketReadyToWrite(socket, pump);
}

// Check |controller| in case it's been deleted previously.
if (controller.get() && ready_interests & kSbSocketWaiterInterestRead) {
controller->OnSocketReadyToRead(socket, pump);
}
}

#else
// static
void MessagePumpIOStarboard::OnSocketWaiterNotification(SbSocketWaiter waiter,
SbSocket socket,
Expand Down Expand Up @@ -279,4 +391,5 @@ void MessagePumpIOStarboard::OnSocketWaiterNotification(SbSocketWaiter waiter,
}
}

#endif // SB_API_VERSION >= 16
} // namespace base
39 changes: 39 additions & 0 deletions base/message_loop/message_pump_io_starboard.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
#ifndef BASE_MESSAGE_PUMP_IO_STARBOARD_H_
#define BASE_MESSAGE_PUMP_IO_STARBOARD_H_

#include "starboard/configuration.h"

#include "base/compiler_specific.h"
#include "base/memory/weak_ptr.h"
#include "base/message_loop/message_pump.h"
Expand Down Expand Up @@ -49,8 +51,13 @@ class BASE_EXPORT MessagePumpIOStarboard : public MessagePump {
public:
// These methods are called from MessageLoop::Run when a socket can be
// interacted with without blocking.
#if SB_API_VERSION >= 16
virtual void OnSocketReadyToRead(int socket) {}
virtual void OnSocketReadyToWrite(int socket) {}
#else
virtual void OnSocketReadyToRead(SbSocket socket) {}
virtual void OnSocketReadyToWrite(SbSocket socket) {}
#endif

protected:
virtual ~Watcher() {}
Expand Down Expand Up @@ -79,8 +86,13 @@ class BASE_EXPORT MessagePumpIOStarboard : public MessagePump {
friend class MessagePumpIOStarboardTest;

// Called by MessagePumpIOStarboard.
#if SB_API_VERSION >= 16
void Init(int socket, bool persistent);
int Release();
#else
void Init(SbSocket socket, bool persistent);
SbSocket Release();
#endif

int interests() const { return interests_; }
void set_interests(int interests) { interests_ = interests; }
Expand All @@ -90,12 +102,21 @@ class BASE_EXPORT MessagePumpIOStarboard : public MessagePump {

void set_watcher(Watcher* watcher) { watcher_ = watcher; }

#if SB_API_VERSION >= 16
void OnSocketReadyToRead(int socket, MessagePumpIOStarboard* pump);
void OnSocketReadyToWrite(int socket, MessagePumpIOStarboard* pump);
#else
void OnSocketReadyToRead(SbSocket socket, MessagePumpIOStarboard* pump);
void OnSocketReadyToWrite(SbSocket socket, MessagePumpIOStarboard* pump);
#endif

const Location created_from_location_;
int interests_;
#if SB_API_VERSION >= 16
int socket_;
#else
SbSocket socket_;
#endif
bool persistent_;
MessagePumpIOStarboard* pump_;
Watcher* watcher_;
Expand Down Expand Up @@ -123,6 +144,16 @@ class BASE_EXPORT MessagePumpIOStarboard : public MessagePump {
// If an error occurs while calling this method in a cumulative fashion, the
// event previously attached to |controller| is aborted. Returns true on
// success. Must be called on the same thread the message_pump is running on.
#if SB_API_VERSION >= 16
bool Watch(int socket,
bool persistent,
int mode,
SocketWatcher* controller,
Watcher* delegate);

// Stops watching the socket.
bool StopWatching(int socket);
#else
bool Watch(SbSocket socket,
bool persistent,
int mode,
Expand All @@ -131,6 +162,7 @@ class BASE_EXPORT MessagePumpIOStarboard : public MessagePump {

// Stops watching the socket.
bool StopWatching(SbSocket socket);
#endif

void AddIOObserver(IOObserver* obs);
void RemoveIOObserver(IOObserver* obs);
Expand All @@ -149,10 +181,17 @@ class BASE_EXPORT MessagePumpIOStarboard : public MessagePump {

// Called by SbSocketWaiter to tell us a registered socket can be read and/or
// written to.
#if SB_API_VERSION >= 16
static void OnPosixSocketWaiterNotification(SbSocketWaiter waiter,
int socket,
void* context,
int ready_interests);
#else
static void OnSocketWaiterNotification(SbSocketWaiter waiter,
SbSocket socket,
void* context,
int ready_interests);
#endif

bool should_quit() const { return !keep_running_; }

Expand Down
Loading

0 comments on commit 495064e

Please sign in to comment.