Skip to content
This repository has been archived by the owner on Jan 16, 2024. It is now read-only.

Commit

Permalink
Version 1.0 alexa-client-sdk
Browse files Browse the repository at this point in the history
Changes in this update
 - Added `AudioPlayer` capability agent.
   - Supports iHeartRadio.
 - `StateSynchronizer` has been updated to better enforce that `System.SynchronizeState`is the first Event sent on a connection to AVS.
 - Additional tests have been added to `ACL`.
 - The `Sample App` has been updated with several small fixes and improvements.
 - `ADSL` was updated such that all directives are now blocked while the handling of previous `SpeechSynthesizer.Speak`
    directives complete. Because any directive may now be blocked, the `preHandleDirective() / handleDirective()` path
    is now used for handling all directives.
 - Fixes for the following GitHub issues:
   - #44.
 - A bug causing `ACL` to not send a ping to AVS every 5 minutes, leading to periodic server disconnects, was fixed.
 - Subtle race condition issues were addressed in the `Executor` class, resolving some intermittent crashes.
  • Loading branch information
mradulan committed Aug 8, 2017
1 parent 1176875 commit d9a0cb7
Show file tree
Hide file tree
Showing 140 changed files with 7,731 additions and 1,040 deletions.
13 changes: 12 additions & 1 deletion ACL/include/ACL/AVSConnectionManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <AVSCommon/SDKInterfaces/ConnectionStatusObserverInterface.h>
#include <AVSCommon/SDKInterfaces/MessageObserverInterface.h>
#include <AVSCommon/SDKInterfaces/MessageSenderInterface.h>
#include <AVSCommon/SDKInterfaces/StateSynchronizerObserverInterface.h>

