Skip to content

Commit

Permalink
Rework Studio/ZAP integration: use notification, not polling
Browse files Browse the repository at this point in the history
In the past, ZAP periodically ping Studio server for UC component states.
Instead, ZAP will only issue 1 GET for global UC states / naming.
For UC component state changes, ZAP subscribe to Studio server via WebSockets.

BUG: ZAPP-1251
  • Loading branch information
Jing T authored Sep 1, 2023
1 parent d11b495 commit d80c44c
Show file tree
Hide file tree
Showing 9 changed files with 371 additions and 82 deletions.
307 changes: 249 additions & 58 deletions src-electron/ide-integration/studio-rest-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,41 @@
*/

// dirty flag reporting interval
const UC_COMPONENT_STATE_REPORTING_INTERVAL_ID = 6000
import axios, { AxiosPromise, AxiosResponse } from 'axios'
import axios, { AxiosResponse } from 'axios'
import * as env from '../util/env'
import * as dbTypes from '../../src-shared/types/db-types'
import * as querySession from '../db/query-session.js'
const queryNotification = require('../db/query-session-notification.js')
const wsServer = require('../server/ws-server.js')
const dbEnum = require('../../src-shared/db-enum.js')
import * as dbEnum from '../../src-shared/db-enum.js'
import * as ucTypes from '../../src-shared/types/uc-component-types'
import * as dbMappingTypes from '../types/db-mapping-types'
import * as http from 'http-status-codes'
import { StatusCodes } from 'http-status-codes'
import zcl from './zcl.js'
import WebSocket from 'ws'
import {
StudioRestAPI,
StudioWsConnection,
StudioProjectPath,
StudioQueryParams,
StudioWsAPI,
StudioWsMessage,
} from './studio-types'
import { projectName } from '../../src-electron/util/studio-util'

const localhost = 'http://127.0.0.1:'
const resGetProjectInfo = '/rest/clic/components/all/project/'
const resAddComponent = '/rest/clic/component/add/project/'
const resRemoveComponent = '/rest/clic/component/remove/project/'
const wsLocalhost = 'ws://127.0.0.1:'

let ucComponentStateReportId: NodeJS.Timeout
// a periodic heartbeat for checking in on Studio server to maintain WS connections
let heartbeatId: NodeJS.Timeout
const heartbeatDelay = 6000
let studioHttpPort: number
let studioWsConnections: StudioWsConnection = {}

