diff --git a/C/Cpp_include/c4Listener.hh b/C/Cpp_include/c4Listener.hh index 0e35a19a0..127ed4842 100644 --- a/C/Cpp_include/c4Listener.hh +++ b/C/Cpp_include/c4Listener.hh @@ -13,6 +13,7 @@ #pragma once #include "c4Base.hh" #include "c4ListenerTypes.h" +#include "fleece/FLBase.h" #include "fleece/InstanceCounted.hh" #include @@ -42,6 +43,11 @@ struct C4Listener final bool unshareCollection(slice name, C4Collection* coll); + /// Starts a replication, just like a POST to `/_replicate`. + /// @param parameters The same parameters as the request body to `/_replicate`. + /// @returns The task ID. + unsigned startReplication(FLDict parameters); + [[nodiscard]] uint16_t port() const; [[nodiscard]] std::pair connectionStatus() const; @@ -53,8 +59,7 @@ struct C4Listener final C4Listener(const C4Listener&) = delete; private: - // For some reason, MSVC on Jenkins will not compile this with noexcept (everything else will) - C4Listener(C4Listener&&); // NOLINT(performance-noexcept-move-constructor) + C4Listener(C4Listener&&) noexcept; Retained _impl; C4ListenerHTTPAuthCallback C4NULLABLE _httpAuthCallback; diff --git a/C/include/c4Listener.h b/C/include/c4Listener.h index 4333e6448..519d9b554 100644 --- a/C/include/c4Listener.h +++ b/C/include/c4Listener.h @@ -60,6 +60,14 @@ NODISCARD CBL_CORE_API bool c4listener_shareCollection(C4Listener* listener, C4S NODISCARD CBL_CORE_API bool c4listener_unshareCollection(C4Listener* listener, C4String name, C4Collection* collection, C4Error* C4NULLABLE outError) C4API; +/** Starts a replication task, just like a POST to `/_replicate`. + @param listener The running C4Listener instance. + @param parameters The same parameters as the request body to `/_replicate`. + @param outError On failure, the error info is stored here if non-NULL. + @returns The "session_id" identifying the replication task, or 0 on failure. */ +NODISCARD CBL_CORE_API unsigned c4listener_startReplication(C4Listener* listener, FLDict parameters, + C4Error* C4NULLABLE outError) C4API; + /** Returns the URL(s) of a database being shared, or of the root, separated by "\n" bytes. The URLs will differ only in their hostname -- there will be one for each IP address or known hostname of the computer, or of the network interface. diff --git a/REST/RESTListener+Replicate.cc b/REST/RESTListener+Replicate.cc index 39934ecc0..f1d05e101 100644 --- a/REST/RESTListener+Replicate.cc +++ b/REST/RESTListener+Replicate.cc @@ -17,7 +17,7 @@ #include "fleece/RefCounted.hh" #include "ReplicatorOptions.hh" #include "StringUtil.hh" -#include "fleece/Expert.hh" // for AllocedDict +#include "fleece/Mutable.hh" #include #include #include @@ -29,7 +29,7 @@ using namespace fleece; namespace litecore::REST { using namespace net; - class ReplicationTask : public RESTListener::Task { + class RESTListener::ReplicationTask : public RESTListener::Task { public: ReplicationTask(RESTListener* listener, slice source, slice target) : Task(listener), _source(source), _target(target) {} @@ -56,6 +56,7 @@ namespace litecore::REST { SPLAT(remoteAddress.hostname), remoteAddress.port, SPLAT(remoteDbName), _bidi, _continuous); try { + C4Database::WithClientMutex with(localDB); _repl = localDB->newReplicator(remoteAddress, remoteDbName, params); _repl->start(); if ( _repl ) { // it is possible that the replicator already stopped and I cleared the ref @@ -168,18 +169,37 @@ namespace litecore::REST { #pragma mark - HTTP HANDLER: void RESTListener::handleReplicate(litecore::REST::RequestResponse& rq) { - // Parse the JSON body: auto params = rq.bodyAsJSON().asDict(); - if ( !params ) + if ( !params ) { return rq.respondWithStatus(HTTPStatus::BadRequest, "Invalid JSON in request body (or body is not an object)"); - - if ( Value cancelVal = params["cancel"] ) { + } else if ( Value cancelVal = params["cancel"] ) { // Hang on, stop the presses -- we're canceling, not starting cancelReplication(rq, cancelVal); - return; + } else { + auto [status, message, task] = _handleReplicate(params); + if ( task ) { + if ( task->status().error == kC4NoError ) + rq.jsonEncoder().writeFormatted("{ok: true, session_id: %u}", task->taskID()); + else + task->writeErrorInfo(rq.jsonEncoder()); + rq.setStatus(status, message.c_str()); + } else { + rq.respondWithStatus(status, message.c_str()); + } } + } + unsigned RESTListener::startReplication(Dict params) { + auto [status, message, task] = _handleReplicate(params); + if ( int(status) < 300 ) { + return task->taskID(); + } else { + error(error::WebSocket, int(status), message)._throw(); + } + } + + tuple RESTListener::_handleReplicate(Dict params) { bool bidi = params["bidi"].asBool(); bool continuous = params["continuous"].asBool(); C4ReplicatorMode activeMode = continuous ? kC4Continuous : kC4OneShot; @@ -187,8 +207,7 @@ namespace litecore::REST { // Get the local DB and remote URL: slice source = params["source"].asString(); slice target = params["target"].asString(); - if ( !source || !target ) - return rq.respondWithStatus(HTTPStatus::BadRequest, "Missing source or target parameters"); + if ( !source || !target ) return {HTTPStatus::BadRequest, "Missing source or target parameters", nullptr}; slice localName; slice remoteURL; C4ReplicatorMode pushMode, pullMode; @@ -202,14 +221,14 @@ namespace litecore::REST { pullMode = activeMode; remoteURL = source; } else { - return rq.respondWithStatus(HTTPStatus::BadRequest, "Neither source nor target is a local database name"); + return {HTTPStatus::BadRequest, "Neither source nor target is a local database name", nullptr}; } Retained localDB = databaseNamed(localName.asString()); - if ( !localDB ) return rq.respondWithStatus(HTTPStatus::NotFound); + if ( !localDB ) return {HTTPStatus::NotFound, "", nullptr}; C4Address remoteAddress; slice remoteDbName; if ( !C4Address::fromURL(remoteURL, &remoteAddress, &remoteDbName) ) - return rq.respondWithStatus(HTTPStatus::BadRequest, "Invalid database URL"); + return {HTTPStatus::BadRequest, "Invalid database URL", nullptr}; // Subroutine that adds a C4ReplicationCollection: vector collOptions; // backing store for each optionsDictFleece @@ -237,7 +256,7 @@ namespace litecore::REST { // `collections` is an array of collection names: for ( Array::iterator iter(collectionNames); iter; iter.next() ) { slice collPath = iter.value().asString(); - if ( !collPath ) return rq.respondWithStatus(HTTPStatus::BadRequest, "Invalid collections item"); + if ( !collPath ) return {HTTPStatus::BadRequest, "Invalid collections item", nullptr}; addCollection(collPath, params); } } else if ( Dict collections = collectionsVal.asDict() ) { @@ -246,18 +265,18 @@ namespace litecore::REST { addCollection(iter.keyString(), iter.value().asDict()); } } else { - return rq.respondWithStatus(HTTPStatus::BadRequest, "'collections' must be an array or object"); + return {HTTPStatus::BadRequest, "'collections' must be an array or object", nullptr}; } } else { // 'collections' is missing; just use the default collection: addCollection(kC4DefaultCollectionName, params); } if ( replCollections.empty() ) - return rq.respondWithStatus(HTTPStatus::BadRequest, "At least one collection must be replicated"); + return {HTTPStatus::BadRequest, "At least one collection must be replicated", nullptr}; for ( size_t i = 0; i < replCollections.size(); i++ ) { for ( size_t j = 0; j < i; j++ ) { if ( replCollections[j].collection == replCollections[i].collection ) - return rq.respondWithStatus(HTTPStatus::BadRequest, "Duplicate collection"); + return {HTTPStatus::BadRequest, "Duplicate collection", nullptr}; } } @@ -284,16 +303,12 @@ namespace litecore::REST { .collections = replCollections.data(), }; bool ok = task->start(localDB, localName, remoteAddress, remoteDbName, c4Params); - - auto& json = rq.jsonEncoder(); if ( ok ) { - json.writeFormatted("{ok: true, session_id: %u}", task->taskID()); - rq.setStatus(HTTPStatus::Accepted, "Replication has started"); + return {HTTPStatus::Accepted, "Replication has started", task}; } else { - task->writeErrorInfo(json); string message = task->message().asString(); - HTTPStatus statusCode = rq.errorToStatus(task->status().error); - rq.setStatus(statusCode, message.c_str()); + HTTPStatus statusCode = RequestResponse::errorToStatus(task->status().error); + return {statusCode, message, task}; } } diff --git a/REST/RESTListener.hh b/REST/RESTListener.hh index 8d7d79698..95ef1177f 100644 --- a/REST/RESTListener.hh +++ b/REST/RESTListener.hh @@ -49,6 +49,12 @@ namespace litecore::REST { /** Given a database name (from a URI path) returns the filesystem path to the database. */ bool pathFromDatabaseName(const std::string& name, FilePath& outPath); + /// Starts a replication, just like a POST to `/_replicate`. + /// On failure, throws a litecore::error with domain WebSocketDomain. + /// @param params The same parameters as the request body to `/_replicate`. + /// @returns The task ID. + unsigned startReplication(Dict params); + /** An asynchronous task (like a replication). */ class Task : public RefCounted @@ -89,7 +95,7 @@ namespace litecore::REST { time_t _timeUpdated{0}; }; - /** The currently-running tasks. */ + /// The currently-running tasks. std::vector> tasks(); protected: @@ -124,6 +130,8 @@ namespace litecore::REST { static std::string kServerName; private: + class ReplicationTask; + static std::pair parseKeySpace(slice keySpace); static bool collectionGiven(RequestResponse&); @@ -147,6 +155,7 @@ namespace litecore::REST { bool modifyDoc(fleece::Dict body, std::string docID, const std::string& revIDQuery, bool deleting, bool newEdits, C4Collection* coll, fleece::JSONEncoder& json, C4Error* outError) noexcept; + std::tuple _handleReplicate(Dict params); std::unique_ptr _directory; const bool _allowCreateDB, _allowDeleteDB, _allowCreateCollection, _allowDeleteCollection, _allowQueries; diff --git a/REST/REST_CAPI.cc b/REST/REST_CAPI.cc index 23b748ad2..66e18e407 100644 --- a/REST/REST_CAPI.cc +++ b/REST/REST_CAPI.cc @@ -75,6 +75,14 @@ CBL_CORE_API bool c4listener_unshareCollection(C4Listener* listener, C4String na return false; } +CBL_CORE_API unsigned c4listener_startReplication(C4Listener* listener, FLDict parameters, C4Error* outError) noexcept { + try { + return listener->startReplication(parameters); + } + catchError(outError); + return 0; +} + CBL_CORE_API uint16_t c4listener_getPort(const C4Listener* listener) noexcept { try { return listener->port(); diff --git a/REST/c4Listener.cc b/REST/c4Listener.cc index 516f78c96..d8aa2e763 100644 --- a/REST/c4Listener.cc +++ b/REST/c4Listener.cc @@ -68,7 +68,8 @@ namespace { if ( config.apis == kC4RESTAPI ) { ss << "allowCreateDBs: " << config.allowCreateDBs << ", allowDeleteDBs: " << config.allowDeleteDBs << ", allowCreateCollections: " << config.allowCreateCollections - << ", allowDeleteCollections: " << config.allowDeleteCollections; + << ", allowDeleteCollections: " << config.allowDeleteCollections + << ", allowQueries: " << config.allowQueries; } else { ss << "allowPush: " << config.allowPush << ", " << "allowPull: " << config.allowPull << ", " @@ -101,7 +102,7 @@ C4Listener::C4Listener(C4ListenerConfig config) } } -C4Listener::C4Listener(C4Listener&&) = default; +C4Listener::C4Listener(C4Listener&&) noexcept = default; C4Listener::~C4Listener() { if ( _impl ) _impl->stop(); @@ -119,9 +120,6 @@ bool C4Listener::shareCollection(slice name, C4Collection* coll) { if ( _usuallyFalse(!coll || !coll->isValid()) ) { C4Error::raise(LiteCoreDomain, kC4ErrorNotOpen, "Invalid collection: either deleted, or db closed"); } - - optional nameStr; - if ( name.buf ) nameStr = name; return _impl->registerCollection((string)name, coll->getSpec()); } @@ -129,6 +127,8 @@ bool C4Listener::unshareCollection(slice name, C4Collection* coll) { return _impl->unregisterCollection((string)name, coll->getSpec()); } +unsigned C4Listener::startReplication(FLDict parameters) { return _impl->startReplication(parameters); } + std::vector C4Listener::URLs(C4Database* C4NULLABLE db, C4ListenerAPIs api) const { AssertParam(api == kC4RESTAPI || api == kC4SyncAPI, "The provided API must be one of the following: REST, Sync."); vector urls;