Skip to content

Commit

Permalink
updating sync-on-create to obey repository preferences from sharePoli…
Browse files Browse the repository at this point in the history
…cy (#114)

* updating sync-on-create to obey repository preferences from sharePolicy
* add test to verify non-sharing on find from readonly repo
* fix test comment
  • Loading branch information
heckj authored Jul 4, 2024
1 parent c3211ed commit 875bdb0
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 66 deletions.
74 changes: 46 additions & 28 deletions Sources/AutomergeRepo/Repo.swift
Original file line number Diff line number Diff line change
Expand Up @@ -292,24 +292,33 @@ public final class Repo {
}

func beginSync(docId: DocumentId, to peer: PEER_ID) async {
do {
let handle = try await resolveDocHandle(id: docId)
let syncState = syncState(id: docId, peer: peer)
if let syncData = handle.doc.generateSyncMessage(state: syncState) {
let syncMsg: SyncV1Msg = .sync(.init(
documentId: docId.description,
senderId: peerId,
targetId: peer,
sync_message: syncData
))
syncRequestPublisher.send(SyncRequest(id: docId, peer: peer))
await network.send(message: syncMsg, to: peer)
if await sharePolicy.share(peer: peer, docId: docId) {
if logLevel(.repo).canTrace() {
Logger.repo.trace("REPO: \(self.peerId) starting a sync for document \(docId) to \(peer)")
}
do {
let handle = try await resolveDocHandle(id: docId)
let syncState = syncState(id: docId, peer: peer)
if let syncData = handle.doc.generateSyncMessage(state: syncState) {
let syncMsg: SyncV1Msg = .sync(.init(
documentId: docId.description,
senderId: peerId,
targetId: peer,
sync_message: syncData
))
syncRequestPublisher.send(SyncRequest(id: docId, peer: peer))
await network.send(message: syncMsg, to: peer)
}
} catch {
Logger.repo
.error(
"REPO: \(self.peerId) Failed to generate sync on peer connection: \(error.localizedDescription, privacy: .public)"
)
}
} else {
if logLevel(.repo).canTrace() {
Logger.repo.warning("REPO: \(self.peerId) SharePolicy DENIED sharing document \(docId) to \(peer)")
}
} catch {
Logger.repo
.error(
"REPO: \(self.peerId) Failed to generate sync on peer connection: \(error.localizedDescription, privacy: .public)"
)
}
}

Expand All @@ -320,16 +329,7 @@ public final class Repo {
Logger.repo.trace("REPO: \(self.peerId) adding peer \(peer)")
}
for docId in documentIds() {
if await sharePolicy.share(peer: peer, docId: docId) {
if logLevel(.repo).canTrace() {
Logger.repo.trace("REPO: \(self.peerId) starting a sync for document \(docId) to \(peer)")
}
await beginSync(docId: docId, to: peer)
} else {
if logLevel(.repo).canTrace() {
Logger.repo.trace("REPO: \(self.peerId) SharePolicy DENIED sharing document \(docId) to \(peer)")
}
}
await beginSync(docId: docId, to: peer)
}
}

Expand Down Expand Up @@ -366,6 +366,11 @@ public final class Repo {
}
}

/// Request a sync to all connected peers.
/// - Parameter id: The id of the document to sync.
///
/// The repo only initiates a sync when the repository's ``SharePolicy/share(peer:docId:)``
/// method allows the document to be replicated to other peers.
func syncToAllPeers(id: DocumentId) {
for peer in self.peerMetadataByPeerId.keys {
Task {
Expand Down Expand Up @@ -511,6 +516,9 @@ public final class Repo {

/// Creates a new Automerge document, storing it and sharing the creation with connected peers.
/// - Returns: The Automerge document.
///
/// The repo only initiates a sync to connected peers when the repository's
/// ``SharePolicy/share(peer:docId:)`` method allows the document to be replicated the peer.
public func create() async throws -> DocHandle {
let handle = InternalDocHandle(id: DocumentId(), isNew: true, initialValue: Document())
handles[handle.id] = handle
Expand All @@ -523,6 +531,9 @@ public final class Repo {
/// Creates a new Automerge document, storing it and sharing the creation with connected peers.
/// - Returns: The Automerge document.
/// - Parameter id: The Id of the Automerge document.
///
/// The repo only initiates a sync to connected peers when the repository's
/// ``SharePolicy/share(peer:docId:)`` method allows the document to be replicated the peer.
public func create(id: DocumentId) async throws -> DocHandle {
if let _ = handles[id] {
throw Errors.DuplicateID(id: id)
Expand All @@ -535,7 +546,7 @@ public final class Repo {
return resolved
}

/// Imports an Automerge document with an option Id.
/// Imports an Automerge document with an option Id, and potentially share it with connected peers.
/// - Parameter handle: The handle to the Automerge document to import.
/// - Returns: A handle to the Automerge document.
///
Expand All @@ -553,6 +564,9 @@ public final class Repo {
/// ```
///
/// Use the handle you create with `import(handle:)` to load it into the repository.
///
/// The repo only initiates a sync to connected peers when the repository's
/// ``SharePolicy/share(peer:docId:)`` method allows the document to be replicated the peer.
@discardableResult
public func `import`(handle: DocHandle) async throws -> DocHandle {
if let existingHandle = handles[handle.id] {
Expand Down Expand Up @@ -608,6 +622,10 @@ public final class Repo {
/// Clones a document the repo already knows to create a new, shared document.
/// - Parameter id: The id of the document to clone.
/// - Returns: A handle to the Automerge document.
///
/// The Repo treats the cloned document as a newly imported or created document and attempts to
/// automatically sync to connected peers when the repository's
/// ``SharePolicy/share(peer:docId:)`` method allows the document to be replicated the peer.
public func clone(id: DocumentId) async throws -> DocHandle {
let handle = try await resolveDocHandle(id: id)
let fork = handle.doc.fork()
Expand Down
112 changes: 74 additions & 38 deletions Tests/AutomergeRepoTests/TwoReposWithInMemoryNetworkTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import XCTest

final class TwoReposWithInMemoryNetworkTests: XCTestCase {
let network = InMemoryNetwork.shared
var repoOne: Repo!
var repoTwo: Repo!
var repo_nonsharing: Repo!
var repo_sharing: Repo!

var adapterOne: InMemoryNetworkEndpoint!
var adapterTwo: InMemoryNetworkEndpoint!
Expand All @@ -27,9 +27,9 @@ final class TwoReposWithInMemoryNetworkTests: XCTestCase {
let endpoints = await self.network.endpoints
XCTAssertEqual(endpoints.count, 0)

repoOne = Repo(sharePolicy: SharePolicy.readonly)
repo_nonsharing = Repo(sharePolicy: SharePolicy.readonly)
// Repo setup WITHOUT any storage subsystem
let storageId = await repoOne.storageId()
let storageId = await repo_nonsharing.storageId()
XCTAssertNil(storageId)

adapterOne = await network.createNetworkEndpoint(
Expand All @@ -38,21 +38,21 @@ final class TwoReposWithInMemoryNetworkTests: XCTestCase {
name: "One"
)
)
await repoOne.addNetworkAdapter(adapter: adapterOne)
await repo_nonsharing.addNetworkAdapter(adapter: adapterOne)

let peersOne = await repoOne.peers()
let peersOne = await repo_nonsharing.peers()
XCTAssertEqual(peersOne, [])

repoTwo = Repo(sharePolicy: SharePolicy.agreeable)
repo_sharing = Repo(sharePolicy: SharePolicy.agreeable)
adapterTwo = await network.createNetworkEndpoint(
config: .init(
listeningNetwork: true,
name: "Two"
)
)
await repoTwo.addNetworkAdapter(adapter: adapterTwo)
await repo_sharing.addNetworkAdapter(adapter: adapterTwo)

let peersTwo = await repoTwo.peers()
let peersTwo = await repo_sharing.peers()
XCTAssertEqual(peersTwo, [])

let connections = await network.connections()
Expand All @@ -76,15 +76,15 @@ final class TwoReposWithInMemoryNetworkTests: XCTestCase {
func testMostBasicRepoStartingPoints() async throws {
// Repo
// property: peers [PeerId] - all (currently) connected peers
let peersOne = await repoOne.peers()
let peersTwo = await repoTwo.peers()
let peersOne = await repo_nonsharing.peers()
let peersTwo = await repo_sharing.peers()
XCTAssertEqual(peersOne, [])
XCTAssertEqual(peersOne, peersTwo)

let knownIdsOne = await repoOne.documentIds()
let knownIdsOne = await repo_nonsharing.documentIds()
XCTAssertEqual(knownIdsOne, [])

let knownIdsTwo = await repoOne.documentIds()
let knownIdsTwo = await repo_nonsharing.documentIds()
XCTAssertEqual(knownIdsTwo, knownIdsOne)
}

Expand Down Expand Up @@ -131,31 +131,31 @@ final class TwoReposWithInMemoryNetworkTests: XCTestCase {
try await withSpan("testCreate") { _ in

// initial conditions
var knownOnTwo = await repoTwo.documentIds()
var knownOnOne = await repoOne.documentIds()
var knownOnTwo = await repo_sharing.documentIds()
var knownOnOne = await repo_nonsharing.documentIds()
XCTAssertEqual(knownOnOne.count, 0)
XCTAssertEqual(knownOnTwo.count, 0)

// Create and add some doc content to the "server" repo - RepoTwo
let newDocId = DocumentId()
let newDoc = try await withSpan("repoTwo.create") { _ in
try await repoTwo.create(id: newDocId)
try await repo_sharing.create(id: newDocId)
}
// add some content to the new document
try newDoc.doc.put(obj: .ROOT, key: "title", value: .String("INITIAL VALUE"))

XCTAssertNotNil(newDoc)
knownOnTwo = await repoTwo.documentIds()
knownOnTwo = await repo_sharing.documentIds()
XCTAssertEqual(knownOnTwo.count, 1)
XCTAssertEqual(knownOnTwo[0], newDocId)

knownOnOne = await repoOne.documentIds()
knownOnOne = await repo_nonsharing.documentIds()
XCTAssertEqual(knownOnOne.count, 0)

let twoSyncExpectation = expectation(description: "Repo Two should attempt to sync when repo one connects")
var expectationMet = false
let two_sink = repoTwo.syncRequestPublisher.sink { syncRequest in
if syncRequest.id == newDocId, syncRequest.peer == self.repoOne.peerId {
let two_sink = repo_sharing.syncRequestPublisher.sink { syncRequest in
if syncRequest.id == newDocId, syncRequest.peer == self.repo_nonsharing.peerId {
if !expectationMet {
expectationMet = true
twoSyncExpectation.fulfill()
Expand All @@ -175,39 +175,39 @@ final class TwoReposWithInMemoryNetworkTests: XCTestCase {
two_sink.cancel()

// verify that after sync, both repos have a copy of the document
knownOnOne = await repoOne.documentIds()
knownOnOne = await repo_nonsharing.documentIds()
XCTAssertEqual(knownOnOne.count, 1)
XCTAssertEqual(knownOnOne[0], newDocId)
}
}

func testFind() async throws {
func testFind_sharing() async throws {
// initial conditions
var knownOnTwo = await repoTwo.documentIds()
var knownOnOne = await repoOne.documentIds()
let knownOnTwo = await repo_sharing.documentIds()
let knownOnOne = await repo_nonsharing.documentIds()
XCTAssertEqual(knownOnOne.count, 0)
XCTAssertEqual(knownOnTwo.count, 0)

// "GO ONLINE"
// await network.traceConnections(true)
// await adapterTwo.logReceivedMessages(true)
try await withSpan("adapterOne.connect") { _ in
try await withSpan("repo_nonsharing.connect") { _ in
try await adapterOne.connect(to: "Two")
}

// Create and add some doc content to the "server" repo - RepoTwo
let newDocId = DocumentId()
let newDoc = try await withSpan("repoTwo.create") { _ in
try await repoTwo.create(id: newDocId)
let newDoc = try await withSpan("repo_sharing.create") { _ in
try await repo_sharing.create(id: newDocId)
}
XCTAssertNotNil(newDoc.doc)
// add some content to the new document
try newDoc.doc.put(obj: .ROOT, key: "title", value: .String("INITIAL VALUE"))

await repoOne.setLogLevel(.resolver, to: .tracing)
await repoOne.setLogLevel(.network, to: .tracing)
await repo_nonsharing.setLogLevel(.resolver, to: .tracing)
await repo_nonsharing.setLogLevel(.network, to: .tracing)
// We can _request_ the document, and should find it - but it won't YET be updated...
let foundDoc = try await repoOne.find(id: newDocId)
let foundDoc = try await repo_nonsharing.find(id: newDocId)

// set up expectation to await for trigger from the objectWillChange publisher on the "found" doc
let documentsEquivalent = expectation(
Expand All @@ -225,26 +225,62 @@ final class TwoReposWithInMemoryNetworkTests: XCTestCase {
await fulfillment(of: [documentsEquivalent], timeout: 10, enforceOrder: false)
}

func testFind_nonsharing() async throws {
// initial conditions
let knownOnTwo = await repo_sharing.documentIds()
let knownOnOne = await repo_nonsharing.documentIds()
XCTAssertEqual(knownOnOne.count, 0)
XCTAssertEqual(knownOnTwo.count, 0)

// "GO ONLINE"
// await network.traceConnections(true)
// await adapterTwo.logReceivedMessages(true)
try await withSpan("repo_nonsharing.connect") { _ in
try await adapterOne.connect(to: "Two")
}

// Create and add some doc content to the "server" repo - RepoTwo
let newDocId = DocumentId()
let newDoc = try await withSpan("repo_nonsharing.create") { _ in
try await repo_nonsharing.create(id: newDocId)
}
XCTAssertNotNil(newDoc.doc)
// add some content to the new document
try newDoc.doc.put(obj: .ROOT, key: "title", value: .String("INITIAL VALUE"))

await repo_nonsharing.setLogLevel(.resolver, to: .tracing)
await repo_nonsharing.setLogLevel(.network, to: .tracing)
// We can _request_ the document, but a non-sharing repo won't provide it
do {
let _ = try await repo_sharing.find(id: newDocId)
XCTFail("Expected unavailable response")
} catch let error as Errors.Unavailable {
XCTAssertEqual(error.id, newDocId)
} catch {
XCTFail("Unexpected error")
}
}

func testFindFail() async throws {
// initial conditions
var knownOnTwo = await repoTwo.documentIds()
var knownOnOne = await repoOne.documentIds()
var knownOnTwo = await repo_sharing.documentIds()
var knownOnOne = await repo_nonsharing.documentIds()
XCTAssertEqual(knownOnOne.count, 0)
XCTAssertEqual(knownOnTwo.count, 0)

// Create and add some doc content to the "client" repo - RepoOne
let newDocId = DocumentId()
let newDoc = try await withSpan("repoTwo.create") { _ in
try await repoOne.create(id: newDocId)
try await repo_nonsharing.create(id: newDocId)
}
XCTAssertNotNil(newDoc.doc)
// add some content to the new document
try newDoc.doc.put(obj: .ROOT, key: "title", value: .String("INITIAL VALUE"))

knownOnTwo = await repoTwo.documentIds()
knownOnTwo = await repo_sharing.documentIds()
XCTAssertEqual(knownOnTwo.count, 0)

knownOnOne = await repoOne.documentIds()
knownOnOne = await repo_nonsharing.documentIds()
XCTAssertEqual(knownOnOne.count, 1)
XCTAssertEqual(knownOnOne[0], newDocId)
// "GO ONLINE"
Expand All @@ -257,15 +293,15 @@ final class TwoReposWithInMemoryNetworkTests: XCTestCase {
// Two doesn't automatically get the document because RepoOne
// isn't configured to "share" automatically on connect
// (it's not "agreeable")
knownOnTwo = await repoTwo.documentIds()
knownOnTwo = await repo_sharing.documentIds()
XCTAssertEqual(knownOnTwo.count, 0)

knownOnOne = await repoOne.documentIds()
knownOnOne = await repo_nonsharing.documentIds()
XCTAssertEqual(knownOnOne.count, 1)

// We can _request_ the document, but should be denied
do {
let _ = try await repoTwo.find(id: newDocId)
let _ = try await repo_sharing.find(id: newDocId)
XCTFail("RepoOne is private and should NOT share the document")
} catch {
let errMsg = error.localizedDescription
Expand Down

0 comments on commit 875bdb0

Please sign in to comment.