Skip to content

Commit

Permalink
CcdbApi.loadFileToMemory changed to a vectored operation
Browse files Browse the repository at this point in the history
  • Loading branch information
TrifleMichael authored and ktf committed Oct 30, 2023
1 parent 3671fca commit ec9ee0b
Show file tree
Hide file tree
Showing 6 changed files with 648 additions and 234 deletions.
71 changes: 63 additions & 8 deletions CCDB/include/CCDB/CCDBDownloader.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
#ifndef O2_CCDBDOWNLOADER_H_
#define O2_CCDBDOWNLOADER_H_

#if !defined(__CINT__) && !defined(__MAKECINT__) && !defined(__ROOTCLING__) && !defined(__CLING__)
#include "MemoryResources/MemoryResources.h"
#endif

#include <cstdio>
#include <cstdlib>
#include <curl/curl.h>
Expand All @@ -21,6 +25,8 @@
#include <mutex>
#include <condition_variable>
#include <unordered_map>
#include <map>
#include <functional>

typedef struct uv_loop_s uv_loop_t;
typedef struct uv_timer_s uv_timer_t;
Expand All @@ -32,6 +38,25 @@ typedef struct uv_handle_s uv_handle_t;
namespace o2::ccdb
{

#if !defined(__CINT__) && !defined(__MAKECINT__) && !defined(__ROOTCLING__) && !defined(__CLING__)
struct HeaderObjectPair_t {
std::multimap<std::string, std::string> header;
o2::pmr::vector<char>* object = nullptr;
int counter = 0;
};

typedef struct DownloaderRequestData {
std::vector<std::string> hosts;
std::string path;
long timestamp;
HeaderObjectPair_t hoPair;
std::map<std::string, std::string>* headers;

std::function<bool(std::string)> localContentCallback;
bool errorflag = false;
} DownloaderRequestData;
#endif

/*
Some functions below aren't member functions of CCDBDownloader because both curl and libuv require callback functions which have to be either static or non-member.
Because non-static functions are used in the functions below, they must be non-member.
Expand Down Expand Up @@ -133,6 +158,14 @@ class CCDBDownloader
*/
std::vector<CURLcode> batchBlockingPerform(std::vector<CURL*> const& handleVector);

/**
* Schedules an asynchronous transfer but doesn't perform it.
*
* @param handle Handle to be performed on.
* @param requestCounter Counter shared by a batch of CURL handles.
*/
void asynchSchedule(CURL* handle, size_t* requestCounter);

/**
* Limits the number of parallel connections. Should be used only if no transfers are happening.
*/
Expand Down Expand Up @@ -176,6 +209,14 @@ class CCDBDownloader
void runLoop(bool noWait);

private:
/**
* Returns a vector of possible content locations based on the redirect headers.
*
* @param baseUrl Content path.
* @param headerMap Map containing response headers.
*/
std::vector<std::string> getLocations(std::string baseUrl, std::multimap<std::string, std::string>* headerMap) const;

std::string mUserAgentId = "CCDBDownloader";
/**
* Sets up internal UV loop.
Expand Down Expand Up @@ -207,8 +248,7 @@ class CCDBDownloader
*/
enum RequestType {
BLOCKING,
ASYNCHRONOUS,
ASYNCHRONOUS_WITH_CALLBACK
ASYNCHRONOUS
};

/**
Expand All @@ -230,20 +270,19 @@ class CCDBDownloader

DataForSocket mSocketData;

#if !defined(__CINT__) && !defined(__MAKECINT__) && !defined(__ROOTCLING__) && !defined(__CLING__)
/**
* Structure which is stored in a easy_handle. It carries information about the request which the easy_handle is part of.
* All easy handles coming from one request have an identical PerformData structure.
*/
typedef struct PerformData {
std::condition_variable* cv;
bool* completionFlag;
CURLcode* codeDestination;
void (*cbFun)(void*);
std::thread* cbThread;
void* cbData;
size_t* requestsLeft;
RequestType type;
int hostInd;
int locInd;
DownloaderRequestData* requestData;
} PerformData;
#endif

/**
* Called by CURL in order to close a socket. It will be called by CURL even if a timeout timer closed the socket beforehand.
Expand All @@ -253,6 +292,20 @@ class CCDBDownloader
*/
static void closesocketCallback(void* clientp, curl_socket_t item);

#if !defined(__CINT__) && !defined(__MAKECINT__) && !defined(__ROOTCLING__) && !defined(__CLING__)
// Reschedules the transfer to be performed with a different host.
void tryNewHost(PerformData* performData, CURL* easy_handle);

// Retrieves content from either alien, cvmfs or local storage using a callback to CCDBApi.
void getLocalContent(PerformData* performData, std::string& newUrl, std::string& newLocation, bool& contentRetrieved, std::vector<std::string>& locations);

// Continues a transfer via a http redirect.
void httpRedirect(PerformData* performData, std::string& newUrl, std::string& newLocation, CURL* easy_handle);

// Continues a transfer via a redirect. The redirect can point to a local file, alien file or a http address.
void followRedirect(PerformData* performData, CURL* easy_handle, std::vector<std::string>& locations, bool& rescheduled, bool& contentRetrieved);
#endif

/**
* Is used to react to polling file descriptors in poll_handle.
*
Expand Down Expand Up @@ -322,10 +375,12 @@ class CCDBDownloader
*/
void checkMultiInfo();

#if !defined(__CINT__) && !defined(__MAKECINT__) && !defined(__ROOTCLING__) && !defined(__CLING__)
/**
* Set openSocketCallback and closeSocketCallback with appropriate arguments. Stores data inside the CURL handle.
*/
void setHandleOptions(CURL* handle, PerformData* data);
#endif

/**
* Create structure holding information about a socket including a poll handle assigned to it
Expand Down
63 changes: 59 additions & 4 deletions CCDB/include/CCDB/CcdbApi.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,20 @@
#include <TObject.h>
#include <TMessage.h>
#include "CCDB/CcdbObjectInfo.h"
#include "CCDB/CCDBDownloader.h"
#include <CommonUtils/ConfigurableParam.h>
#include <type_traits>
#include <vector>

#if !defined(__CINT__) && !defined(__MAKECINT__) && !defined(__ROOTCLING__) && !defined(__CLING__)
#include "MemoryResources/MemoryResources.h"
#include <boost/interprocess/sync/named_semaphore.hpp>
#include <TJAlienCredentials.h>
#else
class TJAlienCredentials;
#endif

#include "CCDB/CCDBDownloader.h"

class TFile;
class TGrid;

Expand Down Expand Up @@ -342,14 +344,43 @@ class CcdbApi //: public DatabaseInterface
TObject* retrieveFromTFile(std::string const& path, std::map<std::string, std::string> const& metadata, long timestamp,
std::map<std::string, std::string>* headers, std::string const& etag,
const std::string& createdNotAfter, const std::string& createdNotBefore) const;

#if !defined(__CINT__) && !defined(__MAKECINT__) && !defined(__ROOTCLING__) && !defined(__CLING__)
typedef struct RequestContext {
o2::pmr::vector<char>& dest;
std::string path;
std::map<std::string, std::string> const& metadata;
long timestamp;
std::map<std::string, std::string>& headers;
std::string etag;
std::string createdNotAfter;
std::string createdNotBefore;
bool considerSnapshot;

RequestContext(o2::pmr::vector<char>& d,
std::map<std::string, std::string> const& m,
std::map<std::string, std::string>& h)
: dest(d), metadata(m), headers(h) {}
} RequestContext;

// Stores file associated with requestContext as a snapshot.
void saveSnapshot(RequestContext& requestContext) const;

// Schedules download via CCDBDownloader, but doesn't perform it until mUVLoop is ran.
void scheduleDownload(RequestContext& requestContext, size_t* requestCounter) const;

void getFromSnapshot(bool createSnapshot, std::string const& path,
long timestamp, std::map<std::string, std::string> headers,
std::string& snapshotpath, o2::pmr::vector<char>& dest, int& fromSnapshot, std::string const& etag) const;
void releaseNamedSemaphore(boost::interprocess::named_semaphore* sem, std::string path) const;
boost::interprocess::named_semaphore* createNamedSempahore(std::string path) const;
void loadFileToMemory(o2::pmr::vector<char>& dest, const std::string& path, std::map<std::string, std::string>* localHeaders = nullptr) const;
void loadFileToMemory(o2::pmr::vector<char>& dest, std::string const& path,
std::map<std::string, std::string> const& metadata, long timestamp,
std::map<std::string, std::string>* headers, std::string const& etag,
const std::string& createdNotAfter, const std::string& createdNotBefore, bool considerSnapshot = true) const;
void navigateURLsAndLoadFileToMemory(o2::pmr::vector<char>& dest, CURL* curl_handle, std::string const& url, std::map<string, string>* headers) const;

// Loads files from alien and cvmfs into given destination.
bool loadLocalContentToMemory(o2::pmr::vector<char>& dest, std::string& url) const;

// the failure to load the file to memory is signaled by 0 size and non-0 capacity
static bool isMemoryFileInvalid(const o2::pmr::vector<char>& v) { return v.size() == 0 && v.capacity() > 0; }
Expand All @@ -364,12 +395,36 @@ class CcdbApi //: public DatabaseInterface
}
return obj;
}

/**
* Retrieves files either as snapshot or schedules them to be downloaded via CCDBDownloader.
*
* @param requestContext Structure giving details about the transfer.
* @param fromSnapshot After navigateSourcesAndLoadFile returns signals whether file was retrieved from snapshot.
* @param requestCounter Pointer to the variable storing the number of requests to be done.
*/
void navigateSourcesAndLoadFile(RequestContext& requestContext, int& fromSnapshot, size_t* requestCounter) const;

/**
* Retrieves files described via RequestContexts into memory. Downloads are performed in parallel via CCDBDownloader.
*
* @param requestContext Structure giving details about the transfer.
*/
void vectoredLoadFileToMemory(std::vector<RequestContext>& requestContext) const;
#endif

private:
// Sets the unique agent ID
void setUniqueAgentID();

/**
* Schedules download of data associated with the curl_handle. Doing that increments the requestCounter by 1. Requests are performed by running the mUVLoop
*
* @param handle CURL handle associated with the request.
* @param requestCounter Pointer to the variable storing the number of requests to be done.
*/
void asynchPerform(CURL* handle, size_t* requestCounter) const;

// internal helper function to update a CCDB file with meta information
static void updateMetaInformationInLocalFile(std::string const& filename, std::map<std::string, std::string> const* headers, CCDBQuery const* querysummary = nullptr);

Expand Down Expand Up @@ -550,7 +605,7 @@ class CcdbApi //: public DatabaseInterface
CURLcode CURL_perform(CURL* handle) const;

mutable CCDBDownloader* mDownloader = nullptr; //! the multi-handle (async) CURL downloader
bool mIsCCDBDownloaderEnabled = false;
bool mIsCCDBDownloaderPreferred = false;
/// Base URL of the CCDB (with port)
std::string mUniqueAgentID{}; // Unique User-Agent ID communicated to server for logging
std::string mUrl{};
Expand Down
Loading

0 comments on commit ec9ee0b

Please sign in to comment.