Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 21 additions & 12 deletions jupyter_server/services/kernels/connection/channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from textwrap import dedent

from jupyter_client import protocol_version as client_protocol_version # type:ignore[attr-defined]
from tornado import gen, web
from tornado import web
from tornado.ioloop import IOLoop
from tornado.websocket import WebSocketClosedError
from traitlets import Any, Bool, Dict, Float, Instance, Int, List, Unicode, default
Expand Down Expand Up @@ -171,9 +171,9 @@ def nudge(self):
# establishing its zmq subscriptions before processing the next request.
if getattr(self.kernel_manager, "execution_state", None) == "busy":
self.log.debug("Nudge: not nudging busy kernel %s", self.kernel_id)
f: Future[t.Any] = Future()
f: asyncio.Future[t.Any] = asyncio.Future()
f.set_result(None)
return _ensure_future(f)
return f
# Use a transient shell channel to prevent leaking
# shell responses to the front-end.
shell_channel = self.kernel_manager.connect_shell()
Expand All @@ -183,17 +183,26 @@ def nudge(self):
# The IOPub used by the client, whose subscriptions we are verifying.
iopub_channel = self.channels["iopub"]

info_future: Future[t.Any] = Future()
iopub_future: Future[t.Any] = Future()
both_done = gen.multi([info_future, iopub_future])
async def wait_for_activity():
execution_state = getattr(self.kernel_manager, "execution_state", None)
while execution_state == "starting":
await asyncio.sleep(0.05)
execution_state = getattr(self.kernel_manager, "execution_state", None)
self.log.debug("Nudge: %s execution_state=%s", self.kernel_id, execution_state)

info_future: asyncio.Future[t.Any] = asyncio.Future()
iopub_future: asyncio.Future[t.Any] = asyncio.Future()
futures = [info_future, iopub_future]
futures.append(asyncio.ensure_future(wait_for_activity()))
all_done = asyncio.ensure_future(asyncio.gather(*futures))

def finish(_=None):
"""Ensure all futures are resolved
which in turn triggers cleanup
"""
for f in (info_future, iopub_future):
for f in futures:
if not f.done():
f.set_result(None)
f.cancel()

def cleanup(_=None):
"""Common cleanup"""
Expand All @@ -205,7 +214,7 @@ def cleanup(_=None):
control_channel.close()

# trigger cleanup when both message futures are resolved
both_done.add_done_callback(cleanup)
all_done.add_done_callback(cleanup)

def on_shell_reply(msg):
"""Handle nudge shell replies."""
Expand Down Expand Up @@ -256,7 +265,7 @@ def nudge(count):
finish()
return

if not both_done.done():
if not all_done.done():
log = self.log.warning if count % 10 == 0 else self.log.debug
log(f"Nudge: attempt {count} on kernel {self.kernel_id}")
self.session.send(shell_channel, "kernel_info_request")
Expand All @@ -267,10 +276,10 @@ def nudge(count):
nudge_handle = loop.call_later(0, nudge, count=0)

# resolve with a timeout if we get no response
future = gen.with_timeout(loop.time() + self.kernel_info_timeout, both_done)
future = asyncio.ensure_future(asyncio.wait_for(all_done, timeout=self.kernel_info_timeout))
# ensure we have no dangling resources or unresolved Futures in case of timeout
future.add_done_callback(finish)
return _ensure_future(future)
return future

async def _register_session(self):
"""Ensure we aren't creating a duplicate session.
Expand Down
6 changes: 5 additions & 1 deletion jupyter_server/services/kernels/kernelmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import os
import pathlib # noqa: TC003
import sys
import time
import typing as t
import warnings
from collections import defaultdict
Expand Down Expand Up @@ -278,6 +279,7 @@ async def _async_start_kernel( # type:ignore[override]
async def _finish_kernel_start(self, kernel_id):
"""Handle a kernel that finishes starting."""
km = self.get_kernel(kernel_id)
self.log.debug("Waiting for kernel %s", kernel_id)
if hasattr(km, "ready"):
ready = km.ready
if not isinstance(ready, asyncio.Future):
Expand All @@ -287,6 +289,7 @@ async def _finish_kernel_start(self, kernel_id):
except Exception:
self.log.exception("Error waiting for kernel manager ready")
return
self.log.debug("Kernel %s ready", kernel_id)

self._kernel_ports[kernel_id] = km.ports
self.start_watching_activity(kernel_id)
Expand Down Expand Up @@ -490,6 +493,7 @@ def on_restart_failed():
# Re-establish activity watching if ports have changed...
if self._get_changed_ports(kernel_id) is not None:
self.stop_watching_activity(kernel_id)
self.execution_state = "starting"
self.start_watching_activity(kernel_id)
return future

Expand Down Expand Up @@ -585,9 +589,9 @@ def start_watching_activity(self, kernel_id):
- update last_activity on every message
- record execution_state from status messages
"""
self.log.debug("Watching kernel activity: %s", kernel_id)
kernel = self._kernels[kernel_id]
# add busy/activity markers:
kernel.execution_state = "starting"
kernel.reason = ""
kernel.last_activity = utcnow()
kernel._activity_stream = kernel.connect_iopub()
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ unfixable = [
[tool.pytest.ini_options]
minversion = "6.0"
xfail_strict = true
log_cli_level = "info"
log_cli_level = "debug"
addopts = [
"-ra", "--durations=10", "--color=yes", "--doctest-modules",
"--showlocals", "--strict-markers", "--strict-config"
Expand Down
6 changes: 3 additions & 3 deletions tests/services/kernels/test_execution_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ async def test_execution_state(jp_fetch, jp_ws_fetch):
"parent_header": {},
"metadata": {},
"content": {
"code": "while True:\n\tpass",
"code": "import time\nwhile True:\n\ttime.sleep(1)",
"silent": False,
"allow_stdin": False,
"stop_on_error": True,
Expand All @@ -56,10 +56,10 @@ async def test_execution_state(jp_fetch, jp_ws_fetch):

# kernels start slowly on Windows
max_startup_time = 60
started = time.time()
started = time.perf_counter()
while es == "starting":
await asyncio.sleep(1)
elapsed = time.time() - started
elapsed = time.perf_counter() - started
if elapsed > max_startup_time:
raise ValueError(f"Kernel did not start up in {max_startup_time} seconds")
es = await get_execution_state(kid, jp_fetch)
Expand Down