Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LIVY-253. Simplify interactive session state management #236

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions rsc/src/main/java/com/cloudera/livy/rsc/RSCClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ public class RSCClient implements LivyClient {

private ContextInfo contextInfo;
private volatile boolean isAlive;
private volatile String replState;

private SessionStateListener stateListener;

RSCClient(RSCConf conf, Promise<ContextInfo> ctx) throws IOException {
this.conf = conf;
Expand Down Expand Up @@ -94,6 +95,10 @@ public void onFailure(Throwable error) {
isAlive = true;
}

public void registerStateListener(SessionStateListener stateListener) {
this.stateListener = stateListener;
}

private synchronized void connectToContext(final ContextInfo info) throws Exception {
this.contextInfo = info;

Expand Down Expand Up @@ -291,13 +296,6 @@ public Future<ReplJobResults> getReplJobResults() throws Exception {
return deferredCall(new BaseProtocol.GetReplJobResults(), ReplJobResults.class);
}

/**
* @return Return the repl state. If this's not connected to a repl session, it will return null.
*/
public String getReplState() {
return replState;
}

private class ClientProtocol extends BaseProtocol {

<T> JobHandleImpl<T> submit(Job<T> job) {
Expand Down Expand Up @@ -393,7 +391,10 @@ private void handle(ChannelHandlerContext ctx, JobStarted msg) {

private void handle(ChannelHandlerContext ctx, ReplState msg) {
LOG.trace("Received repl state for {}", msg.state);
replState = msg.state;

if (stateListener != null) {
stateListener.onStateUpdated(msg.state);
}
}
}
}
26 changes: 26 additions & 0 deletions rsc/src/main/java/com/cloudera/livy/rsc/SessionStateListener.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.cloudera.livy.rsc;

public interface SessionStateListener {

/**
* Action when state is updated.
*/
void onStateUpdated(String state);
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.spark.launcher.SparkLauncher

import com.cloudera.livy._
import com.cloudera.livy.client.common.HttpMessages._
import com.cloudera.livy.rsc.{PingJob, RSCClient, RSCConf}
import com.cloudera.livy.rsc.{PingJob, RSCClient, RSCConf, SessionStateListener}
import com.cloudera.livy.rsc.driver.Statement
import com.cloudera.livy.server.recovery.SessionStore
import com.cloudera.livy.sessions._
Expand Down Expand Up @@ -354,11 +354,12 @@ class InteractiveSession(
mockApp: Option[SparkApp]) // For unit test.
extends Session(id, owner, livyConf)
with SessionHeartbeat
with SparkAppListener {
with SparkAppListener
with SessionStateListener {

import InteractiveSession._

private var serverSideState: SessionState = initialState
@volatile private var _state: SessionState = initialState

override protected val heartbeatTimeout: FiniteDuration = {
val heartbeatTimeoutInSecond = heartbeatTimeoutS
Expand All @@ -373,6 +374,8 @@ class InteractiveSession(
_appId = appIdHint
sessionStore.save(RECOVERY_SESSION_TYPE, recoveryMetadata)
heartbeat()
// Register this class to RSCClient as a session state listener
client.foreach(_.registerStateListener(this))

private val app = mockApp.orElse {
if (livyConf.isRunningOnYarn()) {
Expand Down Expand Up @@ -413,14 +416,14 @@ class InteractiveSession(
override def onJobFailed(job: JobHandle[Void], cause: Throwable): Unit = errorOut()

override def onJobSucceeded(job: JobHandle[Void], result: Void): Unit = {
transition(SessionState.Running())
transition(SessionState.Idle())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The repl might be in busy state if it's a recovered session. We shouldn't assume it's idle.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Originally it is Idle state, your modification of LIVY-244 changed it to Running state. I didn't change the semantics for session recovery unless originally it is wrong.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LIVY-244 changes the server state to Running state so server will query repl for the actual state. The actual state could be idle or busy. Your code does change the semantics.

}

private def errorOut(): Unit = {
// Other code might call stop() to close the RPC channel. When RPC channel is closing,
// this callback might be triggered. Check and don't call stop() to avoid nested called
// if the session is already shutting down.
if (serverSideState != SessionState.ShuttingDown()) {
if (_state != SessionState.ShuttingDown()) {
transition(SessionState.Error())
stop()
app.foreach { a =>
Expand All @@ -438,17 +441,7 @@ class InteractiveSession(
InteractiveRecoveryMetadata(
id, appId, appTag, kind, heartbeatTimeout.toSeconds.toInt, owner, proxyUser, rscDriverUri)

override def state: SessionState = {
if (serverSideState.isInstanceOf[SessionState.Running]) {
// If session is in running state, return the repl state from RSCClient.
client
.flatMap(s => Option(s.getReplState))
.map(SessionState(_))
.getOrElse(SessionState.Busy()) // If repl state is unknown, assume repl is busy.
} else {
serverSideState
}
}
override def state: SessionState = _state

override def stopSession(): Unit = {
try {
Expand Down Expand Up @@ -548,24 +541,24 @@ class InteractiveSession(
// If the session crashed because of the error, the session should instead go to dead state.
// Since these 2 transitions are triggered by different threads, there's a race condition.
// Make sure we won't transit from dead to error state.
val areSameStates = serverSideState.getClass() == newState.getClass()
val transitFromInactiveToActive = !serverSideState.isActive && newState.isActive
val areSameStates = _state.getClass() == newState.getClass()
val transitFromInactiveToActive = !_state.isActive && newState.isActive
if (!areSameStates && !transitFromInactiveToActive) {
debug(s"$this session state change from ${serverSideState} to $newState")
serverSideState = newState
debug(s"$this session state change from ${_state} to $newState")
_state = newState
}
}

private def ensureActive(): Unit = synchronized {
require(serverSideState.isActive, "Session isn't active.")
require(_state.isActive, "Session isn't active.")
require(client.isDefined, "Session is active but client hasn't been created.")
}

private def ensureRunning(): Unit = synchronized {
serverSideState match {
case SessionState.Running() =>
_state match {
case SessionState.Idle() | SessionState.Busy() => Unit
case _ =>
throw new IllegalStateException("Session is in state %s" format serverSideState)
throw new IllegalStateException(s"Session is in state ${_state}")
}
}

Expand Down Expand Up @@ -597,4 +590,6 @@ class InteractiveSession(
}

override def infoChanged(appInfo: AppInfo): Unit = { this.appInfo = appInfo }

override def onStateUpdated(state: String): Unit = { transition(SessionState(state)) }
}