diff --git a/modules/core/src/main/scala/sangria/schema/Context.scala b/modules/core/src/main/scala/sangria/schema/Context.scala index 878429d4..e456a0b0 100644 --- a/modules/core/src/main/scala/sangria/schema/Context.scala +++ b/modules/core/src/main/scala/sangria/schema/Context.scala @@ -6,7 +6,7 @@ import sangria.marshalling._ import sangria.ast.SourceMapper import sangria.{ast, introspection} import sangria.execution.deferred.Deferred -import sangria.streaming.SubscriptionStream +import sangria.streaming.{SubscriptionStream, SubscriptionStreamLike} import sangria.util.Cache import scala.concurrent.{ExecutionContext, Future} @@ -163,7 +163,7 @@ object UpdateCtx { new UpdateCtx(action, newCtx) } -private[sangria] case class SubscriptionValue[Ctx, Val, S[_]]( +case class SubscriptionValue[Ctx, Val, S[_]]( source: Val, stream: SubscriptionStream[S]) extends LeafAction[Ctx, Val] { @@ -172,6 +172,13 @@ private[sangria] case class SubscriptionValue[Ctx, Val, S[_]]( throw new IllegalStateException( "`map` is not supported subscription actions. Action is only intended for internal use.") } +object SubscriptionValue { + def apply[Ctx, Val, StreamSource, Res, Out](source: StreamSource)( + implicit stream: SubscriptionStreamLike[StreamSource, Action, Ctx, Res, Out]): SubscriptionValue[Ctx, StreamSource, stream.StreamSource] = { + val s = stream.subscriptionStream + SubscriptionValue(source, s) + } +} case class ProjectionName(name: String) extends FieldTag case object ProjectionExclude extends FieldTag diff --git a/modules/derivation/src/test/scala/sangria/streaming/StreamSpec.scala b/modules/derivation/src/test/scala/sangria/streaming/StreamSpec.scala index 8241b355..5d83eb07 100644 --- a/modules/derivation/src/test/scala/sangria/streaming/StreamSpec.scala +++ b/modules/derivation/src/test/scala/sangria/streaming/StreamSpec.scala @@ -371,6 +371,50 @@ class StreamSpec extends AnyWordSpec with Matchers with FutureResultSupport { result should (have(size(1)).and( contain("""{"data": {"letters": "a", "numbers": 10}}""".parseJson))) } + + "work with materialized schemas" in { + import _root_.monix.reactive.Observable + import sangria.streaming.monix._ + + val schemaDefinition = gql""" + type Subscription { + letters: String! + numbers: Int! + } + + type Query { + hello: String + } + """ + + val builder = ResolverBasedAstSchemaBuilder[Unit]( + FieldResolver.map( + "Query" -> Map("hello" -> (_ => "world")), + "Subscription" -> Map( + "letters" -> (_ => SubscriptionValue(Observable("a", "b").map(Action(_)))), + "numbers" -> (_ => SubscriptionValue(Observable(1, 2).map(Action(_)))), + ) + ) + ) + + val schema = Schema.buildFromAst(schemaDefinition, builder) + + import sangria.execution.ExecutionScheme.Stream + + val stream: Observable[JsValue] = + Executor.execute( + schema, + graphql"subscription { letters numbers }", + queryValidator = QueryValidator.default.withoutValidation[SingleFieldSubscriptions]) + + val result = stream.toListL.runToFuture.await(timeout) + + result should (have(size(4)) + .and(contain("""{"data": {"letters": "a"}}""".parseJson)) + .and(contain("""{"data": {"letters": "b"}}""".parseJson)) + .and(contain("""{"data": {"numbers": 1}}""".parseJson)) + .and(contain("""{"data": {"numbers": 2}}""".parseJson))) + } } } }