Skip to content

Commit

Permalink
MSE-in-Workers
Browse files Browse the repository at this point in the history
  • Loading branch information
at-ninja committed Oct 25, 2024
1 parent 7e04f7f commit 55fac9b
Show file tree
Hide file tree
Showing 13 changed files with 948 additions and 134 deletions.
1 change: 1 addition & 0 deletions cobalt/dom/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ static_library("dom") {
"media_source.cc",
"media_source.h",
"media_source_attachment.h",
"media_source_attachment_supplement.cc",
"media_source_attachment_supplement.h",
"memory_info.cc",
"memory_info.h",
Expand Down
285 changes: 251 additions & 34 deletions cobalt/dom/cross_thread_media_source_attachment.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

#include "cobalt/dom/cross_thread_media_source_attachment.h"

#include <utility>

#include "base/synchronization/lock.h"
#include "base/task/sequenced_task_runner.h"
#include "cobalt/dom/media_source.h"
#include "cobalt/dom/media_source_attachment.h"
Expand All @@ -34,98 +37,312 @@ namespace dom {
CrossThreadMediaSourceAttachment::CrossThreadMediaSourceAttachment(
scoped_refptr<MediaSource> media_source)
: media_source_(media_source),
task_runner_(base::SequencedTaskRunner::GetCurrentDefault()),
// worker_runner_(base::SingleThreadTaskRunner::GetCurrentDefault()),
worker_runner_(base::SequencedTaskRunner::GetCurrentDefault()),
recent_element_time_(0.0),
element_has_error_(false) {}
element_has_error_(false),
element_has_max_video_capabilities_(false),
have_ever_attached_(false),
have_ever_started_closing_(false) {}

void CrossThreadMediaSourceAttachment::TraceMembers(script::Tracer* tracer) {
DCHECK_NE(task_runner_, base::SequencedTaskRunner::GetCurrentDefault());
// TODO(at-ninja): How does this need to work for cross-thread? Tracing
// should be called from both threads. Can this be forced for testing?
// Should this only trace the objects on the current thread, or should this
// go cross-thread to trace everything?

base::AutoLock auto_lock(attachment_state_lock_);
// DCHECK(worker_runner_->BelongsToCurrentThread());
DCHECK_EQ(worker_runner_, base::SequencedTaskRunner::GetCurrentDefault());

tracer->Trace(attached_element_);
tracer->Trace(media_source_);
tracer->Trace(attached_element_); // main thread.
tracer->Trace(media_source_); // worker thread.
}

bool CrossThreadMediaSourceAttachment::StartAttachingToMediaElement(
HTMLMediaElement* media_element) {
DCHECK_NE(task_runner_, base::SequencedTaskRunner::GetCurrentDefault());
NOTIMPLEMENTED();
return false;
base::AutoLock auto_lock(attachment_state_lock_);
// Called from main thread. At this point, we can only check if
// this is not being called from the worker thread.
// DCHECK(!worker_runner_->BelongsToCurrentThread());
DCHECK_NE(worker_runner_, base::SequencedTaskRunner::GetCurrentDefault());

if (have_ever_attached_) {
return false;
}

DCHECK(!have_ever_started_closing_);
DCHECK(!attached_element_);

// Attach on the main thread.
bool success = media_source_->StartAttachingToMediaElement(this);
if (!success) {
return false;
}

attached_element_ = base::AsWeakPtr(media_element);
// main_runner_ = base::SingleThreadTaskRunner::GetCurrentDefault();
main_runner_ = base::SequencedTaskRunner::GetCurrentDefault();
// DCHECK(main_runner_->BelongsToCurrentThread());
DCHECK_EQ(main_runner_, base::SequencedTaskRunner::GetCurrentDefault());

// Grab initial state from the HTMLMediaElement while on the main thread.
recent_element_time_ = media_element->current_time(NULL);
element_has_error_ = static_cast<bool>(media_element->error());
element_has_max_video_capabilities_ =
media_element->HasMaxVideoCapabilities();

DCHECK(!element_has_error_);

have_ever_attached_ = true;
return true;
}

void CrossThreadMediaSourceAttachment::CompleteAttachingToMediaElement(
::media::ChunkDemuxer* chunk_demuxer) {
DCHECK_NE(task_runner_, base::SequencedTaskRunner::GetCurrentDefault());
NOTIMPLEMENTED();
base::AutoLock auto_lock(attachment_state_lock_);
DCHECK(have_ever_attached_);
// DCHECK(main_runner_->BelongsToCurrentThread());
DCHECK_EQ(main_runner_, base::SequencedTaskRunner::GetCurrentDefault());
// DCHECK(!worker_runner_->BelongsToCurrentThread());
DCHECK_NE(worker_runner_, base::SequencedTaskRunner::GetCurrentDefault());

worker_runner_->PostTask(
FROM_HERE,
base::BindOnce(&CrossThreadMediaSourceAttachment::
CompleteAttachingToMediaElementOnWorkerThread,
this, chunk_demuxer));
}

void CrossThreadMediaSourceAttachment::
CompleteAttachingToMediaElementOnWorkerThread(
::media::ChunkDemuxer* chunk_demuxer) {
base::AutoLock auto_lock(attachment_state_lock_);
// DCHECK(worker_runner_->BelongsToCurrentThread());
DCHECK_EQ(worker_runner_, base::SequencedTaskRunner::GetCurrentDefault());
// DCHECK(!main_runner_->BelongsToCurrentThread());
DCHECK_NE(main_runner_, base::SequencedTaskRunner::GetCurrentDefault());

if (have_ever_started_closing_) {
return;
}

media_source_->CompleteAttachingToMediaElement(chunk_demuxer);
}

void CrossThreadMediaSourceAttachment::Close() {
DCHECK_NE(task_runner_, base::SequencedTaskRunner::GetCurrentDefault());
NOTIMPLEMENTED();
base::AutoLock auto_lock(attachment_state_lock_);
DCHECK(have_ever_attached_);
DCHECK(!have_ever_started_closing_);
// DCHECK(main_runner_->BelongsToCurrentThread());
DCHECK_EQ(main_runner_, base::SequencedTaskRunner::GetCurrentDefault());
// DCHECK(!worker_runner_->BelongsToCurrentThread());
DCHECK_NE(worker_runner_, base::SequencedTaskRunner::GetCurrentDefault());

have_ever_started_closing_ = true;

worker_runner_->PostTask(
FROM_HERE,
base::BindOnce(&CrossThreadMediaSourceAttachment::CloseOnWorkerThread,
this));
}

void CrossThreadMediaSourceAttachment::CloseOnWorkerThread() {
base::AutoLock auto_lock(attachment_state_lock_);
DCHECK(have_ever_started_closing_);
DCHECK(attached_element_);
// DCHECK(worker_runner_->BelongsToCurrentThread());
DCHECK_EQ(worker_runner_, base::SequencedTaskRunner::GetCurrentDefault());
// DCHECK(!main_runner_->BelongsToCurrentThread());
DCHECK_NE(main_runner_, base::SequencedTaskRunner::GetCurrentDefault());

media_source_->Close();
}

scoped_refptr<TimeRanges> CrossThreadMediaSourceAttachment::GetBufferedRange()
const {
DCHECK_NE(task_runner_, base::SequencedTaskRunner::GetCurrentDefault());
NOTIMPLEMENTED();
return nullptr;
base::AutoLock auto_lock(attachment_state_lock_);
DCHECK(attached_element_);
DCHECK(!have_ever_started_closing_);
DCHECK(have_ever_attached_);
// DCHECK(main_runner_->BelongsToCurrentThread());
DCHECK_EQ(main_runner_, base::SequencedTaskRunner::GetCurrentDefault());
// DCHECK(!worker_runner_->BelongsToCurrentThread());
DCHECK_NE(worker_runner_, base::SequencedTaskRunner::GetCurrentDefault());

return media_source_->GetBufferedRange();
}

MediaSourceReadyState CrossThreadMediaSourceAttachment::GetReadyState() const {
DCHECK_NE(task_runner_, base::SequencedTaskRunner::GetCurrentDefault());
NOTIMPLEMENTED();
return kMediaSourceReadyStateClosed;
base::AutoLock auto_lock(attachment_state_lock_);
DCHECK(attached_element_);
DCHECK(!have_ever_started_closing_);
DCHECK(have_ever_attached_);
// DCHECK(main_runner_->BelongsToCurrentThread());
DCHECK_EQ(main_runner_, base::SequencedTaskRunner::GetCurrentDefault());
// DCHECK(!worker_runner_->BelongsToCurrentThread());
DCHECK_NE(worker_runner_, base::SequencedTaskRunner::GetCurrentDefault());

return media_source_->ready_state();
}

void CrossThreadMediaSourceAttachment::NotifyDurationChanged(double duration) {
DCHECK_NE(task_runner_, base::SequencedTaskRunner::GetCurrentDefault());
NOTIMPLEMENTED();
void CrossThreadMediaSourceAttachment::NotifyDurationChanged(
double /*duration*/) {
attachment_state_lock_.AssertAcquired();
// DCHECK(worker_runner_->BelongsToCurrentThread());
DCHECK_EQ(worker_runner_, base::SequencedTaskRunner::GetCurrentDefault());

// No-op for cross thread MSA.
}

bool CrossThreadMediaSourceAttachment::HasMaxVideoCapabilities() const {
DCHECK_NE(task_runner_, base::SequencedTaskRunner::GetCurrentDefault());
NOTIMPLEMENTED();
return false;
attachment_state_lock_.AssertAcquired();
// DCHECK(worker_runner_->BelongsToCurrentThread());
DCHECK_EQ(worker_runner_, base::SequencedTaskRunner::GetCurrentDefault());

return element_has_max_video_capabilities_;
}

double CrossThreadMediaSourceAttachment::GetRecentMediaTime() {
DCHECK_NE(task_runner_, base::SequencedTaskRunner::GetCurrentDefault());
NOTIMPLEMENTED();
return 0;
attachment_state_lock_.AssertAcquired();
// DCHECK(worker_runner_->BelongsToCurrentThread());
DCHECK_EQ(worker_runner_, base::SequencedTaskRunner::GetCurrentDefault());

return recent_element_time_;
}

bool CrossThreadMediaSourceAttachment::GetElementError() {
DCHECK_NE(task_runner_, base::SequencedTaskRunner::GetCurrentDefault());
NOTIMPLEMENTED();
return false;
attachment_state_lock_.AssertAcquired();
// DCHECK(worker_runner_->BelongsToCurrentThread());
DCHECK_EQ(worker_runner_, base::SequencedTaskRunner::GetCurrentDefault());

return element_has_error_;
}

scoped_refptr<AudioTrackList>
CrossThreadMediaSourceAttachment::CreateAudioTrackList(
script::EnvironmentSettings* settings) {
DCHECK_NE(task_runner_, base::SequencedTaskRunner::GetCurrentDefault());
NOTIMPLEMENTED();
return nullptr;
}

scoped_refptr<VideoTrackList>
CrossThreadMediaSourceAttachment::CreateVideoTrackList(
script::EnvironmentSettings* settings) {
DCHECK_NE(task_runner_, base::SequencedTaskRunner::GetCurrentDefault());
NOTIMPLEMENTED();
return nullptr;
}

void CrossThreadMediaSourceAttachment::OnElementTimeUpdate(double time) {
NOTIMPLEMENTED();
base::AutoLock auto_lock(attachment_state_lock_);
// DCHECK(main_runner_->BelongsToCurrentThread());
DCHECK_EQ(main_runner_, base::SequencedTaskRunner::GetCurrentDefault());
// DCHECK(!worker_runner_->BelongsToCurrentThread());
DCHECK_NE(worker_runner_, base::SequencedTaskRunner::GetCurrentDefault());

worker_runner_->PostTask(
FROM_HERE,
base::BindOnce(
&CrossThreadMediaSourceAttachment::UpdateWorkerThreadTimeCache, this,
time));
}

void CrossThreadMediaSourceAttachment::UpdateWorkerThreadTimeCache(
double time) {
base::AutoLock auto_lock(attachment_state_lock_);
// DCHECK(worker_runner_->BelongsToCurrentThread());
DCHECK_EQ(worker_runner_, base::SequencedTaskRunner::GetCurrentDefault());
// DCHECK(!main_runner_->BelongsToCurrentThread());
DCHECK_NE(main_runner_, base::SequencedTaskRunner::GetCurrentDefault());

recent_element_time_ = time;
}

void CrossThreadMediaSourceAttachment::OnElementError() {
base::AutoLock auto_lock(attachment_state_lock_);
// DCHECK(main_runner_->BelongsToCurrentThread());
DCHECK_EQ(main_runner_, base::SequencedTaskRunner::GetCurrentDefault());
// DCHECK(!worker_runner_->BelongsToCurrentThread());
DCHECK_NE(worker_runner_, base::SequencedTaskRunner::GetCurrentDefault());

worker_runner_->PostTask(
FROM_HERE,
base::BindOnce(
&CrossThreadMediaSourceAttachment::HandleElementErrorOnWorkerThread,
this));
}

void CrossThreadMediaSourceAttachment::HandleElementErrorOnWorkerThread() {
base::AutoLock auto_lock(attachment_state_lock_);
DCHECK(!element_has_error_)
<< "At most one transition to element error per attachment is expected";
NOTIMPLEMENTED();
// DCHECK(worker_runner_->BelongsToCurrentThread());
DCHECK_EQ(worker_runner_, base::SequencedTaskRunner::GetCurrentDefault());
// DCHECK(main_runner_->BelongsToCurrentThread());
DCHECK_EQ(main_runner_, base::SequencedTaskRunner::GetCurrentDefault());

element_has_error_ = true;
}

void CrossThreadMediaSourceAttachment::OnElementHasMaxVideoCapabilitiesUpdate(
bool has_max_video_capabilities) {
base::AutoLock auto_lock(attachment_state_lock_);
// DCHECK(main_runner_->BelongsToCurrentThread());
DCHECK_EQ(main_runner_, base::SequencedTaskRunner::GetCurrentDefault());
// DCHECK(!worker_runner_->BelongsToCurrentThread());
DCHECK_NE(worker_runner_, base::SequencedTaskRunner::GetCurrentDefault());

worker_runner_->PostTask(
FROM_HERE,
base::BindOnce(&CrossThreadMediaSourceAttachment::
UpdateElementMaxVideoCapabilitiesOnWorkerThread,
this, has_max_video_capabilities));
}

void CrossThreadMediaSourceAttachment::
UpdateElementMaxVideoCapabilitiesOnWorkerThread(
bool has_max_video_capabilities) {
base::AutoLock auto_lock(attachment_state_lock_);
// DCHECK(worker_runner_->BelongsToCurrentThread());
DCHECK_EQ(worker_runner_, base::SequencedTaskRunner::GetCurrentDefault());
// DCHECK(main_runner_->BelongsToCurrentThread());
DCHECK_EQ(main_runner_, base::SequencedTaskRunner::GetCurrentDefault());

element_has_max_video_capabilities_ = has_max_video_capabilities;
}

bool CrossThreadMediaSourceAttachment::FullyAttachedOrSameThread() const {
attachment_state_lock_.AssertAcquired();
DCHECK(have_ever_attached_);
// DCHECK(worker_runner_->BelongsToCurrentThread());
DCHECK_EQ(worker_runner_, base::SequencedTaskRunner::GetCurrentDefault());
// DCHECK(!main_runner_->BelongsToCurrentThread());
DCHECK_NE(main_runner_, base::SequencedTaskRunner::GetCurrentDefault());

return attached_element_ && !have_ever_started_closing_;
}

bool CrossThreadMediaSourceAttachment::RunExclusively(
bool abort_if_not_fully_attached, RunExclusivelyCB cb) {
base::AutoLock auto_lock(attachment_state_lock_);

DCHECK(have_ever_attached_);
// DCHECK(worker_runner_->BelongsToCurrentThread());
DCHECK_EQ(worker_runner_, base::SequencedTaskRunner::GetCurrentDefault());

if (abort_if_not_fully_attached &&
(!attached_element_ || have_ever_started_closing_)) {
return false;
}

std::move(cb).Run();
return true;
}

void CrossThreadMediaSourceAttachment::
AssertCrossThreadMutexIsAcquiredForDebugging() {
attachment_state_lock_.AssertAcquired();
}

} // namespace dom
} // namespace cobalt
Loading

0 comments on commit 55fac9b

Please sign in to comment.