#include "ACL/Transport/MessageRouterInterface.h"
#include "ACL/Transport/MessageRouterObserverInterface.h"
Expand Down Expand Up @@ -69,6 +70,8 @@ namespace acl {
class AVSConnectionManager :
public avsCommon::sdkInterfaces::MessageSenderInterface,
public avsCommon::sdkInterfaces::AVSEndpointAssignerInterface,
/* TODO: ACSDK-421: Remove the implementation of StateSynchronizerObserverInterface */
public avsCommon::sdkInterfaces::StateSynchronizerObserverInterface,
public MessageRouterObserverInterface {
public:
/**
Expand All @@ -77,8 +80,9 @@ class AVSConnectionManager :
* @param messageRouter The entity which handles sending and receiving of AVS messages.
* @param isEnabled The enablement setting. If true, then the created object will attempt to connect to AVS.
* @param connectionStatusObservers An optional set of observers which will be notified when the connection status
* changes.
* changes. The observers cannot be a nullptr.
* @param messageObservers An optional set of observer which will be sent messages that arrive from AVS.
* The observers cannot be a nullptr.
* @return The created AVSConnectionManager object.
*/
static std::shared_ptr<AVSConnectionManager> create(
Expand Down Expand Up @@ -160,6 +164,9 @@ class AVSConnectionManager :

void sendMessage(std::shared_ptr<avsCommon::avs::MessageRequest> request) override;

/* TODO: ACSDK-421: Remove the implementation of StateSynchronizerObserverInterface */
void onStateChanged(avsCommon::sdkInterfaces::StateSynchronizerObserverInterface::State newState) override;

/**
* @note Set the URL endpoint for the AVS connection. Calling this function with a new value will cause the
* current active connection to be closed, and a new one opened to the new endpoint.
Expand Down Expand Up @@ -194,6 +201,10 @@ class AVSConnectionManager :
/// Internal state to indicate if the Connection object is enabled for making an AVS connection.
std::atomic<bool> m_isEnabled;

/* TODO: ACSDK-421: Remove the implementation of StateSynchronizerObserverInterface */
/// Internal object that flags if @c StateSynchronizer had sent the initial event successfully.
std::atomic<bool> m_isSynchronized;

/// Set of observers to notify when the connection status changes. @c m_connectionStatusObserverMutex must be
/// acquired before access.
std::unordered_set<std::shared_ptr<avsCommon::sdkInterfaces::ConnectionStatusObserverInterface>>
Expand Down
3 changes: 2 additions & 1 deletion ACL/include/ACL/Transport/MessageRouter.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ class MessageRouter: public MessageRouterInterface, public TransportObserverInte

ConnectionStatus getConnectionStatus() override;

void send(std::shared_ptr<avsCommon::avs::MessageRequest> request) override;
// TODO: ACSDK-421: Revert this to use send().
void sendMessage(std::shared_ptr<avsCommon::avs::MessageRequest> request) override;

void setAVSEndpoint(const std::string& avsEndpoint) override;

Expand Down
15 changes: 4 additions & 11 deletions ACL/include/ACL/Transport/MessageRouterInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@

#include "AVSCommon/Utils/Threading/Executor.h"
#include "AVSCommon/AVS/MessageRequest.h"
// TODO: ACSDK-421: Revert this to implement send().
#include "AVSCommon/SDKInterfaces/MessageSenderInterface.h"

#include "ACL/Transport/MessageRouterObserverInterface.h"
#include "ACL/Transport/TransportInterface.h"
Expand All @@ -37,7 +39,8 @@ namespace acl {
*
* Implementations of this class are required to be thread-safe.
*/
class MessageRouterInterface {
// TODO: ACSDK-421: Remove the inheritance from MessageSenderInterface.
class MessageRouterInterface : public avsCommon::sdkInterfaces::MessageSenderInterface {
public:
/// Alias to a connection status and changed reason pair.
using ConnectionStatus = std::pair<avsCommon::sdkInterfaces::ConnectionStatusObserverInterface::Status,
Expand All @@ -63,16 +66,6 @@ class MessageRouterInterface {
*/
virtual ConnectionStatus getConnectionStatus() = 0;

/**
* Send a message to AVS.
* If the underlying implementation is not connected, or is in the process of changing its connection state,
* this function should do nothing and return false.
* Otherwise it should enqueue the message to be sent, and return true.
* @param request The message to be sent to AVS.
* @return Whether the underlying implementation successfully enqueued the message to be sent.
*/
virtual void send(std::shared_ptr<avsCommon::avs::MessageRequest> request) = 0;

/**
* Set the URL endpoint for the AVS connection. Calling this function with a new value will cause the
* current active connection to be closed, and a new one opened to the new endpoint.
Expand Down
38 changes: 33 additions & 5 deletions ACL/src/AVSConnectionManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,33 @@ AVSConnectionManager::create(std::shared_ptr<MessageRouterInterface> messageRout
std::unordered_set<std::shared_ptr<ConnectionStatusObserverInterface>> connectionStatusObservers,
std::unordered_set<std::shared_ptr<avsCommon::sdkInterfaces::MessageObserverInterface>> messageObservers) {
if (!avsCommon::avs::initialization::AlexaClientSDKInit::isInitialized()) {
ACSDK_ERROR(LX("createFailed").d("reason", "uninitialziedAlexaClientSdk").d("return", "nullPtr"));
ACSDK_ERROR(LX("createFailed").d("reason", "uninitialziedAlexaClientSdk").d("return", "nullptr"));
return nullptr;
}

if (!messageRouter) {
ACSDK_ERROR(LX("createFailed").d("reason", "nullMessageRouter").d("return", "nullptr"));
return nullptr;
}

for (auto observer : connectionStatusObservers) {
if (!observer) {
ACSDK_ERROR(LX("createFailed").d("reason", "nullConnectionStatusObserver").d("return", "nullptr"));
return nullptr;
}
}

for(auto observer : messageObservers) {
if (!observer) {
ACSDK_ERROR(LX("createFailed").d("reason", "nullMessageObserver").d("return", "nullptr"));
return nullptr;
}
}

auto connectionManager = std::shared_ptr<AVSConnectionManager>(
new AVSConnectionManager(messageRouter, connectionStatusObservers, messageObservers));

if (messageRouter) {
messageRouter->setObserver(connectionManager);
}
messageRouter->setObserver(connectionManager);

if (isEnabled) {
connectionManager->enable();
Expand All @@ -66,6 +83,7 @@ AVSConnectionManager::AVSConnectionManager(
std::unordered_set<std::shared_ptr<ConnectionStatusObserverInterface>> connectionStatusObservers,
std::unordered_set<std::shared_ptr<MessageObserverInterface>> messageObservers)
: m_isEnabled{false},
m_isSynchronized{false},
m_connectionStatusObservers{connectionStatusObservers},
m_messageObservers{messageObservers},
m_messageRouter{messageRouter} {
Expand Down Expand Up @@ -93,7 +111,13 @@ void AVSConnectionManager::reconnect() {
}

void AVSConnectionManager::sendMessage(std::shared_ptr<avsCommon::avs::MessageRequest> request) {
m_messageRouter->send(request);
// TODO: ACSDK-421: Implement synchronized state check at a lower level.
if (m_isSynchronized) {
m_messageRouter->sendMessage(request);
} else {
ACSDK_DEBUG(LX("sendMessageNotSuccessful").d("reason", "notSynchronized"));
request->onSendCompleted(avsCommon::avs::MessageRequest::Status::NOT_SYNCHRONIZED);
}
}

bool AVSConnectionManager::isConnected() const {
Expand Down Expand Up @@ -167,6 +191,10 @@ void AVSConnectionManager::onConnectionStatusChanged(
}
}

void AVSConnectionManager::onStateChanged(StateSynchronizerObserverInterface::State newState) {
m_isSynchronized = (StateSynchronizerObserverInterface::State::SYNCHRONIZED == newState);
}

void AVSConnectionManager::receive(const std::string & contextId, const std::string & message) {
std::unique_lock<std::mutex> lock{m_messageOberverMutex};
std::unordered_set<std::shared_ptr<avsCommon::sdkInterfaces::MessageObserverInterface>> observers{m_messageObservers};
Expand Down
18 changes: 15 additions & 3 deletions ACL/src/Transport/HTTP2Stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ using namespace alexaClientSDK::avsCommon::utils;
using namespace avsCommon::avs;
using namespace avsCommon::avs::attachment;


/// String to identify log entries originating from this file.
static const std::string TAG("HTTP2Stream");

Expand Down Expand Up @@ -209,20 +210,24 @@ bool HTTP2Stream::initPost(const std::string& url, const std::string& authToken,
}

if (!m_transfer.setPostContent(METADATA_FIELD_NAME, requestPayload)) {
ACSDK_ERROR(LX("initPostFailed").d("reason", "setPostContentFailed"));
return false;
}

if (!m_transfer.setReadCallback(HTTP2Stream::readCallback, this)) {
ACSDK_ERROR(LX("initPostFailed").d("reason", "setReadCallbackFailed"));
return false;
}

if (request->getAttachmentReader()) {
if (!m_transfer.setPostStream(ATTACHMENT_FIELD_NAME, this)) {
ACSDK_ERROR(LX("initPostFailed").d("reason", "setPostStreamFailed"));
return false;
}
}

if (!m_transfer.setTransferType(CurlEasyHandleWrapper::TransferType::kPOST)) {
ACSDK_ERROR(LX("initPostFailed").d("reason", "setTransferTypeFailed"));
return false;
}

Expand Down Expand Up @@ -262,6 +267,10 @@ size_t HTTP2Stream::writeCallback(char *data, size_t size, size_t nmemb, void *u
}

size_t HTTP2Stream::headerCallback(char *data, size_t size, size_t nmemb, void *user) {
if (!user) {
ACSDK_ERROR(LX("headerCallbackFailed").d("reason","nullUser"));
return 0;
}
size_t headerLength = size * nmemb;
std::string header(data, headerLength);
#ifdef DEBUG
Expand All @@ -285,9 +294,14 @@ size_t HTTP2Stream::headerCallback(char *data, size_t size, size_t nmemb, void *
}

size_t HTTP2Stream::readCallback(char *data, size_t size, size_t nmemb, void *userData) {
if (!userData) {
ACSDK_ERROR(LX("readCallbackFailed").d("reason","nullUserData"));
return 0;
}

HTTP2Stream *stream = static_cast<HTTP2Stream*>(userData);
stream->m_timeOfLastTransfer = getNow();

stream->m_timeOfLastTransfer = getNow();
auto attachmentReader = stream->m_currentRequest->getAttachmentReader();

// This is ok - it means there's no attachment to send. Return 0 so libcurl can complete the stream to AVS.
Expand All @@ -301,7 +315,6 @@ size_t HTTP2Stream::readCallback(char *data, size_t size, size_t nmemb, void *us
auto bytesRead = attachmentReader->read(data, maxBytesToRead, &readStatus);

switch (readStatus) {

// The good cases.
case AttachmentReader::ReadStatus::OK:
case AttachmentReader::ReadStatus::OK_WOULDBLOCK:
Expand All @@ -318,7 +331,6 @@ size_t HTTP2Stream::readCallback(char *data, size_t size, size_t nmemb, void *us
case AttachmentReader::ReadStatus::ERROR_INTERNAL:
return CURL_READFUNC_ABORT;
}

// The attachment has no more data right now, but is still readable.
if (0 == bytesRead) {
stream->setPaused(true);
Expand Down
10 changes: 6 additions & 4 deletions ACL/src/Transport/HTTP2Transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -344,11 +344,9 @@ void HTTP2Transport::networkLoop() {
* While the connection is alive we should have at least 1 transfer active (the downchannel).
*/
int numTransfersLeft = 1;
int timeouts = 0;
while (numTransfersLeft && !isStopping()) {

int numTransfersUpdated = 0;
int timeouts = 0;

CURLMcode ret = curl_multi_perform(m_multi->handle, &numTransfersLeft);
if (CURLM_CALL_MULTI_PERFORM == ret) {
continue;
Expand Down Expand Up @@ -389,6 +387,7 @@ void HTTP2Transport::networkLoop() {

//TODO: ACSDK-69 replace timeout with signal fd
//TODO: ACSDK-281 - investigate the timeout values and performance consequences for curl_multi_wait.
int numTransfersUpdated = 0;
ret = curl_multi_wait(m_multi->handle, NULL, 0, multiWaitTimeoutMs, &numTransfersUpdated);
if (ret != CURLM_OK) {
ACSDK_ERROR(LX("networkLoopStopping")
Expand Down Expand Up @@ -487,7 +486,6 @@ void HTTP2Transport::networkLoop() {
bool HTTP2Transport::establishConnection() {
// Set numTransferLeft to 1 because the downchannel stream has been added already.
int numTransfersLeft = 1;
int numTransfersUpdated = 0;

/*
* Calls curl_multi_perform until downchannel stream receives an HTTP2 response code. If the downchannel stream
Expand Down Expand Up @@ -526,6 +524,7 @@ bool HTTP2Transport::establishConnection() {
setIsStopping(ConnectionStatusObserverInterface::ChangedReason::INTERNAL_ERROR);
}
// wait for activity on the downchannel stream, kinda like poll()
int numTransfersUpdated = 0;
ret = curl_multi_wait(m_multi->handle, NULL, 0 , WAIT_FOR_ACTIVITY_TIMEOUT_MS, &numTransfersUpdated);
if (ret != CURLM_OK) {
ACSDK_ERROR(LX("establishConnectionFailed")
Expand Down Expand Up @@ -648,6 +647,8 @@ void HTTP2Transport::processNextOutgoingMessage() {
}

bool HTTP2Transport::sendPing() {
ACSDK_DEBUG(LX("sendPing").d("pingStream", m_pingStream.get()));

if (m_pingStream) {
return true;
}
Expand Down Expand Up @@ -687,6 +688,7 @@ bool HTTP2Transport::sendPing() {
}

void HTTP2Transport::handlePingResponse() {
ACSDK_DEBUG(LX("handlePingResponse"));
if (HTTP2Stream::HTTPResponseCodes::SUCCESS_NO_CONTENT != m_pingStream->getResponseCode()) {
ACSDK_ERROR(LX("pingFailed")
.d("responseCode", m_pingStream->getResponseCode()));
Expand Down
3 changes: 2 additions & 1 deletion ACL/src/Transport/MessageRouter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ void MessageRouter::disable() {
disconnectAllTransportsLocked(lock, ConnectionStatusObserverInterface::ChangedReason::ACL_CLIENT_REQUEST);
}

void MessageRouter::send(std::shared_ptr<MessageRequest> request) {
// TODO: ACSDK-421: Revert this to use send().
void MessageRouter::sendMessage(std::shared_ptr<MessageRequest> request) {
if (!request) {
ACSDK_ERROR(LX("sendFailed").d("reason", "nullRequest"));
return;
Expand Down
Loading

0 comments on commit d9a0cb7

Please sign in to comment.