Skip to content

Commit

Permalink
Merge branch 'mqtt-homeassistant-duplicate-channels-fix' into pr-1351…
Browse files Browse the repository at this point in the history
…8-and-13621
  • Loading branch information
ssalonen committed Nov 5, 2022
2 parents eb0c5c5 + f40d3e8 commit db22a22
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
*/
package org.openhab.binding.mqtt.homeassistant.internal.handler;

import java.util.Collection;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -51,6 +52,7 @@
import org.openhab.core.thing.ThingStatusDetail;
import org.openhab.core.thing.ThingTypeUID;
import org.openhab.core.thing.ThingUID;
import org.openhab.core.thing.binding.builder.ThingBuilder;
import org.openhab.core.thing.type.ChannelDefinition;
import org.openhab.core.thing.type.ChannelGroupDefinition;
import org.openhab.core.thing.type.ChannelGroupType;
Expand Down Expand Up @@ -82,6 +84,8 @@
public class HomeAssistantThingHandler extends AbstractMQTTThingHandler
implements ComponentDiscovered, Consumer<List<AbstractComponent<?>>> {
public static final String AVAILABILITY_CHANNEL = "availability";
private static final Comparator<Channel> CHANNEL_COMPARATOR_BY_UID = Comparator
.comparing(channel -> channel.getUID().toString());;

private final Logger logger = LoggerFactory.getLogger(HomeAssistantThingHandler.class);

Expand Down Expand Up @@ -260,7 +264,7 @@ public void componentDiscovered(HaID homeAssistantTopicID, AbstractComponent<?>
* Callback of {@link DelayedBatchProcessing}.
* Add all newly discovered components to the Thing and start the components.
*/
@SuppressWarnings("null")
@SuppressWarnings({ "null" })
@Override
public void accept(List<AbstractComponent<?>> discoveredComponentsList) {
MqttBrokerConnection connection = this.connection;
Expand Down Expand Up @@ -293,13 +297,44 @@ public void accept(List<AbstractComponent<?>> discoveredComponentsList) {
return null;
});

Collection<Channel> channels = discovered.getChannelMap().values().stream()
List<Channel> discoveredChannels = discovered.getChannelMap().values().stream()
.map(ComponentChannel::getChannel).collect(Collectors.toList());
ThingHelper.addChannelsToThing(thing, channels);
if (known != null) {
// We had previously known component with different config hash
// We remove all conflicting old channels, they will be re-added below based on the new discovery
logger.debug(
"Received component {} with slightly different config. Making sure we re-create conflicting channels...",
discovered.getGroupUID());
removeJustRediscoveredChannels(discoveredChannels);
}

// Add newly discovered channels. We sort the channels
// for (mostly) consistent jsondb serialization
discoveredChannels.sort(CHANNEL_COMPARATOR_BY_UID);
ThingHelper.addChannelsToThing(thing, discoveredChannels);
}
updateThingType();
}
}

private void removeJustRediscoveredChannels(List<Channel> discoveredChannels) {
ArrayList<Channel> mutableChannels = new ArrayList<>(getThing().getChannels());
Set<ChannelUID> newChannelUIDs = discoveredChannels.stream().map(Channel::getUID).collect(Collectors.toSet());
// Take current channels but remove those channels that were just re-discovered
List<Channel> existingChannelsWithNewlyDiscoveredChannelsRemoved = mutableChannels.stream()
.filter(existingChannel -> !newChannelUIDs.contains(existingChannel.getUID()))
.collect(Collectors.toList());
if (existingChannelsWithNewlyDiscoveredChannelsRemoved.size() < mutableChannels.size()) {
// We sort the channels for (mostly) consistent jsondb serialization
existingChannelsWithNewlyDiscoveredChannelsRemoved.sort(CHANNEL_COMPARATOR_BY_UID);
updateThingChannels(existingChannelsWithNewlyDiscoveredChannelsRemoved);
}
}

