diff --git a/CCDB/include/CCDB/CCDBDownloader.h b/CCDB/include/CCDB/CCDBDownloader.h index 031f656452fe8..485b7b4335147 100644 --- a/CCDB/include/CCDB/CCDBDownloader.h +++ b/CCDB/include/CCDB/CCDBDownloader.h @@ -11,6 +11,10 @@ #ifndef O2_CCDBDOWNLOADER_H_ #define O2_CCDBDOWNLOADER_H_ +#if !defined(__CINT__) && !defined(__MAKECINT__) && !defined(__ROOTCLING__) && !defined(__CLING__) +#include "MemoryResources/MemoryResources.h" +#endif + #include #include #include @@ -21,6 +25,8 @@ #include #include #include +#include +#include typedef struct uv_loop_s uv_loop_t; typedef struct uv_timer_s uv_timer_t; @@ -32,6 +38,25 @@ typedef struct uv_handle_s uv_handle_t; namespace o2::ccdb { +#if !defined(__CINT__) && !defined(__MAKECINT__) && !defined(__ROOTCLING__) && !defined(__CLING__) +struct HeaderObjectPair_t { + std::multimap header; + o2::pmr::vector* object = nullptr; + int counter = 0; +}; + +typedef struct DownloaderRequestData { + std::vector hosts; + std::string path; + long timestamp; + HeaderObjectPair_t hoPair; + std::map* headers; + + std::function localContentCallback; + bool errorflag = false; +} DownloaderRequestData; +#endif + /* Some functions below aren't member functions of CCDBDownloader because both curl and libuv require callback functions which have to be either static or non-member. Because non-static functions are used in the functions below, they must be non-member. @@ -133,6 +158,14 @@ class CCDBDownloader */ std::vector batchBlockingPerform(std::vector const& handleVector); + /** + * Schedules an asynchronous transfer but doesn't perform it. + * + * @param handle Handle to be performed on. + * @param requestCounter Counter shared by a batch of CURL handles. + */ + void asynchSchedule(CURL* handle, size_t* requestCounter); + /** * Limits the number of parallel connections. Should be used only if no transfers are happening. */ @@ -176,6 +209,14 @@ class CCDBDownloader void runLoop(bool noWait); private: + /** + * Returns a vector of possible content locations based on the redirect headers. + * + * @param baseUrl Content path. + * @param headerMap Map containing response headers. + */ + std::vector getLocations(std::string baseUrl, std::multimap* headerMap) const; + std::string mUserAgentId = "CCDBDownloader"; /** * Sets up internal UV loop. @@ -207,8 +248,7 @@ class CCDBDownloader */ enum RequestType { BLOCKING, - ASYNCHRONOUS, - ASYNCHRONOUS_WITH_CALLBACK + ASYNCHRONOUS }; /** @@ -230,20 +270,19 @@ class CCDBDownloader DataForSocket mSocketData; +#if !defined(__CINT__) && !defined(__MAKECINT__) && !defined(__ROOTCLING__) && !defined(__CLING__) /** * Structure which is stored in a easy_handle. It carries information about the request which the easy_handle is part of. - * All easy handles coming from one request have an identical PerformData structure. */ typedef struct PerformData { - std::condition_variable* cv; - bool* completionFlag; CURLcode* codeDestination; - void (*cbFun)(void*); - std::thread* cbThread; - void* cbData; size_t* requestsLeft; RequestType type; + int hostInd; + int locInd; + DownloaderRequestData* requestData; } PerformData; +#endif /** * Called by CURL in order to close a socket. It will be called by CURL even if a timeout timer closed the socket beforehand. @@ -253,6 +292,20 @@ class CCDBDownloader */ static void closesocketCallback(void* clientp, curl_socket_t item); +#if !defined(__CINT__) && !defined(__MAKECINT__) && !defined(__ROOTCLING__) && !defined(__CLING__) + // Reschedules the transfer to be performed with a different host. + void tryNewHost(PerformData* performData, CURL* easy_handle); + + // Retrieves content from either alien, cvmfs or local storage using a callback to CCDBApi. + void getLocalContent(PerformData* performData, std::string& newUrl, std::string& newLocation, bool& contentRetrieved, std::vector& locations); + + // Continues a transfer via a http redirect. + void httpRedirect(PerformData* performData, std::string& newUrl, std::string& newLocation, CURL* easy_handle); + + // Continues a transfer via a redirect. The redirect can point to a local file, alien file or a http address. + void followRedirect(PerformData* performData, CURL* easy_handle, std::vector& locations, bool& rescheduled, bool& contentRetrieved); +#endif + /** * Is used to react to polling file descriptors in poll_handle. * @@ -322,10 +375,12 @@ class CCDBDownloader */ void checkMultiInfo(); +#if !defined(__CINT__) && !defined(__MAKECINT__) && !defined(__ROOTCLING__) && !defined(__CLING__) /** * Set openSocketCallback and closeSocketCallback with appropriate arguments. Stores data inside the CURL handle. */ void setHandleOptions(CURL* handle, PerformData* data); +#endif /** * Create structure holding information about a socket including a poll handle assigned to it diff --git a/CCDB/include/CCDB/CcdbApi.h b/CCDB/include/CCDB/CcdbApi.h index e8d3a3f12d100..a906375ccadf1 100644 --- a/CCDB/include/CCDB/CcdbApi.h +++ b/CCDB/include/CCDB/CcdbApi.h @@ -24,18 +24,20 @@ #include #include #include "CCDB/CcdbObjectInfo.h" -#include "CCDB/CCDBDownloader.h" #include #include #include #if !defined(__CINT__) && !defined(__MAKECINT__) && !defined(__ROOTCLING__) && !defined(__CLING__) #include "MemoryResources/MemoryResources.h" +#include #include #else class TJAlienCredentials; #endif +#include "CCDB/CCDBDownloader.h" + class TFile; class TGrid; @@ -342,14 +344,43 @@ class CcdbApi //: public DatabaseInterface TObject* retrieveFromTFile(std::string const& path, std::map const& metadata, long timestamp, std::map* headers, std::string const& etag, const std::string& createdNotAfter, const std::string& createdNotBefore) const; - #if !defined(__CINT__) && !defined(__MAKECINT__) && !defined(__ROOTCLING__) && !defined(__CLING__) + typedef struct RequestContext { + o2::pmr::vector& dest; + std::string path; + std::map const& metadata; + long timestamp; + std::map& headers; + std::string etag; + std::string createdNotAfter; + std::string createdNotBefore; + bool considerSnapshot; + + RequestContext(o2::pmr::vector& d, + std::map const& m, + std::map& h) + : dest(d), metadata(m), headers(h) {} + } RequestContext; + + // Stores file associated with requestContext as a snapshot. + void saveSnapshot(RequestContext& requestContext) const; + + // Schedules download via CCDBDownloader, but doesn't perform it until mUVLoop is ran. + void scheduleDownload(RequestContext& requestContext, size_t* requestCounter) const; + + void getFromSnapshot(bool createSnapshot, std::string const& path, + long timestamp, std::map headers, + std::string& snapshotpath, o2::pmr::vector& dest, int& fromSnapshot, std::string const& etag) const; + void releaseNamedSemaphore(boost::interprocess::named_semaphore* sem, std::string path) const; + boost::interprocess::named_semaphore* createNamedSempahore(std::string path) const; void loadFileToMemory(o2::pmr::vector& dest, const std::string& path, std::map* localHeaders = nullptr) const; void loadFileToMemory(o2::pmr::vector& dest, std::string const& path, std::map const& metadata, long timestamp, std::map* headers, std::string const& etag, const std::string& createdNotAfter, const std::string& createdNotBefore, bool considerSnapshot = true) const; - void navigateURLsAndLoadFileToMemory(o2::pmr::vector& dest, CURL* curl_handle, std::string const& url, std::map* headers) const; + + // Loads files from alien and cvmfs into given destination. + bool loadLocalContentToMemory(o2::pmr::vector& dest, std::string& url) const; // the failure to load the file to memory is signaled by 0 size and non-0 capacity static bool isMemoryFileInvalid(const o2::pmr::vector& v) { return v.size() == 0 && v.capacity() > 0; } @@ -364,12 +395,36 @@ class CcdbApi //: public DatabaseInterface } return obj; } + + /** + * Retrieves files either as snapshot or schedules them to be downloaded via CCDBDownloader. + * + * @param requestContext Structure giving details about the transfer. + * @param fromSnapshot After navigateSourcesAndLoadFile returns signals whether file was retrieved from snapshot. + * @param requestCounter Pointer to the variable storing the number of requests to be done. + */ + void navigateSourcesAndLoadFile(RequestContext& requestContext, int& fromSnapshot, size_t* requestCounter) const; + + /** + * Retrieves files described via RequestContexts into memory. Downloads are performed in parallel via CCDBDownloader. + * + * @param requestContext Structure giving details about the transfer. + */ + void vectoredLoadFileToMemory(std::vector& requestContext) const; #endif private: // Sets the unique agent ID void setUniqueAgentID(); + /** + * Schedules download of data associated with the curl_handle. Doing that increments the requestCounter by 1. Requests are performed by running the mUVLoop + * + * @param handle CURL handle associated with the request. + * @param requestCounter Pointer to the variable storing the number of requests to be done. + */ + void asynchPerform(CURL* handle, size_t* requestCounter) const; + // internal helper function to update a CCDB file with meta information static void updateMetaInformationInLocalFile(std::string const& filename, std::map const* headers, CCDBQuery const* querysummary = nullptr); @@ -550,7 +605,7 @@ class CcdbApi //: public DatabaseInterface CURLcode CURL_perform(CURL* handle) const; mutable CCDBDownloader* mDownloader = nullptr; //! the multi-handle (async) CURL downloader - bool mIsCCDBDownloaderEnabled = false; + bool mIsCCDBDownloaderPreferred = false; /// Base URL of the CCDB (with port) std::string mUniqueAgentID{}; // Unique User-Agent ID communicated to server for logging std::string mUrl{}; diff --git a/CCDB/src/CCDBDownloader.cxx b/CCDB/src/CCDBDownloader.cxx index 2ff600c96f2f3..2f6571c992c4e 100644 --- a/CCDB/src/CCDBDownloader.cxx +++ b/CCDB/src/CCDBDownloader.cxx @@ -344,31 +344,130 @@ void CCDBDownloader::destroyCurlContext(curl_context_t* context) uv_close((uv_handle_t*)context->poll_handle, curlCloseCB); } +void CCDBDownloader::tryNewHost(PerformData* performData, CURL* easy_handle) +{ + auto requestData = performData->requestData; + std::string newUrl = requestData->hosts.at(performData->hostInd) + "/" + requestData->path + "/" + std::to_string(requestData->timestamp); + LOG(debug) << "Connecting to another host " << newUrl; + requestData->hoPair.header.clear(); + curl_easy_setopt(easy_handle, CURLOPT_URL, newUrl.c_str()); + mHandlesToBeAdded.push_back(easy_handle); +} + +void CCDBDownloader::getLocalContent(PerformData* performData, std::string& newUrl, std::string& newLocation, bool& contentRetrieved, std::vector& locations) +{ + auto requestData = performData->requestData; + newUrl = newLocation; + LOG(debug) << "Redirecting to local content " << newUrl << "\n"; + if (requestData->localContentCallback(newUrl)) { + contentRetrieved = true; + } else { + // Prepare next redirect url + newLocation = (performData->locInd < locations.size()) ? locations.at(performData->locInd) : ""; + performData->locInd++; + } +} + +void CCDBDownloader::httpRedirect(PerformData* performData, std::string& newUrl, std::string& newLocation, CURL* easy_handle) +{ + auto requestData = performData->requestData; + newUrl = requestData->hosts.at(performData->hostInd) + newLocation; + LOG(debug) << "Trying content location " << newUrl; + curl_easy_setopt(easy_handle, CURLOPT_URL, newUrl.c_str()); + mHandlesToBeAdded.push_back(easy_handle); +} + +void CCDBDownloader::followRedirect(PerformData* performData, CURL* easy_handle, std::vector& locations, bool& rescheduled, bool& contentRetrieved) +{ + std::string newLocation = locations.at(performData->locInd++); + std::string newUrl; + if (newLocation.find("alien:/", 0) != std::string::npos || newLocation.find("file:/", 0) != std::string::npos) { + getLocalContent(performData, newUrl, newLocation, contentRetrieved, locations); + } + if (!contentRetrieved && newLocation != "") { + httpRedirect(performData, newUrl, newLocation, easy_handle); + rescheduled = true; + } +} + void CCDBDownloader::transferFinished(CURL* easy_handle, CURLcode curlCode) { mHandlesInUse--; - PerformData* data; - curlEasyErrorCheck(curl_easy_getinfo(easy_handle, CURLINFO_PRIVATE, &data)); + PerformData* performData; + curlEasyErrorCheck(curl_easy_getinfo(easy_handle, CURLINFO_PRIVATE, &performData)); curlMultiErrorCheck(curl_multi_remove_handle(mCurlMultiHandle, easy_handle)); - *data->codeDestination = curlCode; + *performData->codeDestination = curlCode; - // If no requests left then signal finished based on type of operation - if (--(*data->requestsLeft) == 0) { - switch (data->type) { - case BLOCKING: - break; - case ASYNCHRONOUS: - // Temporary change before asynchronous calls are reintroduced - LOG(error) << "CCDBDownloader: Illegal request type"; - break; - case ASYNCHRONOUS_WITH_CALLBACK: - // Temporary change before asynchronous calls will callbacks are reintroduced - LOG(error) << "CCDBDownloader: Illegal request type"; - break; - } + bool rescheduled = false; + bool contentRetrieved = false; + + switch (performData->type) { + case BLOCKING: { + --(*performData->requestsLeft); + } break; + case ASYNCHRONOUS: { + DownloaderRequestData* requestData = performData->requestData; + + if (requestData->headers) { + for (auto& p : requestData->hoPair.header) { + (*requestData->headers)[p.first] = p.second; + } + } + if (requestData->errorflag && requestData->headers) { + (*requestData->headers)["Error"] = "An error occurred during retrieval"; + } + + // Log that transfer finished + long httpCode; + curl_easy_getinfo(easy_handle, CURLINFO_RESPONSE_CODE, &httpCode); + char* url; + curl_easy_getinfo(easy_handle, CURLINFO_EFFECTIVE_URL, &url); + LOG(debug) << "Transfer for " << url << " finished with code " << httpCode << "\n"; + + // Get alternative locations for the same host + auto locations = getLocations(requestData->hosts.at(performData->hostInd), &(requestData->hoPair.header)); + + // React to received http code + if (404 == httpCode) { + LOG(error) << "Requested resource does not exist: " << url; + } else if (304 == httpCode) { + LOGP(debug, "Object exists but I am not serving it since it's already in your possession"); + contentRetrieved = true; + } else if (300 <= httpCode && httpCode < 400 && performData->locInd < locations.size()) { + followRedirect(performData, easy_handle, locations, rescheduled, contentRetrieved); + } else if (200 <= httpCode && httpCode < 300) { + contentRetrieved = true; + } else { + LOG(error) << "Error in fetching object " << url << ", curl response code:" << httpCode; + } + + // Check if content was retrieved, or scheduled to be retrieved + if (!rescheduled && !contentRetrieved && performData->locInd == locations.size()) { + // Ran out of locations to redirect, try new host + if (++performData->hostInd < requestData->hosts.size()) { + tryNewHost(performData, easy_handle); + rescheduled = true; + } else { + LOG(error) << "File " << requestData->path << " could not be retrieved. No more hosts to try."; + } + } + + if (!rescheduled) { + // No more transfers will be done for this request, do cleanup specific for ASYNCHRONOUS calls + --(*performData->requestsLeft); + delete requestData; + delete performData->codeDestination; + if (!contentRetrieved) { + LOGP(alarm, "Curl request to {}, response code: {}", url, httpCode); + } + } + } break; + } + if (!rescheduled) { + // No more transfers will be done for this request, do general cleanup + delete performData; } - delete data; checkHandleQueue(); @@ -450,6 +549,34 @@ CURLcode CCDBDownloader::perform(CURL* handle) return batchBlockingPerform(handleVector).back(); } +std::vector CCDBDownloader::getLocations(std::string baseUrl, std::multimap* headerMap) const +{ + std::vector locs; + auto iter = headerMap->find("Location"); + if (iter != headerMap->end()) { + if (iter->second.find("/", 0) != std::string::npos) { + locs.push_back(iter->second); + } else { + locs.push_back(baseUrl + iter->second); + } + } + // add alternative locations (not yet included) + auto iter2 = headerMap->find("Content-Location"); + if (iter2 != headerMap->end()) { + auto range = headerMap->equal_range("Content-Location"); + for (auto it = range.first; it != range.second; ++it) { + if (std::find(locs.begin(), locs.end(), it->second) == locs.end()) { + if (it->second.find("alien", 0) != std::string::npos) { + locs.push_back(it->second); + } else { + locs.push_back(baseUrl + it->second); + } + } + } + } + return locs; +} + std::vector CCDBDownloader::batchBlockingPerform(std::vector const& handleVector) { std::vector codeVector(handleVector.size()); @@ -462,7 +589,6 @@ std::vector CCDBDownloader::batchBlockingPerform(std::vector co data->type = BLOCKING; data->requestsLeft = &requestsLeft; - setHandleOptions(handleVector[i], data); mHandlesToBeAdded.push_back(handleVector[i]); } @@ -470,7 +596,42 @@ std::vector CCDBDownloader::batchBlockingPerform(std::vector co while (requestsLeft > 0) { uv_run(mUVLoop, UV_RUN_ONCE); } + return codeVector; } +void CCDBDownloader::asynchSchedule(CURL* handle, size_t* requestCounter) +{ + (*requestCounter)++; + + CURLcode* codeVector = new CURLcode(); + + // Get data about request + DownloaderRequestData* requestData; + std::multimap* headerMap; + std::vector* hostsPool; + curl_easy_getinfo(handle, CURLINFO_PRIVATE, &requestData); + headerMap = &(requestData->hoPair.header); + hostsPool = &(requestData->hosts); + + // Prepare temporary data about transfer + auto* data = new CCDBDownloader::PerformData(); // Freed in transferFinished + data->codeDestination = codeVector; + *codeVector = CURLE_FAILED_INIT; + + data->type = ASYNCHRONOUS; + data->requestsLeft = requestCounter; + data->hostInd = 0; + data->locInd = 0; + data->requestData = requestData; + + // Prepare handle and schedule download + setHandleOptions(handle, data); + mHandlesToBeAdded.push_back(handle); + + checkHandleQueue(); + + // return codeVector; +} + } // namespace o2 diff --git a/CCDB/src/CcdbApi.cxx b/CCDB/src/CcdbApi.cxx index 0ae9099040c05..03b0c98973ef2 100644 --- a/CCDB/src/CcdbApi.cxx +++ b/CCDB/src/CcdbApi.cxx @@ -20,7 +20,6 @@ #include "CommonUtils/StringUtils.h" #include "CommonUtils/FileSystemUtils.h" #include "CommonUtils/MemFileHelper.h" -#include "MemoryResources/MemoryResources.h" #include "Framework/DefaultsHelpers.h" #include "Framework/DataTakingContext.h" #include @@ -61,24 +60,20 @@ CcdbApi::CcdbApi() setUniqueAgentID(); DeploymentMode deploymentMode = DefaultsHelpers::deploymentMode(); - mIsCCDBDownloaderEnabled = 0; + mIsCCDBDownloaderPreferred = 0; if (deploymentMode == DeploymentMode::OnlineDDS && deploymentMode == DeploymentMode::OnlineECS && deploymentMode == DeploymentMode::OnlineAUX && deploymentMode == DeploymentMode::FST) { - mIsCCDBDownloaderEnabled = 1; + mIsCCDBDownloaderPreferred = 1; } - if (getenv("ALICEO2_ENABLE_MULTIHANDLE_CCDBAPI")) { - mIsCCDBDownloaderEnabled = atoi(getenv("ALICEO2_ENABLE_MULTIHANDLE_CCDBAPI")); - } - if (mIsCCDBDownloaderEnabled) { - mDownloader = new CCDBDownloader(); + if (getenv("ALICEO2_ENABLE_MULTIHANDLE_CCDBAPI")) { // todo rename ALICEO2_ENABLE_MULTIHANDLE_CCDBAPI to ALICEO2_PREFER_MULTIHANDLE_CCDBAPI + mIsCCDBDownloaderPreferred = atoi(getenv("ALICEO2_ENABLE_MULTIHANDLE_CCDBAPI")); } + mDownloader = new CCDBDownloader(); } CcdbApi::~CcdbApi() { curl_global_cleanup(); - if (mDownloader) { - delete mDownloader; - } + delete mDownloader; } void CcdbApi::setUniqueAgentID() @@ -119,14 +114,12 @@ void CcdbApi::curlInit() CcdbApi::mJAlienCredentials->loadCredentials(); CcdbApi::mJAlienCredentials->selectPreferedCredentials(); - // allow to configure the socket timeout of CCDBDownloader, if activated (for some tuning studies) - if (mIsCCDBDownloaderEnabled) { - if (getenv("ALICEO2_CCDB_SOCKET_TIMEOUT")) { - auto timeoutMS = atoi(getenv("ALICEO2_CCDB_SOCKET_TIMEOUT")); - if (timeoutMS >= 0) { - LOG(info) << "Setting socket timeout to " << timeoutMS << " milliseconds"; - mDownloader->setKeepaliveTimeoutTime(timeoutMS); - } + // allow to configure the socket timeout of CCDBDownloader (for some tuning studies) + if (getenv("ALICEO2_CCDB_SOCKET_TIMEOUT")) { + auto timeoutMS = atoi(getenv("ALICEO2_CCDB_SOCKET_TIMEOUT")); + if (timeoutMS >= 0) { + LOG(info) << "Setting socket timeout to " << timeoutMS << " milliseconds"; + mDownloader->setKeepaliveTimeoutTime(timeoutMS); } } } @@ -1481,231 +1474,232 @@ std::string CcdbApi::getHostUrl(int hostIndex) const return hostsPool.at(hostIndex); } -void CcdbApi::loadFileToMemory(o2::pmr::vector& dest, std::string const& path, - std::map const& metadata, long timestamp, - std::map* headers, std::string const& etag, - const std::string& createdNotAfter, const std::string& createdNotBefore, bool considerSnapshot) const +void CcdbApi::scheduleDownload(RequestContext& requestContext, size_t* requestCounter) const { - LOGP(debug, "loadFileToMemory {} ETag=[{}]", path, etag); + auto data = new DownloaderRequestData(); // Deleted in transferFinished of CCDBDownloader.cxx + data->hoPair.object = &requestContext.dest; - // if we are in snapshot mode we can simply open the file, unless the etag is non-empty: - // this would mean that the object was is already fetched and in this mode we don't to validity checks! - bool createSnapshot = considerSnapshot && !mSnapshotCachePath.empty(); // create snaphot if absent - int fromSnapshot = 0; - boost::interprocess::named_semaphore* sem = nullptr; - std::string semhashedstring{}, snapshotpath{}, logfile{}; - std::unique_ptr logStream; - auto sem_release = [&sem, &semhashedstring, path, this]() { - if (sem) { - sem->post(); - if (sem->try_wait()) { // if nobody else is waiting remove the semaphore resource - sem->post(); - boost::interprocess::named_semaphore::remove(semhashedstring.c_str()); - } - } + auto signalError = [&chunk = requestContext.dest, &errorflag = data->errorflag]() { + chunk.clear(); + chunk.reserve(1); + errorflag = true; }; - if (createSnapshot) { // create named semaphore - std::hash hasher; - semhashedstring = "aliceccdb" + std::to_string(hasher(mSnapshotCachePath + path)).substr(0, 16); + std::function localContentCallback = [this, &requestContext](std::string url) { + return this->loadLocalContentToMemory(requestContext.dest, url); + }; + + auto writeCallback = [](void* contents, size_t size, size_t nmemb, void* chunkptr) { + auto& ho = *static_cast(chunkptr); + auto& chunk = *ho.object; + size_t realsize = size * nmemb, sz = 0; + ho.counter++; try { - sem = new boost::interprocess::named_semaphore(boost::interprocess::open_or_create_t{}, semhashedstring.c_str(), 1); + if (chunk.capacity() < chunk.size() + realsize) { + auto cl = ho.header.find("Content-Length"); + if (cl != ho.header.end()) { + sz = std::max(chunk.size() + realsize, (size_t)std::stol(cl->second)); + } else { + sz = chunk.size() + realsize; + // LOGP(debug, "SIZE IS NOT IN HEADER, allocate {}", sz); + } + chunk.reserve(sz); + } + char* contC = (char*)contents; + chunk.insert(chunk.end(), contC, contC + realsize); } catch (std::exception e) { - LOG(warn) << "Exception occurred during CCDB (cache) semaphore setup; Continuing without"; - sem = nullptr; + // LOGP(alarm, "failed to reserve {} bytes in CURL write callback (realsize = {}): {}", sz, realsize, e.what()); + realsize = 0; } - if (sem) { - sem->wait(); // wait until we can enter (no one else there) + return realsize; + }; + + CURL* curl_handle = curl_easy_init(); + curl_easy_setopt(curl_handle, CURLOPT_USERAGENT, mUniqueAgentID.c_str()); + string fullUrl = getFullUrlForRetrieval(curl_handle, requestContext.path, requestContext.metadata, requestContext.timestamp); + curl_slist* options_list = nullptr; + initCurlHTTPHeaderOptionsForRetrieve(curl_handle, options_list, requestContext.timestamp, &requestContext.headers, + requestContext.etag, requestContext.createdNotAfter, requestContext.createdNotBefore); + + data->headers = &requestContext.headers; + data->hosts = hostsPool; + data->path = requestContext.path; + data->timestamp = requestContext.timestamp; + data->localContentCallback = localContentCallback; + + curl_easy_setopt(curl_handle, CURLOPT_URL, fullUrl.c_str()); + initCurlOptionsForRetrieve(curl_handle, (void*)(&data->hoPair), writeCallback, false); + curl_easy_setopt(curl_handle, CURLOPT_HEADERFUNCTION, header_map_callbackhoPair.header)>); + curl_easy_setopt(curl_handle, CURLOPT_HEADERDATA, (void*)&(data->hoPair.header)); + curl_easy_setopt(curl_handle, CURLOPT_PRIVATE, (void*)data); + curlSetSSLOptions(curl_handle); + + asynchPerform(curl_handle, requestCounter); +} + +boost::interprocess::named_semaphore* CcdbApi::createNamedSempahore(std::string path) const +{ + std::hash hasher; + std::string semhashedstring = "aliceccdb" + std::to_string(hasher(mSnapshotCachePath + path)).substr(0, 16); + try { + return new boost::interprocess::named_semaphore(boost::interprocess::open_or_create_t{}, semhashedstring.c_str(), 1); + } catch (std::exception e) { + LOG(warn) << "Exception occurred during CCDB (cache) semaphore setup; Continuing without"; + return nullptr; + } +} + +void CcdbApi::releaseNamedSemaphore(boost::interprocess::named_semaphore* sem, std::string path) const +{ + if (sem) { + sem->post(); + if (sem->try_wait()) { // if nobody else is waiting remove the semaphore resource + sem->post(); + std::hash hasher; + std::string semhashedstring = "aliceccdb" + std::to_string(hasher(mSnapshotCachePath + path)).substr(0, 16); + boost::interprocess::named_semaphore::remove(semhashedstring.c_str()); } - logfile = mSnapshotCachePath + "/log"; - logStream = std::make_unique(logfile, ios_base::out | ios_base::app); - if (logStream->is_open()) { - *logStream.get() << "CCDB-access[" << getpid() << "] of " << mUniqueAgentID << " to " << path << " timestamp " << timestamp << " for load to memory\n"; + } +} + +void CcdbApi::getFromSnapshot(bool createSnapshot, std::string const& path, + long timestamp, std::map headers, + std::string& snapshotpath, o2::pmr::vector& dest, int& fromSnapshot, std::string const& etag) const +{ + if (createSnapshot) { // create named semaphore + std::string logfile = mSnapshotCachePath + "/log"; + std::fstream logStream = std::fstream(logfile, ios_base::out | ios_base::app); + if (logStream.is_open()) { + logStream << "CCDB-access[" << getpid() << "] of " << mUniqueAgentID << " to " << path << " timestamp " << timestamp << " for load to memory\n"; } } - if (mInSnapshotMode) { // file must be there, otherwise a fatal will be produced - loadFileToMemory(dest, getSnapshotFile(mSnapshotTopPath, path), headers); + if (mInSnapshotMode) { // file must be there, otherwise a fatal will be produced; + loadFileToMemory(dest, getSnapshotFile(mSnapshotTopPath, path), &headers); fromSnapshot = 1; - } else if (mPreferSnapshotCache && std::filesystem::exists(snapshotpath = getSnapshotFile(mSnapshotCachePath, path))) { + } else if (mPreferSnapshotCache && std::filesystem::exists(snapshotpath)) { // if file is available, use it, otherwise cache it below from the server. Do this only when etag is empty since otherwise the object was already fetched and cached if (etag.empty()) { - loadFileToMemory(dest, snapshotpath, headers); + loadFileToMemory(dest, snapshotpath, &headers); } fromSnapshot = 2; - } else { // look on the server - CURL* curl_handle = curl_easy_init(); - curl_easy_setopt(curl_handle, CURLOPT_USERAGENT, mUniqueAgentID.c_str()); - string fullUrl = getFullUrlForRetrieval(curl_handle, path, metadata, timestamp); - - curl_slist* options_list = nullptr; - initCurlHTTPHeaderOptionsForRetrieve(curl_handle, options_list, timestamp, headers, etag, createdNotAfter, createdNotBefore); - navigateURLsAndLoadFileToMemory(dest, curl_handle, fullUrl, headers); - for (size_t hostIndex = 1; hostIndex < hostsPool.size() && isMemoryFileInvalid(dest); hostIndex++) { - fullUrl = getFullUrlForRetrieval(curl_handle, path, metadata, timestamp, hostIndex); - navigateURLsAndLoadFileToMemory(dest, curl_handle, fullUrl, headers); // headers loaded from the file in case of the snapshot reading only - } - curl_slist_free_all(options_list); - curl_easy_cleanup(curl_handle); } +} - if (dest.empty()) { - sem_release(); - return; // nothing was fetched: either cached value is good or error was produced - } - // !considerSnapshot means that the call was made by retrieve for snapshoting reasons - logReading(path, timestamp, headers, fmt::format("{}{}", considerSnapshot ? "load to memory" : "retrieve", fromSnapshot ? " from snapshot" : "")); +void CcdbApi::saveSnapshot(RequestContext& requestContext) const +{ + // Consider saving snapshot + if (!mSnapshotCachePath.empty() && !(mInSnapshotMode && mSnapshotTopPath == mSnapshotCachePath)) { // store in the snapshot only if the object was not read from the snapshot + auto sem = createNamedSempahore(requestContext.path); + if (sem) { + sem->wait(); // wait until we can enter (no one else there) + } - // are we asked to create a snapshot ? - if (createSnapshot && fromSnapshot != 2 && !(mInSnapshotMode && mSnapshotTopPath == mSnapshotCachePath)) { // store in the snapshot only if the object was not read from the snapshot - auto snapshotdir = getSnapshotDir(mSnapshotCachePath, path); - snapshotpath = getSnapshotFile(mSnapshotCachePath, path); + auto snapshotdir = getSnapshotDir(mSnapshotCachePath, requestContext.path); + std::string snapshotpath = getSnapshotFile(mSnapshotCachePath, requestContext.path); o2::utils::createDirectoriesIfAbsent(snapshotdir); - if (logStream->is_open()) { - *logStream.get() << "CCDB-access[" << getpid() << "] ... " << mUniqueAgentID << " downloading to snapshot " << snapshotpath << " from memory\n"; + std::fstream logStream; + if (logStream.is_open()) { + logStream << "CCDB-access[" << getpid() << "] ... " << mUniqueAgentID << " downloading to snapshot " << snapshotpath << " from memory\n"; } { // dump image to a file - LOGP(debug, "creating snapshot {} -> {}", path, snapshotpath); - CCDBQuery querysummary(path, metadata, timestamp); + LOGP(debug, "creating snapshot {} -> {}", requestContext.path, snapshotpath); + CCDBQuery querysummary(requestContext.path, requestContext.metadata, requestContext.timestamp); { std::ofstream objFile(snapshotpath, std::ios::out | std::ofstream::binary); - std::copy(dest.begin(), dest.end(), std::ostreambuf_iterator(objFile)); + std::copy(requestContext.dest.begin(), requestContext.dest.end(), std::ostreambuf_iterator(objFile)); } // now open the same file as root file and store metadata - updateMetaInformationInLocalFile(snapshotpath, headers, &querysummary); + updateMetaInformationInLocalFile(snapshotpath, &requestContext.headers, &querysummary); } + releaseNamedSemaphore(sem, requestContext.path); } - sem_release(); } -// navigate sequence of URLs until TFile content is found; object is extracted and returned -void CcdbApi::navigateURLsAndLoadFileToMemory(o2::pmr::vector& dest, CURL* curl_handle, std::string const& url, std::map* headers) const +void CcdbApi::loadFileToMemory(o2::pmr::vector& dest, std::string const& path, + std::map const& metadata, long timestamp, + std::map* headers, std::string const& etag, + const std::string& createdNotAfter, const std::string& createdNotBefore, bool considerSnapshot) const { - // let's see first of all if the url is something specific that curl cannot handle - if (url.find("alien:/", 0) != std::string::npos) { - return loadFileToMemory(dest, url, nullptr); // headers loaded from the file in case of the snapshot reading only - } - if ((url.find("file:/", 0) != std::string::npos)) { - std::string path = url.substr(7); - if (std::filesystem::exists(path)) { - return loadFileToMemory(dest, path, nullptr); - } else { - return; + RequestContext requestContext(dest, metadata, *headers); + requestContext.path = path; + // std::map metadataCopy = metadata; // Create a copy because metadata will be passed as a pointer so it cannot be constant. The const in definition is for backwards compatability. + // requestContext.metadata = metadataCopy; + requestContext.timestamp = timestamp; + requestContext.etag = etag; + requestContext.createdNotAfter = createdNotAfter; + requestContext.createdNotBefore = createdNotBefore; + requestContext.considerSnapshot = considerSnapshot; + std::vector contexts = {requestContext}; + vectoredLoadFileToMemory(contexts); +} + +void CcdbApi::navigateSourcesAndLoadFile(RequestContext& requestContext, int& fromSnapshot, size_t* requestCounter) const +{ + LOGP(debug, "loadFileToMemory {} ETag=[{}]", requestContext.path, requestContext.etag); + bool createSnapshot = requestContext.considerSnapshot && !mSnapshotCachePath.empty(); // create snaphot if absent + + std::string snapshotpath; + if (mInSnapshotMode || std::filesystem::exists(snapshotpath = getSnapshotFile(mSnapshotCachePath, requestContext.path))) { + boost::interprocess::named_semaphore* sem = createNamedSempahore(requestContext.path); + if (sem) { + sem->wait(); // wait until we can enter (no one else there) } + // if we are in snapshot mode we can simply open the file, unless the etag is non-empty: + // this would mean that the object was is already fetched and in this mode we don't to validity checks! + getFromSnapshot(createSnapshot, requestContext.path, requestContext.timestamp, requestContext.headers, snapshotpath, requestContext.dest, fromSnapshot, requestContext.etag); + releaseNamedSemaphore(sem, requestContext.path); + } else { // look on the server + scheduleDownload(requestContext, requestCounter); } - // otherwise make an HTTP/CURL request - struct HeaderObjectPair_t { - std::multimap header; - o2::pmr::vector* object = nullptr; - int counter = 0; - } hoPair{{}, &dest, 0}; +} - bool errorflag = false; - auto signalError = [&chunk = dest, &errorflag]() { - chunk.clear(); - chunk.reserve(1); - errorflag = true; - }; - auto writeCallBack = [](void* contents, size_t size, size_t nmemb, void* chunkptr) { - auto& ho = *static_cast(chunkptr); - auto& chunk = *ho.object; - size_t realsize = size * nmemb, sz = 0; - ho.counter++; - try { - if (chunk.capacity() < chunk.size() + realsize) { - auto cl = ho.header.find("Content-Length"); - if (cl != ho.header.end()) { - sz = std::max(chunk.size() + realsize, (size_t)std::stol(cl->second)); - } else { - sz = chunk.size() + realsize; - LOGP(debug, "SIZE IS NOT IN HEADER, allocate {}", sz); - } - chunk.reserve(sz); - } - char* contC = (char*)contents; - chunk.insert(chunk.end(), contC, contC + realsize); - } catch (std::exception e) { - LOGP(alarm, "failed to reserve {} bytes in CURL write callback (realsize = {}): {}", sz, realsize, e.what()); - realsize = 0; - } - return realsize; - }; +void CcdbApi::vectoredLoadFileToMemory(std::vector& requestContexts) const +{ + std::vector fromSnapshots(requestContexts.size()); + size_t requestCounter = 0; - // specify URL to get - curl_easy_setopt(curl_handle, CURLOPT_URL, url.c_str()); - initCurlOptionsForRetrieve(curl_handle, (void*)&hoPair, writeCallBack, false); - curl_easy_setopt(curl_handle, CURLOPT_HEADERFUNCTION, header_map_callback); - hoPair.header.clear(); - curl_easy_setopt(curl_handle, CURLOPT_HEADERDATA, (void*)&hoPair.header); - curlSetSSLOptions(curl_handle); + // Get files from snapshots and schedule downloads + for (int i = 0; i < requestContexts.size(); i++) { + // navigateSourcesAndLoadFile either retrieves file from snapshot immediately, or schedules it to be downloaded when mDownloader->runLoop is ran at a later time + auto& requestContext = requestContexts.at(i); + navigateSourcesAndLoadFile(requestContext, fromSnapshots.at(i), &requestCounter); + logReading(requestContext.path, requestContext.timestamp, &requestContext.headers, + fmt::format("{}{}", requestContext.considerSnapshot ? "load to memory" : "retrieve", fromSnapshots.at(i) ? " from snapshot" : "")); + } - auto res = CURL_perform(curl_handle); - long response_code = -1; - if (res == CURLE_OK && curl_easy_getinfo(curl_handle, CURLINFO_RESPONSE_CODE, &response_code) == CURLE_OK) { - if (headers) { - for (auto& p : hoPair.header) { - (*headers)[p.first] = p.second; - } - } - if (200 <= response_code && response_code < 300) { - // good response and the content is directly provided and should have been dumped into "chunk" - } else if (response_code == 304) { - LOGP(debug, "Object exists but I am not serving it since it's already in your possession"); - } - // this is a more general redirection - else if (300 <= response_code && response_code < 400) { - // we try content locations in order of appearance until one succeeds - // 1st: The "Location" field - // 2nd: Possible "Content-Location" fields - Location field - // some locations are relative to the main server so we need to fix/complement them - auto complement_Location = [this](std::string const& loc) { - if (loc[0] == '/') { - // if it's just a path (noticed by trailing '/' we prepend the server url - return getURL() + loc; - } - return loc; - }; + // Download the rest + while (requestCounter > 0) { + mDownloader->runLoop(0); + } - std::vector locs; - auto iter = hoPair.header.find("Location"); - if (iter != hoPair.header.end()) { - locs.push_back(complement_Location(iter->second)); - } - // add alternative locations (not yet included) - auto iter2 = hoPair.header.find("Content-Location"); - if (iter2 != hoPair.header.end()) { - auto range = hoPair.header.equal_range("Content-Location"); - for (auto it = range.first; it != range.second; ++it) { - if (std::find(locs.begin(), locs.end(), it->second) == locs.end()) { - locs.push_back(complement_Location(it->second)); - } - } - } - for (auto& l : locs) { - if (l.size() > 0) { - LOG(debug) << "Trying content location " << l; - navigateURLsAndLoadFileToMemory(dest, curl_handle, l, headers); - if (dest.size()) { /* or other success marker in future */ - break; - } - } + // Save snapshots + for (int i = 0; i < requestContexts.size(); i++) { + auto& requestContext = requestContexts.at(i); + if (!requestContext.dest.empty()) { + if (requestContext.considerSnapshot && fromSnapshots.at(i) != 2) { + saveSnapshot(requestContext); } - } else if (response_code == 404) { - LOG(error) << "Requested resource does not exist: " << url; - signalError(); } else { - LOG(error) << "Error in fetching object " << url << ", curl response code:" << response_code; - signalError(); + LOG(error) << "Did not receive content for " << requestContext.path << "\n"; } - } else { - LOGP(alarm, "Curl request to {} failed with result {}, response code: {}", url, int(res), response_code); - signalError(); } - // indicate that an error occurred ---> used by caching layers (such as CCDBManager) - if (errorflag && headers) { - (*headers)["Error"] = "An error occurred during retrieval"; +} + +bool CcdbApi::loadLocalContentToMemory(o2::pmr::vector& dest, std::string& url) const +{ + if (url.find("alien:/", 0) != std::string::npos) { + loadFileToMemory(dest, url, nullptr); // headers loaded from the file in case of the snapshot reading only + return true; } - return; + if ((url.find("file:/", 0) != std::string::npos)) { + std::string path = url.substr(7); + if (std::filesystem::exists(path)) { + loadFileToMemory(dest, path, nullptr); + return true; + } + } + return false; } void CcdbApi::loadFileToMemory(o2::pmr::vector& dest, const std::string& path, std::map* localHeaders) const @@ -1812,9 +1806,14 @@ void CcdbApi::logReading(const std::string& path, long ts, const std::mapasynchSchedule(handle, requestCounter); +} + CURLcode CcdbApi::CURL_perform(CURL* handle) const { - if (mIsCCDBDownloaderEnabled) { + if (mIsCCDBDownloaderPreferred) { return mDownloader->perform(handle); } CURLcode result; diff --git a/CCDB/test/testCcdbApi.cxx b/CCDB/test/testCcdbApi.cxx index c2a3bf4e483c0..c834f2f30f64a 100644 --- a/CCDB/test/testCcdbApi.cxx +++ b/CCDB/test/testCcdbApi.cxx @@ -553,3 +553,40 @@ BOOST_AUTO_TEST_CASE(TestUpdateMetadata, *utf::precondition(if_reachable())) BOOST_CHECK(headers.count("custom") > 0); BOOST_CHECK(headers.at("custom") == "second"); } + +BOOST_AUTO_TEST_CASE(multi_host_test) +{ + CcdbApi api; + api.init("http://bogus-host.cern.ch,http://ccdb-test.cern.ch:8080"); + std::map metadata; + std::map headers; + o2::pmr::vector dst; + std::string url = "Analysis/ALICE3/Centrality"; + api.loadFileToMemory(dst, url, metadata, 1645780010602, &headers, "", "", "", true); + BOOST_CHECK(dst.size() != 0); +} + +BOOST_AUTO_TEST_CASE(vectored) +{ + CcdbApi api; + api.init("http://ccdb-test.cern.ch:8080"); + + int TEST_SAMPLE_SIZE = 5; + std::vector> dests(TEST_SAMPLE_SIZE); + std::vector> metadatas(TEST_SAMPLE_SIZE); + std::vector> headers(TEST_SAMPLE_SIZE); + + std::vector contexts; + for (int i = 0; i < TEST_SAMPLE_SIZE; i++) { + contexts.push_back(CcdbApi::RequestContext(dests.at(i), metadatas.at(i), headers.at(i))); + contexts.at(i).path = "Analysis/ALICE3/Centrality"; + contexts.at(i).timestamp = 1645780010602; + contexts.at(i).considerSnapshot = true; + } + + api.vectoredLoadFileToMemory(contexts); + + for (auto context : contexts) { + BOOST_CHECK(context.dest.size() != 0); + } +} \ No newline at end of file diff --git a/CCDB/test/testCcdbApiDownloader.cxx b/CCDB/test/testCcdbApiDownloader.cxx index 11fbbd4494eda..a0e95d9de1d9d 100644 --- a/CCDB/test/testCcdbApiDownloader.cxx +++ b/CCDB/test/testCcdbApiDownloader.cxx @@ -27,6 +27,9 @@ #include #include +#include +#include "MemoryResources/MemoryResources.h" + using namespace std; namespace o2 @@ -71,6 +74,110 @@ CURL* createTestHandle(std::string* dst) return handle; } +namespace +{ +template > +size_t header_map_callback(char* buffer, size_t size, size_t nitems, void* userdata) +{ + auto* headers = static_cast(userdata); + auto header = std::string(buffer, size * nitems); + std::string::size_type index = header.find(':', 0); + if (index != std::string::npos) { + const auto key = boost::algorithm::trim_copy(header.substr(0, index)); + const auto value = boost::algorithm::trim_copy(header.substr(index + 1)); + headers->insert(std::make_pair(key, value)); + } + return size * nitems; +} +} // namespace + +size_t writeCallbackNoLambda(void* contents, size_t size, size_t nmemb, void* chunkptr) +{ + auto& ho = *static_cast(chunkptr); + auto& chunk = *ho.object; + size_t realsize = size * nmemb, sz = 0; + ho.counter++; + try { + if (chunk.capacity() < chunk.size() + realsize) { + auto cl = ho.header.find("Content-Length"); + if (cl != ho.header.end()) { + sz = std::max(chunk.size() + realsize, (size_t)std::stol(cl->second)); + } else { + sz = chunk.size() + realsize; + // LOGP(debug, "SIZE IS NOT IN HEADER, allocate {}", sz); + } + chunk.reserve(sz); + } + char* contC = (char*)contents; + chunk.insert(chunk.end(), contC, contC + realsize); + } catch (std::exception e) { + // LOGP(alarm, "failed to reserve {} bytes in CURL write callback (realsize = {}): {}", sz, realsize, e.what()); + realsize = 0; + } + return realsize; +} + +std::vector prepareAsyncHandles(size_t num, std::vector*>& dests) +{ + std::vector handles; + + for (int i = 0; i < num; i++) { + auto dest = new o2::pmr::vector(); + dests.push_back(dest); + CURL* curl_handle = curl_easy_init(); + handles.push_back(curl_handle); + + auto data = new DownloaderRequestData(); + data->hoPair.object = dest; + data->hosts.push_back("http://ccdb-test.cern.ch:8080"); + data->path = "Analysis/ALICE3/Centrality"; + data->timestamp = 1646729604010; + data->localContentCallback = nullptr; + + curl_easy_setopt(curl_handle, CURLOPT_URL, "http://ccdb-test.cern.ch:8080/Analysis/ALICE3/Centrality/1646729604010"); + curl_easy_setopt(curl_handle, CURLOPT_WRITEFUNCTION, writeCallbackNoLambda); + curl_easy_setopt(curl_handle, CURLOPT_WRITEDATA, (void*)&(data->hoPair)); + + curl_easy_setopt(curl_handle, CURLOPT_HEADERFUNCTION, header_map_callbackhoPair.header)>); + curl_easy_setopt(curl_handle, CURLOPT_HEADERDATA, (void*)&(data->hoPair.header)); + curl_easy_setopt(curl_handle, CURLOPT_PRIVATE, (void*)data); + } + return handles; +} + +BOOST_AUTO_TEST_CASE(asynch_schedule_test) +{ + int TRANSFERS = 5; + + if (curl_global_init(CURL_GLOBAL_ALL)) { + fprintf(stderr, "Could not init curl\n"); + return; + } + + CCDBDownloader downloader; + std::vector*> dests; + auto handles = prepareAsyncHandles(TRANSFERS, dests); + size_t transfersLeft = 0; + + for (auto handle : handles) { + downloader.asynchSchedule(handle, &transfersLeft); + } + + while (transfersLeft > 0) { + downloader.runLoop(0); + } + + for (int i = 0; i < TRANSFERS; i++) { + long httpCode; + curl_easy_getinfo(handles[i], CURLINFO_HTTP_CODE, &httpCode); + BOOST_CHECK(httpCode == 200); + BOOST_CHECK(dests[i]->size() != 0); + curl_easy_cleanup(handles[i]); + delete dests[i]; + } + curl_global_cleanup(); +} + BOOST_AUTO_TEST_CASE(perform_test) { if (curl_global_init(CURL_GLOBAL_ALL)) {