Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 16 additions & 5 deletions ipykernel/displayhook.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def __init__(self, session, pub_socket):

self._parent_header: ContextVar[dict[str, Any]] = ContextVar("parent_header")
self._parent_header.set({})
self._parent_header_global = {}

def get_execution_count(self):
"""This method is replaced in kernelapp"""
Expand Down Expand Up @@ -57,11 +58,16 @@ def __call__(self, obj):

@property
def parent_header(self):
return self._parent_header.get()
try:
return self._parent_header.get()
except LookupError:
return self._parent_header_global

def set_parent(self, parent):
"""Set the parent header."""
self._parent_header.set(extract_header(parent))
parent_header = extract_header(parent)
self._parent_header.set(parent_header)
self._parent_header_global = parent_header


class ZMQShellDisplayHook(DisplayHook):
Expand All @@ -83,11 +89,16 @@ def __init__(self, *args, **kwargs):

@property
def parent_header(self):
return self._parent_header.get()
try:
return self._parent_header.get()
except LookupError:
return self._parent_header_global

def set_parent(self, parent):
"""Set the parent for outbound messages."""
self._parent_header.set(extract_header(parent))
"""Set the parent header."""
parent_header = extract_header(parent)
self._parent_header.set(parent_header)
self._parent_header_global = parent_header

