From 11d30252b0d8e58880492e3524b58f30046c820d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miguel=20=C3=81lvarez?= Date: Wed, 10 Jan 2024 21:38:17 +0100 Subject: [PATCH] [audio/voice] Add pcm audio websocket with dialog support MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Miguel Álvarez --- bundles/org.openhab.core.audio/pom.xml | 6 + .../voice/websocket/PCMWebSocketAdapter.java | 158 +++++++ .../websocket/PCMWebSocketConnection.java | 384 ++++++++++++++++++ .../audio/PCMWebSocketAudioSink.java | 303 ++++++++++++++ .../audio/PCMWebSocketAudioSource.java | 200 +++++++++ 5 files changed, 1051 insertions(+) create mode 100644 bundles/org.openhab.core.voice/src/main/java/org/openhab/core/voice/websocket/PCMWebSocketAdapter.java create mode 100644 bundles/org.openhab.core.voice/src/main/java/org/openhab/core/voice/websocket/PCMWebSocketConnection.java create mode 100644 bundles/org.openhab.core.voice/src/main/java/org/openhab/core/voice/websocket/audio/PCMWebSocketAudioSink.java create mode 100644 bundles/org.openhab.core.voice/src/main/java/org/openhab/core/voice/websocket/audio/PCMWebSocketAudioSource.java diff --git a/bundles/org.openhab.core.audio/pom.xml b/bundles/org.openhab.core.audio/pom.xml index 2603aa967e1..fb49b7f3317 100644 --- a/bundles/org.openhab.core.audio/pom.xml +++ b/bundles/org.openhab.core.audio/pom.xml @@ -45,6 +45,12 @@ ${project.version} test + + org.openhab.core.bundles + org.openhab.core.io.websocket + 4.2.0-SNAPSHOT + compile + diff --git a/bundles/org.openhab.core.voice/src/main/java/org/openhab/core/voice/websocket/PCMWebSocketAdapter.java b/bundles/org.openhab.core.voice/src/main/java/org/openhab/core/voice/websocket/PCMWebSocketAdapter.java new file mode 100644 index 00000000000..ffcab608774 --- /dev/null +++ b/bundles/org.openhab.core.voice/src/main/java/org/openhab/core/voice/websocket/PCMWebSocketAdapter.java @@ -0,0 +1,158 @@ +/** + * Copyright (c) 2010-2024 Contributors to the openHAB project + * + * See the NOTICE file(s) distributed with this work for additional + * information. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.openhab.core.voice.websocket; + +import static java.nio.ByteBuffer.*; +import org.eclipse.jdt.annotation.NonNullByDefault; +import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest; +import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse; +import org.openhab.core.audio.AudioManager; +import org.openhab.core.common.ThreadPoolManager; +import org.openhab.core.events.EventSubscriber; +import org.openhab.core.io.websocket.WebSocketAdapter; +import org.openhab.core.voice.VoiceManager; +import org.osgi.framework.BundleContext; +import org.osgi.service.component.annotations.Activate; +import org.osgi.service.component.annotations.Component; +import org.osgi.service.component.annotations.Reference; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * The {@link PCMWebSocketAdapter} creates instances of {@link PCMWebSocketConnection} to handle pcm audio. + * + * @author Miguel Álvarez Díez - Initial contribution + */ +@NonNullByDefault +@Component(immediate = true, service = { EventSubscriber.class, WebSocketAdapter.class }) +public class PCMWebSocketAdapter implements WebSocketAdapter { + public static final String ADAPTER_ID = "pcm-audio"; + + private final Logger logger = LoggerFactory.getLogger(PCMWebSocketAdapter.class); + private final ScheduledExecutorService executor = ThreadPoolManager.getScheduledPool("pcm-audio-websocket"); + protected final BundleContext bundleContext; + protected final VoiceManager voiceManager; + protected final AudioManager audioManager; + private final ScheduledFuture pingTask; + private final Set speakerConnections = Collections.synchronizedSet(new HashSet<>()); + @Activate + public PCMWebSocketAdapter(BundleContext bundleContext, final @Reference AudioManager audioManager, + final @Reference VoiceManager voiceManager) { + this.bundleContext = bundleContext; + this.audioManager = audioManager; + this.voiceManager = voiceManager; + this.pingTask = executor.scheduleWithFixedDelay(this::pingHandlers, 10, 5, TimeUnit.SECONDS); + } + + private void pingHandlers() { + var handlers = new ArrayList<>(speakerConnections); + for (var handler : handlers) { + if (handler != null) { + boolean pinned = false; + var remote = handler.getRemote(); + if (remote != null) { + try { + remote.sendPing(wrap("oh".getBytes(StandardCharsets.UTF_8))); + pinned = true; + } catch (IOException ignored) { + } + } + if (!pinned) { + logger.warn("ping failed, disconnecting speaker {}", handler.getId()); + var session = handler.getSession(); + if (session != null) { + session.close(); + } + } + } + + } + } + + private void disconnectHandlers() { + logger.debug("Disconnecting {} clients...", speakerConnections.size()); + var handlers = new ArrayList<>(speakerConnections); + for (var handler : handlers) { + onSpeakerDisconnected(handler); + var session = handler.getSession(); + if (session != null) { + try { + session.disconnect(); + } catch (IOException e) { + logger.debug("Disconnect failed: {}", e.getMessage()); + } + } + } + } + + public List getSpeakerConnections() { + synchronized (speakerConnections) { + return new ArrayList<>(speakerConnections); + } + } + + private void onSpeakerConnected(PCMWebSocketConnection speaker) throws IllegalStateException { + synchronized (speakerConnections) { + if (getSpeakerConnection(speaker.getId()) != null) { + throw new IllegalStateException("Another speaker with the same id is already connected"); + } + speakerConnections.add(speaker); + logger.debug("connected speakers {}", speakerConnections.size()); + } + } + + private void onSpeakerDisconnected(PCMWebSocketConnection connection) { + logger.debug("speaker disconnected '{}'", connection.getId()); + synchronized (speakerConnections) { + speakerConnections.remove(connection); + logger.debug("connected speakers {}", speakerConnections.size()); + } + } + + public PCMWebSocketConnection getSpeakerConnection(String id) { + synchronized (speakerConnections) { + return speakerConnections.stream() + .filter(speakerConnection -> speakerConnection.getId().equalsIgnoreCase(id)).findAny().orElse(null); + } + } + + @Override + public String getId() { + return ADAPTER_ID; + } + + @Override + public Object createWebSocket(ServletUpgradeRequest servletUpgradeRequest, + ServletUpgradeResponse servletUpgradeResponse) { + logger.debug("creating connection!"); + return new PCMWebSocketConnection(this, executor); + } + + public synchronized void dispose() { + logger.debug("Unregistering protocols"); + pingTask.cancel(true); + disconnectHandlers(); + } + +} diff --git a/bundles/org.openhab.core.voice/src/main/java/org/openhab/core/voice/websocket/PCMWebSocketConnection.java b/bundles/org.openhab.core.voice/src/main/java/org/openhab/core/voice/websocket/PCMWebSocketConnection.java new file mode 100644 index 00000000000..01d4227cf46 --- /dev/null +++ b/bundles/org.openhab.core.voice/src/main/java/org/openhab/core/voice/websocket/PCMWebSocketConnection.java @@ -0,0 +1,384 @@ +/** + * Copyright (c) 2010-2024 Contributors to the openHAB project + * + * See the NOTICE file(s) distributed with this work for additional + * information. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.openhab.core.voice.websocket; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.eclipse.jdt.annotation.NonNullByDefault; +import org.eclipse.jdt.annotation.Nullable; +import org.eclipse.jetty.websocket.api.RemoteEndpoint; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.WebSocketListener; +import org.eclipse.jetty.websocket.api.annotations.WebSocket; +import org.openhab.core.audio.AudioFormat; +import org.openhab.core.audio.AudioSink; +import org.openhab.core.audio.AudioSource; +import org.openhab.core.voice.KSEdgeService; +import org.openhab.core.voice.KSException; +import org.openhab.core.voice.KSListener; +import org.openhab.core.voice.KSService; +import org.openhab.core.voice.KSServiceHandle; +import org.openhab.core.voice.KSpottedEvent; +import org.openhab.core.voice.text.HumanLanguageInterpreter; +import org.openhab.core.voice.websocket.audio.PCMWebSocketAudioSink; +import org.openhab.core.voice.websocket.audio.PCMWebSocketAudioSource; +import org.osgi.framework.ServiceRegistration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Hashtable; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * The {@link PCMWebSocketConnection} represents a WebSocket connection used to transmit pcm audio. + * + * + * + * @author Miguel Álvarez Díez - Initial contribution + */ +@WebSocket +@NonNullByDefault +@SuppressWarnings("unused") +public class PCMWebSocketConnection implements WebSocketListener { + private final Logger logger = LoggerFactory.getLogger(PCMWebSocketConnection.class); + protected final Map> audioComponentRegistrations = new ConcurrentHashMap<>(); + private volatile @Nullable Session session; + private @Nullable RemoteEndpoint remote; + private final PCMWebSocketAdapter wsAdapter; + private final ScheduledExecutorService executor; + private @Nullable ScheduledFuture scheduledDisconnection; + + private boolean initialized = false; + private @Nullable WebSocketKeywordSpotter ks = null; + + private final ObjectMapper jsonMapper = new ObjectMapper(); + private String id = ""; + private @Nullable PCMWebSocketAudioSource audioSource = null; + + public PCMWebSocketConnection(PCMWebSocketAdapter wsAdapter, ScheduledExecutorService executor) { + this.wsAdapter = wsAdapter; + this.executor = executor; + } + + public void sendAudio(byte[] id, byte[] b) { + try { + var remote = getRemote(); + if (remote != null) { + // concat stream identifier and send + ByteBuffer buff = ByteBuffer.wrap(new byte[id.length + b.length]); + buff.put(id); + buff.put(b); + remote.sendBytesByFuture(ByteBuffer.wrap(buff.array())); + } + } catch (IllegalStateException ignored) { + logger.warn("Unable to send audio buffer"); + } + } + + public void setListening(boolean listening) { + sendClientCommand(new WebSocketCommand(listening ? WebSocketCommand.OutputCommands.START_LISTENING : WebSocketCommand.OutputCommands.STOP_LISTENING)); + } + + public void disconnect() { + var session = getSession(); + if (session != null) { + try { + session.disconnect(); + } catch (IOException ignored) { + } + } + } + + private void spot() { + onRemoteSpot(); + } + + @Override + public void onWebSocketConnect(@Nullable Session sess) { + if (sess == null) { + // never + return; + } + this.session = sess; + this.remote = sess.getRemote(); + logger.info("New client connected."); + scheduledDisconnection = executor.schedule(() -> { + try { + sess.disconnect(); + } catch (IOException ignored) { + } + }, 5, TimeUnit.SECONDS); + } + + private void sendClientCommand(T msg) { + var remote = getRemote(); + if (remote != null) { + try { + remote.sendStringByFuture(new ObjectMapper().writeValueAsString(msg)); + } catch (JsonProcessingException e) { + logger.warn("JsonProcessingException writing JSON message: ", e); + } + } + } + + @Override + public void onWebSocketBinary(byte @Nullable [] payload, int offset, int len) { + logger.trace("Received binary data of length {}", len); + PCMWebSocketAudioSource audioSource = this.audioSource; + if (payload != null && audioSource != null) { + audioSource.writeToStreams(payload); + } + } + + @Override + public void onWebSocketText(@Nullable String message) { + try { + var rootMessageNode = jsonMapper.readTree(message); + if (rootMessageNode.has("cmd")) { + try { + var cmd = rootMessageNode.get("cmd").asText().trim().toUpperCase(); + logger.debug("Handling msg '{}'", cmd); + var messageType = WebSocketCommand.InputCommands.valueOf(cmd); + switch (messageType) { + case INITIALIZE -> { + var initializeArgs = jsonMapper.readValue(rootMessageNode.get("args").asText(), WebSocketCommand.InitializeArgs.class); + var scheduledDisconnection = this.scheduledDisconnection; + if (scheduledDisconnection != null) { + scheduledDisconnection.cancel(true); + } + // update connection settings + id = initializeArgs.id; + registerSpeakerComponents(id, initializeArgs.sinkSampleRate, initializeArgs.sourceSampleRate); + sendClientCommand(new WebSocketCommand(WebSocketCommand.OutputCommands.INITIALIZED)); + } + case ON_SPOT -> onRemoteSpot(); + } + + } catch (IOException | IllegalStateException e) { + logger.warn("Disconnecting client: {}", e.getMessage()); + disconnect(); + } + } + } catch (JsonProcessingException e) { + logger.warn("Exception parsing JSON message: ", e); + } + } + + @Override + public void onWebSocketError(@Nullable Throwable cause) { + logger.warn("WebSocket Error: ", cause); + } + + @Override + public void onWebSocketClose(int statusCode, @Nullable String reason) { + this.session = null; + this.remote = null; + logger.debug("Session closed with code {}: {}", statusCode, reason); + unregisterSpeakerComponents(id); + } + + public void setSinkVolume(int value) { + if (initialized) { + sendClientCommand(new WebSocketCommand(WebSocketCommand.OutputCommands.SINK_VOLUME, Map.of("value", value))); + } + } + + public void setSourceVolume(int value) { + if (initialized) { + sendClientCommand(new WebSocketCommand(WebSocketCommand.OutputCommands.SOURCE_VOLUME, Map.of("value", value))); + } + } + + public @Nullable RemoteEndpoint getRemote() { + return this.remote; + } + + public @Nullable Session getSession() { + return this.session; + } + + public boolean isConnected() { + Session sess = this.session; + return sess != null && sess.isOpen(); + } + + protected synchronized void registerSpeakerComponents(String id, int sourceSampleRate, int sinkSampleRate) throws IOException { + if (id.isBlank()) { + throw new IOException("Unable to register audio components"); + } + String label = "UI (" + id + ")"; + var sinkStereo = false; + @Nullable + String listeningItem = null; + logger.debug("Registering dialog components for '{}' (source sample rate {}, sink sample rate {})", id, sourceSampleRate, sinkSampleRate); + this.initialized = true; + // register source + this.audioSource = new PCMWebSocketAudioSource(getSourceId(id), label, sourceSampleRate, this); + logger.debug("Registering audio source {}", this.audioSource.getId()); + audioComponentRegistrations.put(this.audioSource.getId(), wsAdapter.bundleContext + .registerService(AudioSource.class.getName(), this.audioSource, new Hashtable<>())); + // register sink + var sink = new PCMWebSocketAudioSink(getSinkId(id), label, this, sinkStereo ? 2 : 1, sinkSampleRate); + logger.debug("Registering audio sink {}", sink.getId()); + audioComponentRegistrations.put(sink.getId(), + wsAdapter.bundleContext.registerService(AudioSink.class.getName(), sink, new Hashtable<>())); + // init dialog + this.ks = new WebSocketKeywordSpotter(id); + wsAdapter.voiceManager.startDialog( // + wsAdapter.voiceManager.getDialogContextBuilder() // + .withSource(this.audioSource) // + .withSink(sink) // + .withKS(ks) // + .build() // + ); + } + + protected synchronized void unregisterSpeakerComponents(String id) { + if (initialized) { + var source = wsAdapter.audioManager.getSource(getSourceId(id)); + if (source instanceof PCMWebSocketAudioSource hsAudioSource) { + try { + hsAudioSource.close(); + } catch (Exception ignored) { + } + } + if (source != null) { + try { + wsAdapter.voiceManager.stopDialog(source); + } catch (Exception e) { + logger.debug("Unable to stop dialog for {}", source.getId()); + } + ServiceRegistration sourceReg = audioComponentRegistrations.remove(source.getId()); + if (sourceReg != null) { + logger.debug("Unregistering audio source {}", source.getId()); + sourceReg.unregister(); + } + } + var sink = wsAdapter.audioManager.getSink(getSinkId(id)); + if (sink != null) { + ServiceRegistration sinkReg = audioComponentRegistrations.remove(sink.getId()); + if (sinkReg != null) { + logger.debug("Unregistering audio sink {}", sink.getId()); + sinkReg.unregister(); + } + } + } + } + + protected void onRemoteSpot() { + var ks = this.ks; + if (ks != null) { + ks.emitSpot(); + } + } + + private String getSinkId(String id) { + return "pcm::" + id + "::sink"; + } + + private String getSourceId(String id) { + return "pcm::" + id + "::source"; + } + + public String getId() { + return id; + } + + private class WebSocketCommand { + public final OutputCommands cmd; + public final Map args; + + public WebSocketCommand(OutputCommands cmd) { + this(cmd, new HashMap<>()); + } + public WebSocketCommand(OutputCommands cmd, Map args) { + this.cmd = cmd; + this.args = args; + } + public enum OutputCommands { + INITIALIZED, + START_LISTENING, + STOP_LISTENING, + SINK_VOLUME, + SOURCE_VOLUME, + } + + public enum InputCommands { + INITIALIZE, + ON_SPOT, + } + public class InitializeArgs { + String id = ""; + int sourceSampleRate; + int sinkSampleRate; + public InitializeArgs() { } + } + } + + /** + * Anonymous keyword spotter used to trigger the dialog + */ + private class WebSocketKeywordSpotter implements KSEdgeService { + private @Nullable KSListener ksListener = null; + private final String id; + public WebSocketKeywordSpotter(String id) { + this.id = id; + } + @Override + public KSServiceHandle spot(KSListener ksListener) throws KSException { + this.ksListener = ksListener; + return () -> { + if(ksListener.equals(this.ksListener)) { + this.ksListener = null; + } + }; + } + + public void emitSpot() { + var ksListener = this.ksListener; + if(ksListener != null) { + ksListener.ksEventReceived(new KSpottedEvent()); + } + } + + @Override + public String getId() { + return "audiows::" + id + "::ks"; + } + + @Override + public String getLabel(@Nullable Locale locale) { + // never shown + return "Anonymous"; + } + + @Override + public Set getSupportedLocales() { + return Set.of(); + } + + @Override + public Set getSupportedFormats() { + return Set.of(AudioFormat.WAV, AudioFormat.PCM_SIGNED); + } + } +} diff --git a/bundles/org.openhab.core.voice/src/main/java/org/openhab/core/voice/websocket/audio/PCMWebSocketAudioSink.java b/bundles/org.openhab.core.voice/src/main/java/org/openhab/core/voice/websocket/audio/PCMWebSocketAudioSink.java new file mode 100644 index 00000000000..9996f26d2bf --- /dev/null +++ b/bundles/org.openhab.core.voice/src/main/java/org/openhab/core/voice/websocket/audio/PCMWebSocketAudioSink.java @@ -0,0 +1,303 @@ +/** + * Copyright (c) 2010-2024 Contributors to the openHAB project + * + * See the NOTICE file(s) distributed with this work for additional + * information. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.openhab.core.voice.websocket.audio; + +import javax.sound.sampled.AudioInputStream; +import javax.sound.sampled.AudioSystem; +import org.eclipse.jdt.annotation.NonNullByDefault; +import org.eclipse.jdt.annotation.Nullable; +import org.openhab.core.audio.AudioFormat; +import org.openhab.core.audio.AudioSink; +import org.openhab.core.audio.AudioStream; +import org.openhab.core.audio.FixedLengthAudioStream; +import org.openhab.core.audio.PipedAudioStream; +import org.openhab.core.audio.SizeableAudioStream; +import org.openhab.core.audio.UnsupportedAudioFormatException; +import org.openhab.core.audio.UnsupportedAudioStreamException; +import org.openhab.core.audio.utils.AudioWaveUtils; +import org.openhab.core.library.types.PercentType; +import org.openhab.core.voice.websocket.PCMWebSocketConnection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InterruptedIOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.security.SecureRandom; +import java.time.Duration; +import java.time.Instant; +import java.util.HashSet; +import java.util.Locale; +import java.util.Objects; +import java.util.Set; + +/** + * This is an AudioSource from an input channel of the host. + * + * @author Miguel Álvarez Díez - Initial contribution + * + */ +@NonNullByDefault +public class PCMWebSocketAudioSink implements AudioSink { + /** + * Byte send to the sink after last chunk to indicate that streaming has ended. + * Should try to be sent event on and error as the client should be aware that data transmission has ended. + */ + private static byte TERMINATION_BYTE = (byte) 0; + private final HashSet supportedFormats = new HashSet<>(); + private static final HashSet> SUPPORTED_STREAMS = new HashSet<>(); + + static { + SUPPORTED_STREAMS.add(FixedLengthAudioStream.class); + SUPPORTED_STREAMS.add(PipedAudioStream.class); + } + private final Logger logger = LoggerFactory.getLogger(PCMWebSocketAudioSink.class); + + private final String sinkId; + private final String sinkLabel; + private final PCMWebSocketConnection speakerIO; + private final int channelNumber; + private final long clientSampleRate; + private PercentType sinkVolume = new PercentType(100); + + public PCMWebSocketAudioSink(String id, String label, PCMWebSocketConnection speakerIO, int channelNumber, + long clientSampleRate) { + this.sinkId = id; + this.sinkLabel = label; + this.speakerIO = speakerIO; + this.channelNumber = channelNumber; + this.clientSampleRate = clientSampleRate; + supportedFormats.add(AudioFormat.PCM_SIGNED); + supportedFormats.add(AudioFormat.WAV); + } + + @Override + public String getId() { + return this.sinkId; + } + + @Override + public @Nullable String getLabel(@Nullable Locale locale) { + return this.sinkLabel; + } + + @Override + public void process(@Nullable AudioStream audioStream) + throws UnsupportedAudioFormatException, UnsupportedAudioStreamException { + if (audioStream == null) { + return; + } + var format = audioStream.getFormat(); + InputStream convertedAudioStream = null; + OutputStream outputStream = null; + try { + long duration = -1; + if (AudioFormat.CONTAINER_WAVE.equals(audioStream.getFormat().getContainer())) { + logger.debug("Removing wav container from data"); + try { + AudioWaveUtils.removeFMT(audioStream); + } catch (IOException e) { + logger.warn("IOException trying to remove wav header: {}", e.getMessage()); + } + } + if (audioStream instanceof SizeableAudioStream sizeableAudioStream) { + long length = sizeableAudioStream.length(); + var audioFormat = audioStream.getFormat(); + long byteRate = (Objects.requireNonNull(audioFormat.getBitDepth()) / 8) + * Objects.requireNonNull(audioFormat.getFrequency()) + * Objects.requireNonNull(audioFormat.getChannels()); + float durationInSeconds = (float) length / byteRate; + duration = Math.round(durationInSeconds * 1000); + logger.debug("Duration of input stream : {}", duration); + } + + if (audioStream instanceof PipedAudioStream && isDirectStreamSupported(format)) { + int channels = format.getChannels() != null ? format.getChannels() : 1; + StreamType type = channels == 1 ? StreamType.PCM16BitMono : StreamType.PCM16BitStereo; + outputStream = new PCMWebSocketOutputStream(speakerIO, type); + convertedAudioStream = getPCMStreamNormalized(audioStream, audioStream.getFormat()); + transferAudio(convertedAudioStream, outputStream, duration); + } else { + var streamType = channelNumber == 1 ? StreamType.PCM16BitMono : StreamType.PCM16BitStereo; + outputStream = new PCMWebSocketOutputStream(speakerIO, streamType); + convertedAudioStream = getPCMStreamNormalized(audioStream, audioStream.getFormat()); + transferAudio(convertedAudioStream, outputStream, duration); + } + } catch (InterruptedIOException ignored) { + } catch (IOException e) { + logger.warn("IOException: {}", e.getMessage()); + } catch (InterruptedException e) { + logger.warn("InterruptedException: {}", e.getMessage()); + } finally { + if (convertedAudioStream != null) { + try { + convertedAudioStream.close(); + } catch (IOException e) { + logger.warn("IOException: {}", e.getMessage(), e); + } + } + try { + audioStream.close(); + } catch (IOException e) { + logger.warn("IOException: {}", e.getMessage(), e); + } + try { + if (outputStream != null) { + outputStream.close(); + } + } catch (IOException e) { + logger.warn("IOException: {}", e.getMessage(), e); + } + } + } + + private boolean isDirectStreamSupported(AudioFormat format) { + var bigEndian = format.isBigEndian(); + var bitDepth = format.getBitDepth(); + return AudioFormat.PCM_SIGNED.isCompatible(format) && // + bitDepth != null && bitDepth == 16 && // + bigEndian != null && !bigEndian; + } + + private InputStream getPCMStreamNormalized(InputStream pcmInputStream, AudioFormat format) { + if (format.getChannels() != channelNumber + || format.getBitDepth() != 16 + || format.getFrequency() != clientSampleRate) { + logger.debug("Sound is not in the target format. Trying to re-encode it"); + javax.sound.sampled.AudioFormat jFormat = new javax.sound.sampled.AudioFormat( // + (float) format.getFrequency(), // + format.getBitDepth(), // + format.getChannels(), // + true, // + false // + ); + javax.sound.sampled.AudioFormat fixedJFormat = new javax.sound.sampled.AudioFormat( // + (float) clientSampleRate, // + 16, // + channelNumber, // + true, // + false // + ); + return AudioSystem.getAudioInputStream(fixedJFormat, new AudioInputStream(pcmInputStream, jFormat, -1)); + } else { + return pcmInputStream; + } + } + + private void transferAudio(InputStream inputStream, OutputStream outputStream, long duration) + throws IOException, InterruptedException { + Instant start = Instant.now(); + try { + inputStream.transferTo(outputStream); + } finally { + try { + // send a byte indicating this stream has ended, so it can be tear down on the client + outputStream.write(new byte[] { TERMINATION_BYTE }, 0, 1); + } catch (IOException e) { + logger.warn("Unable to send termination byte to sink {}", sinkId); + } + } + if (duration != -1) { + Instant end = Instant.now(); + long millisSecondTimedToSendAudioData = Duration.between(start, end).toMillis(); + if (millisSecondTimedToSendAudioData < duration) { + long timeToSleep = duration - millisSecondTimedToSendAudioData; + logger.debug("Sleep time to let the system play sound : {}ms", timeToSleep); + Thread.sleep(timeToSleep); + } + } + } + + @Override + public Set getSupportedFormats() { + return supportedFormats; + } + + @Override + public Set> getSupportedStreams() { + return SUPPORTED_STREAMS; + } + + @Override + public PercentType getVolume() throws IOException { + return this.sinkVolume; + } + + @Override + public void setVolume(PercentType percentType) throws IOException { + this.sinkVolume = percentType; + speakerIO.setSinkVolume(percentType.intValue()); + } + + private static class PCMWebSocketOutputStream extends OutputStream { + private final byte[] id; + private final PCMWebSocketConnection speakerIO; + private boolean closed = false; + + public PCMWebSocketOutputStream(PCMWebSocketConnection speakerIO, StreamType streamFormat) { + this.speakerIO = speakerIO; + this.id = generateId(streamFormat); + } + + @Override + public void write(int b) throws IOException { + write(ByteBuffer.allocate(4).putInt(b).array()); + } + + @Override + public void write(byte @Nullable [] b, int off, int len) throws IOException { + if (closed) { + throw new IOException("Stream closed"); + } + if (b != null) { + speakerIO.sendAudio(id, b); + } + } + + @Override + public void close() throws IOException { + closed = true; + super.close(); + } + + private static byte[] generateId(StreamType streamFormat) { + SecureRandom sr = new SecureRandom(); + byte[] rndBytes = new byte[5]; + sr.nextBytes(rndBytes); + rndBytes[4] = streamFormat.get(); + return rndBytes; + } + } + + /** + * Byte sent in the 5th position of each chunk that indicates sample format and channels + */ + private enum StreamType { + // 16bit int 1 channel little-endian + PCM16BitMono((byte) 1), + // 16bit int 2 channel little-endian + PCM16BitStereo((byte) 2); + + private final byte b; + + StreamType(byte b) { + this.b = b; + } + + public byte get() { + return this.b; + } + } +} diff --git a/bundles/org.openhab.core.voice/src/main/java/org/openhab/core/voice/websocket/audio/PCMWebSocketAudioSource.java b/bundles/org.openhab.core.voice/src/main/java/org/openhab/core/voice/websocket/audio/PCMWebSocketAudioSource.java new file mode 100644 index 00000000000..8d2ae0c7ba0 --- /dev/null +++ b/bundles/org.openhab.core.voice/src/main/java/org/openhab/core/voice/websocket/audio/PCMWebSocketAudioSource.java @@ -0,0 +1,200 @@ +/** + * Copyright (c) 2010-2024 Contributors to the openHAB project + * + * See the NOTICE file(s) distributed with this work for additional + * information. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.openhab.core.voice.websocket.audio; + +import javax.sound.sampled.AudioInputStream; +import javax.sound.sampled.AudioSystem; +import org.eclipse.jdt.annotation.NonNullByDefault; +import org.eclipse.jdt.annotation.Nullable; +import org.openhab.core.audio.AudioException; +import org.openhab.core.audio.AudioFormat; +import org.openhab.core.audio.AudioSource; +import org.openhab.core.audio.AudioStream; +import org.openhab.core.audio.PipedAudioStream; +import org.openhab.core.voice.websocket.PCMWebSocketConnection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; +import java.util.Locale; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; + +/** + * This is an AudioSource from a websocket connection. + * + * @author Miguel Álvarez Díez - Initial contribution + * + */ +@NonNullByDefault +public class PCMWebSocketAudioSource implements AudioSource { + private final Logger logger = LoggerFactory.getLogger(PCMWebSocketAudioSource.class); + public static int SUPPORTED_BIT_DEPTH = 16; + public static int SUPPORTED_SAMPLE_RATE = 16000; + public static int SUPPORTED_CHANNELS = 1; + public static AudioFormat SUPPORTED_FORMAT = new AudioFormat(AudioFormat.CONTAINER_WAVE, + AudioFormat.CODEC_PCM_SIGNED, false, SUPPORTED_BIT_DEPTH, null, (long) SUPPORTED_SAMPLE_RATE, + SUPPORTED_CHANNELS); + private final String sourceId; + private final String sourceLabel; + private final PCMWebSocketConnection websocket; + private @Nullable PipedOutputStream sourceAudioPipedOutput; + private @Nullable PipedInputStream sourceAudioPipedInput; + private @Nullable InputStream sourceAudioStream; + private final PipedAudioStream.Group streamGroup = PipedAudioStream.newGroup(SUPPORTED_FORMAT); + private final int streamSampleRate; + + public PCMWebSocketAudioSource(String id, String label, int streamSampleRate, + PCMWebSocketConnection websocket) { + this.sourceId = id; + this.sourceLabel = label; + this.websocket = websocket; + this.streamSampleRate = streamSampleRate; + } + + @Override + public String getId() { + return this.sourceId; + } + + @Override + public String getLabel(@Nullable Locale locale) { + return this.sourceLabel; + } + + @Override + public Set getSupportedFormats() { + return Set.of(SUPPORTED_FORMAT); + } + + @Override + public AudioStream getInputStream(AudioFormat audioFormat) throws AudioException { + try { + final PipedAudioStream stream = streamGroup.getAudioStreamInGroup(); + stream.onClose(this::onStreamClose); + onStreamCreated(); + return stream; + } catch (IOException e) { + throw new AudioException(e); + } + } + + public void close() throws Exception { + streamGroup.close(); + } + + public void writeToStreams(byte[] payload) { + if (this.sourceAudioStream == null || this.sourceAudioPipedOutput == null) { + logger.debug("Source already disposed ignoring data"); + return; + } + byte[] convertedPayload; + try { + this.sourceAudioPipedOutput.write(payload); + int resampledLength = (payload.length) / (streamSampleRate / SUPPORTED_SAMPLE_RATE); + logger.trace("resampling payload size {} => {}", payload.length, resampledLength); + convertedPayload = this.sourceAudioStream.readNBytes(resampledLength); + } catch (IOException e) { + logger.error("Error writing source audio", e); + return; + } + streamGroup.write(convertedPayload); + } + + private void onStreamCreated() { + logger.debug("Registering source stream for '{}'", getId()); + synchronized (streamGroup) { + if (this.streamGroup.isEmpty()) { + try { + var pipedOutput = new PipedOutputStream(); + this.sourceAudioPipedOutput = pipedOutput; + var pipedInput = new PipedInputStream(pipedOutput, 4096 * 4); + this.sourceAudioPipedInput = pipedInput; + if (streamSampleRate != PCMWebSocketAudioSource.SUPPORTED_SAMPLE_RATE) { + logger.debug("Enabling audio resampling for the audio source stream: {} => {}", + streamSampleRate, PCMWebSocketAudioSource.SUPPORTED_SAMPLE_RATE); + this.sourceAudioStream = getPCMStreamNormalized(this.sourceAudioPipedInput); + } else { + logger.debug("Audio source stream sample rate {}, no resampling needed", + SUPPORTED_SAMPLE_RATE); + this.sourceAudioStream = pipedInput; + } + } catch (IOException e) { + logger.error("Unable to setup audio source stream", e); + } + logger.debug("Send start listening {}", getId()); + websocket.setListening(true); + } + } + } + + /** + * Ensure right PCM format by converting if needed (sample rate, channel) + * + * @param stream PCM input stream + * @return A PCM normalized stream at the desired format + */ + private AudioInputStream getPCMStreamNormalized(InputStream stream) { + javax.sound.sampled.AudioFormat jFormat = new javax.sound.sampled.AudioFormat( // + (float) streamSampleRate, // + SUPPORTED_BIT_DEPTH, // + SUPPORTED_CHANNELS, // + true, // + false // + ); + javax.sound.sampled.AudioFormat fixedJFormat = new javax.sound.sampled.AudioFormat( // + (float) SUPPORTED_SAMPLE_RATE, // + SUPPORTED_BIT_DEPTH, // + SUPPORTED_CHANNELS, // + true, // + false // + ); + logger.debug("Sound is not in the target format. Trying to re-encode it"); + return AudioSystem.getAudioInputStream(fixedJFormat, new AudioInputStream(stream, jFormat, -1)); + } + + private void onStreamClose() { + logger.debug("Unregister source audio stream for '{}'", getId()); + synchronized (streamGroup) { + if (streamGroup.isEmpty()) { + logger.debug("Send stop listening {}", getId()); + websocket.setListening(false); + logger.debug("Disposing audio source internal resources for '{}'", getId()); + if (this.sourceAudioStream != null) { + try { + this.sourceAudioStream.close(); + } catch (IOException ignored) { + } + this.sourceAudioStream = null; + } + if (this.sourceAudioPipedOutput != null) { + try { + this.sourceAudioPipedOutput.close(); + } catch (IOException ignored) { + } + this.sourceAudioPipedOutput = null; + } + if (this.sourceAudioPipedInput != null) { + try { + this.sourceAudioPipedInput.close(); + } catch (IOException ignored) { + } + this.sourceAudioPipedInput = null; + } + } + } + } +}