From b021146a620d6c149c77e129b7049f66ea3a1ea4 Mon Sep 17 00:00:00 2001 From: Mark Herwege Date: Sat, 11 Jan 2025 11:28:56 +0100 Subject: [PATCH 1/3] [network] Make icmp ping and arp ping optional by presence thing (#18083) Signed-off-by: Mark Herwege --- bundles/org.openhab.binding.network/README.md | 13 +++++ .../internal/NetworkHandlerConfiguration.java | 2 + .../network/internal/PresenceDetection.java | 53 ++++++++++++++++--- .../internal/handler/NetworkHandler.java | 5 +- .../network/internal/utils/LatencyParser.java | 26 ++++++--- .../network/internal/utils/NetworkUtils.java | 35 +++++++++++- .../resources/OH-INF/i18n/network.properties | 4 ++ .../resources/OH-INF/thing/thing-types.xml | 17 ++++++ 8 files changed, 136 insertions(+), 19 deletions(-) diff --git a/bundles/org.openhab.binding.network/README.md b/bundles/org.openhab.binding.network/README.md index 7fc819372dfb2..a742e80c95c69 100644 --- a/bundles/org.openhab.binding.network/README.md +++ b/bundles/org.openhab.binding.network/README.md @@ -54,6 +54,12 @@ Use the following options for a **network:pingdevice**: - **timeout:** How long the ping will wait for an answer, in milliseconds. Default: `5000` (5 seconds). - **refreshInterval:** How often the device will be checked, in milliseconds. Default: `60000` (one minute). - **useIOSWakeUp:** When set to true, an additional port knock is performed before a ping. Default: `true`. +- **useArpPing:** When set to true if the presence detection is allowed to use arp ping. + This can speed up presence detection, but may lead to inaccurate ping latency measurements. + Switch off if you want to use this for ping latency monitoring. Default: `true`. +- **useIcmpPing:** When set to true if the presence detection is allowed to use icmp ping. + When also using arp ping, the latency measurements will not be comparable. + Switch off if you rather want to use arp ping latency monitoring. Default: `true`. - **networkInterfaceNames:** The network interface names used for communicating with the device. Limiting the network interfaces reduces the load when arping and Wake-on-LAN are used. Use comma separated values when using textual config. Default: empty (all network interfaces). @@ -190,6 +196,7 @@ demo.things: ```java Thing network:pingdevice:devicename [ hostname="192.168.0.42", macAddress="6f:70:65:6e:48:41", useIOSWakeUp="false" ] +Thing network:pingdevice:router [ hostname="192.168.0.1", useArpPing="false" ] Thing network:speedtest:local "SpeedTest 50Mo" @ "Internet" [url="https://bouygues.testdebit.info/", fileName="50M.iso"] ``` @@ -199,6 +206,8 @@ demo.items: Switch MyDevice { channel="network:pingdevice:devicename:online" } Number:Time MyDeviceResponseTime { channel="network:pingdevice:devicename:latency" } +Number:Time MyRouterResponseTime { channel="network:pingdevice:router:latency" } + String Speedtest_Running "Test running ... [%s]" {channel="network:speedtest:local:isRunning"} Number:Dimensionless Speedtest_Progress "Test progress [%d %unit%]" {channel="network:speedtest:local:progress"} Number:DataTransferRate Speedtest_ResultDown "Downlink [%.2f %unit%]" {channel="network:speedtest:local:rateDown"} @@ -218,6 +227,10 @@ sitemap demo label="Main Menu" Text item=MyDeviceResponseTime label="Device Response Time [%s]" } + Frame { + Text item=MyRouterResponseTime label="Router Response Time [%s]" + } + Frame label="SpeedTest" { Text item=Speedtest_Start Switch item=Speedtest_Running diff --git a/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/NetworkHandlerConfiguration.java b/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/NetworkHandlerConfiguration.java index 6c0445d86963d..e7eee61f3bc8f 100644 --- a/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/NetworkHandlerConfiguration.java +++ b/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/NetworkHandlerConfiguration.java @@ -32,5 +32,7 @@ public class NetworkHandlerConfiguration { public Integer refreshInterval = 60000; public Integer timeout = 5000; public boolean useIOSWakeUp = true; + public boolean useArpPing = true; + public boolean useIcmpPing = true; public Set networkInterfaceNames = Set.of(); } diff --git a/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/PresenceDetection.java b/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/PresenceDetection.java index 04541ebf26eed..75864546989b1 100644 --- a/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/PresenceDetection.java +++ b/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/PresenceDetection.java @@ -71,8 +71,10 @@ public class PresenceDetection implements IPRequestReceivedCallback { private String ipPingState = "Disabled"; protected String arpPingUtilPath = ""; private ArpPingUtilEnum arpPingMethod = ArpPingUtilEnum.DISABLED; - protected @Nullable IpPingMethodEnum pingMethod = null; + protected @Nullable IpPingMethodEnum pingMethod = IpPingMethodEnum.DISABLED; private boolean iosDevice; + private boolean useArpPing; + private boolean useIcmpPing; private Set tcpPorts = new HashSet<>(); private Duration refreshInterval = Duration.ofMinutes(1); @@ -188,7 +190,7 @@ public void setPreferResponseTimeAsLatency(boolean preferResponseTimeAsLatency) public void setUseIcmpPing(@Nullable Boolean useSystemPing) { if (useSystemPing == null) { ipPingState = "Disabled"; - pingMethod = null; + pingMethod = IpPingMethodEnum.DISABLED; } else if (useSystemPing) { final IpPingMethodEnum pingMethod = networkUtils.determinePingMethod(); this.pingMethod = pingMethod; @@ -220,12 +222,17 @@ private void setUseArpPing(boolean enable, @Nullable InetAddress destinationAddr * Sets the path to ARP ping. * * @param enable enable or disable ARP ping - * @param arpPingUtilPath enableDHCPListen(useDHCPsniffing); + * @param arpPingUtilPath path to Arping tool + * @param arpPingUtilMethod Arping tool method */ public void setUseArpPing(boolean enable, String arpPingUtilPath, ArpPingUtilEnum arpPingUtilMethod) { - setUseArpPing(enable, destination.getValue()); - this.arpPingUtilPath = arpPingUtilPath; - this.arpPingMethod = arpPingUtilMethod; + if (!enable) { + arpPingMethod = ArpPingUtilEnum.DISABLED; + } else { + setUseArpPing(enable, destination.getValue()); + this.arpPingUtilPath = arpPingUtilPath; + this.arpPingMethod = arpPingUtilMethod; + } } public String getArpPingState() { @@ -256,6 +263,36 @@ public void setIOSDevice(boolean value) { iosDevice = value; } + /** + * Return true if the device presence detection is also performed using arp ping. This gives + * less accurate ping latency results when used for an IPv4 destination host. + */ + public boolean isUseArpPing() { + return useArpPing; + } + + /** + * Set to true if the device presence detection should also be performed using arp ping. This gives + * less accurate ping latency results when used for an IPv4 destination host. + */ + public void setUseArpPing(boolean useArpPing) { + this.useArpPing = useArpPing; + } + + /** + * Return true if the device presence detection is also performed using icmp ping. + */ + public boolean isUseIcmpPing() { + return useIcmpPing; + } + + /** + * Set to true if the device presence detection should also be performed using icmp ping. + */ + public void setUseIcmPing(boolean useIcmpPing) { + this.useIcmpPing = useIcmpPing; + } + /** * Return the last seen value as an {@link Instant} or null if not yet seen. */ @@ -329,7 +366,7 @@ public CompletableFuture performPresenceDetection() { Set interfaceNames = null; detectionChecks = tcpPorts.size(); - if (pingMethod != null) { + if (pingMethod != IpPingMethodEnum.DISABLED) { detectionChecks += 1; } if (arpPingMethod.canProceed) { @@ -385,7 +422,7 @@ public CompletableFuture performPresenceDetection() { } // ICMP ping - if (pingMethod != null) { + if (pingMethod != IpPingMethodEnum.DISABLED) { addAsyncDetection(completableFutures, () -> { Thread.currentThread().setName("presenceDetectionICMP_" + hostname); if (pingMethod == IpPingMethodEnum.JAVA_PING) { diff --git a/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/handler/NetworkHandler.java b/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/handler/NetworkHandler.java index 3b8472de72b2e..404d79e8f920d 100644 --- a/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/handler/NetworkHandler.java +++ b/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/handler/NetworkHandler.java @@ -183,8 +183,9 @@ void initialize(PresenceDetection presenceDetection) { presenceDetection.setIOSDevice(handlerConfiguration.useIOSWakeUp); // Hand over binding configurations to the network service presenceDetection.setUseDhcpSniffing(configuration.allowDHCPlisten); - presenceDetection.setUseIcmpPing(configuration.allowSystemPings); - presenceDetection.setUseArpPing(true, configuration.arpPingToolPath, configuration.arpPingUtilMethod); + presenceDetection.setUseIcmpPing(handlerConfiguration.useIcmpPing ? configuration.allowSystemPings : null); + presenceDetection.setUseArpPing(handlerConfiguration.useArpPing, configuration.arpPingToolPath, + configuration.arpPingUtilMethod); } this.retries = handlerConfiguration.retry.intValue(); diff --git a/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/utils/LatencyParser.java b/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/utils/LatencyParser.java index bb3bffec5da63..b257df5d3fb9a 100644 --- a/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/utils/LatencyParser.java +++ b/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/utils/LatencyParser.java @@ -12,7 +12,7 @@ */ package org.openhab.binding.network.internal.utils; -import static org.openhab.binding.network.internal.utils.NetworkUtils.millisToDuration; +import static org.openhab.binding.network.internal.utils.NetworkUtils.*; import java.time.Duration; import java.util.regex.Matcher; @@ -31,7 +31,7 @@ @NonNullByDefault public class LatencyParser { - private static final Pattern LATENCY_PATTERN = Pattern.compile(".*time=(.*) ?ms"); + private static final Pattern LATENCY_PATTERN = Pattern.compile(".*time=(.*) ?(u|m)s.*"); private final Logger logger = LoggerFactory.getLogger(LatencyParser.class); // This is how the input looks like on Mac and Linux: @@ -43,18 +43,30 @@ public class LatencyParser { // 1 packets transmitted, 1 packets received, 0.0% packet loss // round-trip min/avg/max/stddev = 1.225/1.225/1.225/0.000 ms + // This is an example of an arping response on Linux: + // arping -c 1 -i eth0 192.168.0.1 + // ARPING 192.168.0.1 + // 60 bytes from xx:xx:xx:xx:xx:xx (192.168.0.1): index=0 time=792.847 usec + // + // --- 192.168.0.1 statistics --- + // 1 packets transmitted, 1 packets received, 0% unanswered (0 extra) + // rtt min/avg/max/std-dev = 0.793/0.793/0.793/0.000 ms + /** - * Examine a single ping command output line and try to extract the latency value if it is contained. + * Examine a single ping or arping command output line and try to extract the latency value if it is contained. * - * @param inputLine Single output line of the ping command. - * @return Latency value provided by the ping command. null if the provided line did not contain a - * latency value which matches the known patterns. + * @param inputLine Single output line of the ping or arping command. + * @return Latency value provided by the ping or arping command. null if the provided line did not + * contain a latency value which matches the known patterns. */ public @Nullable Duration parseLatency(String inputLine) { logger.debug("Parsing latency from input {}", inputLine); Matcher m = LATENCY_PATTERN.matcher(inputLine); - if (m.find() && m.groupCount() == 1) { + if (m.find() && m.groupCount() >= 2) { + if ("u".equals(m.group(2))) { + return microsToDuration(Double.parseDouble(m.group(1).replace(",", "."))); + } return millisToDuration(Double.parseDouble(m.group(1).replace(",", "."))); } diff --git a/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/utils/NetworkUtils.java b/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/utils/NetworkUtils.java index a8bfc3202cee5..19493a6bbdee1 100644 --- a/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/utils/NetworkUtils.java +++ b/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/utils/NetworkUtils.java @@ -55,9 +55,10 @@ public class NetworkUtils { /** - * Nanos per millisecond. + * Nanos per millisecond and microsecond. */ private static final long NANOS_PER_MILLI = 1000_000L; + private static final long NANOS_PER_MICRO = 1000L; /** * Converts a {@link Duration} to milliseconds. @@ -84,6 +85,17 @@ public static Duration millisToDuration(double millis) { return Duration.ofNanos((long) (millis * NANOS_PER_MILLI)); } + /** + * Converts a double representing microseconds to a {@link Duration} instance. + *

+ * + * @param micros the microseconds to be converted + * @return a {@link Duration} instance representing the given microseconds + */ + public static Duration microsToDuration(double micros) { + return Duration.ofNanos((long) (micros * NANOS_PER_MICRO)); + } + private final Logger logger = LoggerFactory.getLogger(NetworkUtils.class); private LatencyParser latencyParser = new LatencyParser(); @@ -286,6 +298,7 @@ public ArpPingUtilEnum determineNativeArpPingMethod(String arpToolPath) { } public enum IpPingMethodEnum { + DISABLED, JAVA_PING, WINDOWS_PING, IPUTILS_LINUX_PING, @@ -414,7 +427,25 @@ public enum ArpPingUtilEnum { // The return code is 0 for a successful ping. 1 if device didn't respond and 2 if there is another error like // network interface not ready. - return new PingResult(proc.waitFor() == 0, Duration.between(execStartTime, Instant.now())); + int result = proc.waitFor(); + if (result != 0) { + return new PingResult(false, Duration.between(execStartTime, Instant.now())); + } + + PingResult pingResult = new PingResult(true, Duration.between(execStartTime, Instant.now())); + try (BufferedReader r = new BufferedReader(new InputStreamReader(proc.getInputStream()))) { + String line = r.readLine(); + while (line != null) { + Duration responseTime = latencyParser.parseLatency(line); + if (responseTime != null) { + pingResult.setResponseTime(responseTime); + return pingResult; + } + line = r.readLine(); + } + } + + return pingResult; } /** diff --git a/bundles/org.openhab.binding.network/src/main/resources/OH-INF/i18n/network.properties b/bundles/org.openhab.binding.network/src/main/resources/OH-INF/i18n/network.properties index 4b03ccc0039ef..787f3f959311f 100644 --- a/bundles/org.openhab.binding.network/src/main/resources/OH-INF/i18n/network.properties +++ b/bundles/org.openhab.binding.network/src/main/resources/OH-INF/i18n/network.properties @@ -41,8 +41,12 @@ thing-type.config.network.pingdevice.retry.label = Retry thing-type.config.network.pingdevice.retry.description = How many refresh interval cycles should a presence detection should take place, before the device is stated as offline thing-type.config.network.pingdevice.timeout.label = Timeout thing-type.config.network.pingdevice.timeout.description = States how long to wait for a response (in ms), before if a device is stated as offline +thing-type.config.network.pingdevice.useArpPing.label = Use ARP Ping +thing-type.config.network.pingdevice.useArpPing.description = Set to true if the presence detection is allowed to use arp ping. This can speed up presence detection, but may lead to inaccurate ping latency measurements. Switch off if you want to use this for ping latency monitoring. thing-type.config.network.pingdevice.useIOSWakeUp.label = Use iOS Wake Up thing-type.config.network.pingdevice.useIOSWakeUp.description = Set to true if the device presence detection should be performed for an iOS device like iPhone or iPads. An additional port knock is performed before a ping. +thing-type.config.network.pingdevice.useIcmpPing.label = Use ICMP Ping +thing-type.config.network.pingdevice.useIcmpPing.description = Set to true if the presence detection is allowed to use icmp ping. If you are monitoring network latency using arping, you should switch this off to prevent mixing results with arp ping results. thing-type.config.network.servicedevice.hostname.label = Hostname or IP thing-type.config.network.servicedevice.hostname.description = Hostname or IP of the device thing-type.config.network.servicedevice.macAddress.label = MAC Address diff --git a/bundles/org.openhab.binding.network/src/main/resources/OH-INF/thing/thing-types.xml b/bundles/org.openhab.binding.network/src/main/resources/OH-INF/thing/thing-types.xml index 5ee4b44cc537e..25d3799bd3656 100644 --- a/bundles/org.openhab.binding.network/src/main/resources/OH-INF/thing/thing-types.xml +++ b/bundles/org.openhab.binding.network/src/main/resources/OH-INF/thing/thing-types.xml @@ -70,6 +70,23 @@ true + + + true + Set to true if the presence detection is allowed to use arp ping. This can speed up presence detection, + but may lead to inaccurate ping latency measurements. Switch off if you want to use this for ping latency + monitoring. + true + + + + + true + Set to true if the presence detection is allowed to use icmp ping. If you are monitoring network + latency using arping, you should switch this off to prevent mixing results with arp ping results. + true + + From 6f35f251f7f1eb75c5c193d68597ca9d3c63badf Mon Sep 17 00:00:00 2001 From: lsiepel Date: Sun, 7 Sep 2025 19:51:15 +0200 Subject: [PATCH 2/3] [network] Fix discovery performance causing a slow openHAB start (#17972) * Fix performance * Improvements * Add logging * Improve logging * Update bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/utils/NetworkUtils.java * Improve thread handling * Improve shutdown * thread cleanup * Improve per thread allocation * Stop on finishing all interfaces * Change Arping to make use of completeablefeature * Seperate presence detection from lifecycle * Improve logging and filtering * Create seperate ScheduledExecutorService * Fix review comment * Improve network address checks * Improve interrupthandling * Revert threadlocal * Refactor Presencedetection * Make LatencyParser accept Windows' <1ms syntax for minimal latency * Remove misleading reference to non-existing NetworkHandlerBuilder * Handle thread-safety of NetworkDiscoveryService.executorService * Fix network interface exclusion * Make PresenceDetectionValue thread-safe * Partial refactoring of PresenceDetection Fixes: - Synchronization of lastSeen - Joining of async tasks - Minor logging improvements Addition: - Create setIcmpPingMethod() * Partial refactoring of NetworkDiscoveryService Fixes: - Correct the number of addresses in a /24 subnet. - Restructure task creation so that one less thread is needed per scanned address, and so that startScan() is guaranteed to return quickly. - Create independent threads for posting discovery results, as this can take a long time and might not finish before the executor shuts down. * Make NetworkHandler thread-safe * Make SpeedTestHandler thread-safe * Make sure that process output is consumed * Implement tool-specific latency parsing * Fix NetworkDiscoveryService configuration and make the thread pool size configurable * i18n * Fix test Signed-off-by: Leo Siepel Co-authored-by: Wouter Born Co-authored-by: Ravi Nadahar --- bundles/org.openhab.binding.network/README.md | 2 + .../internal/NetworkBindingConfiguration.java | 11 +- .../internal/NetworkBindingConstants.java | 1 + .../internal/NetworkHandlerFactory.java | 23 +- .../network/internal/PresenceDetection.java | 242 ++++++++---------- .../internal/PresenceDetectionValue.java | 52 ++-- .../internal/WakeOnLanPacketSender.java | 2 +- .../discovery/NetworkDiscoveryService.java | 234 +++++++++++------ .../internal/handler/NetworkHandler.java | 158 ++++++++---- .../internal/handler/SpeedTestHandler.java | 120 +++++---- .../network/internal/utils/LatencyParser.java | 37 ++- .../network/internal/utils/NetworkUtils.java | 129 +++++++--- .../src/main/resources/OH-INF/addon/addon.xml | 7 + .../resources/OH-INF/i18n/network.properties | 2 + .../internal/PresenceDetectionTest.java | 116 ++++----- .../internal/discovery/DiscoveryTest.java | 13 +- .../internal/handler/NetworkHandlerTest.java | 21 +- .../internal/utils/LatencyParserTest.java | 11 +- 18 files changed, 714 insertions(+), 467 deletions(-) diff --git a/bundles/org.openhab.binding.network/README.md b/bundles/org.openhab.binding.network/README.md index a742e80c95c69..0679257d5f9ce 100644 --- a/bundles/org.openhab.binding.network/README.md +++ b/bundles/org.openhab.binding.network/README.md @@ -14,6 +14,7 @@ The binding has the following configuration options: - **arpPingToolPath:** If the ARP ping tool is not called `arping` and cannot be found in the PATH environment variable, the absolute path can be configured here. Default is `arping`. - **cacheDeviceStateTimeInMS:** The result of a device presence detection is cached for a small amount of time. Set this time here in milliseconds. Be aware that no new pings will be issued within this time frame, even if explicitly requested. Default is 2000. - **preferResponseTimeAsLatency:** If enabled, an attempt will be made to extract the latency from the output of the ping command. If no such latency value is found in the ping command output, the time to execute the ping command is used as fallback latency. If disabled, the time to execute the ping command is always used as latency value. This is disabled by default to be backwards-compatible and to not break statistics and monitoring which existed before this feature. +- **numberOfDiscoveryThreads:** Specifies the number of threads to be used during the discovery process. Increasing this value may speed up the discovery of devices on large networks but could also increase the load on the system. Default is `100`. Create a `/services/network.cfg` file and use the above options like this: @@ -22,6 +23,7 @@ binding.network:allowSystemPings=true binding.network:allowDHCPlisten=false binding.network:arpPingToolPath=arping binding.network:cacheDeviceStateTimeInMS=2000 +binding.network:numberOfDiscoveryThreads=100 ``` ## Supported Things diff --git a/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/NetworkBindingConfiguration.java b/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/NetworkBindingConfiguration.java index c63eae9ff9106..34eabe00212bb 100644 --- a/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/NetworkBindingConfiguration.java +++ b/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/NetworkBindingConfiguration.java @@ -29,13 +29,17 @@ @NonNullByDefault public class NetworkBindingConfiguration { + public final static int DEFAULT_DISCOVERY_THREADS = 100; + public final static String DEFAULT_ARPING_TOOL_PATH = "arping"; + public final static ArpPingUtilEnum DEFAULT_ARPING_METHOD = ArpPingUtilEnum.DISABLED; public boolean allowSystemPings = true; public boolean allowDHCPlisten = true; public BigDecimal cacheDeviceStateTimeInMS = BigDecimal.valueOf(2000); - public String arpPingToolPath = "arping"; - public ArpPingUtilEnum arpPingUtilMethod = ArpPingUtilEnum.DISABLED; + public String arpPingToolPath = DEFAULT_ARPING_TOOL_PATH; + public ArpPingUtilEnum arpPingUtilMethod = DEFAULT_ARPING_METHOD; // For backwards compatibility reasons, the default is to use the ping method execution time as latency value public boolean preferResponseTimeAsLatency = false; + public int numberOfDiscoveryThreads = DEFAULT_DISCOVERY_THREADS; private List listeners = new ArrayList<>(); @@ -45,6 +49,7 @@ public void update(NetworkBindingConfiguration newConfiguration) { this.cacheDeviceStateTimeInMS = newConfiguration.cacheDeviceStateTimeInMS; this.arpPingToolPath = newConfiguration.arpPingToolPath; this.preferResponseTimeAsLatency = newConfiguration.preferResponseTimeAsLatency; + this.numberOfDiscoveryThreads = newConfiguration.numberOfDiscoveryThreads; NetworkUtils networkUtils = new NetworkUtils(); this.arpPingUtilMethod = networkUtils.determineNativeArpPingMethod(arpPingToolPath); @@ -65,6 +70,6 @@ public String toString() { return "NetworkBindingConfiguration{" + "allowSystemPings=" + allowSystemPings + ", allowDHCPlisten=" + allowDHCPlisten + ", cacheDeviceStateTimeInMS=" + cacheDeviceStateTimeInMS + ", arpPingToolPath='" + arpPingToolPath + '\'' + ", arpPingUtilMethod=" + arpPingUtilMethod + ", preferResponseTimeAsLatency=" - + preferResponseTimeAsLatency + '}'; + + preferResponseTimeAsLatency + ", numberOfDiscoveryThreads=" + numberOfDiscoveryThreads + '}'; } } diff --git a/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/NetworkBindingConstants.java b/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/NetworkBindingConstants.java index 071aafba8def1..09697556a389e 100644 --- a/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/NetworkBindingConstants.java +++ b/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/NetworkBindingConstants.java @@ -28,6 +28,7 @@ public class NetworkBindingConstants { public static final String BINDING_ID = "network"; + public static final String BINDING_CONFIGURATION_PID = "binding.network"; // List of all Thing Type UIDs public static final ThingTypeUID BACKWARDS_COMPATIBLE_DEVICE = new ThingTypeUID(BINDING_ID, "device"); diff --git a/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/NetworkHandlerFactory.java b/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/NetworkHandlerFactory.java index 6fec7eaaaa5d3..018df90507c35 100644 --- a/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/NetworkHandlerFactory.java +++ b/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/NetworkHandlerFactory.java @@ -12,12 +12,16 @@ */ package org.openhab.binding.network.internal; +import static org.openhab.binding.network.internal.NetworkBindingConstants.*; + import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; import org.eclipse.jdt.annotation.NonNullByDefault; import org.eclipse.jdt.annotation.Nullable; import org.openhab.binding.network.internal.handler.NetworkHandler; import org.openhab.binding.network.internal.handler.SpeedTestHandler; +import org.openhab.core.common.ThreadPoolManager; import org.openhab.core.config.core.Configuration; import org.openhab.core.thing.Thing; import org.openhab.core.thing.ThingTypeUID; @@ -39,15 +43,17 @@ * @author David Graeff - Initial contribution */ @NonNullByDefault -@Component(service = ThingHandlerFactory.class, configurationPid = "binding.network") +@Component(service = ThingHandlerFactory.class, configurationPid = BINDING_CONFIGURATION_PID) public class NetworkHandlerFactory extends BaseThingHandlerFactory { final NetworkBindingConfiguration configuration = new NetworkBindingConfiguration(); - + private static final String NETWORK_HANDLER_THREADPOOL_NAME = "networkBinding"; private final Logger logger = LoggerFactory.getLogger(NetworkHandlerFactory.class); + private final ScheduledExecutorService executor = ThreadPoolManager + .getScheduledPool(NETWORK_HANDLER_THREADPOOL_NAME); @Override public boolean supportsThingType(ThingTypeUID thingTypeUID) { - return NetworkBindingConstants.SUPPORTED_THING_TYPES_UIDS.contains(thingTypeUID); + return SUPPORTED_THING_TYPES_UIDS.contains(thingTypeUID); } // The activate component call is used to access the bindings configuration @@ -76,12 +82,11 @@ protected void modified(Map config) { protected @Nullable ThingHandler createHandler(Thing thing) { ThingTypeUID thingTypeUID = thing.getThingTypeUID(); - if (thingTypeUID.equals(NetworkBindingConstants.PING_DEVICE) - || thingTypeUID.equals(NetworkBindingConstants.BACKWARDS_COMPATIBLE_DEVICE)) { - return new NetworkHandler(thing, false, configuration); - } else if (thingTypeUID.equals(NetworkBindingConstants.SERVICE_DEVICE)) { - return new NetworkHandler(thing, true, configuration); - } else if (thingTypeUID.equals(NetworkBindingConstants.SPEEDTEST_DEVICE)) { + if (thingTypeUID.equals(PING_DEVICE) || thingTypeUID.equals(BACKWARDS_COMPATIBLE_DEVICE)) { + return new NetworkHandler(thing, executor, false, configuration); + } else if (thingTypeUID.equals(SERVICE_DEVICE)) { + return new NetworkHandler(thing, executor, true, configuration); + } else if (thingTypeUID.equals(SPEEDTEST_DEVICE)) { return new SpeedTestHandler(thing); } return null; diff --git a/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/PresenceDetection.java b/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/PresenceDetection.java index 75864546989b1..2933d3a6fab54 100644 --- a/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/PresenceDetection.java +++ b/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/PresenceDetection.java @@ -29,11 +29,8 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeoutException; import java.util.function.Consumer; import org.eclipse.jdt.annotation.NonNullByDefault; @@ -51,7 +48,7 @@ import org.slf4j.LoggerFactory; /** - * The {@link PresenceDetection} handles the connection to the Device. + * The {@link PresenceDetection} handles the connection to the Device. This class is not thread-safe. * * @author Marc Mettke - Initial contribution * @author David Gräff, 2017 - Rewritten @@ -71,14 +68,15 @@ public class PresenceDetection implements IPRequestReceivedCallback { private String ipPingState = "Disabled"; protected String arpPingUtilPath = ""; private ArpPingUtilEnum arpPingMethod = ArpPingUtilEnum.DISABLED; - protected @Nullable IpPingMethodEnum pingMethod = IpPingMethodEnum.DISABLED; + protected IpPingMethodEnum pingMethod = IpPingMethodEnum.DISABLED; private boolean iosDevice; private boolean useArpPing; private boolean useIcmpPing; private Set tcpPorts = new HashSet<>(); - private Duration refreshInterval = Duration.ofMinutes(1); private Duration timeout = Duration.ofSeconds(5); + + /** All access must be guarded by "this" */ private @Nullable Instant lastSeen; private @NonNullByDefault({}) String hostname; @@ -91,21 +89,18 @@ public class PresenceDetection implements IPRequestReceivedCallback { ExpiringCacheAsync cache; private final PresenceDetectionListener updateListener; - private ScheduledExecutorService scheduledExecutorService; private Set networkInterfaceNames = Set.of(); - private @Nullable ScheduledFuture refreshJob; - protected @Nullable ExecutorService detectionExecutorService; - protected @Nullable ExecutorService waitForResultExecutorService; private String dhcpState = "off"; int detectionChecks; private String lastReachableNetworkInterfaceName = ""; - public PresenceDetection(final PresenceDetectionListener updateListener, - ScheduledExecutorService scheduledExecutorService, Duration cacheDeviceStateTime) - throws IllegalArgumentException { + private final Executor executor; + + public PresenceDetection(final PresenceDetectionListener updateListener, Duration cacheDeviceStateTime, + Executor executor) { this.updateListener = updateListener; - this.scheduledExecutorService = scheduledExecutorService; + this.executor = executor; cache = new ExpiringCacheAsync<>(cacheDeviceStateTime); } @@ -117,10 +112,6 @@ public Set getServicePorts() { return tcpPorts; } - public Duration getRefreshInterval() { - return refreshInterval; - } - public Duration getTimeout() { return timeout; } @@ -168,10 +159,6 @@ public void setUseDhcpSniffing(boolean enable) { this.useDHCPsniffing = enable; } - public void setRefreshInterval(Duration refreshInterval) { - this.refreshInterval = refreshInterval; - } - public void setTimeout(Duration timeout) { this.timeout = timeout; } @@ -202,6 +189,27 @@ public void setUseIcmpPing(@Nullable Boolean useSystemPing) { } } + /** + * Sets the ping method directly. The exact ping method must be known, or things will fail. + * Use {@link NetworkUtils#determinePingMethod()} to find a supported method. + * + * @param pingMethod the {@link IpPingMethodEnum}. + */ + public void setIcmpPingMethod(IpPingMethodEnum pingMethod) { + this.pingMethod = pingMethod; + switch (pingMethod) { + case DISABLED: + ipPingState = "Disabled"; + break; + case JAVA_PING: + ipPingState = "Java ping"; + break; + default: + ipPingState = pingMethod.name(); + break; + } + } + /** * Enables or disables ARP pings. Will be automatically disabled if the destination * is not an IPv4 address. If the feature test for the native arping utility fails, @@ -296,7 +304,7 @@ public void setUseIcmPing(boolean useIcmpPing) { /** * Return the last seen value as an {@link Instant} or null if not yet seen. */ - public @Nullable Instant getLastSeen() { + public @Nullable synchronized Instant getLastSeen() { return lastSeen; } @@ -322,10 +330,6 @@ public void getValue(Consumer callback) { cache.getValue(this::performPresenceDetection).thenAccept(callback); } - public ExecutorService getThreadsFor(int threadCount) { - return Executors.newFixedThreadPool(threadCount); - } - private void withDestinationAddress(Consumer consumer) { InetAddress destinationAddress = destination.getValue(); if (destinationAddress == null) { @@ -335,24 +339,8 @@ private void withDestinationAddress(Consumer consumer) { } } - private void stopDetection() { - ExecutorService detectionExecutorService = this.detectionExecutorService; - if (detectionExecutorService != null) { - logger.debug("Shutting down detectionExecutorService"); - detectionExecutorService.shutdownNow(); - this.detectionExecutorService = null; - } - ExecutorService waitForResultExecutorService = this.waitForResultExecutorService; - if (waitForResultExecutorService != null) { - logger.debug("Shutting down waitForResultExecutorService"); - waitForResultExecutorService.shutdownNow(); - this.waitForResultExecutorService = null; - } - } - /** * Perform a presence detection with ICMP-, ARP ping and TCP connection attempts simultaneously. - * A fixed thread pool will be created with as many threads as necessary to perform all tests at once. * * Please be aware of the following restrictions: *

    @@ -388,83 +376,69 @@ public CompletableFuture performPresenceDetection() { return CompletableFuture.completedFuture(pdv); } - stopDetection(); - - ExecutorService detectionExecutorService = getThreadsFor(detectionChecks); - this.detectionExecutorService = detectionExecutorService; - ExecutorService waitForResultExecutorService = getThreadsFor(1); - this.waitForResultExecutorService = waitForResultExecutorService; - List> completableFutures = new ArrayList<>(); for (Integer tcpPort : tcpPorts) { addAsyncDetection(completableFutures, () -> { - Thread.currentThread().setName("presenceDetectionTCP_" + hostname + " " + tcpPort); performServicePing(pdv, tcpPort); - }, detectionExecutorService); + }); } // ARP ping for IPv4 addresses. Use single executor for Windows tool and // each own executor for each network interface for other tools if (arpPingMethod == ArpPingUtilEnum.ELI_FULKERSON_ARP_PING_FOR_WINDOWS) { addAsyncDetection(completableFutures, () -> { - Thread.currentThread().setName("presenceDetectionARP_" + hostname + " "); - // arp-ping.exe tool capable of handling multiple interfaces by itself performArpPing(pdv, ""); - }, detectionExecutorService); + }); } else if (interfaceNames != null) { for (final String interfaceName : interfaceNames) { addAsyncDetection(completableFutures, () -> { - Thread.currentThread().setName("presenceDetectionARP_" + hostname + " " + interfaceName); performArpPing(pdv, interfaceName); - }, detectionExecutorService); + }); } } // ICMP ping if (pingMethod != IpPingMethodEnum.DISABLED) { addAsyncDetection(completableFutures, () -> { - Thread.currentThread().setName("presenceDetectionICMP_" + hostname); if (pingMethod == IpPingMethodEnum.JAVA_PING) { performJavaPing(pdv); } else { performSystemPing(pdv); } - }, detectionExecutorService); + }); } return CompletableFuture.supplyAsync(() -> { - Thread.currentThread().setName("presenceDetectionResult_" + hostname); logger.debug("Waiting for {} detection futures for {} to complete", completableFutures.size(), hostname); - completableFutures.forEach(completableFuture -> { - try { - completableFuture.join(); - } catch (CancellationException | CompletionException e) { - logger.debug("Detection future failed to complete", e); + try { + CompletableFuture.allOf(completableFutures.toArray(CompletableFuture[]::new)).join(); + logger.debug("All {} detection futures for {} have completed", completableFutures.size(), hostname); + } catch (CancellationException e) { + logger.debug("Detection future for {} was cancelled", hostname); + } catch (CompletionException e) { + if (e.getCause() instanceof TimeoutException) { + logger.debug("Detection future for {} timed out", hostname); + } else { + logger.debug("Detection future failed to complete {}", e.getMessage()); + logger.trace("", e); } - }); - logger.debug("All {} detection futures for {} have completed", completableFutures.size(), hostname); + } if (!pdv.isReachable()) { - logger.debug("{} is unreachable, invalidating destination value", hostname); + logger.trace("{} is unreachable, invalidating cached destination value", hostname); destination.invalidateValue(); } logger.debug("Sending listener final result: {}", pdv); updateListener.finalDetectionResult(pdv); - detectionExecutorService.shutdownNow(); - this.detectionExecutorService = null; - detectionChecks = 0; - return pdv; - }, waitForResultExecutorService); + }, executor); } - private void addAsyncDetection(List> completableFutures, Runnable detectionRunnable, - ExecutorService executorService) { - completableFutures.add(CompletableFuture.runAsync(detectionRunnable, executorService) - .orTimeout(timeout.plusSeconds(3).toMillis(), TimeUnit.MILLISECONDS)); + private void addAsyncDetection(List> completableFutures, Runnable detectionRunnable) { + completableFutures.add(CompletableFuture.runAsync(detectionRunnable, executor)); } /** @@ -476,7 +450,7 @@ private void addAsyncDetection(List> completableFutures, * @param type the detection type * @param latency the latency */ - synchronized PresenceDetectionValue updateReachable(PresenceDetectionType type, Duration latency) { + PresenceDetectionValue updateReachable(PresenceDetectionType type, Duration latency) { PresenceDetectionValue pdv = new PresenceDetectionValue(hostname, latency); updateReachable(pdv, type, latency); return pdv; @@ -492,20 +466,22 @@ synchronized PresenceDetectionValue updateReachable(PresenceDetectionType type, * @param type the detection type * @param latency the latency */ - synchronized void updateReachable(PresenceDetectionValue pdv, PresenceDetectionType type, Duration latency) { + void updateReachable(PresenceDetectionValue pdv, PresenceDetectionType type, Duration latency) { updateReachable(pdv, type, latency, -1); } - synchronized void updateReachable(PresenceDetectionValue pdv, PresenceDetectionType type, Duration latency, - int tcpPort) { - lastSeen = Instant.now(); + void updateReachable(PresenceDetectionValue pdv, PresenceDetectionType type, Duration latency, int tcpPort) { + Instant now = Instant.now(); pdv.addReachableDetectionType(type); pdv.updateLatency(latency); if (0 <= tcpPort) { pdv.addReachableTcpPort(tcpPort); } logger.debug("Sending listener partial result: {}", pdv); - updateListener.partialDetectionResult(pdv); + synchronized (this) { + lastSeen = now; + updateListener.partialDetectionResult(pdv); + } } protected void performServicePing(PresenceDetectionValue pdv, int tcpPort) { @@ -537,27 +513,41 @@ protected void performArpPing(PresenceDetectionValue pdv, String interfaceName) logger.trace("Perform ARP ping presence detection for {} on interface: {}", hostname, interfaceName); withDestinationAddress(destinationAddress -> { - try { - if (iosDevice) { - networkUtils.wakeUpIOS(destinationAddress); - Thread.sleep(50); - } - - PingResult pingResult = networkUtils.nativeArpPing(arpPingMethod, arpPingUtilPath, interfaceName, - destinationAddress.getHostAddress(), timeout); - if (pingResult != null) { - if (pingResult.isSuccess()) { - updateReachable(pdv, ARP_PING, getLatency(pingResult)); - lastReachableNetworkInterfaceName = interfaceName; - } else if (lastReachableNetworkInterfaceName.equals(interfaceName)) { - logger.trace("{} is no longer reachable on network interface: {}", hostname, interfaceName); - lastReachableNetworkInterfaceName = ""; + Runnable arpPingTask = () -> { + try { + PingResult pingResult = networkUtils.nativeArpPing(arpPingMethod, arpPingUtilPath, interfaceName, + destinationAddress.getHostAddress(), timeout); + if (pingResult != null) { + if (pingResult.isSuccess()) { + updateReachable(pdv, ARP_PING, getLatency(pingResult)); + lastReachableNetworkInterfaceName = interfaceName; + } else if (lastReachableNetworkInterfaceName.equals(interfaceName)) { + logger.trace("{} is no longer reachable on network interface: {}", hostname, interfaceName); + lastReachableNetworkInterfaceName = ""; + } } + } catch (IOException e) { + logger.trace("Failed to execute an ARP ping for {}", hostname, e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; } - } catch (IOException e) { - logger.trace("Failed to execute an ARP ping for {}", hostname, e); - } catch (InterruptedException ignored) { - // This can be ignored, the thread will end anyway + }; + + if (iosDevice) { + CompletableFuture.runAsync(() -> { + try { + networkUtils.wakeUpIOS(destinationAddress); + Thread.sleep(50); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } catch (IOException e) { + logger.trace("Failed to wake up iOS device for {}", hostname, e); + } + }, executor).thenRunAsync(arpPingTask, executor); + } else { + arpPingTask.run(); } }); } @@ -593,7 +583,8 @@ protected void performSystemPing(PresenceDetectionValue pdv) { } catch (IOException e) { logger.trace("Failed to execute a native ping for {}", hostname, e); } catch (InterruptedException e) { - // This can be ignored, the thread will end anyway + Thread.currentThread().interrupt(); + return; } }); } @@ -611,40 +602,19 @@ public void dhcpRequestReceived(String ipAddress) { updateReachable(DHCP_REQUEST, Duration.ZERO); } - /** - * Start/Restart a fixed scheduled runner to update the devices reach-ability state. - */ - public void startAutomaticRefresh() { - ScheduledFuture future = refreshJob; - if (future != null && !future.isDone()) { - future.cancel(true); + public void refresh() { + try { + logger.debug("Refreshing {} reachability state", hostname); + getValue(); + } catch (ExecutionException e) { + logger.debug("Failed to refresh {} presence detection", hostname, e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; } - refreshJob = scheduledExecutorService.scheduleWithFixedDelay(() -> { - try { - logger.debug("Refreshing {} reachability state", hostname); - getValue(); - } catch (InterruptedException | ExecutionException e) { - logger.debug("Failed to refresh {} presence detection", hostname, e); - } - }, 0, refreshInterval.toMillis(), TimeUnit.MILLISECONDS); - } - - /** - * Return true if automatic refreshing is enabled. - */ - public boolean isAutomaticRefreshing() { - return refreshJob != null; } - /** - * Stop automatic refreshing. - */ - public void stopAutomaticRefresh() { - ScheduledFuture future = refreshJob; - if (future != null && !future.isDone()) { - future.cancel(true); - refreshJob = null; - } + public void dispose() { InetAddress cached = cachedDestination; if (cached != null) { disableDHCPListen(cached); diff --git a/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/PresenceDetectionValue.java b/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/PresenceDetectionValue.java index bfec6937e4087..10a36743d14fc 100644 --- a/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/PresenceDetectionValue.java +++ b/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/PresenceDetectionValue.java @@ -24,9 +24,10 @@ import org.eclipse.jdt.annotation.NonNullByDefault; /** - * Contains the result or partial result of a presence detection. + * Contains the result or partial result of a presence detection. This class is thread-safe. * * @author David Graeff - Initial contribution + * @author Ravi Nadahar - Made class thread-safe */ @NonNullByDefault public class PresenceDetectionValue { @@ -34,9 +35,14 @@ public class PresenceDetectionValue { public static final Duration UNREACHABLE = Duration.ofMillis(-1); private final String hostAddress; + + /* All access must be guarded by "this" */ private Duration latency; + /* All access must be guarded by "this" */ private final Set reachableDetectionTypes = new TreeSet<>(); + + /* All access must be guarded by "this" */ private final List reachableTcpPorts = new ArrayList<>(); /** @@ -61,14 +67,14 @@ public String getHostAddress() { * Return the ping latency, {@value #UNREACHABLE} if not reachable. Can be 0 if * no specific latency is known but the device is still reachable. */ - public Duration getLowestLatency() { + public synchronized Duration getLowestLatency() { return latency; } /** * Returns true if the target is reachable by any means. */ - public boolean isReachable() { + public synchronized boolean isReachable() { return !UNREACHABLE.equals(latency); } @@ -86,9 +92,13 @@ boolean updateLatency(Duration newLatency) { "Latency must be >=0. Create a new PresenceDetectionValue for a not reachable device!"); } else if (newLatency.isZero()) { return false; - } else if (!isReachable() || latency.isZero() || newLatency.compareTo(latency) < 0) { - latency = newLatency; - return true; + } else { + synchronized (this) { + if (!isReachable() || latency.isZero() || newLatency.compareTo(latency) < 0) { + latency = newLatency; + return true; + } + } } return false; } @@ -98,14 +108,14 @@ boolean updateLatency(Duration newLatency) { * * @param type a {@link PresenceDetectionType}. */ - void addReachableDetectionType(PresenceDetectionType type) { + synchronized void addReachableDetectionType(PresenceDetectionType type) { reachableDetectionTypes.add(type); } /** * Return true if the target can be reached by ICMP or ARP pings. */ - public boolean isPingReachable() { + public synchronized boolean isPingReachable() { return reachableDetectionTypes.contains(PresenceDetectionType.ARP_PING) || reachableDetectionTypes.contains(PresenceDetectionType.ICMP_PING); } @@ -113,41 +123,37 @@ public boolean isPingReachable() { /** * Return true if the target provides open TCP ports. */ - public boolean isTcpServiceReachable() { + public synchronized boolean isTcpServiceReachable() { return reachableDetectionTypes.contains(PresenceDetectionType.TCP_CONNECTION); } /** * Return a string of comma-separated successful presence detection types. */ - public String getSuccessfulDetectionTypes() { + public synchronized String getSuccessfulDetectionTypes() { return reachableDetectionTypes.stream().map(PresenceDetectionType::name).collect(Collectors.joining(", ")); } /** * Return the reachable TCP ports of the presence detection value. - * Thread safe. */ - public List getReachableTcpPorts() { - synchronized (reachableTcpPorts) { - return new ArrayList<>(reachableTcpPorts); - } + public synchronized List getReachableTcpPorts() { + return new ArrayList<>(reachableTcpPorts); } /** * Add a reachable TCP port to this presence detection result value object. - * Thread safe. */ - void addReachableTcpPort(int tcpPort) { - synchronized (reachableTcpPorts) { - reachableTcpPorts.add(tcpPort); - } + synchronized void addReachableTcpPort(int tcpPort) { + reachableTcpPorts.add(tcpPort); } @Override public String toString() { - return "PresenceDetectionValue [hostAddress=" + hostAddress + ", latency=" + durationToMillis(latency) - + "ms, reachableDetectionTypes=" + reachableDetectionTypes + ", reachableTcpPorts=" + reachableTcpPorts - + "]"; + synchronized (this) { + return "PresenceDetectionValue [hostAddress=" + hostAddress + ", latency=" + durationToMillis(latency) + + "ms, reachableDetectionTypes=" + reachableDetectionTypes + ", reachableTcpPorts=" + + reachableTcpPorts + "]"; + } } } diff --git a/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/WakeOnLanPacketSender.java b/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/WakeOnLanPacketSender.java index a9775c42731a4..e2cd7dca0c77f 100644 --- a/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/WakeOnLanPacketSender.java +++ b/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/WakeOnLanPacketSender.java @@ -67,7 +67,7 @@ public class WakeOnLanPacketSender { public WakeOnLanPacketSender(String macAddress, @Nullable String hostname, @Nullable Integer port, Set networkInterfaceNames) { - logger.debug("initialized WOL Packet Sender (mac: {}, hostname: {}, port: {}, networkInterfaceNames: {})", + logger.trace("initialized WOL Packet Sender (mac: {}, hostname: {}, port: {}, networkInterfaceNames: {})", macAddress, hostname, port, networkInterfaceNames); this.macAddress = macAddress; this.hostname = hostname; diff --git a/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/discovery/NetworkDiscoveryService.java b/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/discovery/NetworkDiscoveryService.java index d2cc7de23bd5e..471d522f1d10e 100644 --- a/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/discovery/NetworkDiscoveryService.java +++ b/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/discovery/NetworkDiscoveryService.java @@ -12,19 +12,28 @@ */ package org.openhab.binding.network.internal.discovery; -import static org.openhab.binding.network.internal.NetworkBindingConstants.*; +import static org.openhab.binding.network.internal.NetworkBindingConstants.BINDING_CONFIGURATION_PID; +import static org.openhab.binding.network.internal.NetworkBindingConstants.PARAMETER_HOSTNAME; +import static org.openhab.binding.network.internal.NetworkBindingConstants.PARAMETER_PORT; +import static org.openhab.binding.network.internal.NetworkBindingConstants.PING_DEVICE; +import static org.openhab.binding.network.internal.NetworkBindingConstants.SERVICE_DEVICE; +import static org.openhab.binding.network.internal.NetworkBindingConstants.SUPPORTED_THING_TYPES_UIDS; import static org.openhab.binding.network.internal.utils.NetworkUtils.durationToMillis; +import java.io.IOException; import java.time.Duration; -import java.util.HashMap; +import java.util.Collections; +import java.util.Dictionary; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.stream.Collectors; import org.eclipse.jdt.annotation.NonNullByDefault; import org.eclipse.jdt.annotation.Nullable; @@ -33,15 +42,18 @@ import org.openhab.binding.network.internal.PresenceDetectionListener; import org.openhab.binding.network.internal.PresenceDetectionValue; import org.openhab.binding.network.internal.utils.NetworkUtils; -import org.openhab.core.config.core.Configuration; +import org.openhab.binding.network.internal.utils.NetworkUtils.IpPingMethodEnum; import org.openhab.core.config.discovery.AbstractDiscoveryService; import org.openhab.core.config.discovery.DiscoveryResultBuilder; import org.openhab.core.config.discovery.DiscoveryService; +import org.openhab.core.net.CidrAddress; import org.openhab.core.thing.ThingUID; +import org.osgi.service.cm.Configuration; +import org.osgi.service.cm.ConfigurationAdmin; import org.osgi.service.component.annotations.Activate; import org.osgi.service.component.annotations.Component; import org.osgi.service.component.annotations.Deactivate; -import org.osgi.service.component.annotations.Modified; +import org.osgi.service.component.annotations.Reference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,7 +69,7 @@ @Component(service = DiscoveryService.class, configurationPid = "discovery.network") public class NetworkDiscoveryService extends AbstractDiscoveryService implements PresenceDetectionListener { static final Duration PING_TIMEOUT = Duration.ofMillis(500); - static final int MAXIMUM_IPS_PER_INTERFACE = 255; + static final int MAXIMUM_IPS_PER_INTERFACE = 254; private static final long DISCOVERY_RESULT_TTL = TimeUnit.MINUTES.toSeconds(10); private final Logger logger = LoggerFactory.getLogger(NetworkDiscoveryService.class); @@ -65,40 +77,29 @@ public class NetworkDiscoveryService extends AbstractDiscoveryService implements // TCP port 554 (Windows share / Linux samba) // TCP port 1025 (Xbox / MS-RPC) private Set tcpServicePorts = Set.of(80, 548, 554, 1025); - private AtomicInteger scannedIPcount = new AtomicInteger(0); - private @Nullable ExecutorService executorService = null; - private final NetworkBindingConfiguration configuration = new NetworkBindingConfiguration(); + + /* All access must be guarded by "this" */ + private @Nullable ExecutorService executorService; private final NetworkUtils networkUtils = new NetworkUtils(); + private final ConfigurationAdmin admin; - public NetworkDiscoveryService() { + @Activate + public NetworkDiscoveryService(@Reference ConfigurationAdmin admin) { super(SUPPORTED_THING_TYPES_UIDS, (int) Math.round(new NetworkUtils().getNetworkIPs(MAXIMUM_IPS_PER_INTERFACE).size() * (durationToMillis(PING_TIMEOUT) / 1000.0)), false); - } - - @Override - @Activate - public void activate(@Nullable Map config) { - super.activate(config); - modified(config); - } - - @Override - @Modified - protected void modified(@Nullable Map config) { - super.modified(config); - // We update instead of replace the configuration object, so that if the user updates the - // configuration, the values are automatically available in all handlers. Because they all - // share the same instance. - configuration.update(new Configuration(config).as(NetworkBindingConfiguration.class)); + this.admin = admin; } @Override @Deactivate protected void deactivate() { - if (executorService != null) { - executorService.shutdown(); + synchronized (this) { + if (executorService != null) { + executorService.shutdownNow(); + executorService = null; + } } super.deactivate(); } @@ -120,67 +121,117 @@ public void partialDetectionResult(PresenceDetectionValue value) { public void finalDetectionResult(PresenceDetectionValue value) { } + private ExecutorService createDiscoveryExecutor(@Nullable NetworkBindingConfiguration configuration) { + AtomicInteger count = new AtomicInteger(1); + int numThreads = configuration == null ? NetworkBindingConfiguration.DEFAULT_DISCOVERY_THREADS + : configuration.numberOfDiscoveryThreads; + if (numThreads > 0) { + return Executors.newFixedThreadPool(numThreads, r -> { + Thread t = new Thread(r, "OH-binding-network-discoveryWorker-" + count.getAndIncrement()); + t.setDaemon(true); + return t; + }); + } else { + return Executors.newCachedThreadPool(r -> { + Thread t = new Thread(r, "OH-binding-network-discoveryWorker-" + count.getAndIncrement()); + t.setDaemon(true); + return t; + }); + } + } + /** * Starts the DiscoveryThread for each IP on each interface on the network */ @Override protected void startScan() { - if (executorService == null) { - executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2); + NetworkBindingConfiguration configuration = getConfig(); + final ExecutorService service; + synchronized (this) { + if (executorService == null) { + executorService = createDiscoveryExecutor(configuration); + } + service = executorService; } - final ExecutorService service = executorService; if (service == null) { return; } + removeOlderResults(getTimestampOfLastScan(), null); - logger.trace("Starting Network Device Discovery"); - - final Set networkIPs = networkUtils.getNetworkIPs(MAXIMUM_IPS_PER_INTERFACE); - scannedIPcount.set(0); - - for (String ip : networkIPs) { - final PresenceDetection pd = new PresenceDetection(this, scheduler, Duration.ofSeconds(2)); - pd.setHostname(ip); - pd.setIOSDevice(true); - pd.setUseDhcpSniffing(false); - pd.setTimeout(PING_TIMEOUT); - // Ping devices - pd.setUseIcmpPing(true); - pd.setUseArpPing(true, configuration.arpPingToolPath, configuration.arpPingUtilMethod); - // TCP devices - pd.setServicePorts(tcpServicePorts); - - service.execute(() -> { - Thread.currentThread().setName("Discovery thread " + ip); - try { - pd.getValue(); - } catch (ExecutionException | InterruptedException e) { - stopScan(); - } - int count = scannedIPcount.incrementAndGet(); - if (count == networkIPs.size()) { - logger.trace("Scan of {} IPs successful", scannedIPcount); - stopScan(); + logger.debug("Starting Network Device Discovery"); + + Map> discoveryList = networkUtils.getNetworkIPsPerInterface(); + + // Track completion for all interfaces + final int totalInterfaces = discoveryList.size(); + final AtomicInteger completedInterfaces = new AtomicInteger(0); + + service.execute(() -> { + Thread.currentThread().setName("OH-binding-network-discoveryCoordinator"); + IpPingMethodEnum pingMethod = networkUtils.determinePingMethod(); + for (Entry> discovery : discoveryList.entrySet()) { + final String networkInterface = discovery.getKey(); + final Set networkIPs = networkUtils.getNetworkIPs(discovery.getValue(), + MAXIMUM_IPS_PER_INTERFACE); + logger.debug("Scanning {} IPs on interface {} ", networkIPs.size(), networkInterface); + final AtomicInteger scannedIPcount = new AtomicInteger(0); + final int targetCount = networkIPs.size(); + + for (String ip : networkIPs) { + final PresenceDetection pd = new PresenceDetection(this, Duration.ofSeconds(2), service); + pd.setHostname(ip); + pd.setIOSDevice(true); + pd.setUseDhcpSniffing(false); + pd.setTimeout(PING_TIMEOUT); + // Ping devices + pd.setIcmpPingMethod(pingMethod); + if (configuration == null) { + pd.setUseArpPing(true, NetworkBindingConfiguration.DEFAULT_ARPING_TOOL_PATH, + NetworkBindingConfiguration.DEFAULT_ARPING_METHOD); + } else { + pd.setUseArpPing(true, configuration.arpPingToolPath, configuration.arpPingUtilMethod); + } + // TCP devices + pd.setServicePorts(tcpServicePorts); + pd.getValue((v) -> { + int count = scannedIPcount.incrementAndGet(); + if (count >= targetCount) { + logger.debug("Scan of {} IPs on interface {} completed", scannedIPcount.get(), + networkInterface); + // Only call stopScan after all interfaces are done + if (completedInterfaces.incrementAndGet() >= totalInterfaces) { + logger.debug("All network interface scans completed. Stopping scan."); + stopScan(); + logger.debug("Finished Network Device Discovery"); + } + } + }); } - }); - } + } + }); } @Override - protected synchronized void stopScan() { - super.stopScan(); - final ExecutorService service = executorService; - if (service == null) { - return; + protected void stopScan() { + final ExecutorService service; + synchronized (this) { + super.stopScan(); + service = executorService; + if (service == null) { + return; + } + executorService = null; } + logger.debug("Stopping Network Device Discovery"); + service.shutdownNow(); // Initiate shutdown try { - service.awaitTermination(PING_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS); + if (!service.awaitTermination(PING_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS)) { + logger.warn("Network discovery scan failed to stop within the timeout of {}", PING_TIMEOUT); + } } catch (InterruptedException e) { - Thread.currentThread().interrupt(); // Reset interrupt flag + Thread.currentThread().interrupt(); } - service.shutdown(); - executorService = null; } public static ThingUID createServiceUID(String ip, int tcpPort) { @@ -208,12 +259,15 @@ public void newServiceDevice(String ip, int tcpPort) { default -> "Network Device"; }; label += " (" + ip + ":" + tcpPort + ")"; + final String fLabel = label; - Map properties = new HashMap<>(); - properties.put(PARAMETER_HOSTNAME, ip); - properties.put(PARAMETER_PORT, tcpPort); - thingDiscovered(DiscoveryResultBuilder.create(createServiceUID(ip, tcpPort)).withTTL(DISCOVERY_RESULT_TTL) - .withProperties(properties).withLabel(label).build()); + // A thread that isn't part of the executor is needed, because registering new discoveries is slow, + // and the executor is shut down when the scan is finished or aborted. + new Thread(() -> { + thingDiscovered(DiscoveryResultBuilder.create(createServiceUID(ip, tcpPort)).withTTL(DISCOVERY_RESULT_TTL) + .withProperty(PARAMETER_HOSTNAME, ip).withProperty(PARAMETER_PORT, tcpPort).withLabel(fLabel) + .build()); + }, "OH-binding-network-discoveryResultCourier").start(); } public static ThingUID createPingUID(String ip) { @@ -229,8 +283,30 @@ public static ThingUID createPingUID(String ip) { public void newPingDevice(String ip) { logger.trace("Found pingable network device with IP address {}", ip); - Map properties = Map.of(PARAMETER_HOSTNAME, ip); - thingDiscovered(DiscoveryResultBuilder.create(createPingUID(ip)).withTTL(DISCOVERY_RESULT_TTL) - .withProperties(properties).withLabel("Network Device (" + ip + ")").build()); + // A thread that isn't part of the executor is needed, because registering new discoveries is slow, + // and the executor is shut down when the scan is finished or aborted. + new Thread(() -> { + thingDiscovered(DiscoveryResultBuilder.create(createPingUID(ip)).withTTL(DISCOVERY_RESULT_TTL) + .withProperty(PARAMETER_HOSTNAME, ip).withLabel("Network Device (" + ip + ")").build()); + }, "OH-binding-network-discoveryPingCourier").start(); + } + + private @Nullable NetworkBindingConfiguration getConfig() { + ConfigurationAdmin admin = this.admin; + try { + Configuration configOnline = admin.getConfiguration(BINDING_CONFIGURATION_PID, null); + if (configOnline != null) { + Dictionary props = configOnline.getProperties(); + if (props != null) { + Map propMap = Collections.list(props.keys()).stream() + .collect(Collectors.toMap(Function.identity(), props::get)); + return new org.openhab.core.config.core.Configuration(propMap) + .as(NetworkBindingConfiguration.class); + } + } + } catch (IOException e) { + logger.warn("Unable to read configuration: {}", e.getMessage()); + } + return null; } } diff --git a/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/handler/NetworkHandler.java b/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/handler/NetworkHandler.java index 404d79e8f920d..7e7af367f1711 100644 --- a/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/handler/NetworkHandler.java +++ b/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/handler/NetworkHandler.java @@ -21,8 +21,12 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import org.eclipse.jdt.annotation.NonNullByDefault; +import org.eclipse.jdt.annotation.Nullable; import org.openhab.binding.network.internal.NetworkBindingConfiguration; import org.openhab.binding.network.internal.NetworkBindingConfigurationListener; import org.openhab.binding.network.internal.NetworkBindingConstants; @@ -56,45 +60,61 @@ * @author Marc Mettke - Initial contribution * @author David Graeff - Rewritten * @author Wouter Born - Add Wake-on-LAN thing action support + * @author Ravi Nadahar - Made class thread-safe */ @NonNullByDefault public class NetworkHandler extends BaseThingHandler implements PresenceDetectionListener, NetworkBindingConfigurationListener { private final Logger logger = LoggerFactory.getLogger(NetworkHandler.class); - private @NonNullByDefault({}) PresenceDetection presenceDetection; - private @NonNullByDefault({}) WakeOnLanPacketSender wakeOnLanPacketSender; - private boolean isTCPServiceDevice; - private NetworkBindingConfiguration configuration; + /* All access must be guarded by "this" */ + private @Nullable PresenceDetection presenceDetection; + + /* All access must be guarded by "this" */ + private @Nullable ScheduledFuture refreshJob; + + /* All access must be guarded by "this" */ + private @Nullable WakeOnLanPacketSender wakeOnLanPacketSender; + + private final boolean isTCPServiceDevice; + private final NetworkBindingConfiguration configuration; // How many retries before a device is deemed offline - int retries; + volatile int retries; // Retry counter. Will be reset as soon as a device presence detection succeed. - private int retryCounter = 0; - private NetworkHandlerConfiguration handlerConfiguration = new NetworkHandlerConfiguration(); + private volatile int retryCounter = 0; + private final ScheduledExecutorService executor; /** - * Do not call this directly, but use the {@see NetworkHandlerBuilder} instead. + * Creates a new instance using the specified parameters. */ - public NetworkHandler(Thing thing, boolean isTCPServiceDevice, NetworkBindingConfiguration configuration) { + public NetworkHandler(Thing thing, ScheduledExecutorService executor, boolean isTCPServiceDevice, + NetworkBindingConfiguration configuration) { super(thing); + this.executor = executor; this.isTCPServiceDevice = isTCPServiceDevice; this.configuration = configuration; this.configuration.addNetworkBindingConfigurationListener(this); } private void refreshValue(ChannelUID channelUID) { + PresenceDetection pd; + ScheduledFuture rj; + synchronized (this) { + pd = presenceDetection; + rj = refreshJob; + } // We are not yet even initialized, don't do anything - if (presenceDetection == null || !presenceDetection.isAutomaticRefreshing()) { + if (pd == null || rj == null) { return; } switch (channelUID.getId()) { case CHANNEL_ONLINE: - presenceDetection.getValue(value -> updateState(CHANNEL_ONLINE, OnOffType.from(value.isReachable()))); + pd.getValue(value -> updateState(CHANNEL_ONLINE, OnOffType.from(value.isReachable()))); break; case CHANNEL_LATENCY: - presenceDetection.getValue(value -> { + pd.getValue(value -> { double latencyMs = durationToMillis(value.getLowestLatency()); updateState(CHANNEL_LATENCY, new QuantityType<>(latencyMs, MetricPrefix.MILLI(Units.SECOND))); }); @@ -102,7 +122,7 @@ private void refreshValue(ChannelUID channelUID) { case CHANNEL_LASTSEEN: // We should not set the last seen state to UNDEF, it prevents restoreOnStartup from working // For reference: https://github.com/openhab/openhab-addons/issues/17404 - Instant lastSeen = presenceDetection.getLastSeen(); + Instant lastSeen = pd.getLastSeen(); if (lastSeen != null) { updateState(CHANNEL_LASTSEEN, new DateTimeType(lastSeen)); } @@ -133,7 +153,11 @@ public void partialDetectionResult(PresenceDetectionValue value) { public void finalDetectionResult(PresenceDetectionValue value) { // We do not notify the framework immediately if a device presence detection failed and // the user configured retries to be > 1. - retryCounter = value.isReachable() ? 0 : retryCounter + 1; + if (value.isReachable()) { + retryCounter = 0; + } else { + retryCounter++; + } if (retryCounter >= retries) { updateState(CHANNEL_ONLINE, OnOffType.OFF); @@ -141,7 +165,11 @@ public void finalDetectionResult(PresenceDetectionValue value) { retryCounter = 0; } - Instant lastSeen = presenceDetection.getLastSeen(); + PresenceDetection pd; + synchronized (this) { + pd = presenceDetection; + } + Instant lastSeen = pd == null ? null : pd.getLastSeen(); if (value.isReachable() && lastSeen != null) { updateState(CHANNEL_LASTSEEN, new DateTimeType(lastSeen)); } @@ -153,11 +181,14 @@ public void finalDetectionResult(PresenceDetectionValue value) { @Override public void dispose() { - PresenceDetection detection = presenceDetection; - if (detection != null) { - detection.stopAutomaticRefresh(); + synchronized (this) { + ScheduledFuture refreshJob = this.refreshJob; + if (refreshJob != null) { + refreshJob.cancel(true); + this.refreshJob = null; + } + presenceDetection = null; } - presenceDetection = null; } /** @@ -165,57 +196,68 @@ public void dispose() { * Used by testing for injecting. */ void initialize(PresenceDetection presenceDetection) { - handlerConfiguration = getConfigAs(NetworkHandlerConfiguration.class); + NetworkHandlerConfiguration config = getConfigAs(NetworkHandlerConfiguration.class); - this.presenceDetection = presenceDetection; - presenceDetection.setHostname(handlerConfiguration.hostname); - presenceDetection.setNetworkInterfaceNames(handlerConfiguration.networkInterfaceNames); + presenceDetection.setHostname(config.hostname); + presenceDetection.setNetworkInterfaceNames(config.networkInterfaceNames); presenceDetection.setPreferResponseTimeAsLatency(configuration.preferResponseTimeAsLatency); if (isTCPServiceDevice) { - Integer port = handlerConfiguration.port; + Integer port = config.port; if (port == null) { updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR, "No port configured!"); return; } presenceDetection.setServicePorts(Set.of(port)); } else { - presenceDetection.setIOSDevice(handlerConfiguration.useIOSWakeUp); + presenceDetection.setIOSDevice(config.useIOSWakeUp); // Hand over binding configurations to the network service presenceDetection.setUseDhcpSniffing(configuration.allowDHCPlisten); - presenceDetection.setUseIcmpPing(handlerConfiguration.useIcmpPing ? configuration.allowSystemPings : null); - presenceDetection.setUseArpPing(handlerConfiguration.useArpPing, configuration.arpPingToolPath, + presenceDetection.setUseIcmpPing(config.useIcmpPing ? configuration.allowSystemPings : null); + presenceDetection.setUseArpPing(config.useArpPing, configuration.arpPingToolPath, configuration.arpPingUtilMethod); } - this.retries = handlerConfiguration.retry.intValue(); - presenceDetection.setRefreshInterval(Duration.ofMillis(handlerConfiguration.refreshInterval)); - presenceDetection.setTimeout(Duration.ofMillis(handlerConfiguration.timeout)); - - wakeOnLanPacketSender = new WakeOnLanPacketSender(handlerConfiguration.macAddress, - handlerConfiguration.hostname, handlerConfiguration.port, handlerConfiguration.networkInterfaceNames); + this.retries = config.retry.intValue(); + presenceDetection.setTimeout(Duration.ofMillis(config.timeout)); + synchronized (this) { + this.presenceDetection = presenceDetection; + wakeOnLanPacketSender = new WakeOnLanPacketSender(config.macAddress, config.hostname, config.port, + config.networkInterfaceNames); + if (config.refreshInterval > 0) { + refreshJob = executor.scheduleWithFixedDelay(presenceDetection::refresh, 0, config.refreshInterval, + TimeUnit.MILLISECONDS); + } + } updateStatus(ThingStatus.ONLINE); - presenceDetection.startAutomaticRefresh(); - - updateNetworkProperties(); } private void updateNetworkProperties() { // Update properties (after startAutomaticRefresh, to get the correct dhcp state) Map properties = editProperties(); - properties.put(NetworkBindingConstants.PROPERTY_ARP_STATE, presenceDetection.getArpPingState()); - properties.put(NetworkBindingConstants.PROPERTY_ICMP_STATE, presenceDetection.getIPPingState()); - properties.put(NetworkBindingConstants.PROPERTY_PRESENCE_DETECTION_TYPE, ""); - properties.put(NetworkBindingConstants.PROPERTY_DHCP_STATE, presenceDetection.getDhcpState()); + synchronized (this) { + PresenceDetection pd = presenceDetection; + if (pd == null) { + logger.warn("Can't update network properties because presenceDetection is null"); + return; + } + properties.put(NetworkBindingConstants.PROPERTY_ARP_STATE, pd.getArpPingState()); + properties.put(NetworkBindingConstants.PROPERTY_ICMP_STATE, pd.getIPPingState()); + properties.put(NetworkBindingConstants.PROPERTY_PRESENCE_DETECTION_TYPE, ""); + properties.put(NetworkBindingConstants.PROPERTY_DHCP_STATE, pd.getDhcpState()); + } updateProperties(properties); } // Create a new network service and apply all configurations. @Override public void initialize() { - initialize(new PresenceDetection(this, scheduler, - Duration.ofMillis(configuration.cacheDeviceStateTimeInMS.intValue()))); + updateStatus(ThingStatus.UNKNOWN); + executor.submit(() -> { + initialize(new PresenceDetection(this, Duration.ofMillis(configuration.cacheDeviceStateTimeInMS.intValue()), + executor)); + }); } /** @@ -228,7 +270,12 @@ public boolean isTCPServiceDevice() { @Override public void bindingConfigurationChanged() { // Make sure that changed binding configuration is reflected - presenceDetection.setPreferResponseTimeAsLatency(configuration.preferResponseTimeAsLatency); + synchronized (this) { + PresenceDetection pd = presenceDetection; + if (pd != null) { + pd.setPreferResponseTimeAsLatency(configuration.preferResponseTimeAsLatency); + } + } } @Override @@ -237,15 +284,32 @@ public Collection> getServices() { } public void sendWakeOnLanPacketViaIp() { - // Hostname can't be null - wakeOnLanPacketSender.sendWakeOnLanPacketViaIp(); + WakeOnLanPacketSender sender; + synchronized (this) { + sender = wakeOnLanPacketSender; + } + if (sender != null) { + // Hostname can't be null + sender.sendWakeOnLanPacketViaIp(); + } else { + logger.warn("Failed to send WoL packet via IP because sender is null"); + } } public void sendWakeOnLanPacketViaMac() { - if (handlerConfiguration.macAddress.isEmpty()) { + NetworkHandlerConfiguration config = getConfigAs(NetworkHandlerConfiguration.class); + if (config.macAddress.isEmpty()) { throw new IllegalStateException( "Cannot send WoL packet because the 'macAddress' is not configured for " + thing.getUID()); } - wakeOnLanPacketSender.sendWakeOnLanPacketViaMac(); + WakeOnLanPacketSender sender; + synchronized (this) { + sender = wakeOnLanPacketSender; + } + if (sender != null) { + sender.sendWakeOnLanPacketViaMac(); + } else { + logger.warn("Failed to send WoL packet via MAC because sender is null"); + } } } diff --git a/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/handler/SpeedTestHandler.java b/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/handler/SpeedTestHandler.java index f3bfb8b75cf9a..c4deec147be79 100644 --- a/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/handler/SpeedTestHandler.java +++ b/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/handler/SpeedTestHandler.java @@ -18,6 +18,7 @@ import java.math.BigDecimal; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.eclipse.jdt.annotation.NonNullByDefault; import org.eclipse.jdt.annotation.Nullable; @@ -48,15 +49,22 @@ * measurements at a given interval and for given file / size * * @author Gaël L'hopital - Initial contribution + * @author Ravi Nadahar - Made class thread-safe */ @NonNullByDefault public class SpeedTestHandler extends BaseThingHandler implements ISpeedTestListener { private final Logger logger = LoggerFactory.getLogger(SpeedTestHandler.class); + + /* All access must be guarded by "this" */ private @Nullable SpeedTestSocket speedTestSocket; - private @NonNullByDefault({}) ScheduledFuture refreshTask; - private @NonNullByDefault({}) SpeedTestConfiguration configuration; + + /* All access must be guarded by "this" */ + private @Nullable ScheduledFuture refreshTask; + + /* All access must be guarded by "this" */ private State bufferedProgress = UnDefType.UNDEF; - private int timeouts; + + private final AtomicInteger timeouts = new AtomicInteger(); public SpeedTestHandler(Thing thing) { super(thing); @@ -64,44 +72,52 @@ public SpeedTestHandler(Thing thing) { @Override public void initialize() { - configuration = getConfigAs(SpeedTestConfiguration.class); startRefreshTask(); } - private synchronized void startSpeedTest() { - String url = configuration.getDownloadURL(); - if (speedTestSocket == null && url != null) { - logger.debug("Network speedtest started"); - final SpeedTestSocket socket = new SpeedTestSocket(1500); - speedTestSocket = socket; - socket.addSpeedTestListener(this); - updateState(CHANNEL_TEST_ISRUNNING, OnOffType.ON); - updateState(CHANNEL_TEST_START, new DateTimeType()); - updateState(CHANNEL_TEST_END, UnDefType.NULL); - updateProgress(new QuantityType<>(0, Units.PERCENT)); - socket.startDownload(url); - } else { - logger.info("A speedtest is already in progress, will retry on next refresh"); + private void startSpeedTest() { + SpeedTestConfiguration config = getConfigAs(SpeedTestConfiguration.class); + String url = config.getDownloadURL(); + if (url == null || url.isBlank()) { + logger.warn("Failed to start speedtest because the URL is blank"); + return; + } + synchronized (this) { + if (speedTestSocket == null) { + logger.debug("Network speedtest started"); + final SpeedTestSocket socket = new SpeedTestSocket(1500); + speedTestSocket = socket; + socket.addSpeedTestListener(this); + updateState(CHANNEL_TEST_ISRUNNING, OnOffType.ON); + updateState(CHANNEL_TEST_START, new DateTimeType()); + updateState(CHANNEL_TEST_END, UnDefType.NULL); + updateProgress(new QuantityType<>(0, Units.PERCENT)); + socket.startDownload(url); + } else { + logger.info("A speedtest is already in progress, will retry on next refresh"); + } } } - private synchronized void stopSpeedTest() { + private void stopSpeedTest() { updateState(CHANNEL_TEST_ISRUNNING, OnOffType.OFF); updateProgress(UnDefType.NULL); updateState(CHANNEL_TEST_END, new DateTimeType()); - if (speedTestSocket != null) { - SpeedTestSocket socket = speedTestSocket; - socket.closeSocket(); - socket.removeSpeedTestListener(this); - socket = null; - speedTestSocket = null; - logger.debug("Network speedtest finished"); + synchronized (this) { + if (speedTestSocket != null) { + SpeedTestSocket socket = speedTestSocket; + socket.closeSocket(); + socket.removeSpeedTestListener(this); + speedTestSocket = null; + logger.debug("Network speedtest finished"); + } } } @Override public void onCompletion(final @Nullable SpeedTestReport testReport) { - timeouts = configuration.maxTimeout; + SpeedTestConfiguration config = getConfigAs(SpeedTestConfiguration.class); + timeouts.set(config.maxTimeout); if (testReport != null) { BigDecimal rate = testReport.getTransferRateBit(); QuantityType quantity = new QuantityType<>(rate, BIT_PER_SECOND) @@ -110,9 +126,11 @@ public void onCompletion(final @Nullable SpeedTestReport testReport) { switch (testReport.getSpeedTestMode()) { case DOWNLOAD: updateState(CHANNEL_RATE_DOWN, quantity); - String url = configuration.getUploadURL(); - if (speedTestSocket != null && url != null) { - speedTestSocket.startUpload(configuration.getUploadURL(), configuration.uploadSize); + String url = config.getUploadURL(); + synchronized (this) { + if (speedTestSocket != null && url != null) { + speedTestSocket.startUpload(config.getUploadURL(), config.uploadSize); + } } break; case UPLOAD: @@ -132,12 +150,12 @@ public void onError(final @Nullable SpeedTestError testError, final @Nullable St updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR, errorMessage); freeRefreshTask(); } else if (SpeedTestError.SOCKET_TIMEOUT.equals(testError)) { - timeouts--; - if (timeouts <= 0) { + int count = timeouts.decrementAndGet(); + if (count <= 0) { updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.COMMUNICATION_ERROR, "Max timeout count reached"); freeRefreshTask(); } else { - logger.warn("Speedtest timed out, {} attempts left. Message '{}'", timeouts, errorMessage); + logger.warn("Speedtest timed out, {} attempts left. Message '{}'", count, errorMessage); stopSpeedTest(); } } else if (SpeedTestError.SOCKET_ERROR.equals(testError) @@ -156,9 +174,15 @@ public void onProgress(float percent, @Nullable SpeedTestReport testReport) { } private void updateProgress(State state) { - if (!state.toString().equals(bufferedProgress.toString())) { - bufferedProgress = state; - updateState(CHANNEL_TEST_PROGRESS, bufferedProgress); + boolean isNew = false; + synchronized (this) { + if (!state.toString().equals(bufferedProgress.toString())) { + bufferedProgress = state; + isNew = true; + } + } + if (isNew) { + updateState(CHANNEL_TEST_PROGRESS, state); } } @@ -187,19 +211,25 @@ public void dispose() { } private void freeRefreshTask() { - stopSpeedTest(); - if (refreshTask != null) { - refreshTask.cancel(true); - refreshTask = null; + synchronized (this) { + ScheduledFuture task = refreshTask; + if (task != null) { + task.cancel(true); + refreshTask = null; + } } + stopSpeedTest(); } private void startRefreshTask() { - logger.info("Speedtests starts in {} minutes, then refreshes every {} minutes", configuration.initialDelay, - configuration.refreshInterval); - refreshTask = scheduler.scheduleWithFixedDelay(this::startSpeedTest, configuration.initialDelay, - configuration.refreshInterval, TimeUnit.MINUTES); - timeouts = configuration.maxTimeout; + SpeedTestConfiguration config = getConfigAs(SpeedTestConfiguration.class); + logger.info("Speedtests starts in {} minutes, then refreshes every {} minutes", config.initialDelay, + config.refreshInterval); + synchronized (this) { + refreshTask = scheduler.scheduleWithFixedDelay(this::startSpeedTest, config.initialDelay, + config.refreshInterval, TimeUnit.MINUTES); + } + timeouts.set(config.maxTimeout); updateStatus(ThingStatus.ONLINE); } } diff --git a/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/utils/LatencyParser.java b/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/utils/LatencyParser.java index b257df5d3fb9a..07fdca2cf3d18 100644 --- a/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/utils/LatencyParser.java +++ b/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/utils/LatencyParser.java @@ -31,7 +31,13 @@ @NonNullByDefault public class LatencyParser { - private static final Pattern LATENCY_PATTERN = Pattern.compile(".*time=(.*) ?(u|m)s.*"); + private static final Pattern LATENCY_PATTERN = Pattern.compile(".*time(?:=|<)(.*) ?(u|m)s.*"); + private static final Pattern THOMAS_HABERT_ARPING_PATTERN = Pattern + .compile("^[\\w ]+from[\\w:()\\.= ]+?time=([0-9,\\.]+)\\s?(m|u)sec$"); + private static final Pattern IPUTILS_ARPING_PATTERN = Pattern + .compile("^Unicast[\\w ]+from[\\w:()\\.= \\[\\]]+?\\s*([0-9,\\.]+)\\s?(m|u)s$"); + private static final Pattern ELI_FULKERSON_ARP_PING_PATTERN = Pattern + .compile("^Reply that[\\w:\\. ]+?\\sin\\s([0-9,\\.]+)\\s?(m|u)s$"); private final Logger logger = LoggerFactory.getLogger(LatencyParser.class); // This is how the input looks like on Mac and Linux: @@ -56,21 +62,40 @@ public class LatencyParser { * Examine a single ping or arping command output line and try to extract the latency value if it is contained. * * @param inputLine Single output line of the ping or arping command. + * @param type the syntax to expect. Use {@code null} for generic ping syntax. * @return Latency value provided by the ping or arping command. null if the provided line did not * contain a latency value which matches the known patterns. */ - public @Nullable Duration parseLatency(String inputLine) { - logger.debug("Parsing latency from input {}", inputLine); + public @Nullable Duration parseLatency(String inputLine, @Nullable ArpPingUtilEnum type) { + logger.trace("Parsing latency from input \"{}\"", inputLine); - Matcher m = LATENCY_PATTERN.matcher(inputLine); + Pattern pattern; + if (type == null) { + pattern = LATENCY_PATTERN; + } else { + switch (type) { + case ELI_FULKERSON_ARP_PING_FOR_WINDOWS: + pattern = ELI_FULKERSON_ARP_PING_PATTERN; + break; + case IPUTILS_ARPING: + pattern = IPUTILS_ARPING_PATTERN; + break; + case THOMAS_HABERT_ARPING: + case THOMAS_HABERT_ARPING_WITHOUT_TIMEOUT: + pattern = THOMAS_HABERT_ARPING_PATTERN; + break; + default: + pattern = LATENCY_PATTERN; + break; + } + } + Matcher m = pattern.matcher(inputLine); if (m.find() && m.groupCount() >= 2) { if ("u".equals(m.group(2))) { return microsToDuration(Double.parseDouble(m.group(1).replace(",", "."))); } return millisToDuration(Double.parseDouble(m.group(1).replace(",", "."))); } - - logger.debug("Did not find a latency value"); return null; } } diff --git a/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/utils/NetworkUtils.java b/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/utils/NetworkUtils.java index 19493a6bbdee1..db2ac29804935 100644 --- a/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/utils/NetworkUtils.java +++ b/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/utils/NetworkUtils.java @@ -28,13 +28,16 @@ import java.net.SocketException; import java.net.SocketTimeoutException; import java.net.UnknownHostException; +import java.nio.charset.StandardCharsets; import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.Enumeration; +import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -151,7 +154,7 @@ public Set getInterfaceNames() { try { for (Enumeration en = NetworkInterface.getNetworkInterfaces(); en.hasMoreElements();) { NetworkInterface networkInterface = en.nextElement(); - if (!networkInterface.isLoopback()) { + if (networkInterface.isUp() && !networkInterface.isLoopback()) { result.add(networkInterface.getName()); } } @@ -172,6 +175,41 @@ public Set getNetworkIPs(int maximumPerInterface) { return getNetworkIPs(getInterfaceIPs(), maximumPerInterface); } + /** + * Retrieves a map of network interface names to their associated IP addresses. + * + * @return A map where the key is the name of the network interface and the value is a set of CidrAddress objects + * representing the IP addresses and network prefix lengths for that interface. + */ + public Map> getNetworkIPsPerInterface() { + Map> outputMap = new HashMap<>(); + try { + for (Enumeration en = NetworkInterface.getNetworkInterfaces(); en.hasMoreElements();) { + NetworkInterface networkInterface = en.nextElement(); + if (!networkInterface.isUp() || networkInterface.isLoopback()) { + logger.trace("Network interface: {} is excluded in the search", networkInterface.getName()); + continue; + } + + Set addresses = networkInterface.getInterfaceAddresses().stream() + .map(m -> new CidrAddress(m.getAddress(), m.getNetworkPrefixLength())) + .filter(cidr -> !cidr.getAddress().isLoopbackAddress()) // (127.x.x.x, ::1) + .filter(cidr -> !cidr.getAddress().isLinkLocalAddress())// (169.254.x.x or fe80::/10) + .collect(Collectors.toSet()); + + if (!addresses.isEmpty()) { + logger.trace("Network interface: {} is included in the search", networkInterface.getName()); + outputMap.put(networkInterface.getName(), addresses); + } else { + logger.trace("Network interface: {} has no usable addresses", networkInterface.getName()); + } + } + } catch (SocketException e) { + logger.trace("Could not get network interfaces", e); + } + return outputMap; + } + /** * Takes the interfaceIPs and fetches every IP which can be assigned on their network * @@ -179,7 +217,7 @@ public Set getNetworkIPs(int maximumPerInterface) { * @param maximumPerInterface The maximum of IP addresses per interface or 0 to get all. * @return Every single IP which can be assigned on the Networks the computer is connected to */ - private Set getNetworkIPs(Set interfaceIPs, int maximumPerInterface) { + public Set getNetworkIPs(Set interfaceIPs, int maximumPerInterface) { Set networkIPs = new LinkedHashSet<>(); short minCidrPrefixLength = 8; // historic Class A network, addresses = 16777214 @@ -232,7 +270,7 @@ public PingResult servicePing(String host, int port, Duration timeout) throws IO socket.connect(new InetSocketAddress(host, port), (int) timeout.toMillis()); success = true; } catch (ConnectException | SocketTimeoutException | NoRouteToHostException e) { - logger.trace("Could not connect to {}:{}", host, port, e); + logger.trace("Could not connect to {}:{} {}", host, port, e.getMessage()); } return new PingResult(success, Duration.between(execStartTime, Instant.now())); } @@ -268,7 +306,7 @@ public IpPingMethodEnum determinePingMethod() { } catch (IOException e) { logger.trace("Native ping to 127.0.0.1 failed", e); } catch (InterruptedException e) { - Thread.currentThread().interrupt(); // Reset interrupt flag + Thread.currentThread().interrupt(); } return IpPingMethodEnum.JAVA_PING; } @@ -325,15 +363,15 @@ public enum IpPingMethodEnum { switch (method) { case IPUTILS_LINUX_PING: proc = new ProcessBuilder("ping", "-w", String.valueOf(timeout.toSeconds()), "-c", "1", hostname) - .start(); + .redirectErrorStream(true).start(); break; case MAC_OS_PING: proc = new ProcessBuilder("ping", "-t", String.valueOf(timeout.toSeconds()), "-c", "1", hostname) - .start(); + .redirectErrorStream(true).start(); break; case WINDOWS_PING: proc = new ProcessBuilder("ping", "-w", String.valueOf(timeout.toMillis()), "-n", "1", hostname) - .start(); + .redirectErrorStream(true).start(); break; case JAVA_PING: default: @@ -341,6 +379,17 @@ public enum IpPingMethodEnum { return null; } + // Consume the output while the process runs + List output = new ArrayList<>(); + try (InputStreamReader isr = new InputStreamReader(proc.getInputStream(), StandardCharsets.UTF_8); + BufferedReader br = new BufferedReader(isr)) { + String line; + while ((line = br.readLine()) != null) { + output.add(line); + logger.trace("Network [ping output]: '{}'", line); + } + } + // The return code is 0 for a successful ping, 1 if device didn't // respond, and 2 if there is another error like network interface // not ready. @@ -348,28 +397,25 @@ public enum IpPingMethodEnum { // see https://superuser.com/questions/403905/ping-from-windows-7-get-no-reply-but-sets-errorlevel-to-0 int result = proc.waitFor(); + Instant execStopTime = Instant.now(); if (result != 0) { - return new PingResult(false, Duration.between(execStartTime, Instant.now())); + return new PingResult(false, Duration.between(execStartTime, execStopTime)); } - try (BufferedReader r = new BufferedReader(new InputStreamReader(proc.getInputStream()))) { - String line = r.readLine(); - if (line == null) { - throw new IOException("Received no output from ping process."); - } - do { - // Because of the Windows issue, we need to check this. We assume that the ping was successful whenever - // this specific string is contained in the output - if (line.contains("TTL=") || line.contains("ttl=")) { - PingResult pingResult = new PingResult(true, Duration.between(execStartTime, Instant.now())); - pingResult.setResponseTime(latencyParser.parseLatency(line)); - return pingResult; - } - line = r.readLine(); - } while (line != null); + if (output.isEmpty()) { + throw new IOException("Received no output from ping process."); + } - return new PingResult(false, Duration.between(execStartTime, Instant.now())); + for (String line : output) { + // Because of the Windows issue, we need to check this. We assume that the ping was successful whenever + // this specific string is contained in the output + if (line.contains("TTL=") || line.contains("ttl=")) { + PingResult pingResult = new PingResult(true, Duration.between(execStartTime, execStopTime)); + pingResult.setResponseTime(latencyParser.parseLatency(line, null)); + return pingResult; + } } + return new PingResult(false, Duration.between(execStartTime, execStopTime)); } public enum ArpPingUtilEnum { @@ -414,34 +460,47 @@ public enum ArpPingUtilEnum { Instant execStartTime = Instant.now(); Process proc; if (arpingTool == ArpPingUtilEnum.THOMAS_HABERT_ARPING_WITHOUT_TIMEOUT) { - proc = new ProcessBuilder(arpUtilPath, "-c", "1", "-i", interfaceName, ipV4address).start(); + proc = new ProcessBuilder(arpUtilPath, "-c", "1", "-i", interfaceName, ipV4address) + .redirectErrorStream(true).start(); } else if (arpingTool == ArpPingUtilEnum.THOMAS_HABERT_ARPING) { proc = new ProcessBuilder(arpUtilPath, "-w", String.valueOf(timeout.toSeconds()), "-C", "1", "-i", - interfaceName, ipV4address).start(); + interfaceName, ipV4address).redirectErrorStream(true).start(); } else if (arpingTool == ArpPingUtilEnum.ELI_FULKERSON_ARP_PING_FOR_WINDOWS) { - proc = new ProcessBuilder(arpUtilPath, "-w", String.valueOf(timeout.toMillis()), "-x", ipV4address).start(); + proc = new ProcessBuilder(arpUtilPath, "-w", String.valueOf(timeout.toMillis()), "-x", ipV4address) + .redirectErrorStream(true).start(); } else { proc = new ProcessBuilder(arpUtilPath, "-w", String.valueOf(timeout.toSeconds()), "-c", "1", "-I", - interfaceName, ipV4address).start(); + interfaceName, ipV4address).redirectErrorStream(true).start(); + } + + // Consume the output while the process runs + List output = new ArrayList<>(); + try (InputStreamReader isr = new InputStreamReader(proc.getInputStream(), StandardCharsets.UTF_8); + BufferedReader br = new BufferedReader(isr)) { + String line; + while ((line = br.readLine()) != null) { + output.add(line); + logger.trace("Network [arping output]: '{}'", line); + } } // The return code is 0 for a successful ping. 1 if device didn't respond and 2 if there is another error like // network interface not ready. int result = proc.waitFor(); + Instant execStopTime = Instant.now(); if (result != 0) { - return new PingResult(false, Duration.between(execStartTime, Instant.now())); + return new PingResult(false, Duration.between(execStartTime, execStopTime)); } - PingResult pingResult = new PingResult(true, Duration.between(execStartTime, Instant.now())); - try (BufferedReader r = new BufferedReader(new InputStreamReader(proc.getInputStream()))) { - String line = r.readLine(); - while (line != null) { - Duration responseTime = latencyParser.parseLatency(line); + PingResult pingResult = new PingResult(true, Duration.between(execStartTime, execStopTime)); + Duration responseTime; + for (String line : output) { + if (!line.isBlank()) { + responseTime = latencyParser.parseLatency(line, arpingTool); if (responseTime != null) { pingResult.setResponseTime(responseTime); return pingResult; } - line = r.readLine(); } } diff --git a/bundles/org.openhab.binding.network/src/main/resources/OH-INF/addon/addon.xml b/bundles/org.openhab.binding.network/src/main/resources/OH-INF/addon/addon.xml index 6e3f9bd271b15..840dd0f87de23 100644 --- a/bundles/org.openhab.binding.network/src/main/resources/OH-INF/addon/addon.xml +++ b/bundles/org.openhab.binding.network/src/main/resources/OH-INF/addon/addon.xml @@ -43,5 +43,12 @@ such latency value is found in the ping command output, the time to execute the ping command is used as fallback latency. If disabled, the time to execute the ping command is always used as latency value. + + 100 + + The number of threads to use when scanning for network devices. Fewer threads, results in lower memory + consumption but a slower operation. Use 0 for unlimited. + true + diff --git a/bundles/org.openhab.binding.network/src/main/resources/OH-INF/i18n/network.properties b/bundles/org.openhab.binding.network/src/main/resources/OH-INF/i18n/network.properties index 787f3f959311f..e8e55efece46a 100644 --- a/bundles/org.openhab.binding.network/src/main/resources/OH-INF/i18n/network.properties +++ b/bundles/org.openhab.binding.network/src/main/resources/OH-INF/i18n/network.properties @@ -13,6 +13,8 @@ addon.config.network.arpPingToolPath.label = ARP Ping Tool Path addon.config.network.arpPingToolPath.description = If your arp ping tool is not called arping and cannot be found in the PATH environment, you can configure the absolute path / tool name here. addon.config.network.cacheDeviceStateTimeInMS.label = Cache Time addon.config.network.cacheDeviceStateTimeInMS.description = The result of a device presence detection is cached for a small amount of time. Be aware that no new pings will be issued within this time frame, even if explicitly requested. +addon.config.network.numberOfDiscoveryThreads.label = Number of Discovery Threads +addon.config.network.numberOfDiscoveryThreads.description = The number of threads to use when scanning for network devices. Fewer threads, results in lower memory consumption but a slower operation. Use 0 for unlimited. addon.config.network.preferResponseTimeAsLatency.label = Use Response Time as Latency addon.config.network.preferResponseTimeAsLatency.description = If enabled, an attempt will be made to extract the latency from the output of the ping command. If no such latency value is found in the ping command output, the time to execute the ping command is used as fallback latency. If disabled, the time to execute the ping command is always used as latency value. diff --git a/bundles/org.openhab.binding.network/src/test/java/org/openhab/binding/network/internal/PresenceDetectionTest.java b/bundles/org.openhab.binding.network/src/test/java/org/openhab/binding/network/internal/PresenceDetectionTest.java index 0f770e9157a21..7987c430b4bb3 100644 --- a/bundles/org.openhab.binding.network/src/test/java/org/openhab/binding/network/internal/PresenceDetectionTest.java +++ b/bundles/org.openhab.binding.network/src/test/java/org/openhab/binding/network/internal/PresenceDetectionTest.java @@ -22,10 +22,12 @@ import java.time.Duration; import java.util.Set; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.function.Consumer; import org.eclipse.jdt.annotation.NonNullByDefault; +import org.eclipse.jdt.annotation.Nullable; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -50,6 +52,7 @@ public class PresenceDetectionTest { private @NonNullByDefault({}) PresenceDetection subject; + private @NonNullByDefault({}) PresenceDetection asyncSubject; private @Mock @NonNullByDefault({}) Consumer callback; private @Mock @NonNullByDefault({}) ExecutorService detectionExecutorService; @@ -65,7 +68,8 @@ public void setUp() { doReturn(ArpPingUtilEnum.IPUTILS_ARPING).when(networkUtils).determineNativeArpPingMethod(anyString()); doReturn(IpPingMethodEnum.WINDOWS_PING).when(networkUtils).determinePingMethod(); - subject = spy(new PresenceDetection(listener, scheduledExecutorService, Duration.ofSeconds(2))); + // Inject a direct executor so async tasks run synchronously in tests + subject = spy(new PresenceDetection(listener, Duration.ofSeconds(2), Runnable::run)); subject.networkUtils = networkUtils; // Set a useful configuration. The default presenceDetection is a no-op. @@ -77,36 +81,59 @@ public void setUp() { subject.setUseArpPing(true, "arping", ArpPingUtilEnum.IPUTILS_ARPING); subject.setUseIcmpPing(true); + asyncSubject = spy(new PresenceDetection(listener, Duration.ofSeconds(2), Executors.newSingleThreadExecutor())); + + asyncSubject.networkUtils = networkUtils; + asyncSubject.setHostname("127.0.0.1"); + asyncSubject.setTimeout(Duration.ofMillis(300)); + asyncSubject.setUseDhcpSniffing(false); + asyncSubject.setIOSDevice(true); + asyncSubject.setServicePorts(Set.of(1010)); + asyncSubject.setUseArpPing(true, "arping", ArpPingUtilEnum.IPUTILS_ARPING); + asyncSubject.setUseIcmpPing(true); + assertThat(subject.pingMethod, is(IpPingMethodEnum.WINDOWS_PING)); } - // Depending on the amount of test methods an according amount of threads is spawned. - // We will check if they spawn and return in time. + // Depending on the amount of test methods an according amount of threads is used. @Test - public void threadCountTest() { - assertNull(subject.detectionExecutorService); + public void usedThreadCountTest() { + // Custom executor to count submitted tasks + class CountingExecutor implements java.util.concurrent.Executor { + int count = 0; + + @Override + public void execute(@Nullable Runnable command) { + count++; + if (command != null) { + command.run(); + } + } + } + CountingExecutor countingExecutor = new CountingExecutor(); + + // Create a new subject with the counting executor + subject = spy(new PresenceDetection(listener, Duration.ofSeconds(2), countingExecutor)); + subject.networkUtils = networkUtils; + subject.setHostname("127.0.0.1"); + subject.setTimeout(Duration.ofMillis(300)); + subject.setUseDhcpSniffing(false); + subject.setIOSDevice(true); + subject.setServicePorts(Set.of(1010)); + subject.setUseArpPing(true, "arping", ArpPingUtilEnum.IPUTILS_ARPING); + subject.setUseIcmpPing(true); doNothing().when(subject).performArpPing(any(), any()); doNothing().when(subject).performJavaPing(any()); doNothing().when(subject).performSystemPing(any()); doNothing().when(subject).performServicePing(any(), anyInt()); - doReturn(waitForResultExecutorService).when(subject).getThreadsFor(1); - subject.getValue(callback -> { + // No-op callback }); - // Thread count: ARP + ICMP + 1*TCP - assertThat(subject.detectionChecks, is(3)); - assertNotNull(subject.detectionExecutorService); - - // "Wait" for the presence detection to finish - ArgumentCaptor runnableCapture = ArgumentCaptor.forClass(Runnable.class); - verify(waitForResultExecutorService, times(1)).execute(runnableCapture.capture()); - runnableCapture.getValue().run(); - - assertThat(subject.detectionChecks, is(0)); - assertNull(subject.detectionExecutorService); + // Thread count: ARP + ICMP + 1*TCP + task completion watcher = 4 + assertThat(countingExecutor.count, is(4)); } @Test @@ -117,27 +144,11 @@ public void partialAndFinalCallbackTests() throws InterruptedException, IOExcept anyString(), any(), any()); doReturn(pingResult).when(networkUtils).servicePing(anyString(), anyInt(), any()); - doReturn(detectionExecutorService).when(subject).getThreadsFor(3); - doReturn(waitForResultExecutorService).when(subject).getThreadsFor(1); - subject.performPresenceDetection(); assertThat(subject.detectionChecks, is(3)); - // Perform the different presence detection threads now - ArgumentCaptor capture = ArgumentCaptor.forClass(Runnable.class); - verify(detectionExecutorService, times(3)).execute(capture.capture()); - for (Runnable r : capture.getAllValues()) { - r.run(); - } - - // "Wait" for the presence detection to finish - ArgumentCaptor runnableCapture = ArgumentCaptor.forClass(Runnable.class); - verify(waitForResultExecutorService, times(1)).execute(runnableCapture.capture()); - runnableCapture.getValue().run(); - - assertThat(subject.detectionChecks, is(0)); - + // All detection methods should be called (direct executor runs synchronously) verify(subject, times(0)).performJavaPing(any()); verify(subject).performSystemPing(any()); verify(subject).performArpPing(any(), any()); @@ -158,41 +169,22 @@ public void cacheTest() throws InterruptedException, IOException { anyString(), any(), any()); doReturn(pingResult).when(networkUtils).servicePing(anyString(), anyInt(), any()); - doReturn(detectionExecutorService).when(subject).getThreadsFor(3); - doReturn(waitForResultExecutorService).when(subject).getThreadsFor(1); - // We expect no valid value - assertTrue(subject.cache.isExpired()); + assertTrue(asyncSubject.cache.isExpired()); // Get value will issue a PresenceDetection internally. - subject.getValue(callback); - verify(subject).performPresenceDetection(); - assertNotNull(subject.detectionExecutorService); - // There should be no straight callback yet - verify(callback, times(0)).accept(any()); - - // Perform the different presence detection threads now - ArgumentCaptor capture = ArgumentCaptor.forClass(Runnable.class); - verify(detectionExecutorService, times(3)).execute(capture.capture()); - for (Runnable r : capture.getAllValues()) { - r.run(); - } - - // "Wait" for the presence detection to finish - capture = ArgumentCaptor.forClass(Runnable.class); - verify(waitForResultExecutorService, times(1)).execute(capture.capture()); - capture.getValue().run(); - - // Although there are multiple partial results and a final result, - // the getValue() consumers get the fastest response possible, and only once. + asyncSubject.getValue(callback); + verify(asyncSubject).performPresenceDetection(); + Thread.sleep(200); // give it some time to execute + // Callback should be called once with the result (since we use direct executor) verify(callback, times(1)).accept(any()); // As long as the cache is valid, we can get the result back again - subject.getValue(callback); + asyncSubject.getValue(callback); verify(callback, times(2)).accept(any()); // Invalidate value, we should not get a new callback immediately again - subject.cache.invalidateValue(); - subject.getValue(callback); + asyncSubject.cache.invalidateValue(); + asyncSubject.getValue(callback); verify(callback, times(2)).accept(any()); } } diff --git a/bundles/org.openhab.binding.network/src/test/java/org/openhab/binding/network/internal/discovery/DiscoveryTest.java b/bundles/org.openhab.binding.network/src/test/java/org/openhab/binding/network/internal/discovery/DiscoveryTest.java index d0fb5e772cbf2..b014f0217291c 100644 --- a/bundles/org.openhab.binding.network/src/test/java/org/openhab/binding/network/internal/discovery/DiscoveryTest.java +++ b/bundles/org.openhab.binding.network/src/test/java/org/openhab/binding/network/internal/discovery/DiscoveryTest.java @@ -33,6 +33,7 @@ import org.openhab.binding.network.internal.PresenceDetectionValue; import org.openhab.core.config.discovery.DiscoveryListener; import org.openhab.core.config.discovery.DiscoveryResult; +import org.osgi.service.cm.ConfigurationAdmin; /** * Tests cases for {@see PresenceDetectionValue} @@ -57,8 +58,9 @@ public void setUp() { } @Test - public void pingDeviceDetected() { - NetworkDiscoveryService d = new NetworkDiscoveryService(); + public void pingDeviceDetected() throws InterruptedException { + ConfigurationAdmin configAdmin = mock(ConfigurationAdmin.class); + NetworkDiscoveryService d = new NetworkDiscoveryService(configAdmin); d.addDiscoveryListener(listener); ArgumentCaptor result = ArgumentCaptor.forClass(DiscoveryResult.class); @@ -67,6 +69,7 @@ public void pingDeviceDetected() { when(value.isPingReachable()).thenReturn(true); when(value.isTcpServiceReachable()).thenReturn(false); d.partialDetectionResult(value); + Thread.sleep(200L); verify(listener).thingDiscovered(any(), result.capture()); DiscoveryResult dresult = result.getValue(); assertThat(dresult.getThingUID(), is(NetworkDiscoveryService.createPingUID(ip))); @@ -74,8 +77,9 @@ public void pingDeviceDetected() { } @Test - public void tcpDeviceDetected() { - NetworkDiscoveryService d = new NetworkDiscoveryService(); + public void tcpDeviceDetected() throws InterruptedException { + ConfigurationAdmin configAdmin = mock(ConfigurationAdmin.class); + NetworkDiscoveryService d = new NetworkDiscoveryService(configAdmin); d.addDiscoveryListener(listener); ArgumentCaptor result = ArgumentCaptor.forClass(DiscoveryResult.class); @@ -85,6 +89,7 @@ public void tcpDeviceDetected() { when(value.isTcpServiceReachable()).thenReturn(true); when(value.getReachableTcpPorts()).thenReturn(List.of(1010)); d.partialDetectionResult(value); + Thread.sleep(200L); verify(listener).thingDiscovered(any(), result.capture()); DiscoveryResult dresult = result.getValue(); assertThat(dresult.getThingUID(), is(NetworkDiscoveryService.createServiceUID(ip, 1010))); diff --git a/bundles/org.openhab.binding.network/src/test/java/org/openhab/binding/network/internal/handler/NetworkHandlerTest.java b/bundles/org.openhab.binding.network/src/test/java/org/openhab/binding/network/internal/handler/NetworkHandlerTest.java index 1b572d71d9377..c5dc713798d1b 100644 --- a/bundles/org.openhab.binding.network/src/test/java/org/openhab/binding/network/internal/handler/NetworkHandlerTest.java +++ b/bundles/org.openhab.binding.network/src/test/java/org/openhab/binding/network/internal/handler/NetworkHandlerTest.java @@ -71,7 +71,7 @@ public void setUp() { @Test public void checkAllConfigurations() { NetworkBindingConfiguration config = new NetworkBindingConfiguration(); - NetworkHandler handler = spy(new NetworkHandler(thing, true, config)); + NetworkHandler handler = spy(new NetworkHandler(thing, scheduledExecutorService, true, config)); handler.setCallback(callback); // Provide all possible configuration when(thing.getConfiguration()).thenAnswer(a -> { @@ -79,28 +79,23 @@ public void checkAllConfigurations() { conf.put(NetworkBindingConstants.PARAMETER_RETRY, 10); conf.put(NetworkBindingConstants.PARAMETER_HOSTNAME, "127.0.0.1"); conf.put(NetworkBindingConstants.PARAMETER_PORT, 8080); - conf.put(NetworkBindingConstants.PARAMETER_REFRESH_INTERVAL, 101010); conf.put(NetworkBindingConstants.PARAMETER_TIMEOUT, 1234); return conf; }); PresenceDetection presenceDetection = spy( - new PresenceDetection(handler, scheduledExecutorService, Duration.ofSeconds(2))); - // Mock start/stop automatic refresh - doNothing().when(presenceDetection).startAutomaticRefresh(); - doNothing().when(presenceDetection).stopAutomaticRefresh(); + new PresenceDetection(handler, Duration.ofSeconds(2), scheduledExecutorService)); handler.initialize(presenceDetection); assertThat(handler.retries, is(10)); assertThat(presenceDetection.getHostname(), is("127.0.0.1")); assertThat(presenceDetection.getServicePorts().iterator().next(), is(8080)); - assertThat(presenceDetection.getRefreshInterval(), is(Duration.ofMillis(101010))); assertThat(presenceDetection.getTimeout(), is(Duration.ofMillis(1234))); } @Test public void tcpDeviceInitTests() { NetworkBindingConfiguration config = new NetworkBindingConfiguration(); - NetworkHandler handler = spy(new NetworkHandler(thing, true, config)); + NetworkHandler handler = spy(new NetworkHandler(thing, scheduledExecutorService, true, config)); assertThat(handler.isTCPServiceDevice(), is(true)); handler.setCallback(callback); // Port is missing, should make the device OFFLINE @@ -109,7 +104,7 @@ public void tcpDeviceInitTests() { conf.put(NetworkBindingConstants.PARAMETER_HOSTNAME, "127.0.0.1"); return conf; }); - handler.initialize(new PresenceDetection(handler, scheduledExecutorService, Duration.ofSeconds(2))); + handler.initialize(new PresenceDetection(handler, Duration.ofSeconds(2), scheduledExecutorService)); // Check that we are offline ArgumentCaptor statusInfoCaptor = ArgumentCaptor.forClass(ThingStatusInfo.class); verify(callback).statusUpdated(eq(thing), statusInfoCaptor.capture()); @@ -120,19 +115,17 @@ public void tcpDeviceInitTests() { @Test public void pingDeviceInitTests() { NetworkBindingConfiguration config = new NetworkBindingConfiguration(); - NetworkHandler handler = spy(new NetworkHandler(thing, false, config)); + NetworkHandler handler = spy(new NetworkHandler(thing, scheduledExecutorService, false, config)); handler.setCallback(callback); // Provide minimal configuration when(thing.getConfiguration()).thenAnswer(a -> { Configuration conf = new Configuration(); conf.put(NetworkBindingConstants.PARAMETER_HOSTNAME, "127.0.0.1"); + conf.put(NetworkBindingConstants.PARAMETER_REFRESH_INTERVAL, 0); // disable auto refresh return conf; }); PresenceDetection presenceDetection = spy( - new PresenceDetection(handler, scheduledExecutorService, Duration.ofSeconds(2))); - // Mock start/stop automatic refresh - doNothing().when(presenceDetection).startAutomaticRefresh(); - doNothing().when(presenceDetection).stopAutomaticRefresh(); + new PresenceDetection(handler, Duration.ofSeconds(2), scheduledExecutorService)); doReturn(Instant.now()).when(presenceDetection).getLastSeen(); handler.initialize(presenceDetection); diff --git a/bundles/org.openhab.binding.network/src/test/java/org/openhab/binding/network/internal/utils/LatencyParserTest.java b/bundles/org.openhab.binding.network/src/test/java/org/openhab/binding/network/internal/utils/LatencyParserTest.java index cbc1bd7aa82fa..239e726d51095 100644 --- a/bundles/org.openhab.binding.network/src/test/java/org/openhab/binding/network/internal/utils/LatencyParserTest.java +++ b/bundles/org.openhab.binding.network/src/test/java/org/openhab/binding/network/internal/utils/LatencyParserTest.java @@ -35,7 +35,7 @@ public void parseLinuxAndMacResultFoundTest() { String input = "64 bytes from 192.168.1.1: icmp_seq=0 ttl=64 time=1.225 ms"; // Act - Duration resultLatency = latencyParser.parseLatency(input); + Duration resultLatency = latencyParser.parseLatency(input, null); // Assert assertNotNull(resultLatency); @@ -55,7 +55,7 @@ public void parseLinuxAndMacResultNotFoundTest() { for (String inputLine : inputLines) { // Act - Duration resultLatency = latencyParser.parseLatency(inputLine); + Duration resultLatency = latencyParser.parseLatency(inputLine, null); // Assert assertNull(resultLatency); @@ -69,10 +69,15 @@ public void parseWindows10ResultFoundTest() { String input = "Reply from 192.168.178.207: bytes=32 time=2ms TTL=64"; // Act - Duration resultLatency = latencyParser.parseLatency(input); + Duration resultLatency = latencyParser.parseLatency(input, null); // Assert assertNotNull(resultLatency); assertEquals(2, durationToMillis(resultLatency), 0); + + input = "Reply from 10.80.5.2: bytes=32 time<1ms TTL=64"; + resultLatency = latencyParser.parseLatency(input, null); + assertNotNull(resultLatency); + assertEquals(1, durationToMillis(resultLatency), 0.0); } } From 9b602f052d085aeaa030fef14932923b80005650 Mon Sep 17 00:00:00 2001 From: Nadahar Date: Fri, 3 Oct 2025 17:17:13 +0200 Subject: [PATCH 3/3] [network] Use a threaded consumer for Process output consumption (#19398) * Use a threaded consumer for Process output consumption Signed-off-by: Ravi Nadahar --- .../internal/NetworkHandlerFactory.java | 30 ++++- .../discovery/NetworkDiscoveryService.java | 49 ++++++-- .../internal/handler/NetworkHandler.java | 13 ++- .../network/internal/utils/NetworkUtils.java | 81 ++++++++++--- .../internal/utils/OutputConsumptionUtil.java | 110 ++++++++++++++++++ .../internal/handler/NetworkHandlerTest.java | 16 +-- 6 files changed, 254 insertions(+), 45 deletions(-) create mode 100644 bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/utils/OutputConsumptionUtil.java diff --git a/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/NetworkHandlerFactory.java b/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/NetworkHandlerFactory.java index 018df90507c35..8f11b6f5f07fb 100644 --- a/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/NetworkHandlerFactory.java +++ b/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/NetworkHandlerFactory.java @@ -15,12 +15,17 @@ import static org.openhab.binding.network.internal.NetworkBindingConstants.*; import java.util.Map; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.eclipse.jdt.annotation.NonNullByDefault; import org.eclipse.jdt.annotation.Nullable; import org.openhab.binding.network.internal.handler.NetworkHandler; import org.openhab.binding.network.internal.handler.SpeedTestHandler; +import org.openhab.core.common.NamedThreadFactory; import org.openhab.core.common.ThreadPoolManager; import org.openhab.core.config.core.Configuration; import org.openhab.core.thing.Thing; @@ -47,9 +52,11 @@ public class NetworkHandlerFactory extends BaseThingHandlerFactory { final NetworkBindingConfiguration configuration = new NetworkBindingConfiguration(); private static final String NETWORK_HANDLER_THREADPOOL_NAME = "networkBinding"; + private static final String NETWORK_RESOLVER_THREADPOOL_NAME = "binding-network-resolver"; private final Logger logger = LoggerFactory.getLogger(NetworkHandlerFactory.class); private final ScheduledExecutorService executor = ThreadPoolManager .getScheduledPool(NETWORK_HANDLER_THREADPOOL_NAME); + private volatile @Nullable ExecutorService resolver; @Override public boolean supportsThingType(ThingTypeUID thingTypeUID) { @@ -61,12 +68,24 @@ public boolean supportsThingType(ThingTypeUID thingTypeUID) { protected void activate(ComponentContext componentContext, Map config) { super.activate(componentContext); modified(config); + ExecutorService resolver = this.resolver; + if (resolver != null) { + // This should not happen + resolver.shutdownNow(); + } + this.resolver = new ThreadPoolExecutor(1, Integer.MAX_VALUE, 20L, TimeUnit.SECONDS, + new SynchronousQueue(), new NamedThreadFactory(NETWORK_RESOLVER_THREADPOOL_NAME)); } @Override @Deactivate protected void deactivate(ComponentContext componentContext) { super.deactivate(componentContext); + ExecutorService resolver = this.resolver; + if (resolver != null) { + resolver.shutdownNow(); + this.resolver = null; + } } @Modified @@ -80,12 +99,19 @@ protected void modified(Map config) { @Override protected @Nullable ThingHandler createHandler(Thing thing) { + ExecutorService resolver = this.resolver; + if (resolver == null) { + // This should be impossible + logger.error("Failed to create handler for Thing \"{}\" - handler factory hasn't been activated", + thing.getUID()); + return null; + } ThingTypeUID thingTypeUID = thing.getThingTypeUID(); if (thingTypeUID.equals(PING_DEVICE) || thingTypeUID.equals(BACKWARDS_COMPATIBLE_DEVICE)) { - return new NetworkHandler(thing, executor, false, configuration); + return new NetworkHandler(thing, executor, resolver, false, configuration); } else if (thingTypeUID.equals(SERVICE_DEVICE)) { - return new NetworkHandler(thing, executor, true, configuration); + return new NetworkHandler(thing, executor, resolver, true, configuration); } else if (thingTypeUID.equals(SPEEDTEST_DEVICE)) { return new SpeedTestHandler(thing); } diff --git a/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/discovery/NetworkDiscoveryService.java b/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/discovery/NetworkDiscoveryService.java index 471d522f1d10e..4a699b5815d06 100644 --- a/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/discovery/NetworkDiscoveryService.java +++ b/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/discovery/NetworkDiscoveryService.java @@ -80,6 +80,9 @@ public class NetworkDiscoveryService extends AbstractDiscoveryService implements /* All access must be guarded by "this" */ private @Nullable ExecutorService executorService; + + /* All access must be guarded by "this" */ + private @Nullable ExecutorService resolver; private final NetworkUtils networkUtils = new NetworkUtils(); private final ConfigurationAdmin admin; @@ -100,6 +103,10 @@ protected void deactivate() { executorService.shutdownNow(); executorService = null; } + if (resolver != null) { + resolver.shutdownNow(); + resolver = null; + } } super.deactivate(); } @@ -140,6 +147,13 @@ private ExecutorService createDiscoveryExecutor(@Nullable NetworkBindingConfigur } } + private ExecutorService createDiscoveryResolver() { + AtomicInteger count = new AtomicInteger(1); + return Executors.newCachedThreadPool(r -> { + return new Thread(r, "OH-binding-network-discoveryResolver-" + count.getAndIncrement()); + }); + } + /** * Starts the DiscoveryThread for each IP on each interface on the network */ @@ -147,13 +161,18 @@ private ExecutorService createDiscoveryExecutor(@Nullable NetworkBindingConfigur protected void startScan() { NetworkBindingConfiguration configuration = getConfig(); final ExecutorService service; + final ExecutorService resolver; synchronized (this) { if (executorService == null) { executorService = createDiscoveryExecutor(configuration); } service = executorService; + if (this.resolver == null) { + this.resolver = createDiscoveryResolver(); + } + resolver = this.resolver; } - if (service == null) { + if (service == null || resolver == null) { return; } @@ -178,7 +197,7 @@ protected void startScan() { final int targetCount = networkIPs.size(); for (String ip : networkIPs) { - final PresenceDetection pd = new PresenceDetection(this, Duration.ofSeconds(2), service); + final PresenceDetection pd = new PresenceDetection(this, Duration.ofSeconds(2), resolver); pd.setHostname(ip); pd.setIOSDevice(true); pd.setUseDhcpSniffing(false); @@ -211,26 +230,34 @@ protected void startScan() { }); } + @SuppressWarnings("sync-override") @Override protected void stopScan() { final ExecutorService service; + final ExecutorService resolver; synchronized (this) { super.stopScan(); service = executorService; - if (service == null) { - return; - } executorService = null; + resolver = this.resolver; + this.resolver = null; } logger.debug("Stopping Network Device Discovery"); - service.shutdownNow(); // Initiate shutdown - try { - if (!service.awaitTermination(PING_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS)) { - logger.warn("Network discovery scan failed to stop within the timeout of {}", PING_TIMEOUT); + if (service != null) { + service.shutdownNow(); // Initiate shutdown + } + if (resolver != null) { + resolver.shutdown(); // Initiate shutdown, but let it complete queued tasks + } + if (service != null) { + try { + if (!service.awaitTermination(PING_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS)) { + logger.warn("Network discovery scan failed to stop within the timeout of {}", PING_TIMEOUT); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); } } diff --git a/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/handler/NetworkHandler.java b/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/handler/NetworkHandler.java index 7e7af367f1711..836bb9ff61796 100644 --- a/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/handler/NetworkHandler.java +++ b/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/handler/NetworkHandler.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -84,14 +85,16 @@ public class NetworkHandler extends BaseThingHandler // Retry counter. Will be reset as soon as a device presence detection succeed. private volatile int retryCounter = 0; private final ScheduledExecutorService executor; + private final ExecutorService resolver; /** * Creates a new instance using the specified parameters. */ - public NetworkHandler(Thing thing, ScheduledExecutorService executor, boolean isTCPServiceDevice, - NetworkBindingConfiguration configuration) { + public NetworkHandler(Thing thing, ScheduledExecutorService executor, ExecutorService resolver, + boolean isTCPServiceDevice, NetworkBindingConfiguration configuration) { super(thing); this.executor = executor; + this.resolver = resolver; this.isTCPServiceDevice = isTCPServiceDevice; this.configuration = configuration; this.configuration.addNetworkBindingConfigurationListener(this); @@ -225,8 +228,8 @@ void initialize(PresenceDetection presenceDetection) { wakeOnLanPacketSender = new WakeOnLanPacketSender(config.macAddress, config.hostname, config.port, config.networkInterfaceNames); if (config.refreshInterval > 0) { - refreshJob = executor.scheduleWithFixedDelay(presenceDetection::refresh, 0, config.refreshInterval, - TimeUnit.MILLISECONDS); + refreshJob = executor.scheduleWithFixedDelay(presenceDetection::refresh, (long) (Math.random() * 5000), + config.refreshInterval, TimeUnit.MILLISECONDS); } } @@ -256,7 +259,7 @@ public void initialize() { updateStatus(ThingStatus.UNKNOWN); executor.submit(() -> { initialize(new PresenceDetection(this, Duration.ofMillis(configuration.cacheDeviceStateTimeInMS.intValue()), - executor)); + resolver)); }); } diff --git a/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/utils/NetworkUtils.java b/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/utils/NetworkUtils.java index db2ac29804935..f96c0d3133e9a 100644 --- a/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/utils/NetworkUtils.java +++ b/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/utils/NetworkUtils.java @@ -12,9 +12,7 @@ */ package org.openhab.binding.network.internal.utils; -import java.io.BufferedReader; import java.io.IOException; -import java.io.InputStreamReader; import java.net.ConnectException; import java.net.DatagramPacket; import java.net.DatagramSocket; @@ -39,6 +37,10 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import org.eclipse.jdt.annotation.NonNullByDefault; @@ -380,12 +382,35 @@ public enum IpPingMethodEnum { } // Consume the output while the process runs - List output = new ArrayList<>(); - try (InputStreamReader isr = new InputStreamReader(proc.getInputStream(), StandardCharsets.UTF_8); - BufferedReader br = new BufferedReader(isr)) { - String line; - while ((line = br.readLine()) != null) { - output.add(line); + FutureTask> consumer = OutputConsumptionUtil.consumeText(proc.getInputStream(), + StandardCharsets.UTF_8); + + if (!proc.waitFor(timeout.toMillis() * 10L, TimeUnit.MILLISECONDS)) { + logger.warn("Timed out while waiting for the ping process to execute"); + proc.destroy(); + return new PingResult(false, Duration.between(execStartTime, Instant.now())); + } + int result = proc.exitValue(); + Instant execStopTime = Instant.now(); + List output; + try { + if (Thread.currentThread().isInterrupted()) { + consumer.cancel(true); + output = List.of(); + } + output = consumer.get(5, TimeUnit.SECONDS); + } catch (ExecutionException e) { + output = List.of(); + logger.warn("An exception occurred while consuming ping process output: {}", e.getMessage()); + logger.trace("", e); + } catch (TimeoutException e) { + // This should never happen, since the output should be available as soon as the process has finished + output = List.of(); + logger.warn("Timed out while retrieving ping process output"); + logger.trace("", e); + } + if (logger.isTraceEnabled()) { + for (String line : output) { logger.trace("Network [ping output]: '{}'", line); } } @@ -395,9 +420,6 @@ public enum IpPingMethodEnum { // not ready. // Exception: return code is also 0 in Windows for all requests on the local subnet. // see https://superuser.com/questions/403905/ping-from-windows-7-get-no-reply-but-sets-errorlevel-to-0 - - int result = proc.waitFor(); - Instant execStopTime = Instant.now(); if (result != 0) { return new PingResult(false, Duration.between(execStartTime, execStopTime)); } @@ -474,20 +496,41 @@ public enum ArpPingUtilEnum { } // Consume the output while the process runs - List output = new ArrayList<>(); - try (InputStreamReader isr = new InputStreamReader(proc.getInputStream(), StandardCharsets.UTF_8); - BufferedReader br = new BufferedReader(isr)) { - String line; - while ((line = br.readLine()) != null) { - output.add(line); + FutureTask> consumer = OutputConsumptionUtil.consumeText(proc.getInputStream(), + StandardCharsets.UTF_8); + + if (!proc.waitFor(timeout.toMillis() * 10L, TimeUnit.MILLISECONDS)) { + logger.warn("Timed out while waiting for the arping process to execute"); + proc.destroy(); + return new PingResult(false, Duration.between(execStartTime, Instant.now())); + } + int result = proc.exitValue(); + Instant execStopTime = Instant.now(); + List output; + try { + if (Thread.currentThread().isInterrupted()) { + consumer.cancel(true); + output = List.of(); + } + output = consumer.get(5, TimeUnit.SECONDS); + } catch (ExecutionException e) { + output = List.of(); + logger.warn("An exception occurred while consuming arping process output: {}", e.getMessage()); + logger.trace("", e); + } catch (TimeoutException e) { + // This should never happen, since the output should be available as soon as the process has finished + output = List.of(); + logger.warn("Timed out while retrieving arping process output"); + logger.trace("", e); + } + if (logger.isTraceEnabled()) { + for (String line : output) { logger.trace("Network [arping output]: '{}'", line); } } // The return code is 0 for a successful ping. 1 if device didn't respond and 2 if there is another error like // network interface not ready. - int result = proc.waitFor(); - Instant execStopTime = Instant.now(); if (result != 0) { return new PingResult(false, Duration.between(execStartTime, execStopTime)); } diff --git a/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/utils/OutputConsumptionUtil.java b/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/utils/OutputConsumptionUtil.java new file mode 100644 index 0000000000000..1598e60569e46 --- /dev/null +++ b/bundles/org.openhab.binding.network/src/main/java/org/openhab/binding/network/internal/utils/OutputConsumptionUtil.java @@ -0,0 +1,110 @@ +/* + * Copyright (c) 2010-2025 Contributors to the openHAB project + * + * See the NOTICE file(s) distributed with this work for additional + * information. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.openhab.binding.network.internal.utils; + +import java.io.BufferedReader; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.FutureTask; + +import org.eclipse.jdt.annotation.NonNullByDefault; +import org.eclipse.jdt.annotation.Nullable; + +/** + * A utility class for spawning threads that consume all bytes from the specified {@link InputStream} and + * return the results as {@link FutureTask}s. + * + * @author Ravi Nadahar - Initial contribution + */ +@NonNullByDefault +public class OutputConsumptionUtil { + + private OutputConsumptionUtil() { + // Not to be instantiated + } + + /** + * Consumes the specified {@link InputStream} by spawning a new thread and converts in into text using UTF-8. The + * result is returned as a {@link FutureTask}. + * + * @param inputStream the {@link InputStream} to consume. + * @return The {@link FutureTask} where the task can be cancelled and the results retrieved. + */ + public static FutureTask> consumeText(InputStream inputStream) { + return consumeText(inputStream, null, null); + } + + /** + * Consumes the specified {@link InputStream} by spawning a new thread and converts in into text using the + * specified {@link Charset}. The result is returned as a {@link FutureTask}. + * + * @param inputStream the {@link InputStream} to consume. + * @param charset the {@link Charset} to use for byte to character conversion. + * will be generated. + * @return The {@link FutureTask} where the task can be cancelled and the results retrieved. + */ + public static FutureTask> consumeText(InputStream inputStream, @Nullable Charset charset) { + return consumeText(inputStream, charset, null); + } + + /** + * Consumes the specified {@link InputStream} by spawning a new thread and converts in into text using UTF-8. The + * result is returned as a {@link FutureTask}. + * + * @param inputStream the {@link InputStream} to consume. + * @param threadName the name of the worker thread that will do the consumption. If {@code null}, a thread name + * will be generated. + * @return The {@link FutureTask} where the task can be cancelled and the results retrieved. + */ + public static FutureTask> consumeText(InputStream inputStream, @Nullable String threadName) { + return consumeText(inputStream, null, threadName); + } + + /** + * Consumes the specified {@link InputStream} by spawning a new thread and converts in into text using the + * specified {@link Charset}. The result is returned as a {@link FutureTask}. + * + * @param inputStream the {@link InputStream} to consume. + * @param charset the {@link Charset} to use for byte to character conversion. + * @param threadName the name of the worker thread that will do the consumption. If {@code null}, a thread name + * will be generated. + * @return The {@link FutureTask} where the task can be cancelled and the results retrieved. + */ + public static FutureTask> consumeText(final InputStream inputStream, @Nullable Charset charset, + @Nullable String threadName) { + FutureTask> result = new FutureTask>(() -> { + List output = new ArrayList<>(); + try (InputStreamReader isr = new InputStreamReader(inputStream, + charset == null ? StandardCharsets.UTF_8 : charset); BufferedReader br = new BufferedReader(isr)) { + String line; + while ((line = br.readLine()) != null) { + output.add(line); + } + } + return output; + }); + + Thread runner; + if (threadName == null || threadName.isBlank()) { + runner = new Thread(result, Thread.currentThread().getName() + "-consumer"); + } else { + runner = new Thread(result, threadName); + } + runner.start(); + return result; + } +} diff --git a/bundles/org.openhab.binding.network/src/test/java/org/openhab/binding/network/internal/handler/NetworkHandlerTest.java b/bundles/org.openhab.binding.network/src/test/java/org/openhab/binding/network/internal/handler/NetworkHandlerTest.java index c5dc713798d1b..5f9df3a5ca4c6 100644 --- a/bundles/org.openhab.binding.network/src/test/java/org/openhab/binding/network/internal/handler/NetworkHandlerTest.java +++ b/bundles/org.openhab.binding.network/src/test/java/org/openhab/binding/network/internal/handler/NetworkHandlerTest.java @@ -21,6 +21,7 @@ import java.time.Duration; import java.time.Instant; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import org.eclipse.jdt.annotation.NonNullByDefault; @@ -61,6 +62,7 @@ public class NetworkHandlerTest extends JavaTest { private @Mock @NonNullByDefault({}) ThingHandlerCallback callback; private @Mock @NonNullByDefault({}) ScheduledExecutorService scheduledExecutorService; + private @Mock @NonNullByDefault({}) ExecutorService resolver; private @Mock @NonNullByDefault({}) Thing thing; @BeforeEach @@ -71,7 +73,7 @@ public void setUp() { @Test public void checkAllConfigurations() { NetworkBindingConfiguration config = new NetworkBindingConfiguration(); - NetworkHandler handler = spy(new NetworkHandler(thing, scheduledExecutorService, true, config)); + NetworkHandler handler = spy(new NetworkHandler(thing, scheduledExecutorService, resolver, true, config)); handler.setCallback(callback); // Provide all possible configuration when(thing.getConfiguration()).thenAnswer(a -> { @@ -82,8 +84,7 @@ public void checkAllConfigurations() { conf.put(NetworkBindingConstants.PARAMETER_TIMEOUT, 1234); return conf; }); - PresenceDetection presenceDetection = spy( - new PresenceDetection(handler, Duration.ofSeconds(2), scheduledExecutorService)); + PresenceDetection presenceDetection = spy(new PresenceDetection(handler, Duration.ofSeconds(2), resolver)); handler.initialize(presenceDetection); assertThat(handler.retries, is(10)); @@ -95,7 +96,7 @@ public void checkAllConfigurations() { @Test public void tcpDeviceInitTests() { NetworkBindingConfiguration config = new NetworkBindingConfiguration(); - NetworkHandler handler = spy(new NetworkHandler(thing, scheduledExecutorService, true, config)); + NetworkHandler handler = spy(new NetworkHandler(thing, scheduledExecutorService, resolver, true, config)); assertThat(handler.isTCPServiceDevice(), is(true)); handler.setCallback(callback); // Port is missing, should make the device OFFLINE @@ -104,7 +105,7 @@ public void tcpDeviceInitTests() { conf.put(NetworkBindingConstants.PARAMETER_HOSTNAME, "127.0.0.1"); return conf; }); - handler.initialize(new PresenceDetection(handler, Duration.ofSeconds(2), scheduledExecutorService)); + handler.initialize(new PresenceDetection(handler, Duration.ofSeconds(2), resolver)); // Check that we are offline ArgumentCaptor statusInfoCaptor = ArgumentCaptor.forClass(ThingStatusInfo.class); verify(callback).statusUpdated(eq(thing), statusInfoCaptor.capture()); @@ -115,7 +116,7 @@ public void tcpDeviceInitTests() { @Test public void pingDeviceInitTests() { NetworkBindingConfiguration config = new NetworkBindingConfiguration(); - NetworkHandler handler = spy(new NetworkHandler(thing, scheduledExecutorService, false, config)); + NetworkHandler handler = spy(new NetworkHandler(thing, scheduledExecutorService, resolver, false, config)); handler.setCallback(callback); // Provide minimal configuration when(thing.getConfiguration()).thenAnswer(a -> { @@ -124,8 +125,7 @@ public void pingDeviceInitTests() { conf.put(NetworkBindingConstants.PARAMETER_REFRESH_INTERVAL, 0); // disable auto refresh return conf; }); - PresenceDetection presenceDetection = spy( - new PresenceDetection(handler, Duration.ofSeconds(2), scheduledExecutorService)); + PresenceDetection presenceDetection = spy(new PresenceDetection(handler, Duration.ofSeconds(2), resolver)); doReturn(Instant.now()).when(presenceDetection).getLastSeen(); handler.initialize(presenceDetection);