Skip to content

Commit

Permalink
Add support for clearing previously sent status messages (#783)
Browse files Browse the repository at this point in the history
### Changelog
Add support for clearing previously sent status messages

### Docs
None

### Description
Adds the `removeStatus` operation which can be used to clear a
previously sent status message. To identify which status message shall
be cleared, a `id` field has been added to the status message.

Adapted implementations:
- [x] typescript
- [x] c++
- [x] python
  • Loading branch information
achim-k authored Jul 25, 2024
1 parent 63c38fa commit 5def76d
Show file tree
Hide file tree
Showing 10 changed files with 190 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,18 @@ struct ServerHandlers {
std::function<void(const std::string&, uint32_t, ConnectionHandle)> fetchAssetHandler;
};

enum class StatusLevel : uint8_t {
Info = 0,
Warning = 1,
Error = 2,
};

struct Status {
StatusLevel level;
std::string message;
std::optional<std::string> id = std::nullopt;
};

template <typename ConnectionHandle>
class ServerInterface {
public:
Expand Down Expand Up @@ -108,6 +120,8 @@ class ServerInterface {
const MapOfSets& advertisedServices) = 0;
virtual void sendFetchAssetResponse(ConnectionHandle clientHandle,
const FetchAssetResponse& response) = 0;
virtual void sendStatus(const Status& status) = 0;
virtual void removeStatus(const std::vector<std::string>& statusIds) = 0;

virtual uint16_t getPort() = 0;
virtual std::string remoteEndpointString(ConnectionHandle clientHandle) = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,6 @@ const std::unordered_map<ClientBinaryOpcode, std::string> CAPABILITY_BY_CLIENT_B
{ClientBinaryOpcode::SERVICE_CALL_REQUEST, CAPABILITY_SERVICES},
};

enum class StatusLevel : uint8_t {
Info = 0,
Warning = 1,
Error = 2,
};

constexpr websocketpp::log::level StatusLevelToLogLevel(StatusLevel level) {
switch (level) {
case StatusLevel::Info:
Expand Down Expand Up @@ -146,22 +140,22 @@ class Server final : public ServerInterface<ConnHandle> {
void sendMessage(ConnHandle clientHandle, ChannelId chanId, uint64_t timestamp,
const uint8_t* payload, size_t payloadSize) override;
void sendStatusAndLogMsg(ConnHandle clientHandle, const StatusLevel level,
const std::string& message);
const std::string& message,
const std::optional<std::string>& id = std::nullopt);
void broadcastTime(uint64_t timestamp) override;
void sendServiceResponse(ConnHandle clientHandle, const ServiceResponse& response) override;
void sendServiceFailure(ConnHandle clientHandle, ServiceId serviceId, uint32_t callId,
const std::string& message) override;
void updateConnectionGraph(const MapOfSets& publishedTopics, const MapOfSets& subscribedTopics,
const MapOfSets& advertisedServices) override;
void sendFetchAssetResponse(ConnHandle clientHandle, const FetchAssetResponse& response) override;
void sendStatus(ConnHandle clientHandle, const Status& status);
void sendStatus(const Status& status) override;
void removeStatus(const std::vector<std::string>& statusIds) override;

uint16_t getPort() override;
std::string remoteEndpointString(ConnHandle clientHandle) override;

typename ServerType::endpoint_type& getEndpoint() & {
return _server;
}

private:
struct ClientInfo {
std::string name;
Expand Down Expand Up @@ -591,21 +585,33 @@ inline void Server<ServerConfiguration>::sendBinary(ConnHandle hdl, const uint8_
}
}

template <typename ServerConfiguration>
inline void Server<ServerConfiguration>::sendStatus(ConnHandle clientHandle, const Status& status) {
json statusPayload = {
{"op", "status"},
{"level", static_cast<uint8_t>(status.level)},
{"message", status.message},
};

if (status.id) {
statusPayload["id"] = status.id.value();
}

sendJson(clientHandle, std::move(statusPayload));
}

template <typename ServerConfiguration>
inline void Server<ServerConfiguration>::sendStatusAndLogMsg(ConnHandle clientHandle,
const StatusLevel level,
const std::string& message) {
const std::string& message,
const std::optional<std::string>& id) {
const std::string endpoint = remoteEndpointString(clientHandle);
const std::string logMessage = endpoint + ": " + message;
const auto logLevel = StatusLevelToLogLevel(level);
auto logger = level == StatusLevel::Info ? _server.get_alog() : _server.get_elog();
logger.write(logLevel, logMessage);

sendJson(clientHandle, json{
{"op", "status"},
{"level", static_cast<uint8_t>(level)},
{"message", message},
});
sendStatus(clientHandle, {level, message, id});
}

template <typename ServerConfiguration>
Expand Down Expand Up @@ -1066,7 +1072,7 @@ inline void Server<ServerConfiguration>::sendServiceFailure(ConnHandle clientHan
{"serviceId", serviceId},
{"callId", callId},
{"message", message}});
};
}

