Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Copyright (c) 2010-2025 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Copyright (c) 2010-2025 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Copyright (c) 2010-2025 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
Expand Down Expand Up @@ -58,8 +58,7 @@ public void setThingHandler(@Nullable ThingHandler handler) {
public void resetCounter(
@ActionInput(name = "counter", label = "Counter", required = true, description = "Id of the counter", type = "java.lang.Integer") Integer counter) {
logger.debug("IPX800 action 'resetCounter' called");
Ipx800v3Handler theHandler = this.handler;
if (theHandler != null) {
if (handler instanceof Ipx800v3Handler theHandler) {
theHandler.resetCounter(counter);
} else {
logger.warn("Method call resetCounter failed because IPX800 action service ThingHandler is null!");
Expand All @@ -70,8 +69,7 @@ public void resetCounter(
public void reset(
@ActionInput(name = "placeholder", label = "Placeholder", required = false, description = "This parameter is not used", type = "java.lang.Integer") @Nullable Integer placeholder) {
logger.debug("IPX800 action 'reset' called");
Ipx800v3Handler theHandler = this.handler;
if (theHandler != null) {
if (handler instanceof Ipx800v3Handler theHandler) {
theHandler.reset();
} else {
logger.warn("Method call reset failed because IPX800 action service ThingHandler is null!");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Copyright (c) 2010-2025 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Copyright (c) 2010-2025 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Copyright (c) 2010-2025 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Copyright (c) 2010-2025 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Copyright (c) 2010-2025 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
Expand All @@ -18,13 +18,18 @@
import java.io.PrintWriter;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.Optional;
import java.net.UnknownHostException;
import java.util.Random;

import org.eclipse.jdt.annotation.NonNullByDefault;
import org.openhab.binding.gce.internal.model.M2MMessageParser;
import org.openhab.binding.gce.internal.model.PortDefinition;
import org.openhab.binding.gce.internal.model.StatusFile;
import org.openhab.binding.gce.internal.model.StatusFileAccessor;
import org.openhab.core.thing.ThingUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xml.sax.SAXException;

/**
* The {@link Ipx800DeviceConnector} is responsible for connecting,
Expand All @@ -35,156 +40,148 @@
*/
@NonNullByDefault
public class Ipx800DeviceConnector extends Thread {
private static final int DEFAULT_SOCKET_TIMEOUT_MS = 5000;
private static final int DEFAULT_RECONNECT_TIMEOUT_MS = 5000;
private static final int DEFAULT_SOCKET_TIMEOUT_MS = 10000;
private static final int MAX_KEEPALIVE_FAILURE = 3;
private static final String ENDL = "\r\n";

private final Logger logger = LoggerFactory.getLogger(Ipx800DeviceConnector.class);
private final Random randomizer = new Random();

private final String hostname;
private final int portNumber;

private Optional<M2MMessageParser> messageParser = Optional.empty();
private Optional<Socket> socket = Optional.empty();
private Optional<BufferedReader> input = Optional.empty();
private Optional<PrintWriter> output = Optional.empty();
private final M2MMessageParser parser;
private final StatusFileAccessor statusAccessor;
private final Ipx800EventListener listener;
private final Socket socket;
private final BufferedReader input;
private final PrintWriter output;

private int failedKeepalive = 0;
private boolean waitingKeepaliveResponse = false;
private boolean interrupted = false;

public Ipx800DeviceConnector(String hostname, int portNumber, ThingUID uid) {
public Ipx800DeviceConnector(String hostname, int portNumber, ThingUID uid, Ipx800EventListener listener)
throws UnknownHostException, IOException {
super("OH-binding-" + uid);
this.hostname = hostname;
this.portNumber = portNumber;
this.listener = listener;

logger.debug("Connecting to {}:{}...", hostname, portNumber);
Socket socket = new Socket(hostname, portNumber);
socket.setSoTimeout(DEFAULT_SOCKET_TIMEOUT_MS);
this.socket = socket;

output = new PrintWriter(socket.getOutputStream(), true);
input = new BufferedReader(new InputStreamReader(socket.getInputStream()));
parser = new M2MMessageParser(listener);
statusAccessor = new StatusFileAccessor(hostname);
setDaemon(true);
}

/**
*
* Stop the device thread
*/

public void dispose() {
interrupted = true;
}

public synchronized void send(String message) {
output.ifPresentOrElse(out -> {
logger.debug("Sending '{}' to Ipx800", message);
out.write(message + ENDL);
out.flush();
}, () -> logger.warn("Trying to send '{}' while the output stream is closed.", message));
logger.debug("Sending '{}' to Ipx800", message);
output.println(message);
}

/**
* Connect to the ipx800
*
* @throws IOException
* Send a random keepalive command which cause the IPX to send an update.
* If we don't receive the update maxKeepAliveFailure time, the connection
* is closed
*/
private void connect() throws IOException {
disconnect();

logger.debug("Connecting to {}:{}...", hostname, portNumber);
Socket socket = new Socket(hostname, portNumber);
socket.setSoTimeout(DEFAULT_SOCKET_TIMEOUT_MS);
socket.getInputStream().skip(socket.getInputStream().available());
this.socket = Optional.of(socket);
private void sendKeepalive() {
PortDefinition pd = PortDefinition.values()[randomizer.nextInt(PortDefinition.AS_SET.size())];
String command = "%s%d".formatted(pd.m2mCommand, randomizer.nextInt(pd.quantity) + 1);

if (waitingKeepaliveResponse) {
failedKeepalive++;
logger.debug("Sending keepalive {}, attempt {}", command, failedKeepalive);
} else {
failedKeepalive = 0;
logger.debug("Sending keepalive {}", command);
}

input = Optional.of(new BufferedReader(new InputStreamReader(socket.getInputStream())));
output = Optional.of(new PrintWriter(socket.getOutputStream(), true));
output.println(command);
parser.setExpectedResponse(command);

waitingKeepaliveResponse = true;
}

/**
* Disconnect the device
*/
private void disconnect() {
logger.debug("Disconnecting");
@Override
public void run() {
while (!interrupted) {
if (failedKeepalive > MAX_KEEPALIVE_FAILURE) {
interrupted = true;
listener.errorOccurred(new IOException("Max keep alive attempts has been reached"));
}
try {
String command = input.readLine();
waitingKeepaliveResponse = false;
parser.unsolicitedUpdate(command);
} catch (SocketTimeoutException e) {
sendKeepalive();
} catch (IOException e) {
interrupted = true;
listener.errorOccurred(e);
}
}
if (output instanceof PrintWriter out) {
out.close();
}

input.ifPresent(in -> {
if (input instanceof BufferedReader in) {
try {
in.close();
} catch (IOException ignore) {
} catch (IOException e) {
logger.warn("Exception input stream: {}", e.getMessage());
}
input = Optional.empty();
});

output.ifPresent(PrintWriter::close);
output = Optional.empty();
}

socket.ifPresent(client -> {
if (socket instanceof Socket client) {
try {
logger.debug("Closing socket");
client.close();
} catch (IOException ignore) {
} catch (IOException e) {
logger.warn("Exception closing socket: {}", e.getMessage());
}
socket = Optional.empty();
});
}
}

logger.debug("Disconnected");
public StatusFile readStatusFile() throws SAXException, IOException {
return statusAccessor.read();
}

/**
* Stop the device thread
*
* Set output of the device sending the corresponding command
*
* @param targetPort
* @param targetValue
*/
public void dispose() {
interrupt();
disconnect();
public void setOutput(String targetPort, int targetValue, boolean pulse) {
logger.debug("Sending {} to {}", targetValue, targetPort);
String command = "Set%02d%s%s".formatted(Integer.parseInt(targetPort), targetValue, pulse ? "p" : "");
send(command);
}

/**
* Send an arbitrary keepalive command which cause the IPX to send an update.
* If we don't receive the update maxKeepAliveFailure time, the connection is closed and reopened
*
* Resets the counter value to 0
*
* @param targetCounter
*/
private void sendKeepalive() {
output.ifPresent(out -> {
if (waitingKeepaliveResponse) {
failedKeepalive++;
logger.debug("Sending keepalive, attempt {}", failedKeepalive);
} else {
failedKeepalive = 0;
logger.debug("Sending keepalive");
}
out.println("GetIn01");
out.flush();
waitingKeepaliveResponse = true;
});
}

@Override
public void run() {
try {
waitingKeepaliveResponse = false;
failedKeepalive = 0;
connect();
while (!interrupted()) {
if (failedKeepalive > MAX_KEEPALIVE_FAILURE) {
throw new IOException("Max keep alive attempts has been reached");
}
input.ifPresent(in -> {
try {
String command = in.readLine();
waitingKeepaliveResponse = false;
messageParser.ifPresent(parser -> parser.unsolicitedUpdate(command));
} catch (IOException e) {
handleException(e);
}
});
}
disconnect();
} catch (IOException e) {
handleException(e);
}
try {
Thread.sleep(DEFAULT_RECONNECT_TIMEOUT_MS);
} catch (InterruptedException e) {
dispose();
}
}

private void handleException(Exception e) {
if (!interrupted()) {
if (e instanceof SocketTimeoutException) {
sendKeepalive();
return;
} else if (e instanceof IOException) {
logger.warn("Communication error: '{}'. Will retry in {} ms", e, DEFAULT_RECONNECT_TIMEOUT_MS);
}
messageParser.ifPresent(parser -> parser.errorOccurred(e));
}
public void resetCounter(int targetCounter) {
logger.debug("Resetting counter {} to 0", targetCounter);
send("ResetCount%d".formatted(targetCounter));
}

public void setParser(M2MMessageParser parser) {
this.messageParser = Optional.of(parser);
public void resetPLC() {
send("Reset");
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Copyright (c) 2010-2025 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
Expand Down
Loading