Skip to content

Commit

Permalink
READ: Added API to tell C4Listener to start a replication
Browse files Browse the repository at this point in the history
CBL will probably never use this, but liteserv needs it.
  • Loading branch information
snej committed Sep 5, 2024
1 parent c1178ab commit 5b5811d
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 31 deletions.
9 changes: 7 additions & 2 deletions C/Cpp_include/c4Listener.hh
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#pragma once
#include "c4Base.hh"
#include "c4ListenerTypes.h"
#include "fleece/FLBase.h"
#include "fleece/InstanceCounted.hh"
#include <vector>

Expand Down Expand Up @@ -42,6 +43,11 @@ struct C4Listener final

bool unshareCollection(slice name, C4Collection* coll);

/// Starts a replication, just like a POST to `/_replicate`.
/// @param parameters The same parameters as the request body to `/_replicate`.
/// @returns The task ID.
unsigned startReplication(FLDict parameters);

[[nodiscard]] uint16_t port() const;

[[nodiscard]] std::pair<unsigned, unsigned> connectionStatus() const;
Expand All @@ -53,8 +59,7 @@ struct C4Listener final
C4Listener(const C4Listener&) = delete;

private:
// For some reason, MSVC on Jenkins will not compile this with noexcept (everything else will)
C4Listener(C4Listener&&); // NOLINT(performance-noexcept-move-constructor)
C4Listener(C4Listener&&) noexcept;