updateThingType();
private void updateThingChannels(List<Channel> channelList) {
ThingBuilder thingBuilder = editThing();
thingBuilder.withChannels(channelList);
updateThing(thingBuilder.build());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

import org.eclipse.jdt.annotation.NonNullByDefault;
Expand All @@ -32,7 +33,9 @@
import org.openhab.binding.mqtt.homeassistant.internal.HaID;
import org.openhab.binding.mqtt.homeassistant.internal.HandlerConfiguration;
import org.openhab.binding.mqtt.homeassistant.internal.component.Climate;
import org.openhab.binding.mqtt.homeassistant.internal.component.Sensor;
import org.openhab.binding.mqtt.homeassistant.internal.component.Switch;
import org.openhab.core.thing.Channel;
import org.openhab.core.thing.binding.ThingHandlerCallback;

/**
Expand Down Expand Up @@ -60,6 +63,7 @@ public class HomeAssistantThingHandlerTests extends AbstractHomeAssistantTests {

private @Mock @NonNullByDefault({}) ThingHandlerCallback callbackMock;
private @NonNullByDefault({}) HomeAssistantThingHandler thingHandler;
private @NonNullByDefault({}) HomeAssistantThingHandler nonSpyThingHandler;

@BeforeEach
public void setup() {
Expand All @@ -74,6 +78,7 @@ public void setup() {
SUBSCRIBE_TIMEOUT, ATTRIBUTE_RECEIVE_TIMEOUT);
thingHandler.setConnection(bridgeConnection);
thingHandler.setCallback(callbackMock);
nonSpyThingHandler = thingHandler;
thingHandler = spy(thingHandler);
}

Expand Down Expand Up @@ -117,6 +122,108 @@ public void testInitialize() {
verify(channelTypeProvider, times(2)).setChannelGroupType(any(), any());
}

/**
* Test where the same component is published twice to MQTT. The binding should handle this.
*
* @throws InterruptedException
*/
@Test
public void testDuplicateComponentPublish() throws InterruptedException {
thingHandler.initialize();

verify(callbackMock).statusUpdated(eq(haThing), any());
// Expect a call to the bridge status changed, the start, the propertiesChanged method
verify(thingHandler).bridgeStatusChanged(any());
verify(thingHandler, timeout(SUBSCRIBE_TIMEOUT)).start(any());

// Expect subscription on each topic from config
MQTT_TOPICS.forEach(t -> {
verify(bridgeConnection, timeout(SUBSCRIBE_TIMEOUT)).subscribe(eq(t), any());
});

verify(thingHandler, never()).componentDiscovered(any(), any());
assertThat(haThing.getChannels().size(), CoreMatchers.is(0));

//
//
// Publish sensor components with identical payload except for
// change in "name" field. The binding should respect the latest discovery result.
//
// This simulates how multiple OpenMQTTGateway devices would publish
// the same discovery topics for a particular Bluetooth sensor, and thus "competing" with similar but slightly
// different discovery topics.
//
// In fact, only difference is actually "via_device" additional metadata field telling which OpenMQTTGateway
// published the discovery topic.
//
//

//
// 1. publish corridor temperature sensor
//
var configTopicTempCorridor = "homeassistant/sensor/tempCorridor/config";
thingHandler.discoverComponents.processMessage(configTopicTempCorridor, new String("{"//
+ "\"temperature_state_topic\": \"+/+/BTtoMQTT/mysensor\","//
+ "\"temperature_state_template\": \"{{ value_json.temperature }}\", "//
+ "\"name\": \"CorridorTemp\", "//
+ "\"unit_of_measurement\": \"°C\" "//
+ "}").getBytes(StandardCharsets.UTF_8));
verify(thingHandler, times(1)).componentDiscovered(eq(new HaID(configTopicTempCorridor)), any(Sensor.class));
thingHandler.delayedProcessing.forceProcessNow();
waitForAssert(() -> {
assertThat("1 channel created", thingHandler.getThing().getChannels().size() == 1);
});

//
// 2. publish outside temperature sensor
//
var configTopicTempOutside = "homeassistant/sensor/tempOutside/config";
thingHandler.discoverComponents.processMessage(configTopicTempOutside, new String("{"//
+ "\"temperature_state_topic\": \"+/+/BTtoMQTT/mysensor\","//
+ "\"temperature_state_template\": \"{{ value_json.temperature }}\", " //
+ "\"name\": \"OutsideTemp\", "//
+ "\"source\": \"gateway2\" "//
+ "}").getBytes(StandardCharsets.UTF_8));
thingHandler.delayedProcessing.forceProcessNow();
verify(thingHandler, times(1)).componentDiscovered(eq(new HaID(configTopicTempOutside)), any(Sensor.class));
waitForAssert(() -> {
assertThat("2 channel created", thingHandler.getThing().getChannels().size() == 2);
});

//
// 3. publish corridor temperature sensor, this time with different name (openHAB channel label)
//
thingHandler.discoverComponents.processMessage(configTopicTempCorridor, new String("{"//
+ "\"temperature_state_topic\": \"+/+/BTtoMQTT/mysensor\","//
+ "\"temperature_state_template\": \"{{ value_json.temperature }}\", "//
+ "\"name\": \"CorridorTemp NEW\", "//
+ "\"unit_of_measurement\": \"°C\" "//
+ "}").getBytes(StandardCharsets.UTF_8));
thingHandler.delayedProcessing.forceProcessNow();

waitForAssert(() -> {
assertThat("2 channel created", thingHandler.getThing().getChannels().size() == 2);
});

//
// verify that both channels are there and the label corresponds to newer discovery topic payload
//
Channel corridorTempChannel = nonSpyThingHandler.getThing().getChannel("tempCorridor_5Fsensor#sensor");
assertThat("Corridor temperature channel is created", corridorTempChannel, CoreMatchers.notNullValue());
Objects.requireNonNull(corridorTempChannel); // for compiler
assertThat("Corridor temperature channel is having the updated label from 2nd discovery topic publish",
corridorTempChannel.getLabel(), CoreMatchers.is("CorridorTemp NEW"));

Channel outsideTempChannel = nonSpyThingHandler.getThing().getChannel("tempOutside_5Fsensor#sensor");
assertThat("Outside temperature channel is created", outsideTempChannel, CoreMatchers.notNullValue());

verify(thingHandler, times(2)).componentDiscovered(eq(new HaID(configTopicTempCorridor)), any(Sensor.class));

waitForAssert(() -> {
assertThat("2 channel created", thingHandler.getThing().getChannels().size() == 2);
});
}

@Test
public void testDispose() {
thingHandler.initialize();
Expand Down

0 comments on commit db22a22

Please sign in to comment.