Skip to content

Commit

Permalink
[Improve][Flink-Kubnernetes-V2] Improve UsingObserver test (#3654)
Browse files Browse the repository at this point in the history
  • Loading branch information
caicancai authored Apr 6, 2024
1 parent 4bf4ea9 commit fa3de49
Showing 1 changed file with 37 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ import zio.{durationInt, Console, ZIO}

class UsingObserver extends AnyWordSpecLike with BeforeAndAfterAll {

"Track and get flink job snapshot." in unsafeRun {
"Track and get flink application job snapshot." in unsafeRun {
for {
// track resource
_ <- ZIO.unit
trackId = TrackKey.appJob(233, "fdev", "simple-appjob")
trackId = TrackKey.appJob(114514, "fdev", "simple-appjob")
_ <- FlinkK8sObserver.track(trackId)
// get job snapshot
_ <- ZIO.sleep(3.seconds)
Expand All @@ -42,6 +42,35 @@ class UsingObserver extends AnyWordSpecLike with BeforeAndAfterAll {
} yield ()
}

"Track and get flink application endpoint snapshot." in unsafeRun {
for {
// track resource
_ <- ZIO.unit
trackId = TrackKey.appJob(233, "fdev", "simple-appjob")
_ <- FlinkK8sObserver.track(trackId)
// get job rest endpoint
_ <- ZIO.sleep(3.seconds)
jobSnap <- FlinkK8sObserver.restSvcEndpointSnaps.get(trackId.namespace, trackId.name)
_ <- Console.printLine(jobSnap.prettyStr)
} yield ()
}

"Track and get flink seesion job snapshot and endpoint." in unsafeRun {
for {
// track resource
_ <- ZIO.unit
trackId = TrackKey.sessionJob(233, "fdev", "simple-sessionjob", "simple-session")
_ <- FlinkK8sObserver.track(trackId)
// get job rest endpoint
_ <- ZIO.sleep(3.seconds)
endpoint <- FlinkK8sObserver.restSvcEndpointSnaps.get(trackId.namespace, trackId.name)
_ <- Console.printLine(endpoint.prettyStr)
// get job snapshot
jobSnap <- FlinkK8sObserver.evaluatedJobSnaps.getValue(trackId.id)
_ <- Console.printLine(jobSnap.prettyStr)
} yield ()
}

"Track and get flink cluster metrics" in unsafeRun {
for {
// track resource
Expand All @@ -66,9 +95,11 @@ class UsingObserver extends AnyWordSpecLike with BeforeAndAfterAll {
} yield ()
}

"Only subscribe Flink job state changes." in unsafeRun {
"Only subscribe Flink application job state changes." in unsafeRun {
for {
// track resource
_ <- FlinkK8sObserver.track(TrackKey.appJob(234, "fdev", "simple-appjob"))
// subscribe job status changes
_ <- FlinkK8sObserver.evaluatedJobSnaps
.flatSubscribe()
.map { case (appId, status) => (appId, status.evalState) }
Expand All @@ -78,9 +109,11 @@ class UsingObserver extends AnyWordSpecLike with BeforeAndAfterAll {
} yield ()
}

"Only subscribe Flink job enpoint changes." in unsafeRun {
"Only subscribe Flink application job enpoint changes." in unsafeRun {
for {
// track resource
_ <- FlinkK8sObserver.track(TrackKey.appJob(234, "fdev", "simple-appjob"))
// subscribe job status changes
_ <- FlinkK8sObserver.restSvcEndpointSnaps
.flatSubscribe()
.map { case (appId, status) => (appId, status.ipRest) }
Expand Down

0 comments on commit fa3de49

Please sign in to comment.