|
5 | 5 | from contextlib import closing |
6 | 6 | from functools import partial |
7 | 7 | from queue import Queue |
8 | | -import http |
| 8 | +import http.client |
9 | 9 | import json |
10 | 10 | import multiprocessing.connection |
11 | 11 | import os |
@@ -49,70 +49,97 @@ def on_stderr_message(self, message: str) -> None: |
49 | 49 |
|
50 | 50 | class AbstractProcessor(Generic[T]): |
51 | 51 |
|
52 | | - def write_data(self, writer: IO[bytes], data: T, is_node_ipc: bool) -> None: |
| 52 | + def write_data(self, data: T) -> None: |
53 | 53 | raise NotImplementedError() |
54 | 54 |
|
55 | | - def read_data(self, reader: IO[bytes], is_node_ipc: bool) -> Optional[T]: |
| 55 | + def read_data(self) -> Optional[T]: |
56 | 56 | raise NotImplementedError() |
57 | 57 |
|
58 | 58 |
|
59 | | -class JsonRpcProcessor(AbstractProcessor[Dict[str, Any]]): |
| 59 | +def encode_payload(data: Dict[str, Any]) -> bytes: |
| 60 | + return json.dumps( |
| 61 | + data, |
| 62 | + ensure_ascii=False, |
| 63 | + check_circular=False, |
| 64 | + separators=(',', ':') |
| 65 | + ).encode('utf-8') |
60 | 66 |
|
61 | | - def write_data(self, writer: IO[bytes], data: Dict[str, Any], is_node_ipc: bool) -> None: |
62 | | - body = self._encode(data) |
63 | | - if not is_node_ipc: |
64 | | - writer.writelines(("Content-Length: {}\r\n\r\n".format(len(body)).encode('ascii'), body)) |
65 | | - else: |
66 | | - writer.write(body + b"\n") |
67 | 67 |
|
68 | | - def read_data(self, reader: IO[bytes], is_node_ipc: bool) -> Optional[Dict[str, Any]]: |
69 | | - if not is_node_ipc: |
70 | | - headers = http.client.parse_headers(reader) # type: ignore |
71 | | - try: |
72 | | - body = reader.read(int(headers.get("Content-Length"))) |
73 | | - except TypeError: |
74 | | - # Expected error on process stopping. Stop the read loop. |
75 | | - raise StopLoopError() |
76 | | - else: |
77 | | - body = reader.readline() |
| 68 | +def decode_payload(message: bytes) -> Optional[Dict[str, Any]]: |
| 69 | + try: |
| 70 | + return json.loads(message.decode('utf-8')) |
| 71 | + except Exception as ex: |
| 72 | + exception_log("JSON decode error", ex) |
| 73 | + return None |
| 74 | + |
| 75 | + |
| 76 | +class StandardProcessor(AbstractProcessor[Dict[str, Any]]): |
78 | 77 |
|
| 78 | + def __init__(self, reader: IO[bytes], writer: IO[bytes]): |
| 79 | + self._reader = reader |
| 80 | + self._writer = writer |
| 81 | + |
| 82 | + def write_data(self, data: Dict[str, Any]) -> None: |
| 83 | + body = encode_payload(data) |
| 84 | + self._writer.writelines(("Content-Length: {}\r\n\r\n".format(len(body)).encode('ascii'), body)) |
| 85 | + self._writer.flush() |
| 86 | + |
| 87 | + def read_data(self) -> Optional[Dict[str, Any]]: |
| 88 | + headers = http.client.parse_headers(self._reader) # type: ignore |
79 | 89 | try: |
80 | | - return self._decode(body) |
81 | | - except Exception as ex: |
82 | | - exception_log("JSON decode error", ex) |
83 | | - return None |
84 | | - |
85 | | - @staticmethod |
86 | | - def _encode(data: Dict[str, Any]) -> bytes: |
87 | | - return json.dumps( |
88 | | - data, |
89 | | - ensure_ascii=False, |
90 | | - check_circular=False, |
91 | | - separators=(',', ':') |
92 | | - ).encode('utf-8') |
93 | | - |
94 | | - @staticmethod |
95 | | - def _decode(message: bytes) -> Dict[str, Any]: |
96 | | - return json.loads(message.decode('utf-8')) |
| 90 | + body = self._reader.read(int(headers.get("Content-Length"))) |
| 91 | + except TypeError: |
| 92 | + # Expected error on process stopping. Stop the read loop. |
| 93 | + raise StopLoopError() |
| 94 | + return decode_payload(body) |
| 95 | + |
| 96 | + |
| 97 | +class NodeIpcProcessor(AbstractProcessor[Dict[str, Any]]): |
| 98 | + _buf = bytearray() |
| 99 | + _lines = 0 |
| 100 | + |
| 101 | + def __init__(self, conn: multiprocessing.connection._ConnectionBase): |
| 102 | + self._conn = conn |
| 103 | + |
| 104 | + def write_data(self, data: Dict[str, Any]) -> None: |
| 105 | + body = encode_payload(data) + b"\n" |
| 106 | + while len(body): |
| 107 | + n = self._conn._write(self._conn.fileno(), body) # type: ignore |
| 108 | + body = body[n:] |
| 109 | + |
| 110 | + def read_data(self) -> Optional[Dict[str, Any]]: |
| 111 | + while self._lines == 0: |
| 112 | + chunk = self._conn._read(self._conn.fileno(), 65536) # type: ignore |
| 113 | + if len(chunk) == 0: |
| 114 | + # EOF reached: https://docs.python.org/3/library/os.html#os.read |
| 115 | + raise StopLoopError() |
| 116 | + |
| 117 | + self._buf += chunk |
| 118 | + self._lines += chunk.count(b'\n') |
| 119 | + |
| 120 | + self._lines -= 1 |
| 121 | + message, _, self._buf = self._buf.partition(b'\n') |
| 122 | + return decode_payload(message) |
97 | 123 |
|
98 | 124 |
|
99 | 125 | class ProcessTransport(Transport[T]): |
100 | 126 |
|
101 | | - def __init__(self, name: str, process: subprocess.Popen, socket: Optional[socket.socket], reader: IO[bytes], |
102 | | - writer: IO[bytes], stderr: Optional[IO[bytes]], processor: AbstractProcessor[T], |
103 | | - callback_object: TransportCallbacks[T], is_node_ipc: bool) -> None: |
| 127 | + def __init__(self, |
| 128 | + name: str, |
| 129 | + process: subprocess.Popen, |
| 130 | + socket: Optional[socket.socket], |
| 131 | + stderr: Optional[IO[bytes]], |
| 132 | + processor: AbstractProcessor[T], |
| 133 | + callback_object: TransportCallbacks[T]) -> None: |
104 | 134 | self._closed = False |
105 | 135 | self._process = process |
106 | 136 | self._socket = socket |
107 | | - self._reader = reader |
108 | | - self._writer = writer |
109 | 137 | self._stderr = stderr |
110 | 138 | self._processor = processor |
111 | 139 | self._reader_thread = threading.Thread(target=self._read_loop, name='{}-reader'.format(name)) |
112 | 140 | self._writer_thread = threading.Thread(target=self._write_loop, name='{}-writer'.format(name)) |
113 | 141 | self._stderr_thread = threading.Thread(target=self._stderr_loop, name='{}-stderr'.format(name)) |
114 | 142 | self._callback_object = weakref.ref(callback_object) |
115 | | - self._is_node_ipc = is_node_ipc |
116 | 143 | self._send_queue = Queue(0) # type: Queue[Union[T, None]] |
117 | 144 | self._reader_thread.start() |
118 | 145 | self._writer_thread.start() |
@@ -144,8 +171,8 @@ def __del__(self) -> None: |
144 | 171 |
|
145 | 172 | def _read_loop(self) -> None: |
146 | 173 | try: |
147 | | - while self._reader: |
148 | | - payload = self._processor.read_data(self._reader, self._is_node_ipc) |
| 174 | + while True: |
| 175 | + payload = self._processor.read_data() |
149 | 176 | if payload is None: |
150 | 177 | continue |
151 | 178 |
|
@@ -194,13 +221,11 @@ def invoke() -> None: |
194 | 221 | def _write_loop(self) -> None: |
195 | 222 | exception = None # type: Optional[Exception] |
196 | 223 | try: |
197 | | - while self._writer: |
| 224 | + while True: |
198 | 225 | d = self._send_queue.get() |
199 | 226 | if d is None: |
200 | 227 | break |
201 | | - self._processor.write_data(self._writer, d, self._is_node_ipc) |
202 | | - if not self._is_node_ipc: |
203 | | - self._writer.flush() |
| 228 | + self._processor.write_data(d) |
204 | 229 | except (BrokenPipeError, AttributeError): |
205 | 230 | pass |
206 | 231 | except Exception as ex: |
@@ -228,35 +253,6 @@ def _stderr_loop(self) -> None: |
228 | 253 | self._send_queue.put_nowait(None) |
229 | 254 |
|
230 | 255 |
|
231 | | -# Can be a singleton since it doesn't hold any state. |
232 | | -json_rpc_processor = JsonRpcProcessor() |
233 | | - |
234 | | - |
235 | | -class NodeIpcIO(): |
236 | | - _buf = bytearray() |
237 | | - _lines = 0 |
238 | | - |
239 | | - def __init__(self, conn: multiprocessing.connection._ConnectionBase): |
240 | | - self._conn = conn |
241 | | - |
242 | | - # https://github.com/python/cpython/blob/330f1d58282517bdf1f19577ab9317fa9810bf95/Lib/multiprocessing/connection.py#L378-L392 |
243 | | - def readline(self) -> bytearray: |
244 | | - while self._lines == 0: |
245 | | - chunk = self._conn._read(self._conn.fileno(), 65536) # type: ignore |
246 | | - self._buf += chunk |
247 | | - self._lines += chunk.count(b'\n') |
248 | | - |
249 | | - self._lines -= 1 |
250 | | - line, _, self._buf = self._buf.partition(b'\n') |
251 | | - return line |
252 | | - |
253 | | - # https://github.com/python/cpython/blob/330f1d58282517bdf1f19577ab9317fa9810bf95/Lib/multiprocessing/connection.py#L369-L376 |
254 | | - def write(self, data: bytes) -> None: |
255 | | - while len(data): |
256 | | - n = self._conn._write(self._conn.fileno(), data) # type: ignore |
257 | | - data = data[n:] |
258 | | - |
259 | | - |
260 | 256 | def create_transport(config: TransportConfig, cwd: Optional[str], |
261 | 257 | callback_object: TransportCallbacks) -> Transport[Dict[str, Any]]: |
262 | 258 | stderr = subprocess.PIPE |
@@ -292,24 +288,27 @@ def start_subprocess() -> subprocess.Popen: |
292 | 288 | config.listener_socket, |
293 | 289 | start_subprocess |
294 | 290 | ) |
| 291 | + processor = StandardProcessor(reader, writer) # type: AbstractProcessor |
295 | 292 | else: |
296 | 293 | process = start_subprocess() |
297 | 294 | if config.tcp_port: |
298 | 295 | sock = _connect_tcp(config.tcp_port) |
299 | 296 | if sock is None: |
300 | 297 | raise RuntimeError("Failed to connect on port {}".format(config.tcp_port)) |
301 | | - reader = sock.makefile('rwb') # type: ignore |
302 | | - writer = reader |
| 298 | + reader = writer = sock.makefile('rwb') |
| 299 | + processor = StandardProcessor(reader, writer) |
303 | 300 | elif not config.node_ipc: |
304 | | - reader = process.stdout # type: ignore |
305 | | - writer = process.stdin # type: ignore |
| 301 | + if not process.stdout or not process.stdin: |
| 302 | + raise RuntimeError( |
| 303 | + 'Failed initializing transport: reader: {}, writer: {}' |
| 304 | + .format(process.stdout, process.stdin) |
| 305 | + ) |
| 306 | + processor = StandardProcessor(process.stdout, process.stdin) |
306 | 307 | else: |
307 | | - reader = writer = NodeIpcIO(config.node_ipc.parent_conn) # type: ignore |
308 | | - if not reader or not writer: |
309 | | - raise RuntimeError('Failed initializing transport: reader: {}, writer: {}'.format(reader, writer)) |
| 308 | + processor = NodeIpcProcessor(config.node_ipc.parent_conn) |
| 309 | + |
310 | 310 | stderr_reader = process.stdout if config.node_ipc else process.stderr |
311 | | - return ProcessTransport(config.name, process, sock, reader, writer, stderr_reader, json_rpc_processor, |
312 | | - callback_object, bool(config.node_ipc)) |
| 311 | + return ProcessTransport(config.name, process, sock, stderr_reader, processor, callback_object) |
313 | 312 |
|
314 | 313 |
|
315 | 314 | _subprocesses = weakref.WeakSet() # type: weakref.WeakSet[subprocess.Popen] |
@@ -403,8 +402,7 @@ def start_in_background(d: _SubprocessData) -> None: |
403 | 402 | # Await one client connection (blocking!) |
404 | 403 | sock, _ = listener_socket.accept() |
405 | 404 | thread.join() |
406 | | - reader = sock.makefile('rwb') # type: IO[bytes] |
407 | | - writer = reader |
| 405 | + reader = writer = sock.makefile('rwb') |
408 | 406 | assert data.process |
409 | 407 | return data.process, sock, reader, writer |
410 | 408 |
|
|
0 commit comments