diff --git a/Sources/AutomergeRepo/Repo.swift b/Sources/AutomergeRepo/Repo.swift index b308fe2..8468588 100644 --- a/Sources/AutomergeRepo/Repo.swift +++ b/Sources/AutomergeRepo/Repo.swift @@ -292,24 +292,33 @@ public final class Repo { } func beginSync(docId: DocumentId, to peer: PEER_ID) async { - do { - let handle = try await resolveDocHandle(id: docId) - let syncState = syncState(id: docId, peer: peer) - if let syncData = handle.doc.generateSyncMessage(state: syncState) { - let syncMsg: SyncV1Msg = .sync(.init( - documentId: docId.description, - senderId: peerId, - targetId: peer, - sync_message: syncData - )) - syncRequestPublisher.send(SyncRequest(id: docId, peer: peer)) - await network.send(message: syncMsg, to: peer) + if await sharePolicy.share(peer: peer, docId: docId) { + if logLevel(.repo).canTrace() { + Logger.repo.trace("REPO: \(self.peerId) starting a sync for document \(docId) to \(peer)") + } + do { + let handle = try await resolveDocHandle(id: docId) + let syncState = syncState(id: docId, peer: peer) + if let syncData = handle.doc.generateSyncMessage(state: syncState) { + let syncMsg: SyncV1Msg = .sync(.init( + documentId: docId.description, + senderId: peerId, + targetId: peer, + sync_message: syncData + )) + syncRequestPublisher.send(SyncRequest(id: docId, peer: peer)) + await network.send(message: syncMsg, to: peer) + } + } catch { + Logger.repo + .error( + "REPO: \(self.peerId) Failed to generate sync on peer connection: \(error.localizedDescription, privacy: .public)" + ) + } + } else { + if logLevel(.repo).canTrace() { + Logger.repo.warning("REPO: \(self.peerId) SharePolicy DENIED sharing document \(docId) to \(peer)") } - } catch { - Logger.repo - .error( - "REPO: \(self.peerId) Failed to generate sync on peer connection: \(error.localizedDescription, privacy: .public)" - ) } } @@ -320,16 +329,7 @@ public final class Repo { Logger.repo.trace("REPO: \(self.peerId) adding peer \(peer)") } for docId in documentIds() { - if await sharePolicy.share(peer: peer, docId: docId) { - if logLevel(.repo).canTrace() { - Logger.repo.trace("REPO: \(self.peerId) starting a sync for document \(docId) to \(peer)") - } - await beginSync(docId: docId, to: peer) - } else { - if logLevel(.repo).canTrace() { - Logger.repo.trace("REPO: \(self.peerId) SharePolicy DENIED sharing document \(docId) to \(peer)") - } - } + await beginSync(docId: docId, to: peer) } } @@ -366,6 +366,11 @@ public final class Repo { } } + /// Request a sync to all connected peers. + /// - Parameter id: The id of the document to sync. + /// + /// The repo only initiates a sync when the repository's ``SharePolicy/share(peer:docId:)`` + /// method allows the document to be replicated to other peers. func syncToAllPeers(id: DocumentId) { for peer in self.peerMetadataByPeerId.keys { Task { @@ -511,6 +516,9 @@ public final class Repo { /// Creates a new Automerge document, storing it and sharing the creation with connected peers. /// - Returns: The Automerge document. + /// + /// The repo only initiates a sync to connected peers when the repository's + /// ``SharePolicy/share(peer:docId:)`` method allows the document to be replicated the peer. public func create() async throws -> DocHandle { let handle = InternalDocHandle(id: DocumentId(), isNew: true, initialValue: Document()) handles[handle.id] = handle @@ -523,6 +531,9 @@ public final class Repo { /// Creates a new Automerge document, storing it and sharing the creation with connected peers. /// - Returns: The Automerge document. /// - Parameter id: The Id of the Automerge document. + /// + /// The repo only initiates a sync to connected peers when the repository's + /// ``SharePolicy/share(peer:docId:)`` method allows the document to be replicated the peer. public func create(id: DocumentId) async throws -> DocHandle { if let _ = handles[id] { throw Errors.DuplicateID(id: id) @@ -535,7 +546,7 @@ public final class Repo { return resolved } - /// Imports an Automerge document with an option Id. + /// Imports an Automerge document with an option Id, and potentially share it with connected peers. /// - Parameter handle: The handle to the Automerge document to import. /// - Returns: A handle to the Automerge document. /// @@ -553,6 +564,9 @@ public final class Repo { /// ``` /// /// Use the handle you create with `import(handle:)` to load it into the repository. + /// + /// The repo only initiates a sync to connected peers when the repository's + /// ``SharePolicy/share(peer:docId:)`` method allows the document to be replicated the peer. @discardableResult public func `import`(handle: DocHandle) async throws -> DocHandle { if let existingHandle = handles[handle.id] { @@ -608,6 +622,10 @@ public final class Repo { /// Clones a document the repo already knows to create a new, shared document. /// - Parameter id: The id of the document to clone. /// - Returns: A handle to the Automerge document. + /// + /// The Repo treats the cloned document as a newly imported or created document and attempts to + /// automatically sync to connected peers when the repository's + /// ``SharePolicy/share(peer:docId:)`` method allows the document to be replicated the peer. public func clone(id: DocumentId) async throws -> DocHandle { let handle = try await resolveDocHandle(id: id) let fork = handle.doc.fork() diff --git a/Tests/AutomergeRepoTests/TwoReposWithInMemoryNetworkTests.swift b/Tests/AutomergeRepoTests/TwoReposWithInMemoryNetworkTests.swift index 579b38e..53863f1 100644 --- a/Tests/AutomergeRepoTests/TwoReposWithInMemoryNetworkTests.swift +++ b/Tests/AutomergeRepoTests/TwoReposWithInMemoryNetworkTests.swift @@ -8,8 +8,8 @@ import XCTest final class TwoReposWithInMemoryNetworkTests: XCTestCase { let network = InMemoryNetwork.shared - var repoOne: Repo! - var repoTwo: Repo! + var repo_nonsharing: Repo! + var repo_sharing: Repo! var adapterOne: InMemoryNetworkEndpoint! var adapterTwo: InMemoryNetworkEndpoint! @@ -27,9 +27,9 @@ final class TwoReposWithInMemoryNetworkTests: XCTestCase { let endpoints = await self.network.endpoints XCTAssertEqual(endpoints.count, 0) - repoOne = Repo(sharePolicy: SharePolicy.readonly) + repo_nonsharing = Repo(sharePolicy: SharePolicy.readonly) // Repo setup WITHOUT any storage subsystem - let storageId = await repoOne.storageId() + let storageId = await repo_nonsharing.storageId() XCTAssertNil(storageId) adapterOne = await network.createNetworkEndpoint( @@ -38,21 +38,21 @@ final class TwoReposWithInMemoryNetworkTests: XCTestCase { name: "One" ) ) - await repoOne.addNetworkAdapter(adapter: adapterOne) + await repo_nonsharing.addNetworkAdapter(adapter: adapterOne) - let peersOne = await repoOne.peers() + let peersOne = await repo_nonsharing.peers() XCTAssertEqual(peersOne, []) - repoTwo = Repo(sharePolicy: SharePolicy.agreeable) + repo_sharing = Repo(sharePolicy: SharePolicy.agreeable) adapterTwo = await network.createNetworkEndpoint( config: .init( listeningNetwork: true, name: "Two" ) ) - await repoTwo.addNetworkAdapter(adapter: adapterTwo) + await repo_sharing.addNetworkAdapter(adapter: adapterTwo) - let peersTwo = await repoTwo.peers() + let peersTwo = await repo_sharing.peers() XCTAssertEqual(peersTwo, []) let connections = await network.connections() @@ -76,15 +76,15 @@ final class TwoReposWithInMemoryNetworkTests: XCTestCase { func testMostBasicRepoStartingPoints() async throws { // Repo // property: peers [PeerId] - all (currently) connected peers - let peersOne = await repoOne.peers() - let peersTwo = await repoTwo.peers() + let peersOne = await repo_nonsharing.peers() + let peersTwo = await repo_sharing.peers() XCTAssertEqual(peersOne, []) XCTAssertEqual(peersOne, peersTwo) - let knownIdsOne = await repoOne.documentIds() + let knownIdsOne = await repo_nonsharing.documentIds() XCTAssertEqual(knownIdsOne, []) - let knownIdsTwo = await repoOne.documentIds() + let knownIdsTwo = await repo_nonsharing.documentIds() XCTAssertEqual(knownIdsTwo, knownIdsOne) } @@ -131,31 +131,31 @@ final class TwoReposWithInMemoryNetworkTests: XCTestCase { try await withSpan("testCreate") { _ in // initial conditions - var knownOnTwo = await repoTwo.documentIds() - var knownOnOne = await repoOne.documentIds() + var knownOnTwo = await repo_sharing.documentIds() + var knownOnOne = await repo_nonsharing.documentIds() XCTAssertEqual(knownOnOne.count, 0) XCTAssertEqual(knownOnTwo.count, 0) // Create and add some doc content to the "server" repo - RepoTwo let newDocId = DocumentId() let newDoc = try await withSpan("repoTwo.create") { _ in - try await repoTwo.create(id: newDocId) + try await repo_sharing.create(id: newDocId) } // add some content to the new document try newDoc.doc.put(obj: .ROOT, key: "title", value: .String("INITIAL VALUE")) XCTAssertNotNil(newDoc) - knownOnTwo = await repoTwo.documentIds() + knownOnTwo = await repo_sharing.documentIds() XCTAssertEqual(knownOnTwo.count, 1) XCTAssertEqual(knownOnTwo[0], newDocId) - knownOnOne = await repoOne.documentIds() + knownOnOne = await repo_nonsharing.documentIds() XCTAssertEqual(knownOnOne.count, 0) let twoSyncExpectation = expectation(description: "Repo Two should attempt to sync when repo one connects") var expectationMet = false - let two_sink = repoTwo.syncRequestPublisher.sink { syncRequest in - if syncRequest.id == newDocId, syncRequest.peer == self.repoOne.peerId { + let two_sink = repo_sharing.syncRequestPublisher.sink { syncRequest in + if syncRequest.id == newDocId, syncRequest.peer == self.repo_nonsharing.peerId { if !expectationMet { expectationMet = true twoSyncExpectation.fulfill() @@ -175,39 +175,39 @@ final class TwoReposWithInMemoryNetworkTests: XCTestCase { two_sink.cancel() // verify that after sync, both repos have a copy of the document - knownOnOne = await repoOne.documentIds() + knownOnOne = await repo_nonsharing.documentIds() XCTAssertEqual(knownOnOne.count, 1) XCTAssertEqual(knownOnOne[0], newDocId) } } - func testFind() async throws { + func testFind_sharing() async throws { // initial conditions - var knownOnTwo = await repoTwo.documentIds() - var knownOnOne = await repoOne.documentIds() + let knownOnTwo = await repo_sharing.documentIds() + let knownOnOne = await repo_nonsharing.documentIds() XCTAssertEqual(knownOnOne.count, 0) XCTAssertEqual(knownOnTwo.count, 0) // "GO ONLINE" // await network.traceConnections(true) // await adapterTwo.logReceivedMessages(true) - try await withSpan("adapterOne.connect") { _ in + try await withSpan("repo_nonsharing.connect") { _ in try await adapterOne.connect(to: "Two") } // Create and add some doc content to the "server" repo - RepoTwo let newDocId = DocumentId() - let newDoc = try await withSpan("repoTwo.create") { _ in - try await repoTwo.create(id: newDocId) + let newDoc = try await withSpan("repo_sharing.create") { _ in + try await repo_sharing.create(id: newDocId) } XCTAssertNotNil(newDoc.doc) // add some content to the new document try newDoc.doc.put(obj: .ROOT, key: "title", value: .String("INITIAL VALUE")) - await repoOne.setLogLevel(.resolver, to: .tracing) - await repoOne.setLogLevel(.network, to: .tracing) + await repo_nonsharing.setLogLevel(.resolver, to: .tracing) + await repo_nonsharing.setLogLevel(.network, to: .tracing) // We can _request_ the document, and should find it - but it won't YET be updated... - let foundDoc = try await repoOne.find(id: newDocId) + let foundDoc = try await repo_nonsharing.find(id: newDocId) // set up expectation to await for trigger from the objectWillChange publisher on the "found" doc let documentsEquivalent = expectation( @@ -225,26 +225,62 @@ final class TwoReposWithInMemoryNetworkTests: XCTestCase { await fulfillment(of: [documentsEquivalent], timeout: 10, enforceOrder: false) } + func testFind_nonsharing() async throws { + // initial conditions + let knownOnTwo = await repo_sharing.documentIds() + let knownOnOne = await repo_nonsharing.documentIds() + XCTAssertEqual(knownOnOne.count, 0) + XCTAssertEqual(knownOnTwo.count, 0) + + // "GO ONLINE" + // await network.traceConnections(true) + // await adapterTwo.logReceivedMessages(true) + try await withSpan("repo_nonsharing.connect") { _ in + try await adapterOne.connect(to: "Two") + } + + // Create and add some doc content to the "server" repo - RepoTwo + let newDocId = DocumentId() + let newDoc = try await withSpan("repo_nonsharing.create") { _ in + try await repo_nonsharing.create(id: newDocId) + } + XCTAssertNotNil(newDoc.doc) + // add some content to the new document + try newDoc.doc.put(obj: .ROOT, key: "title", value: .String("INITIAL VALUE")) + + await repo_nonsharing.setLogLevel(.resolver, to: .tracing) + await repo_nonsharing.setLogLevel(.network, to: .tracing) + // We can _request_ the document, but a non-sharing repo won't provide it + do { + let _ = try await repo_sharing.find(id: newDocId) + XCTFail("Expected unavailable response") + } catch let error as Errors.Unavailable { + XCTAssertEqual(error.id, newDocId) + } catch { + XCTFail("Unexpected error") + } + } + func testFindFail() async throws { // initial conditions - var knownOnTwo = await repoTwo.documentIds() - var knownOnOne = await repoOne.documentIds() + var knownOnTwo = await repo_sharing.documentIds() + var knownOnOne = await repo_nonsharing.documentIds() XCTAssertEqual(knownOnOne.count, 0) XCTAssertEqual(knownOnTwo.count, 0) // Create and add some doc content to the "client" repo - RepoOne let newDocId = DocumentId() let newDoc = try await withSpan("repoTwo.create") { _ in - try await repoOne.create(id: newDocId) + try await repo_nonsharing.create(id: newDocId) } XCTAssertNotNil(newDoc.doc) // add some content to the new document try newDoc.doc.put(obj: .ROOT, key: "title", value: .String("INITIAL VALUE")) - knownOnTwo = await repoTwo.documentIds() + knownOnTwo = await repo_sharing.documentIds() XCTAssertEqual(knownOnTwo.count, 0) - knownOnOne = await repoOne.documentIds() + knownOnOne = await repo_nonsharing.documentIds() XCTAssertEqual(knownOnOne.count, 1) XCTAssertEqual(knownOnOne[0], newDocId) // "GO ONLINE" @@ -257,15 +293,15 @@ final class TwoReposWithInMemoryNetworkTests: XCTestCase { // Two doesn't automatically get the document because RepoOne // isn't configured to "share" automatically on connect // (it's not "agreeable") - knownOnTwo = await repoTwo.documentIds() + knownOnTwo = await repo_sharing.documentIds() XCTAssertEqual(knownOnTwo.count, 0) - knownOnOne = await repoOne.documentIds() + knownOnOne = await repo_nonsharing.documentIds() XCTAssertEqual(knownOnOne.count, 1) // We can _request_ the document, but should be denied do { - let _ = try await repoTwo.find(id: newDocId) + let _ = try await repo_sharing.find(id: newDocId) XCTFail("RepoOne is private and should NOT share the document") } catch { let errMsg = error.localizedDescription