Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,17 @@
import static org.openhab.binding.network.internal.NetworkBindingConstants.*;

import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.openhab.binding.network.internal.handler.NetworkHandler;
import org.openhab.binding.network.internal.handler.SpeedTestHandler;
import org.openhab.core.common.NamedThreadFactory;
import org.openhab.core.common.ThreadPoolManager;
import org.openhab.core.config.core.Configuration;
import org.openhab.core.thing.Thing;
Expand All @@ -47,9 +52,11 @@
public class NetworkHandlerFactory extends BaseThingHandlerFactory {
final NetworkBindingConfiguration configuration = new NetworkBindingConfiguration();
private static final String NETWORK_HANDLER_THREADPOOL_NAME = "networkBinding";
private static final String NETWORK_RESOLVER_THREADPOOL_NAME = "binding-network-resolver";
private final Logger logger = LoggerFactory.getLogger(NetworkHandlerFactory.class);
private final ScheduledExecutorService executor = ThreadPoolManager
.getScheduledPool(NETWORK_HANDLER_THREADPOOL_NAME);
private volatile @Nullable ExecutorService resolver;

@Override
public boolean supportsThingType(ThingTypeUID thingTypeUID) {
Expand All @@ -61,12 +68,24 @@ public boolean supportsThingType(ThingTypeUID thingTypeUID) {
protected void activate(ComponentContext componentContext, Map<String, Object> config) {
super.activate(componentContext);
modified(config);
ExecutorService resolver = this.resolver;
if (resolver != null) {
// This should not happen
resolver.shutdownNow();
}
this.resolver = new ThreadPoolExecutor(1, Integer.MAX_VALUE, 20L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), new NamedThreadFactory(NETWORK_RESOLVER_THREADPOOL_NAME));
}

@Override
@Deactivate
protected void deactivate(ComponentContext componentContext) {
super.deactivate(componentContext);
ExecutorService resolver = this.resolver;
if (resolver != null) {
resolver.shutdownNow();
this.resolver = null;
}
}

@Modified
Expand All @@ -80,12 +99,19 @@ protected void modified(Map<String, Object> config) {

@Override
protected @Nullable ThingHandler createHandler(Thing thing) {
ExecutorService resolver = this.resolver;
if (resolver == null) {
// This should be impossible
logger.error("Failed to create handler for Thing \"{}\" - handler factory hasn't been activated",
thing.getUID());
return null;
}
ThingTypeUID thingTypeUID = thing.getThingTypeUID();

if (thingTypeUID.equals(PING_DEVICE) || thingTypeUID.equals(BACKWARDS_COMPATIBLE_DEVICE)) {
return new NetworkHandler(thing, executor, false, configuration);
return new NetworkHandler(thing, executor, resolver, false, configuration);
} else if (thingTypeUID.equals(SERVICE_DEVICE)) {
return new NetworkHandler(thing, executor, true, configuration);
return new NetworkHandler(thing, executor, resolver, true, configuration);
} else if (thingTypeUID.equals(SPEEDTEST_DEVICE)) {
return new SpeedTestHandler(thing);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ public class NetworkDiscoveryService extends AbstractDiscoveryService implements

/* All access must be guarded by "this" */
private @Nullable ExecutorService executorService;

/* All access must be guarded by "this" */
private @Nullable ExecutorService resolver;
private final NetworkUtils networkUtils = new NetworkUtils();
private final ConfigurationAdmin admin;

Expand All @@ -100,6 +103,10 @@ protected void deactivate() {
executorService.shutdownNow();
executorService = null;
}
if (resolver != null) {
resolver.shutdownNow();
resolver = null;
}
}
super.deactivate();
}
Expand Down Expand Up @@ -140,20 +147,33 @@ private ExecutorService createDiscoveryExecutor(@Nullable NetworkBindingConfigur
}
}

private ExecutorService createDiscoveryResolver() {
AtomicInteger count = new AtomicInteger(1);
return Executors.newCachedThreadPool(r -> {
Thread t = new Thread(r, "OH-binding-network-discoveryResolver-" + count.getAndIncrement());
return t;
});
}

/**
* Starts the DiscoveryThread for each IP on each interface on the network
*/
@Override
protected void startScan() {
NetworkBindingConfiguration configuration = getConfig();
final ExecutorService service;
final ExecutorService resolver;
synchronized (this) {
if (executorService == null) {
executorService = createDiscoveryExecutor(configuration);
}
service = executorService;
if (this.resolver == null) {
this.resolver = createDiscoveryResolver();
}
resolver = this.resolver;
}
if (service == null) {
if (service == null || resolver == null) {
return;
}

Expand All @@ -178,7 +198,7 @@ protected void startScan() {
final int targetCount = networkIPs.size();

for (String ip : networkIPs) {
final PresenceDetection pd = new PresenceDetection(this, Duration.ofSeconds(2), service);
final PresenceDetection pd = new PresenceDetection(this, Duration.ofSeconds(2), resolver);
pd.setHostname(ip);
pd.setIOSDevice(true);
pd.setUseDhcpSniffing(false);
Expand Down Expand Up @@ -211,26 +231,34 @@ protected void startScan() {
});
}

@SuppressWarnings("sync-override")
@Override
protected void stopScan() {
final ExecutorService service;
final ExecutorService resolver;
synchronized (this) {
super.stopScan();
service = executorService;
if (service == null) {
return;
}
executorService = null;
resolver = this.resolver;
this.resolver = null;
}
logger.debug("Stopping Network Device Discovery");

service.shutdownNow(); // Initiate shutdown
try {
if (!service.awaitTermination(PING_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS)) {
logger.warn("Network discovery scan failed to stop within the timeout of {}", PING_TIMEOUT);
if (service != null) {
service.shutdownNow(); // Initiate shutdown
}
if (resolver != null) {
resolver.shutdown(); // Initiate shutdown, but let it complete queued tasks
}
if (service != null) {
try {
if (!service.awaitTermination(PING_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS)) {
logger.warn("Network discovery scan failed to stop within the timeout of {}", PING_TIMEOUT);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -84,14 +85,16 @@ public class NetworkHandler extends BaseThingHandler
// Retry counter. Will be reset as soon as a device presence detection succeed.
private volatile int retryCounter = 0;
private final ScheduledExecutorService executor;
private final ExecutorService resolver;

/**
* Creates a new instance using the specified parameters.
*/
public NetworkHandler(Thing thing, ScheduledExecutorService executor, boolean isTCPServiceDevice,
NetworkBindingConfiguration configuration) {
public NetworkHandler(Thing thing, ScheduledExecutorService executor, ExecutorService resolver,
boolean isTCPServiceDevice, NetworkBindingConfiguration configuration) {
super(thing);
this.executor = executor;
this.resolver = resolver;
this.isTCPServiceDevice = isTCPServiceDevice;
this.configuration = configuration;
this.configuration.addNetworkBindingConfigurationListener(this);
Expand Down Expand Up @@ -225,8 +228,8 @@ void initialize(PresenceDetection presenceDetection) {
wakeOnLanPacketSender = new WakeOnLanPacketSender(config.macAddress, config.hostname, config.port,
config.networkInterfaceNames);
if (config.refreshInterval > 0) {
refreshJob = executor.scheduleWithFixedDelay(presenceDetection::refresh, 0, config.refreshInterval,
TimeUnit.MILLISECONDS);
refreshJob = executor.scheduleWithFixedDelay(presenceDetection::refresh, (long) (Math.random() * 5000),
config.refreshInterval, TimeUnit.MILLISECONDS);
}
}

Expand Down Expand Up @@ -256,7 +259,7 @@ public void initialize() {
updateStatus(ThingStatus.UNKNOWN);
executor.submit(() -> {
initialize(new PresenceDetection(this, Duration.ofMillis(configuration.cacheDeviceStateTimeInMS.intValue()),
executor));
resolver));
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@
*/
package org.openhab.binding.network.internal.utils;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.ConnectException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
Expand All @@ -39,6 +37,10 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

import org.eclipse.jdt.annotation.NonNullByDefault;
Expand Down Expand Up @@ -380,12 +382,35 @@ public enum IpPingMethodEnum {
}

// Consume the output while the process runs
List<String> output = new ArrayList<>();
try (InputStreamReader isr = new InputStreamReader(proc.getInputStream(), StandardCharsets.UTF_8);
BufferedReader br = new BufferedReader(isr)) {
String line;
while ((line = br.readLine()) != null) {
output.add(line);
FutureTask<List<String>> consumer = OutputConsumptionUtil.consumeText(proc.getInputStream(),
StandardCharsets.UTF_8);

if (!proc.waitFor(timeout.toMillis() * 10L, TimeUnit.MILLISECONDS)) {
logger.warn("Timed out while waiting for the ping process to execute");
proc.destroy();
return new PingResult(false, Duration.between(execStartTime, Instant.now()));
}
int result = proc.exitValue();
Instant execStopTime = Instant.now();
List<String> output;
try {
if (Thread.currentThread().isInterrupted()) {
consumer.cancel(true);
output = List.of();
}
output = consumer.get(5, TimeUnit.SECONDS);
} catch (ExecutionException e) {
output = List.of();
logger.warn("An exception occurred while consuming ping process output: {}", e.getMessage());
logger.trace("", e);
} catch (TimeoutException e) {
// This should never happen, since the output should be available as soon as the process has finished
output = List.of();
logger.warn("Timed out while retrieving ping process output");
logger.trace("", e);
}
if (logger.isTraceEnabled()) {
for (String line : output) {
logger.trace("Network [ping output]: '{}'", line);
}
}
Expand All @@ -395,9 +420,6 @@ public enum IpPingMethodEnum {
// not ready.
// Exception: return code is also 0 in Windows for all requests on the local subnet.
// see https://superuser.com/questions/403905/ping-from-windows-7-get-no-reply-but-sets-errorlevel-to-0

int result = proc.waitFor();
Instant execStopTime = Instant.now();
if (result != 0) {
return new PingResult(false, Duration.between(execStartTime, execStopTime));
}
Expand Down Expand Up @@ -474,20 +496,41 @@ public enum ArpPingUtilEnum {
}

// Consume the output while the process runs
List<String> output = new ArrayList<>();
try (InputStreamReader isr = new InputStreamReader(proc.getInputStream(), StandardCharsets.UTF_8);
BufferedReader br = new BufferedReader(isr)) {
String line;
while ((line = br.readLine()) != null) {
output.add(line);
FutureTask<List<String>> consumer = OutputConsumptionUtil.consumeText(proc.getInputStream(),
StandardCharsets.UTF_8);

if (!proc.waitFor(timeout.toMillis() * 10L, TimeUnit.MILLISECONDS)) {
logger.warn("Timed out while waiting for the arping process to execute");
proc.destroy();
return new PingResult(false, Duration.between(execStartTime, Instant.now()));
}
int result = proc.exitValue();
Instant execStopTime = Instant.now();
List<String> output;
try {
if (Thread.currentThread().isInterrupted()) {
consumer.cancel(true);
output = List.of();
}
output = consumer.get(5, TimeUnit.SECONDS);
} catch (ExecutionException e) {
output = List.of();
logger.warn("An exception occurred while consuming arping process output: {}", e.getMessage());
logger.trace("", e);
} catch (TimeoutException e) {
// This should never happen, since the output should be available as soon as the process has finished
output = List.of();
logger.warn("Timed out while retrieving arping process output");
logger.trace("", e);
}
if (logger.isTraceEnabled()) {
for (String line : output) {
logger.trace("Network [arping output]: '{}'", line);
}
}

// The return code is 0 for a successful ping. 1 if device didn't respond and 2 if there is another error like
// network interface not ready.
int result = proc.waitFor();
Instant execStopTime = Instant.now();
if (result != 0) {
return new PingResult(false, Duration.between(execStartTime, execStopTime));
}
Expand Down
Loading