-
Notifications
You must be signed in to change notification settings - Fork 730
Winsock Fixes #3433
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Winsock Fixes #3433
Conversation
This results in a partially working winsock
BUG: A connection currently only becomes communicative after a new connection comes in. But Windows TCP is working after that.
| var realHandle: HANDLE? = nil | ||
| let success = DuplicateHandle( | ||
| GetCurrentProcess(), // Source process | ||
| GetCurrentThread(), // Source handle (pseudo-handle) | ||
| GetCurrentProcess(), // Target process | ||
| &realHandle, // Target handle (real handle) | ||
| 0, // Desired access (0 = same as source) | ||
| false, // Inherit handle | ||
| DWORD(DUPLICATE_SAME_ACCESS) // Options | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Windows gives pseudo handles by default, so they're always correct for the current thread. However, when spawning work on another thread, this handle results in getting the incorrect ThreadID, causing QueueUserAPC to notify the wrong (or non-existing) thread. This in turn prevents SleepEx from waking up the eventloop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't use QueueUserAPC anymore. However the general statement remains true. Maybe we can rework the comments, to be less about QueueUserAPC. We maybe also want to document somewhere why QueueUserAPC is not an option.
|
|
||
| static var currentThread: ThreadOpsSystem.ThreadHandle { | ||
| GetCurrentThread() | ||
| var realHandle: HANDLE? = nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Likewise here
| registrationID: SelectorRegistrationID | ||
| ) throws { | ||
| fatalError("TODO: Unimplemented") | ||
| if let index = self.pollFDs.firstIndex(where: { $0.fd == UInt64(fileDescriptor) }) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Deregistering is just removing the FD. However, we can't just remove it here as we're iterating over the same pollFDs at the same time. deregister0 is called down the stack of try body((SelectorEvent(io: selectorEvent, registration: registration))). So the available indices of pollFDs changes causing a crash
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add this comment to the local array that we use for this? so that we document what this array is used for?
| @@ -131,6 +133,19 @@ extension Selector: _SelectorBackendProtocol { | |||
|
|
|||
| try body((SelectorEvent(io: selectorEvent, registration: registration))) | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line often calls deregister0 indirectly, so we effectively can't mutate pollFDs in deregister0
| // now clean up any deregistered fds | ||
| // In reverse order so we don't have to copy elements out of the array | ||
| // If we do in in normal order, we'll have to shift all elements after the removed one | ||
| for i in self.deregisteredFDs.indices.reversed() { | ||
| if self.deregisteredFDs[i] { | ||
| // remove this one | ||
| let fd = self.pollFDs[i].fd | ||
| self.pollFDs.remove(at: i) | ||
| self.deregisteredFDs.remove(at: i) | ||
| self.registrations.removeValue(forKey: Int(fd)) | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do the deregister0 work of cleaning up after the polling is done
| func initialiseState0() throws { | ||
| self.pollFDs.reserveCapacity(16) | ||
| self.deregisteredFDs.reserveCapacity(16) | ||
| self.lifecycleState = .open |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lifecycle never became open yet
Sources/NIOPosix/SocketChannel.swift
Outdated
| #if os(Windows) | ||
| case .winsock(WSAEWOULDBLOCK): | ||
| return false | ||
| #endif |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
accept returns WSAEWOULDBLOCK
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why doesn't the current logic for syscall(blocking: true) cover this?
|
CC @fabianfett and @zamderax |
This way new TCP connections will be functional
| #if os(Windows) | ||
| if | ||
| let err = err as? IOError, | ||
| case .winsock(WSAEWOULDBLOCK) = err.error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is this case reached?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I've found is that when WSAPoll gives the server socket a .read event when a client is inbound:
- Server accepts the client through
readable()->readable0()->ServerSocketChannel.readFromSocket() - The
socket.accept(..)finds a socket SelectableEventLooprepeats this again again to get another read (see maxMessagesPerRead)- This time,
socket.accept(..)runs intoWinSDK.INVALID_SOCKET - Gets the error using
WSAGetLastError() - The error ends up reaching
.winsock(WSAEWOULDBLOCK)
In hindsight, I just noticed that accept() returns an optional so I'll leverage that instead.
Sources/NIOPosix/SocketChannel.swift
Outdated
| #if os(Windows) | ||
| case .winsock(WSAEWOULDBLOCK): | ||
| return false | ||
| #endif |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why doesn't the current logic for syscall(blocking: true) cover this?
It seems there's no socketpair API on Windows
It's used pretty heavily, and writev is implemented on Windows.
|
Hey guys, is it something that can be accepted? What is the status of this PR? Can someone please share plans for the Windows support roadmap? |
|
@3a4oT this needs a proper review first. And there's one gaping hole in the implementation's polling still that needs fixing. Hopefully I'll figure that out soon. |
| private static func createWakeupSocketPair() throws -> (NIOBSDSocket.Handle, NIOBSDSocket.Handle) { | ||
| // Create a UDP socket to receive wakeup signals | ||
| let readSocket = try NIOBSDSocket.socket(domain: .inet, type: .datagram, protocolSubtype: .default) | ||
|
|
||
| do { | ||
| // Bind to loopback on an ephemeral port | ||
| var addr = sockaddr_in() | ||
| addr.sin_family = ADDRESS_FAMILY(AF_INET) | ||
| addr.sin_addr.S_un.S_addr = WinSDK.htonl(UInt32(bitPattern: INADDR_LOOPBACK)) | ||
| addr.sin_port = 0 // Let the system assign a port | ||
|
|
||
| try withUnsafeBytes(of: &addr) { addrPtr in | ||
| let socketPointer = addrPtr.baseAddress!.assumingMemoryBound(to: sockaddr.self) | ||
| try NIOBSDSocket.bind(socket: readSocket, address: socketPointer, address_len: socklen_t(MemoryLayout<sockaddr_in>.size)) | ||
| } | ||
|
|
||
| // Get the assigned port | ||
| var boundAddr = sockaddr() | ||
| var size = Int32(MemoryLayout<sockaddr>.size) | ||
| try NIOBSDSocket.getsockname(socket: readSocket, address: &boundAddr, address_len: &size) | ||
|
|
||
| // Create the write socket | ||
| let writeSocket = try NIOBSDSocket.socket(domain: .inet, type: .datagram, protocolSubtype: .default) | ||
|
|
||
| do { | ||
| // Connect the write socket to the read socket's address | ||
| guard try NIOBSDSocket.connect(socket: writeSocket, address: &boundAddr, address_len: size) else { | ||
| throw IOError(winsock: WSAGetLastError(), reason: "connect") | ||
| } | ||
| return (readSocket, writeSocket) | ||
| } catch { | ||
| _ = try? NIOBSDSocket.close(socket: writeSocket) | ||
| throw error | ||
| } | ||
| } catch { | ||
| _ = try? NIOBSDSocket.close(socket: readSocket) | ||
| throw error | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the code that needs to be fixed.
Sources/NIOPosix/System.swift
Outdated
|
|
||
| import CNIOWindows | ||
|
|
||
| let IPPROTO_UDP = Int32(17) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IPPROTO_UDP is of IPPROTO type on Windows. Windows has some weird mappings of POSIX constants
| static var UIO_MAXIOV: Int { | ||
| fatalError("unsupported OS") | ||
| // TODO: This is a placeholder value. Find the correct one for Windows. | ||
| return 64 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This variable doesn't formally exist on Windows, but vector writing does work on Windows. We should really make it unavailable for Windows, and update all dependencies that indirectly use this constant. I had a few places that were affected
| @usableFromInline | ||
| var deregisteredFDs = [Bool]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to just store indexes here? This would prevent us from linear searching the pollFDs. Please also add a comment what we use this array for.
|
|
||
| try withUnsafeBytes(of: &addr) { addrPtr in | ||
| let socketPointer = addrPtr.baseAddress!.assumingMemoryBound(to: sockaddr.self) | ||
| try NIOBSDSocket.bind(socket: readSocket, address: socketPointer, address_len: socklen_t(MemoryLayout<sockaddr_in>.size)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using sockaddr_in is weird, as we have previously cast the pointer to sockaddr.
8c67972 to
9332b56
Compare
|
we will also need to make the CI green |
This PR gets TCP (Servers) on Windows mostly working. I'll annotate my PR for clarity.
The one bug:
If you open a TCP client connection to a Windows TCP server, it doesn't read/write yet until a second connection comes in. This happens because
reregister0is called afterWSAPollis called - but WSAPoll isn't woken up to know this so it doesn't know the new socket requests read/writes/...Also the EventLoop cannot be woken up during
el.execute { .. }becauseSleepExisn't running. WSAPoll doesn't respond to this signal it seems.This makes the TCP server functionality not yet usable, but I thought with this draft we could figure out what I'm missing.
Alternative considered: We can make
WSAPolllimited to wake up every 1 ms for example, that way we can still get to these events at some point.