function projectPath(db: dbTypes.DbType, sessionId: number) {
async function projectPath(
db: dbTypes.DbType,
sessionId: number
): Promise<StudioProjectPath> {
return querySession.getSessionKeyValue(
db,
sessionId,
Expand All @@ -66,22 +79,42 @@ async function integrationEnabled(db: dbTypes.DbType, sessionId: number) {
}

/**
* Extract project name from the Studio project path
* @param {} db
* @param {*} sessionId
* @returns '' if retrival failed
* Studio REST API path helper/generator
* @param api
* @param path
* @param queryParams
* @returns
*/
function projectName(studioProjectPath: string) {
const prefix = '_2F'
if (studioProjectPath && studioProjectPath.includes(prefix)) {
return studioProjectPath.substr(
studioProjectPath.lastIndexOf(prefix) + prefix.length
)
function restApiUrl(
api: StudioRestAPI,
path: StudioProjectPath,
queryParams: StudioQueryParams = {}
) {
let base = localhost + studioHttpPort + api + path
let params = Object.entries(queryParams)
if (params.length) {
let queries = new URLSearchParams()
params.forEach(([key, value]) => {
queries.set(key, value)
})

return `${base}?${queries.toString()}`
} else {
return ''
return base
}
}

/**
* Studio WebSocket API path helper/generator
* @param api
* @param path
* @param queryParams
* @returns
*/
function wsApiUrl(api: StudioWsAPI, path: StudioProjectPath) {
return wsLocalhost + studioHttpPort + api + path
}

/**
* Send HTTP GET request to Studio Jetty server for project information.
* @param {} db
Expand All @@ -93,24 +126,32 @@ async function getProjectInfo(
sessionId: number
): Promise<{
data: string[]
status?: http.StatusCodes
status?: StatusCodes
}> {
let project = await projectPath(db, sessionId)
let studioIntegration = await integrationEnabled(db, sessionId)

if (project) {
let name = projectName(project)
let path = localhost + studioHttpPort + resGetProjectInfo + project
env.logDebug(`StudioUC(${name}): GET: ${path}`)
return axios
.get(path)
.then((resp) => {
env.logDebug(`StudioUC(${name}): RESP: ${resp.status}`)
return resp
})
.catch((err) => {
return { data: [] }
})
if (studioIntegration) {
let path = restApiUrl(StudioRestAPI.GetProjectInfo, project)
env.logInfo(`StudioUC(${name}): GET: ${path}`)
return axios
.get(path)
.then((resp) => {
env.logInfo(`StudioUC(${name}): RESP: ${resp.status}`)
return resp
})
.catch((err) => {
env.logInfo(`StudioUC(${name}): ERR: ${err.message}`)
return { data: [] }
})
} else {
env.logInfo(`StudioUC(${name}): Studio integration is not enabled!`)
return { data: [] }
}
} else {
env.logDebug(
env.logInfo(
`StudioUC(): Invalid Studio project path specified via project info API!`
)
return { data: [] }
Expand Down Expand Up @@ -199,7 +240,7 @@ async function updateComponentByComponentIds(
AxiosResponse | ucTypes.UcComponentUpdateResponseWrapper
>[] = []
let project = await projectPath(db, sessionId)
let name = await projectName(project)
let name = projectName(project)

if (Object.keys(componentIds).length) {
promises = componentIds.map((componentId) =>
Expand All @@ -224,10 +265,15 @@ function httpPostComponentUpdate(
componentId: string,
add: boolean
) {
let operation = add ? resAddComponent : resRemoveComponent
let operation = add
? StudioRestAPI.AddComponent
: StudioRestAPI.RemoveComponent
let operationText = add ? 'add' : 'remove'
let name = projectName(project)
let path = restApiUrl(operation, project)
env.logInfo(`StudioUC(${name}): POST: ${path}, ${componentId}`)
return axios
.post(localhost + studioHttpPort + operation + project, {
.post(path, {
componentId: componentId,
})
.then((res) => {
Expand All @@ -251,16 +297,44 @@ function httpPostComponentUpdate(
} else {
// Actual fail.
return {
status: http.StatusCodes.NOT_FOUND,
status: StatusCodes.NOT_FOUND,
id: componentId,
data: `StudioUC(${projectName(
project
)}): Failed to ${operationText} component(${componentId})`,
data: `StudioUC(${name}): Failed to ${operationText} component(${componentId})`,
}
}
})
}

/**
* Handles WebSocket messages from Studio server
* @param db
* @param session
* @param message
*/
async function wsMessageHandler(
db: dbTypes.DbType,
session: any,
message: string
) {
let { sessionId } = session
let name = projectName(await projectPath(db, sessionId))
try {
let resp = JSON.parse(message)
if (resp.msgType == 'updateComponents') {
env.logInfo(
`StudioUC(${name}): Received WebSocket message: ${JSON.stringify(
resp.delta
)}`
)
sendSelectedUcComponents(db, session, JSON.parse(resp.tree))
}
} catch (error) {
env.logError(
`StudioUC(${name}): Failed to process WebSocket notification message.`
)
}
}

/**
* Start the dirty flag reporting interval.
*
Expand All @@ -269,33 +343,150 @@ function initIdeIntegration(db: dbTypes.DbType, studioPort: number) {
studioHttpPort = studioPort

if (studioPort) {
ucComponentStateReportId = setInterval(() => {
sendUcComponentStateReport(db)
}, UC_COMPONENT_STATE_REPORTING_INTERVAL_ID)
heartbeatId = setInterval(async () => {
let sessions = await querySession.getAllSessions(db)
for (const session of sessions) {
await verifyWsConnection(db, session, wsMessageHandler)
}
}, heartbeatDelay)
}
}

/**
* Check WebSocket connections between backend and Studio jetty server.
* If project is opened, verify connection is open.
* If project is closed, close ws connection as well.
*
* @param db
* @param sessionId
*/
async function verifyWsConnection(
db: dbTypes.DbType,
session: any,
messageHandler: StudioWsMessage
) {
try {
let { sessionId } = session
let path = await projectPath(db, sessionId)
if (path) {
if (await isProjectActive(path)) {
await wsConnect(db, session, path, messageHandler)
} else {
wsDisconnect(db, session, path)
}
}
} catch (error: any) {
env.logInfo(error.toString())
}
}

/**
* Utility function for making websocket connection to Studio server
* @param sessionId
* @param path
* @returns
*/
async function wsConnect(
db: dbTypes.DbType,
session: any,
path: StudioProjectPath,
handler: StudioWsMessage
) {
let { sessionId } = session
let ws = studioWsConnections[sessionId]
if (ws && ws.readyState == WebSocket.OPEN) {
return ws
} else {
ws?.terminate()

let wsPath = wsApiUrl(StudioWsAPI.WsServerNotification, path)
let name = projectName(path)
ws = new WebSocket(wsPath)
env.logInfo(`StudioUC(${name}): WS connecting to ${wsPath}`)

ws.on('error', function () {
studioWsConnections[sessionId] = null
return null
})

ws.on('open', function () {
studioWsConnections[sessionId] = ws
env.logInfo(`StudioUC(${name}): WS connected.`)
return ws
})

ws.on('message', function (data) {
handler(db, session, data.toString())
})
}
}

async function wsDisconnect(
db: dbTypes.DbType,
session: any,
path: StudioProjectPath
) {
let { sessionId } = session
if (studioWsConnections[sessionId]) {
env.logInfo(`StudioUC(${projectName(path)}): WS disconnected.`)
studioWsConnections[sessionId]?.close()
studioWsConnections[sessionId] = null
}
}

/**
* Check if a specific Studio project (.slcp) file has been opened or not.
*
* Context: To get proper WebSocket notification for change in project states,
* that specific project needs to be opened already. Otherwise, no notification
* will happen.
*
* DependsComponent API used as a quick way to check if the project is opened or not
* If project is open/valid, the API will respond with "Component not found in project"
* Otherwise, "Project does not exists"
*
* @param path
*/
async function isProjectActive(path: StudioProjectPath): Promise<boolean> {
if (!path) {
return false
}

let url = restApiUrl(StudioRestAPI.DependsComponent, path)
return axios
.get(url)
.then((resp) => {
return true
})
.catch((err) => {
let { response } = err
if (response.status == StatusCodes.BAD_REQUEST && response.data) {
return !response.data.includes('Project does not exists')
}

return false
})
}

/**
* Clears up the reporting interval.
*/
function deinitIdeIntegration() {
if (ucComponentStateReportId) clearInterval(ucComponentStateReportId)
if (heartbeatId) clearInterval(heartbeatId)
}

async function sendUcComponentStateReport(db: dbTypes.DbType) {
let sessions = await querySession.getAllSessions(db)
for (const session of sessions) {
let socket = wsServer.clientSocket(session.sessionKey)
let studioIntegration = await integrationEnabled(db, session.sessionId)
if (socket && studioIntegration) {
getProjectInfo(db, session.sessionId).then((resp) => {
if (resp.status == http.StatusCodes.OK)
wsServer.sendWebSocketMessage(socket, {
category: dbEnum.wsCategory.ucComponentStateReport,
payload: resp.data,
})
})
}
async function sendSelectedUcComponents(
db: dbTypes.DbType,
session: any,
ucComponentStates: string
) {
let socket = wsServer.clientSocket(session.sessionKey)
let studioIntegration = await integrationEnabled(db, session.sessionId)
if (socket && studioIntegration) {
wsServer.sendWebSocketMessage(socket, {
category: dbEnum.wsCategory.updateSelectedUcComponents,
payload: ucComponentStates,
})
}
}

Expand Down
Loading

0 comments on commit d80c44c

Please sign in to comment.