def start_displayhook(self):
"""Start the display hook."""
Expand Down
18 changes: 3 additions & 15 deletions ipykernel/iostream.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,8 +456,6 @@ def __init__(
"parent_header"
)
self._parent_header.set({})
self._thread_to_parent = {}
self._thread_to_parent_header = {}
self._parent_header_global = {}
self._master_pid = os.getpid()
self._flush_pending = False
Expand Down Expand Up @@ -512,21 +510,11 @@ def __init__(
@property
def parent_header(self):
try:
# asyncio-specific
# asyncio or thread-specific
return self._parent_header.get()
except LookupError:
try:
# thread-specific
identity = threading.current_thread().ident
# retrieve the outermost (oldest ancestor,
# discounting the kernel thread) thread identity
while identity in self._thread_to_parent:
identity = self._thread_to_parent[identity]
# use the header of the oldest ancestor
return self._thread_to_parent_header[identity]
except KeyError:
# global (fallback)
return self._parent_header_global
# global (fallback)
return self._parent_header_global

@parent_header.setter
def parent_header(self, value):
Expand Down
90 changes: 0 additions & 90 deletions ipykernel/ipkernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

import asyncio
import builtins
import gc
import getpass
import os
import signal
Expand All @@ -17,15 +16,13 @@
import comm
from IPython.core import release
from IPython.utils.tokenutil import line_at_cursor, token_at_cursor
from jupyter_client.session import extract_header
from traitlets import Any, Bool, HasTraits, Instance, List, Type, default, observe, observe_compat
from zmq.eventloop.zmqstream import ZMQStream

from .comm.comm import BaseComm
from .comm.manager import CommManager
from .compiler import XCachingCompiler
from .eventloops import _use_appnope
from .iostream import OutStream
from .kernelbase import Kernel as KernelBase
from .kernelbase import _accepts_parameters
from .zmqshell import ZMQInteractiveShell
Expand Down Expand Up @@ -167,14 +164,6 @@ def __init__(self, **kwargs):

appnope.nope()

self._new_threads_parent_header = {}
self._initialize_thread_hooks()

if hasattr(gc, "callbacks"):
# while `gc.callbacks` exists since Python 3.3, pypy does not
# implement it even as of 3.9.
gc.callbacks.append(self._clean_thread_parent_frames)

help_links = List(
[
{
Expand Down Expand Up @@ -374,8 +363,6 @@ def _dummy_context_manager(self, *args):

async def execute_request(self, stream, ident, parent):
"""Override for cell output - cell reconciliation."""
parent_header = extract_header(parent)
self._associate_new_top_level_threads_with(parent_header)
await super().execute_request(stream, ident, parent)

async def do_execute(
Expand Down Expand Up @@ -750,83 +737,6 @@ def do_clear(self):
self.shell.reset(False)
return dict(status="ok")

def _associate_new_top_level_threads_with(self, parent_header):
"""Store the parent header to associate it with new top-level threads"""
self._new_threads_parent_header = parent_header

def _initialize_thread_hooks(self):
"""Store thread hierarchy and thread-parent_header associations."""
stdout = self._stdout
stderr = self._stderr
kernel_thread_ident = threading.get_ident()
kernel = self
_threading_Thread_run = threading.Thread.run
_threading_Thread__init__ = threading.Thread.__init__

def run_closure(self: threading.Thread):
"""Wrap the `threading.Thread.start` to intercept thread identity.

This is needed because there is no "start" hook yet, but there
might be one in the future: https://bugs.python.org/issue14073

This is a no-op if the `self._stdout` and `self._stderr` are not
sub-classes of `OutStream`.
"""

try:
parent = self._ipykernel_parent_thread_ident # type:ignore[attr-defined]
except AttributeError:
return
for stream in [stdout, stderr]:
if isinstance(stream, OutStream):
if parent == kernel_thread_ident:
stream._thread_to_parent_header[self.ident] = (
kernel._new_threads_parent_header
)
else:
stream._thread_to_parent[self.ident] = parent
_threading_Thread_run(self)

def init_closure(self: threading.Thread, *args, **kwargs):
_threading_Thread__init__(self, *args, **kwargs)
self._ipykernel_parent_thread_ident = threading.get_ident() # type:ignore[attr-defined]

threading.Thread.__init__ = init_closure # type:ignore[method-assign]
threading.Thread.run = run_closure # type:ignore[method-assign]

def _clean_thread_parent_frames(
self, phase: t.Literal["start", "stop"], info: dict[str, t.Any]
):
"""Clean parent frames of threads which are no longer running.
This is meant to be invoked by garbage collector callback hook.

The implementation enumerates the threads because there is no "exit" hook yet,
but there might be one in the future: https://bugs.python.org/issue14073

This is a no-op if the `self._stdout` and `self._stderr` are not
sub-classes of `OutStream`.
"""
# Only run before the garbage collector starts
if phase != "start":
return
active_threads = {thread.ident for thread in threading.enumerate()}
for stream in [self._stdout, self._stderr]:
if isinstance(stream, OutStream):
thread_to_parent_header = stream._thread_to_parent_header
for identity in list(thread_to_parent_header.keys()):
if identity not in active_threads:
try:
del thread_to_parent_header[identity]
except KeyError:
pass
thread_to_parent = stream._thread_to_parent
for identity in list(thread_to_parent.keys()):
if identity not in active_threads:
try:
del thread_to_parent[identity]
except KeyError:
pass


# This exists only for backwards compatibility - use IPythonKernel instead

Expand Down
29 changes: 24 additions & 5 deletions ipykernel/kernelbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import uuid
import warnings
from collections.abc import Mapping
from contextvars import ContextVar
from contextvars import Context, ContextVar, copy_context
from datetime import datetime
from functools import partial
from signal import SIGINT, SIGTERM, Signals, default_int_handler, signal
Expand Down Expand Up @@ -72,6 +72,8 @@
" ipykernel 6.0 (2021). {target} does not seem to return an awaitable"
)

T = t.TypeVar("T")


def _accepts_parameters(meth, param_names):
parameters = inspect.signature(meth).parameters
Expand Down Expand Up @@ -201,6 +203,7 @@ def _default_ident(self):
_control_parent_ident: bytes = b""
_shell_parent: ContextVar[dict[str, Any]]
_shell_parent_ident: ContextVar[bytes]
_shell_context: Context
# Kept for backward-compatibility, accesses _control_parent_ident and _shell_parent_ident,
# see https://github.com/jupyterlab/jupyterlab/issues/17785
_parent_ident: Mapping[str, bytes]
Expand Down Expand Up @@ -320,13 +323,14 @@ def __init__(self, **kwargs):
self._shell_parent.set({})
self._shell_parent_ident = ContextVar("shell_parent_ident")
self._shell_parent_ident.set(b"")
self._shell_context = copy_context()

# For backward compatibility so that _parent_ident["shell"] and _parent_ident["control"]
# work as they used to for ipykernel >= 7
self._parent_ident = LazyDict(
{
"control": lambda: self._control_parent_ident,
"shell": lambda: self._shell_parent_ident.get(),
"shell": lambda: self._get_shell_context_var(self._shell_parent_ident),
}
)

Expand Down Expand Up @@ -768,6 +772,8 @@ def set_parent(self, ident, parent, channel="shell"):
else:
self._shell_parent_ident.set(ident)
self._shell_parent.set(parent)
# preserve the last call to set_parent
self._shell_context = copy_context()

def get_parent(self, channel=None):
"""Get the parent request associated with a channel.
Expand All @@ -794,7 +800,20 @@ def get_parent(self, channel=None):

if channel == "control":
return self._control_parent
return self._shell_parent.get()

return self._get_shell_context_var(self._shell_parent)

def _get_shell_context_var(self, var: ContextVar[T]) -> T:
"""Lookup a ContextVar, falling back on the shell context

Allows for user-launched Threads to still resolve to the shell's main context

necessary for e.g. display from threads.
"""
try:
return var.get()
except LookupError:
return self._shell_context[var]

def send_response(
self,
Expand Down Expand Up @@ -1455,7 +1474,7 @@ def getpass(self, prompt="", stream=None):
)
return self._input_request(
prompt,
self._shell_parent_ident.get(),
self._get_shell_context_var(self._shell_parent_ident),
self.get_parent("shell"),
password=True,
)
Expand All @@ -1472,7 +1491,7 @@ def raw_input(self, prompt=""):
raise StdinNotImplementedError(msg)
return self._input_request(
str(prompt),
self._shell_parent_ident.get(),
self._get_shell_context_var(self._shell_parent_ident),
self.get_parent("shell"),
password=False,
)
Expand Down
35 changes: 29 additions & 6 deletions ipykernel/zmqshell.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,20 @@ def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._parent_header = contextvars.ContextVar("parent_header")
self._parent_header.set({})
self._parent_header_global = {}

@property
def parent_header(self):
return self._parent_header.get()
try:
return self._parent_header.get()
except LookupError:
return self._parent_header_global

def set_parent(self, parent):
"""Set the parent for outbound messages."""
self._parent_header.set(extract_header(parent))
parent_header = extract_header(parent)
self._parent_header.set(parent_header)
self._parent_header_global = parent_header

def _flush_streams(self):
"""flush IO Streams prior to display"""
Expand Down Expand Up @@ -698,11 +704,23 @@ def set_next_input(self, text, replace=False):

@property
def parent_header(self):
return self._parent_header.get()
try:
return self._parent_header.get()
except LookupError:
return self._parent_header_global

@parent_header.setter
def parent_header(self, value):
self._parent_header_global = value
self._parent_header.set(value)

def set_parent(self, parent):
"""Set the parent header for associating output with its triggering input"""
self._parent_header.set(parent)
"""Set the parent header for associating output with its triggering input

When called from a thread, sets the thread-local value, which persists
until the next call from this thread.
"""
self.parent_header = parent
self.displayhook.set_parent(parent) # type:ignore[attr-defined]
self.display_pub.set_parent(parent) # type:ignore[attr-defined]
if hasattr(self, "_data_pub"):
Expand All @@ -713,7 +731,12 @@ def set_parent(self, parent):
sys.stderr.set_parent(parent)

def get_parent(self):
"""Get the parent header."""
"""Get the parent header.

If set_parent has never been called from the current thread,
the value from the last call to set_parent from _any_ thread will be used
(typically the currently running cell).
"""
return self.parent_header

def init_magics(self):
Expand Down
Loading
Loading