Retained<litecore::REST::RESTListener> _impl;
C4ListenerHTTPAuthCallback C4NULLABLE _httpAuthCallback;
Expand Down
8 changes: 8 additions & 0 deletions C/include/c4Listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ NODISCARD CBL_CORE_API bool c4listener_shareCollection(C4Listener* listener, C4S
NODISCARD CBL_CORE_API bool c4listener_unshareCollection(C4Listener* listener, C4String name, C4Collection* collection,
C4Error* C4NULLABLE outError) C4API;

/** Starts a replication task, just like a POST to `/_replicate`.
@param listener The running C4Listener instance.
@param parameters The same parameters as the request body to `/_replicate`.
@param outError On failure, the error info is stored here if non-NULL.
@returns The "session_id" identifying the replication task, or 0 on failure. */
NODISCARD CBL_CORE_API unsigned c4listener_startReplication(C4Listener* listener, FLDict parameters,
C4Error* C4NULLABLE outError) C4API;

/** Returns the URL(s) of a database being shared, or of the root, separated by "\n" bytes.
The URLs will differ only in their hostname -- there will be one for each IP address or known
hostname of the computer, or of the network interface.
Expand Down
61 changes: 38 additions & 23 deletions REST/RESTListener+Replicate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
#include "fleece/RefCounted.hh"
#include "ReplicatorOptions.hh"
#include "StringUtil.hh"
#include "fleece/Expert.hh" // for AllocedDict
#include "fleece/Mutable.hh"
#include <condition_variable>
#include <functional>
#include <mutex>
Expand All @@ -29,7 +29,7 @@ using namespace fleece;
namespace litecore::REST {
using namespace net;

class ReplicationTask : public RESTListener::Task {
class RESTListener::ReplicationTask : public RESTListener::Task {
public:
ReplicationTask(RESTListener* listener, slice source, slice target)
: Task(listener), _source(source), _target(target) {}
Expand All @@ -56,6 +56,7 @@ namespace litecore::REST {
SPLAT(remoteAddress.hostname), remoteAddress.port, SPLAT(remoteDbName), _bidi, _continuous);

try {
C4Database::WithClientMutex with(localDB);
_repl = localDB->newReplicator(remoteAddress, remoteDbName, params);
_repl->start();
if ( _repl ) { // it is possible that the replicator already stopped and I cleared the ref
Expand Down Expand Up @@ -168,27 +169,45 @@ namespace litecore::REST {
#pragma mark - HTTP HANDLER:

void RESTListener::handleReplicate(litecore::REST::RequestResponse& rq) {
// Parse the JSON body:
auto params = rq.bodyAsJSON().asDict();
if ( !params )
if ( !params ) {
return rq.respondWithStatus(HTTPStatus::BadRequest,
"Invalid JSON in request body (or body is not an object)");

if ( Value cancelVal = params["cancel"] ) {
} else if ( Value cancelVal = params["cancel"] ) {
// Hang on, stop the presses -- we're canceling, not starting
cancelReplication(rq, cancelVal);
return;
} else {
auto [status, message, task] = _handleReplicate(params);
if ( task ) {
if ( task->status().error == kC4NoError )
rq.jsonEncoder().writeFormatted("{ok: true, session_id: %u}", task->taskID());
else
task->writeErrorInfo(rq.jsonEncoder());
rq.setStatus(status, message.c_str());
} else {
rq.respondWithStatus(status, message.c_str());
}
}
}

unsigned RESTListener::startReplication(Dict params) {
auto [status, message, task] = _handleReplicate(params);
if ( int(status) < 300 ) {
return task->taskID();
} else {
error(error::WebSocket, int(status), message)._throw();
}
}

tuple<HTTPStatus, string, RESTListener::ReplicationTask*> RESTListener::_handleReplicate(Dict params) {
bool bidi = params["bidi"].asBool();
bool continuous = params["continuous"].asBool();
C4ReplicatorMode activeMode = continuous ? kC4Continuous : kC4OneShot;

// Get the local DB and remote URL:
slice source = params["source"].asString();
slice target = params["target"].asString();
if ( !source || !target )
return rq.respondWithStatus(HTTPStatus::BadRequest, "Missing source or target parameters");
if ( !source || !target ) return {HTTPStatus::BadRequest, "Missing source or target parameters", nullptr};
slice localName;
slice remoteURL;
C4ReplicatorMode pushMode, pullMode;
Expand All @@ -202,14 +221,14 @@ namespace litecore::REST {
pullMode = activeMode;
remoteURL = source;
} else {
return rq.respondWithStatus(HTTPStatus::BadRequest, "Neither source nor target is a local database name");
return {HTTPStatus::BadRequest, "Neither source nor target is a local database name", nullptr};
}
Retained<C4Database> localDB = databaseNamed(localName.asString());
if ( !localDB ) return rq.respondWithStatus(HTTPStatus::NotFound);
if ( !localDB ) return {HTTPStatus::NotFound, "", nullptr};
C4Address remoteAddress;
slice remoteDbName;
if ( !C4Address::fromURL(remoteURL, &remoteAddress, &remoteDbName) )
return rq.respondWithStatus(HTTPStatus::BadRequest, "Invalid database URL");
return {HTTPStatus::BadRequest, "Invalid database URL", nullptr};

// Subroutine that adds a C4ReplicationCollection:
vector<alloc_slice> collOptions; // backing store for each optionsDictFleece
Expand Down Expand Up @@ -237,7 +256,7 @@ namespace litecore::REST {
// `collections` is an array of collection names:
for ( Array::iterator iter(collectionNames); iter; iter.next() ) {
slice collPath = iter.value().asString();
if ( !collPath ) return rq.respondWithStatus(HTTPStatus::BadRequest, "Invalid collections item");
if ( !collPath ) return {HTTPStatus::BadRequest, "Invalid collections item", nullptr};
addCollection(collPath, params);
}
} else if ( Dict collections = collectionsVal.asDict() ) {
Expand All @@ -246,18 +265,18 @@ namespace litecore::REST {
addCollection(iter.keyString(), iter.value().asDict());
}
} else {
return rq.respondWithStatus(HTTPStatus::BadRequest, "'collections' must be an array or object");
return {HTTPStatus::BadRequest, "'collections' must be an array or object", nullptr};
}
} else {
// 'collections' is missing; just use the default collection:
addCollection(kC4DefaultCollectionName, params);
}
if ( replCollections.empty() )
return rq.respondWithStatus(HTTPStatus::BadRequest, "At least one collection must be replicated");
return {HTTPStatus::BadRequest, "At least one collection must be replicated", nullptr};
for ( size_t i = 0; i < replCollections.size(); i++ ) {
for ( size_t j = 0; j < i; j++ ) {
if ( replCollections[j].collection == replCollections[i].collection )
return rq.respondWithStatus(HTTPStatus::BadRequest, "Duplicate collection");
return {HTTPStatus::BadRequest, "Duplicate collection", nullptr};
}
}

Expand All @@ -284,16 +303,12 @@ namespace litecore::REST {
.collections = replCollections.data(),
};
bool ok = task->start(localDB, localName, remoteAddress, remoteDbName, c4Params);

auto& json = rq.jsonEncoder();
if ( ok ) {
json.writeFormatted("{ok: true, session_id: %u}", task->taskID());
rq.setStatus(HTTPStatus::Accepted, "Replication has started");
return {HTTPStatus::Accepted, "Replication has started", task};
} else {
task->writeErrorInfo(json);
string message = task->message().asString();
HTTPStatus statusCode = rq.errorToStatus(task->status().error);
rq.setStatus(statusCode, message.c_str());
HTTPStatus statusCode = RequestResponse::errorToStatus(task->status().error);
return {statusCode, message, task};
}
}

Expand Down
11 changes: 10 additions & 1 deletion REST/RESTListener.hh
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ namespace litecore::REST {
/** Given a database name (from a URI path) returns the filesystem path to the database. */
bool pathFromDatabaseName(const std::string& name, FilePath& outPath);

/// Starts a replication, just like a POST to `/_replicate`.
/// On failure, throws a litecore::error with domain WebSocketDomain.
/// @param params The same parameters as the request body to `/_replicate`.
/// @returns The task ID.
unsigned startReplication(Dict params);

/** An asynchronous task (like a replication). */
class Task
: public RefCounted
Expand Down Expand Up @@ -89,7 +95,7 @@ namespace litecore::REST {
time_t _timeUpdated{0};
};

/** The currently-running tasks. */
/// The currently-running tasks.
std::vector<Retained<Task>> tasks();

protected:
Expand Down Expand Up @@ -124,6 +130,8 @@ namespace litecore::REST {
static std::string kServerName;

private:
class ReplicationTask;

static std::pair<std::string, C4CollectionSpec> parseKeySpace(slice keySpace);
static bool collectionGiven(RequestResponse&);

Expand All @@ -147,6 +155,7 @@ namespace litecore::REST {

bool modifyDoc(fleece::Dict body, std::string docID, const std::string& revIDQuery, bool deleting,
bool newEdits, C4Collection* coll, fleece::JSONEncoder& json, C4Error* outError) noexcept;
std::tuple<net::HTTPStatus, std::string, ReplicationTask*> _handleReplicate(Dict params);

std::unique_ptr<FilePath> _directory;
const bool _allowCreateDB, _allowDeleteDB, _allowCreateCollection, _allowDeleteCollection, _allowQueries;
Expand Down
8 changes: 8 additions & 0 deletions REST/REST_CAPI.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,14 @@ CBL_CORE_API bool c4listener_unshareCollection(C4Listener* listener, C4String na
return false;
}

CBL_CORE_API unsigned c4listener_startReplication(C4Listener* listener, FLDict parameters, C4Error* outError) noexcept {
try {
return listener->startReplication(parameters);
}
catchError(outError);
return 0;
}

CBL_CORE_API uint16_t c4listener_getPort(const C4Listener* listener) noexcept {
try {
return listener->port();
Expand Down
10 changes: 5 additions & 5 deletions REST/c4Listener.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ namespace {
if ( config.apis == kC4RESTAPI ) {
ss << "allowCreateDBs: " << config.allowCreateDBs << ", allowDeleteDBs: " << config.allowDeleteDBs
<< ", allowCreateCollections: " << config.allowCreateCollections
<< ", allowDeleteCollections: " << config.allowDeleteCollections;
<< ", allowDeleteCollections: " << config.allowDeleteCollections
<< ", allowQueries: " << config.allowQueries;
} else {
ss << "allowPush: " << config.allowPush << ", "
<< "allowPull: " << config.allowPull << ", "
Expand Down Expand Up @@ -101,7 +102,7 @@ C4Listener::C4Listener(C4ListenerConfig config)
}
}

C4Listener::C4Listener(C4Listener&&) = default;
C4Listener::C4Listener(C4Listener&&) noexcept = default;

C4Listener::~C4Listener() {
if ( _impl ) _impl->stop();
Expand All @@ -119,16 +120,15 @@ bool C4Listener::shareCollection(slice name, C4Collection* coll) {
if ( _usuallyFalse(!coll || !coll->isValid()) ) {
C4Error::raise(LiteCoreDomain, kC4ErrorNotOpen, "Invalid collection: either deleted, or db closed");
}

optional<string> nameStr;
if ( name.buf ) nameStr = name;
return _impl->registerCollection((string)name, coll->getSpec());
}

bool C4Listener::unshareCollection(slice name, C4Collection* coll) {
return _impl->unregisterCollection((string)name, coll->getSpec());
}

unsigned C4Listener::startReplication(FLDict parameters) { return _impl->startReplication(parameters); }

std::vector<std::string> C4Listener::URLs(C4Database* C4NULLABLE db, C4ListenerAPIs api) const {
AssertParam(api == kC4RESTAPI || api == kC4SyncAPI, "The provided API must be one of the following: REST, Sync.");
vector<string> urls;
Expand Down

0 comments on commit 5b5811d

Please sign in to comment.