template <typename ServerConfiguration>
inline void Server<ServerConfiguration>::updateConnectionGraph(
Expand Down Expand Up @@ -1538,4 +1544,25 @@ inline void Server<ServerConfiguration>::sendFetchAssetResponse(
con->send(message);
}

template <typename ServerConfiguration>
inline void Server<ServerConfiguration>::sendStatus(const Status& status) {
std::shared_lock<std::shared_mutex> lock(_clientsMutex);
for (const auto& [hdl, clientInfo] : _clients) {
(void)clientInfo;
sendStatus(hdl, status);
}
}

template <typename ServerConfiguration>
inline void Server<ServerConfiguration>::removeStatus(const std::vector<std::string>& statusIds) {
std::shared_lock<std::shared_mutex> lock(_clientsMutex);
for (const auto& [hdl, clientInfo] : _clients) {
(void)clientInfo;
sendJson(hdl, json{
{"op", "removeStatus"},
{"statusIds", statusIds},
});
}
}

} // namespace foxglove
23 changes: 22 additions & 1 deletion docs/spec.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

- [Server Info](#server-info) (json)
- [Status](#status) (json)
- [Remove Status](#remove-status) (json)
- [Advertise](#advertise) (json)
- [Unadvertise](#unadvertise) (json)
- [Message Data](#message-data) (binary)
Expand Down Expand Up @@ -101,14 +102,34 @@ Each JSON message must be an object containing a field called `op` which identif
- `op`: string `"status"`
- `level`: 0 (info), 1 (warning), 2 (error)
- `message`: string
- `id`: string | undefined. Optional identifier for the status message. Newer status messages with the same identifier should replace previous messages. [removeStatus](#remove-status) can reference the identifier to indicate a status message is no longer valid.

#### Example

```json
{
"op": "status",
"level": 0,
"message": "Some info"
"message": "Some info",
"id": "status-123"
}
```

### Remove Status

- Informs the client that previously sent status message(s) are no longer valid.

#### Fields

- `op`: string `"removeStatus"`
- `statusIds`: array of string, ids of the status messages to be removed. The array must not be empty.

#### Example

```json
{
"op": "removeStatus",
"statusIds": ["status-123"]
}
```

Expand Down
32 changes: 31 additions & 1 deletion python/src/foxglove_websocket/server/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,20 @@ async def broadcast_time(self, timestamp: int):
except ConnectionClosed:
pass

async def send_status(self, level: StatusLevel, msg: str, id: Optional[str] = None):
for client in self._clients:
try:
await self._send_status(client.connection, level, msg, id)
except ConnectionClosed:
pass

async def remove_status(self, statusIds: List[str]):
for client in self._clients:
try:
await self._remove_status(client.connection, statusIds)
except ConnectionClosed:
pass

async def _send_json(
self, connection: WebSocketServerProtocol, msg: ServerJsonMessage
):
Expand Down Expand Up @@ -441,14 +455,30 @@ async def _handle_connection(
await result

async def _send_status(
self, connection: WebSocketServerProtocol, level: StatusLevel, msg: str
self,
connection: WebSocketServerProtocol,
level: StatusLevel,
msg: str,
id: Optional[str] = None,
) -> None:
await self._send_json(
connection,
{
"op": "status",
"level": level,
"message": msg,
"id": id,
},
)

async def _remove_status(
self, connection: WebSocketServerProtocol, statusIds: List[str]
) -> None:
await self._send_json(
connection,
{
"op": "removeStatus",
"statusIds": statusIds,
},
)

Expand Down
7 changes: 7 additions & 0 deletions python/src/foxglove_websocket/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,12 @@ class StatusMessage(TypedDict):
op: Literal["status"]
level: StatusLevel
message: str
id: Optional[str]


class RemoveStatusMessages(TypedDict):
op: Literal["removeStatus"]
statusIds: List[str]


class ChannelWithoutId(TypedDict):
Expand Down Expand Up @@ -195,6 +201,7 @@ class ParameterValues(TypedDict):
ServerJsonMessage = Union[
ServerInfo,
StatusMessage,
RemoveStatusMessages,
Advertise,
Unadvertise,
AdvertiseServices,
Expand Down
1 change: 1 addition & 0 deletions python/tests/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ async def test_warn_invalid_channel():
"op": "status",
"level": 1,
"message": "Channel 999 is not available; ignoring subscription",
"id": None,
}


Expand Down
20 changes: 19 additions & 1 deletion typescript/ws-protocol-examples/src/examples/service-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {
ServerCapability,
Service,
ServiceCallPayload,
StatusLevel,
} from "@foxglove/ws-protocol";
import { Command } from "commander";
import Debug from "debug";
Expand All @@ -29,6 +30,8 @@ async function main(): Promise<void> {
});
setupSigintHandler(log, ws);

let callCount = 0;

const serviceDefRos: Omit<Service, "id"> = {
name: "/set_bool_ros",
type: "std_srvs/SetBool",
Expand Down Expand Up @@ -125,7 +128,11 @@ async function main(): Promise<void> {
throw err;
}

log("Received service call request with %d bytes", request.data.byteLength);
log(
"Received service call request with %d bytes, call count %d",
request.data.byteLength,
callCount,
);

const responseMsg = {
success: true,
Expand All @@ -147,6 +154,17 @@ async function main(): Promise<void> {
data: new DataView(responseData.buffer),
};
server.sendServiceCallResponse(response, clientConnection);

if (callCount % 2 === 0) {
server.sendStatus({
level: StatusLevel.INFO,
message: "Service was called :)",
id: "statusFoo",
});
} else {
server.removeStatus(["statusFoo"]);
}
callCount++;
});
server.on("error", (err) => {
log("server error: %o", err);
Expand Down
6 changes: 6 additions & 0 deletions typescript/ws-protocol/src/FoxgloveClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { parseServerMessage } from "./parse";
import {
BinaryOpcode,
Channel,
RemoveStatusMessages,
ClientBinaryOpcode,
ClientChannel,
ClientChannelId,
Expand Down Expand Up @@ -32,6 +33,7 @@ type EventTypes = {

serverInfo: (event: ServerInfo) => void;
status: (event: StatusMessage) => void;
removeStatus: (event: RemoveStatusMessages) => void;
message: (event: MessageData) => void;
time: (event: Time) => void;
advertise: (newChannels: Channel[]) => void;
Expand Down Expand Up @@ -110,6 +112,10 @@ export default class FoxgloveClient {
this.#emitter.emit("status", message);
return;

case "removeStatus":
this.#emitter.emit("removeStatus", message);
return;

case "advertise":
this.#emitter.emit("advertise", message.channels);
return;
Expand Down
39 changes: 39 additions & 0 deletions typescript/ws-protocol/src/FoxgloveServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import {
ServiceCallPayload,
ServiceCallRequest,
ServiceId,
StatusMessage,
SubscriptionId,
} from "./types";

Expand Down Expand Up @@ -659,4 +660,42 @@ export default class FoxgloveServer {

connection.send(msg);
}

/**
* Send a status message to one or all clients.
*
* @param status Status message
* @param connection Optional connection. If undefined, the status message will be sent to all clients.
*/
sendStatus(status: Omit<StatusMessage, "op">, connection?: IWebSocket): void {
if (connection) {
// Send the status to a single client.
this.#send(connection, { op: "status", ...status });
return;
}

// Send status message to all clients.
for (const client of this.#clients.values()) {
this.sendStatus(status, client.connection);
}
}

/**
* Remove status message(s) for one or for all clients.
* @param statusIds Status ids to be removed.
* @param connection Optional connection. If undefined, the status will be removed for all clients.
*/
removeStatus(statusIds: string[], connection?: IWebSocket): void {
if (connection) {
// Remove status for a single client.
this.#send(connection, { op: "removeStatus", statusIds });
return;
}

// Remove status for all clients.
for (const client of this.#clients.values()) {
this.#send(client.connection, { op: "removeStatus", statusIds });
}
}
}
Loading

0 comments on commit 5def76d

Please sign in to comment.