From 5bbfdb3ecaa49223b9aefdf287ddf690c9bd6dcf Mon Sep 17 00:00:00 2001 From: Jay Patel Date: Sat, 29 Oct 2016 17:09:32 -0400 Subject: [PATCH 1/2] ipykernel.py - assign cell_uuid to execution_count assign cell_uuid to execution_count cell_uuid is passed in user_expressions Assign the same cell_uuid (which is now execution_count) to reply_content['execution_count'] --- ipykernel/ipkernel.py | 774 +++++++++++++++++++++--------------------- 1 file changed, 394 insertions(+), 380 deletions(-) diff --git a/ipykernel/ipkernel.py b/ipykernel/ipkernel.py index d28302ca2..f93b59c88 100644 --- a/ipykernel/ipkernel.py +++ b/ipykernel/ipkernel.py @@ -1,380 +1,394 @@ -"""The IPython kernel implementation""" - -import getpass -import sys -import traceback - -from IPython.core import release -from ipython_genutils.py3compat import builtin_mod, PY3, unicode_type, safe_unicode -from IPython.utils.tokenutil import token_at_cursor, line_at_cursor -from traitlets import Instance, Type, Any, List - -from .comm import CommManager -from .kernelbase import Kernel as KernelBase -from .zmqshell import ZMQInteractiveShell - - -class IPythonKernel(KernelBase): - shell = Instance('IPython.core.interactiveshell.InteractiveShellABC', - allow_none=True) - shell_class = Type(ZMQInteractiveShell) - - user_module = Any() - def _user_module_changed(self, name, old, new): - if self.shell is not None: - self.shell.user_module = new - - user_ns = Instance(dict, args=None, allow_none=True) - def _user_ns_changed(self, name, old, new): - if self.shell is not None: - self.shell.user_ns = new - self.shell.init_user_ns() - - # A reference to the Python builtin 'raw_input' function. - # (i.e., __builtin__.raw_input for Python 2.7, builtins.input for Python 3) - _sys_raw_input = Any() - _sys_eval_input = Any() - - def __init__(self, **kwargs): - super(IPythonKernel, self).__init__(**kwargs) - - # Initialize the InteractiveShell subclass - self.shell = self.shell_class.instance(parent=self, - profile_dir = self.profile_dir, - user_module = self.user_module, - user_ns = self.user_ns, - kernel = self, - ) - self.shell.displayhook.session = self.session - self.shell.displayhook.pub_socket = self.iopub_socket - self.shell.displayhook.topic = self._topic('execute_result') - self.shell.display_pub.session = self.session - self.shell.display_pub.pub_socket = self.iopub_socket - - self.comm_manager = CommManager(parent=self, kernel=self) - - self.shell.configurables.append(self.comm_manager) - comm_msg_types = [ 'comm_open', 'comm_msg', 'comm_close' ] - for msg_type in comm_msg_types: - self.shell_handlers[msg_type] = getattr(self.comm_manager, msg_type) - - help_links = List([ - { - 'text': "Python", - 'url': "http://docs.python.org/%i.%i" % sys.version_info[:2], - }, - { - 'text': "IPython", - 'url': "http://ipython.org/documentation.html", - }, - { - 'text': "NumPy", - 'url': "http://docs.scipy.org/doc/numpy/reference/", - }, - { - 'text': "SciPy", - 'url': "http://docs.scipy.org/doc/scipy/reference/", - }, - { - 'text': "Matplotlib", - 'url': "http://matplotlib.org/contents.html", - }, - { - 'text': "SymPy", - 'url': "http://docs.sympy.org/latest/index.html", - }, - { - 'text': "pandas", - 'url': "http://pandas.pydata.org/pandas-docs/stable/", - }, - ]).tag(config=True) - - # Kernel info fields - implementation = 'ipython' - implementation_version = release.version - language_info = { - 'name': 'python', - 'version': sys.version.split()[0], - 'mimetype': 'text/x-python', - 'codemirror_mode': { - 'name': 'ipython', - 'version': sys.version_info[0] - }, - 'pygments_lexer': 'ipython%d' % (3 if PY3 else 2), - 'nbconvert_exporter': 'python', - 'file_extension': '.py' - } - - @property - def banner(self): - return self.shell.banner - - def start(self): - self.shell.exit_now = False - super(IPythonKernel, self).start() - - def set_parent(self, ident, parent): - """Overridden from parent to tell the display hook and output streams - about the parent message. - """ - super(IPythonKernel, self).set_parent(ident, parent) - self.shell.set_parent(parent) - - def init_metadata(self, parent): - """Initialize metadata. - - Run at the beginning of each execution request. - """ - md = super(IPythonKernel, self).init_metadata(parent) - # FIXME: remove deprecated ipyparallel-specific code - # This is required for ipyparallel < 5.0 - md.update({ - 'dependencies_met' : True, - 'engine' : self.ident, - }) - return md - - def finish_metadata(self, parent, metadata, reply_content): - """Finish populating metadata. - - Run after completing an execution request. - """ - # FIXME: remove deprecated ipyparallel-specific code - # This is required by ipyparallel < 5.0 - metadata['status'] = reply_content['status'] - if reply_content['status'] == 'error' and reply_content['ename'] == 'UnmetDependency': - metadata['dependencies_met'] = False - - return metadata - - def _forward_input(self, allow_stdin=False): - """Forward raw_input and getpass to the current frontend. - - via input_request - """ - self._allow_stdin = allow_stdin - - if PY3: - self._sys_raw_input = builtin_mod.input - builtin_mod.input = self.raw_input - else: - self._sys_raw_input = builtin_mod.raw_input - self._sys_eval_input = builtin_mod.input - builtin_mod.raw_input = self.raw_input - builtin_mod.input = lambda prompt='': eval(self.raw_input(prompt)) - self._save_getpass = getpass.getpass - getpass.getpass = self.getpass - - def _restore_input(self): - """Restore raw_input, getpass""" - if PY3: - builtin_mod.input = self._sys_raw_input - else: - builtin_mod.raw_input = self._sys_raw_input - builtin_mod.input = self._sys_eval_input - - getpass.getpass = self._save_getpass - - @property - def execution_count(self): - return self.shell.execution_count - - @execution_count.setter - def execution_count(self, value): - # Ignore the incrememnting done by KernelBase, in favour of our shell's - # execution counter. - pass - - def do_execute(self, code, silent, store_history=True, - user_expressions=None, allow_stdin=False): - shell = self.shell # we'll need this a lot here - - self._forward_input(allow_stdin) - - reply_content = {} - try: - res = shell.run_cell(code, store_history=store_history, silent=silent) - finally: - self._restore_input() - - if res.error_before_exec is not None: - err = res.error_before_exec - else: - err = res.error_in_exec - - if res.success: - reply_content[u'status'] = u'ok' - else: - reply_content[u'status'] = u'error' - - reply_content.update({ - u'traceback': shell._last_traceback or [], - u'ename': unicode_type(type(err).__name__), - u'evalue': safe_unicode(err), - }) - - # FIXME: deprecate piece for ipyparallel: - e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, - method='execute') - reply_content['engine_info'] = e_info - - - # Return the execution counter so clients can display prompts - reply_content['execution_count'] = shell.execution_count - 1 - - if 'traceback' in reply_content: - self.log.info("Exception in execute request:\n%s", '\n'.join(reply_content['traceback'])) - - - # At this point, we can tell whether the main code execution succeeded - # or not. If it did, we proceed to evaluate user_expressions - if reply_content['status'] == 'ok': - reply_content[u'user_expressions'] = \ - shell.user_expressions(user_expressions or {}) - else: - # If there was an error, don't even try to compute expressions - reply_content[u'user_expressions'] = {} - - # Payloads should be retrieved regardless of outcome, so we can both - # recover partial output (that could have been generated early in a - # block, before an error) and always clear the payload system. - reply_content[u'payload'] = shell.payload_manager.read_payload() - # Be aggressive about clearing the payload because we don't want - # it to sit in memory until the next execute_request comes in. - shell.payload_manager.clear_payload() - - return reply_content - - def do_complete(self, code, cursor_pos): - # FIXME: IPython completers currently assume single line, - # but completion messages give multi-line context - # For now, extract line from cell, based on cursor_pos: - if cursor_pos is None: - cursor_pos = len(code) - line, offset = line_at_cursor(code, cursor_pos) - line_cursor = cursor_pos - offset - - txt, matches = self.shell.complete('', line, line_cursor) - return {'matches' : matches, - 'cursor_end' : cursor_pos, - 'cursor_start' : cursor_pos - len(txt), - 'metadata' : {}, - 'status' : 'ok'} - - def do_inspect(self, code, cursor_pos, detail_level=0): - name = token_at_cursor(code, cursor_pos) - info = self.shell.object_inspect(name) - - reply_content = {'status' : 'ok'} - reply_content['data'] = data = {} - reply_content['metadata'] = {} - reply_content['found'] = info['found'] - if info['found']: - info_text = self.shell.object_inspect_text( - name, - detail_level=detail_level, - ) - data['text/plain'] = info_text - - return reply_content - - def do_history(self, hist_access_type, output, raw, session=0, start=0, - stop=None, n=None, pattern=None, unique=False): - if hist_access_type == 'tail': - hist = self.shell.history_manager.get_tail(n, raw=raw, output=output, - include_latest=True) - - elif hist_access_type == 'range': - hist = self.shell.history_manager.get_range(session, start, stop, - raw=raw, output=output) - - elif hist_access_type == 'search': - hist = self.shell.history_manager.search( - pattern, raw=raw, output=output, n=n, unique=unique) - else: - hist = [] - - return { - 'status': 'ok', - 'history' : list(hist), - } - - def do_shutdown(self, restart): - self.shell.exit_now = True - return dict(status='ok', restart=restart) - - def do_is_complete(self, code): - status, indent_spaces = self.shell.input_transformer_manager.check_complete(code) - r = {'status': status} - if status == 'incomplete': - r['indent'] = ' ' * indent_spaces - return r - - def do_apply(self, content, bufs, msg_id, reply_metadata): - from .serialize import serialize_object, unpack_apply_message - shell = self.shell - try: - working = shell.user_ns - - prefix = "_"+str(msg_id).replace("-","")+"_" - - f,args,kwargs = unpack_apply_message(bufs, working, copy=False) - - fname = getattr(f, '__name__', 'f') - - fname = prefix+"f" - argname = prefix+"args" - kwargname = prefix+"kwargs" - resultname = prefix+"result" - - ns = { fname : f, argname : args, kwargname : kwargs , resultname : None } - # print ns - working.update(ns) - code = "%s = %s(*%s,**%s)" % (resultname, fname, argname, kwargname) - try: - exec(code, shell.user_global_ns, shell.user_ns) - result = working.get(resultname) - finally: - for key in ns: - working.pop(key) - - result_buf = serialize_object(result, - buffer_threshold=self.session.buffer_threshold, - item_threshold=self.session.item_threshold, - ) - - except BaseException as e: - # invoke IPython traceback formatting - shell.showtraceback() - reply_content = { - u'traceback': shell._last_traceback or [], - u'ename': unicode_type(type(e).__name__), - u'evalue': safe_unicode(e), - } - # FIXME: deprecate piece for ipyparallel: - e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='apply') - reply_content['engine_info'] = e_info - - self.send_response(self.iopub_socket, u'error', reply_content, - ident=self._topic('error')) - self.log.info("Exception in apply request:\n%s", '\n'.join(reply_content['traceback'])) - result_buf = [] - reply_content['status'] = 'error' - else: - reply_content = {'status' : 'ok'} - - return reply_content, result_buf - - def do_clear(self): - self.shell.reset(False) - return dict(status='ok') - - -# This exists only for backwards compatibility - use IPythonKernel instead - -class Kernel(IPythonKernel): - def __init__(self, *args, **kwargs): - import warnings - warnings.warn('Kernel is a deprecated alias of ipykernel.ipkernel.IPythonKernel', - DeprecationWarning) - super(Kernel, self).__init__(*args, **kwargs) +"""The IPython kernel implementation""" + +import getpass +import sys +import traceback +import uuid + +from IPython.core import release +from ipython_genutils.py3compat import builtin_mod, PY3, unicode_type, safe_unicode +from IPython.utils.tokenutil import token_at_cursor, line_at_cursor +from traitlets import Instance, Type, Any, List + +from .comm import CommManager +from .kernelbase import Kernel as KernelBase +from .zmqshell import ZMQInteractiveShell + + +class IPythonKernel(KernelBase): + shell = Instance('IPython.core.interactiveshell.InteractiveShellABC', + allow_none=True) + shell_class = Type(ZMQInteractiveShell) + + user_module = Any() + def _user_module_changed(self, name, old, new): + if self.shell is not None: + self.shell.user_module = new + + user_ns = Instance(dict, args=None, allow_none=True) + def _user_ns_changed(self, name, old, new): + if self.shell is not None: + self.shell.user_ns = new + self.shell.init_user_ns() + + # A reference to the Python builtin 'raw_input' function. + # (i.e., __builtin__.raw_input for Python 2.7, builtins.input for Python 3) + _sys_raw_input = Any() + _sys_eval_input = Any() + + def __init__(self, **kwargs): + super(IPythonKernel, self).__init__(**kwargs) + + # Initialize the InteractiveShell subclass + self.shell = self.shell_class.instance(parent=self, + profile_dir = self.profile_dir, + user_module = self.user_module, + user_ns = self.user_ns, + kernel = self, + ) + self.shell.displayhook.session = self.session + self.shell.displayhook.pub_socket = self.iopub_socket + self.shell.displayhook.topic = self._topic('execute_result') + self.shell.display_pub.session = self.session + self.shell.display_pub.pub_socket = self.iopub_socket + + self.comm_manager = CommManager(parent=self, kernel=self) + + self.shell.configurables.append(self.comm_manager) + comm_msg_types = [ 'comm_open', 'comm_msg', 'comm_close' ] + for msg_type in comm_msg_types: + self.shell_handlers[msg_type] = getattr(self.comm_manager, msg_type) + + help_links = List([ + { + 'text': "Python", + 'url': "http://docs.python.org/%i.%i" % sys.version_info[:2], + }, + { + 'text': "IPython", + 'url': "http://ipython.org/documentation.html", + }, + { + 'text': "NumPy", + 'url': "http://docs.scipy.org/doc/numpy/reference/", + }, + { + 'text': "SciPy", + 'url': "http://docs.scipy.org/doc/scipy/reference/", + }, + { + 'text': "Matplotlib", + 'url': "http://matplotlib.org/contents.html", + }, + { + 'text': "SymPy", + 'url': "http://docs.sympy.org/latest/index.html", + }, + { + 'text': "pandas", + 'url': "http://pandas.pydata.org/pandas-docs/stable/", + }, + ]).tag(config=True) + + # Kernel info fields + implementation = 'ipython' + implementation_version = release.version + language_info = { + 'name': 'python', + 'version': sys.version.split()[0], + 'mimetype': 'text/x-python', + 'codemirror_mode': { + 'name': 'ipython', + 'version': sys.version_info[0] + }, + 'pygments_lexer': 'ipython%d' % (3 if PY3 else 2), + 'nbconvert_exporter': 'python', + 'file_extension': '.py' + } + + @property + def banner(self): + return self.shell.banner + + def start(self): + self.shell.exit_now = False + super(IPythonKernel, self).start() + + def set_parent(self, ident, parent): + """Overridden from parent to tell the display hook and output streams + about the parent message. + """ + super(IPythonKernel, self).set_parent(ident, parent) + self.shell.set_parent(parent) + + def init_metadata(self, parent): + """Initialize metadata. + + Run at the beginning of each execution request. + """ + md = super(IPythonKernel, self).init_metadata(parent) + # FIXME: remove deprecated ipyparallel-specific code + # This is required for ipyparallel < 5.0 + md.update({ + 'dependencies_met' : True, + 'engine' : self.ident, + }) + return md + + def finish_metadata(self, parent, metadata, reply_content): + """Finish populating metadata. + + Run after completing an execution request. + """ + # FIXME: remove deprecated ipyparallel-specific code + # This is required by ipyparallel < 5.0 + metadata['status'] = reply_content['status'] + if reply_content['status'] == 'error' and reply_content['ename'] == 'UnmetDependency': + metadata['dependencies_met'] = False + + return metadata + + def _forward_input(self, allow_stdin=False): + """Forward raw_input and getpass to the current frontend. + + via input_request + """ + self._allow_stdin = allow_stdin + + if PY3: + self._sys_raw_input = builtin_mod.input + builtin_mod.input = self.raw_input + else: + self._sys_raw_input = builtin_mod.raw_input + self._sys_eval_input = builtin_mod.input + builtin_mod.raw_input = self.raw_input + builtin_mod.input = lambda prompt='': eval(self.raw_input(prompt)) + self._save_getpass = getpass.getpass + getpass.getpass = self.getpass + + def _restore_input(self): + """Restore raw_input, getpass""" + if PY3: + builtin_mod.input = self._sys_raw_input + else: + builtin_mod.raw_input = self._sys_raw_input + builtin_mod.input = self._sys_eval_input + + getpass.getpass = self._save_getpass + + @property + def execution_count(self): + return self.shell.execution_count + + @execution_count.setter + def execution_count(self, value): + # Ignore the incrememnting done by KernelBase, in favour of our shell's + # execution counter. + pass + + def do_execute(self, code, silent, store_history=True, + user_expressions=None, allow_stdin=False): + shell = self.shell # we'll need this a lot here + + self._forward_input(allow_stdin) + + reply_content = {} +#debug print + #print("Before output") + try: + #Edit - Jay Patel - assign cell_uuid to execution_count. cell_uuid is passed in user_expressions +#debug print + #print("Before run cell") + shell.execution_count = user_expressions.get('cell_uuid') + res = shell.run_cell(code, store_history=store_history, silent=silent) + #print("After run cell") + finally: + self._restore_input() + + if res.error_before_exec is not None: + err = res.error_before_exec + else: + err = res.error_in_exec + + if res.success: + reply_content[u'status'] = u'ok' + elif isinstance(err, KeyboardInterrupt): + reply_content[u'status'] = u'abort' + else: + reply_content[u'status'] = u'error' + + reply_content.update({ + u'traceback': shell._last_traceback or [], + u'ename': unicode_type(type(err).__name__), + u'evalue': safe_unicode(err), + }) + + # FIXME: deprecate piece for ipyparallel: + e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, + method='execute') + reply_content['engine_info'] = e_info + +#debug print + # Return the execution counter so clients can display prompts + #sys.stdout.write("Execution count ipkernel:") + #print(shell.execution_count) + #Edit - Jay Patel - Assign the same cell_uuid (which is now execution_count) to reply_content['execution_count'] + #modify execution count value - test + #shell.execution_count = uuid.uuid4().hex + reply_content['execution_count'] = shell.execution_count + #reply_content['execution_count'] = shell.execution_count - 1 + #reply_content['execution_count'] = chr(ord(self.execution_count) - 1) + + if 'traceback' in reply_content: + self.log.info("Exception in execute request:\n%s", '\n'.join(reply_content['traceback'])) + + + # At this point, we can tell whether the main code execution succeeded + # or not. If it did, we proceed to evaluate user_expressions + if reply_content['status'] == 'ok': + reply_content[u'user_expressions'] = \ + shell.user_expressions(user_expressions or {}) + else: + # If there was an error, don't even try to compute expressions + reply_content[u'user_expressions'] = {} + + # Payloads should be retrieved regardless of outcome, so we can both + # recover partial output (that could have been generated early in a + # block, before an error) and always clear the payload system. + reply_content[u'payload'] = shell.payload_manager.read_payload() + # Be aggressive about clearing the payload because we don't want + # it to sit in memory until the next execute_request comes in. + shell.payload_manager.clear_payload() + + return reply_content + + def do_complete(self, code, cursor_pos): + # FIXME: IPython completers currently assume single line, + # but completion messages give multi-line context + # For now, extract line from cell, based on cursor_pos: + if cursor_pos is None: + cursor_pos = len(code) + line, offset = line_at_cursor(code, cursor_pos) + line_cursor = cursor_pos - offset + + txt, matches = self.shell.complete('', line, line_cursor) + return {'matches' : matches, + 'cursor_end' : cursor_pos, + 'cursor_start' : cursor_pos - len(txt), + 'metadata' : {}, + 'status' : 'ok'} + + def do_inspect(self, code, cursor_pos, detail_level=0): + name = token_at_cursor(code, cursor_pos) + info = self.shell.object_inspect(name) + + reply_content = {'status' : 'ok'} + reply_content['data'] = data = {} + reply_content['metadata'] = {} + reply_content['found'] = info['found'] + if info['found']: + info_text = self.shell.object_inspect_text( + name, + detail_level=detail_level, + ) + data['text/plain'] = info_text + + return reply_content + + def do_history(self, hist_access_type, output, raw, session=None, start=None, + stop=None, n=None, pattern=None, unique=False): + if hist_access_type == 'tail': + hist = self.shell.history_manager.get_tail(n, raw=raw, output=output, + include_latest=True) + + elif hist_access_type == 'range': + hist = self.shell.history_manager.get_range(session, start, stop, + raw=raw, output=output) + + elif hist_access_type == 'search': + hist = self.shell.history_manager.search( + pattern, raw=raw, output=output, n=n, unique=unique) + else: + hist = [] + + return {'history' : list(hist)} + + def do_shutdown(self, restart): + self.shell.exit_now = True + return dict(status='ok', restart=restart) + + def do_is_complete(self, code): + status, indent_spaces = self.shell.input_transformer_manager.check_complete(code) + r = {'status': status} + if status == 'incomplete': + r['indent'] = ' ' * indent_spaces + return r + + def do_apply(self, content, bufs, msg_id, reply_metadata): + from .serialize import serialize_object, unpack_apply_message + shell = self.shell + try: + working = shell.user_ns + + prefix = "_"+str(msg_id).replace("-","")+"_" + + f,args,kwargs = unpack_apply_message(bufs, working, copy=False) + + fname = getattr(f, '__name__', 'f') + + fname = prefix+"f" + argname = prefix+"args" + kwargname = prefix+"kwargs" + resultname = prefix+"result" + + ns = { fname : f, argname : args, kwargname : kwargs , resultname : None } + # print ns + working.update(ns) + code = "%s = %s(*%s,**%s)" % (resultname, fname, argname, kwargname) + try: + exec(code, shell.user_global_ns, shell.user_ns) + result = working.get(resultname) + finally: + for key in ns: + working.pop(key) + + result_buf = serialize_object(result, + buffer_threshold=self.session.buffer_threshold, + item_threshold=self.session.item_threshold, + ) + + except BaseException as e: + # invoke IPython traceback formatting + shell.showtraceback() + reply_content = { + u'traceback': shell._last_traceback or [], + u'ename': unicode_type(type(e).__name__), + u'evalue': safe_unicode(e), + } + # FIXME: deprecate piece for ipyparallel: + e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method='apply') + reply_content['engine_info'] = e_info + + self.send_response(self.iopub_socket, u'error', reply_content, + ident=self._topic('error')) + self.log.info("Exception in apply request:\n%s", '\n'.join(reply_content['traceback'])) + result_buf = [] + reply_content['status'] = 'error' + else: + reply_content = {'status' : 'ok'} + + return reply_content, result_buf + + def do_clear(self): + self.shell.reset(False) + return dict(status='ok') + + +# This exists only for backwards compatibility - use IPythonKernel instead + +class Kernel(IPythonKernel): + def __init__(self, *args, **kwargs): + import warnings + warnings.warn('Kernel is a deprecated alias of ipykernel.ipkernel.IPythonKernel', + DeprecationWarning) + super(Kernel, self).__init__(*args, **kwargs) From 8df7bed870b7957ac56e827cb906eb6d2b22d77c Mon Sep 17 00:00:00 2001 From: Jay Patel Date: Sat, 29 Oct 2016 17:19:58 -0400 Subject: [PATCH 2/2] Do not initialize execution_count to 0 kernelbase.py: Do not initialize execution_count to 0. Use the same execution_count instead. --- ipykernel/kernelbase.py | 1484 ++++++++++++++++++++------------------- 1 file changed, 744 insertions(+), 740 deletions(-) diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index eb24b7448..9d8c836c5 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -1,740 +1,744 @@ -"""Base class for a kernel that talks to frontends over 0MQ.""" - -# Copyright (c) IPython Development Team. -# Distributed under the terms of the Modified BSD License. - -from __future__ import print_function - -import sys -import time -import logging -import uuid - -from datetime import datetime -from signal import signal, default_int_handler, SIGINT - -import zmq -from tornado import ioloop -from zmq.eventloop.zmqstream import ZMQStream - -from traitlets.config.configurable import SingletonConfigurable -from IPython.core.error import StdinNotImplementedError -from ipython_genutils import py3compat -from ipython_genutils.py3compat import unicode_type, string_types -from ipykernel.jsonutil import json_clean -from traitlets import ( - Any, Instance, Float, Dict, List, Set, Integer, Unicode, Bool, observe, default -) - -from jupyter_client.session import Session - -from ._version import kernel_protocol_version - -class Kernel(SingletonConfigurable): - - #--------------------------------------------------------------------------- - # Kernel interface - #--------------------------------------------------------------------------- - - # attribute to override with a GUI - eventloop = Any(None) - - @observe('eventloop') - def _update_eventloop(self, change): - """schedule call to eventloop from IOLoop""" - loop = ioloop.IOLoop.instance() - loop.add_callback(self.enter_eventloop) - - session = Instance(Session, allow_none=True) - profile_dir = Instance('IPython.core.profiledir.ProfileDir', allow_none=True) - shell_streams = List() - control_stream = Instance(ZMQStream, allow_none=True) - iopub_socket = Any() - iopub_thread = Any() - stdin_socket = Any() - log = Instance(logging.Logger, allow_none=True) - - # identities: - int_id = Integer(-1) - ident = Unicode() - - @default('ident') - def _default_ident(self): - return unicode_type(uuid.uuid4()) - - # This should be overridden by wrapper kernels that implement any real - # language. - language_info = {} - - # any links that should go in the help menu - help_links = List() - - # Private interface - - _darwin_app_nap = Bool(True, - help="""Whether to use appnope for compatiblity with OS X App Nap. - - Only affects OS X >= 10.9. - """ - ).tag(config=True) - - # track associations with current request - _allow_stdin = Bool(False) - _parent_header = Dict() - _parent_ident = Any(b'') - # Time to sleep after flushing the stdout/err buffers in each execute - # cycle. While this introduces a hard limit on the minimal latency of the - # execute cycle, it helps prevent output synchronization problems for - # clients. - # Units are in seconds. The minimum zmq latency on local host is probably - # ~150 microseconds, set this to 500us for now. We may need to increase it - # a little if it's not enough after more interactive testing. - _execute_sleep = Float(0.0005).tag(config=True) - - # Frequency of the kernel's event loop. - # Units are in seconds, kernel subclasses for GUI toolkits may need to - # adapt to milliseconds. - _poll_interval = Float(0.05).tag(config=True) - - # If the shutdown was requested over the network, we leave here the - # necessary reply message so it can be sent by our registered atexit - # handler. This ensures that the reply is only sent to clients truly at - # the end of our shutdown process (which happens after the underlying - # IPython shell's own shutdown). - _shutdown_message = None - - # This is a dict of port number that the kernel is listening on. It is set - # by record_ports and used by connect_request. - _recorded_ports = Dict() - - # set of aborted msg_ids - aborted = Set() - - # Track execution count here. For IPython, we override this to use the - # execution count we store in the shell. - execution_count = 0 - - msg_types = [ - 'execute_request', 'complete_request', - 'inspect_request', 'history_request', - 'comm_info_request', 'kernel_info_request', - 'connect_request', 'shutdown_request', - 'is_complete_request', - # deprecated: - 'apply_request', - ] - # add deprecated ipyparallel control messages - control_msg_types = msg_types + ['clear_request', 'abort_request'] - - def __init__(self, **kwargs): - super(Kernel, self).__init__(**kwargs) - - # Build dict of handlers for message types - self.shell_handlers = {} - for msg_type in self.msg_types: - self.shell_handlers[msg_type] = getattr(self, msg_type) - - self.control_handlers = {} - for msg_type in self.control_msg_types: - self.control_handlers[msg_type] = getattr(self, msg_type) - - def dispatch_control(self, msg): - """dispatch control requests""" - idents,msg = self.session.feed_identities(msg, copy=False) - try: - msg = self.session.deserialize(msg, content=True, copy=False) - except: - self.log.error("Invalid Control Message", exc_info=True) - return - - self.log.debug("Control received: %s", msg) - - # Set the parent message for side effects. - self.set_parent(idents, msg) - self._publish_status(u'busy') - - header = msg['header'] - msg_type = header['msg_type'] - - handler = self.control_handlers.get(msg_type, None) - if handler is None: - self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type) - else: - try: - handler(self.control_stream, idents, msg) - except Exception: - self.log.error("Exception in control handler:", exc_info=True) - - sys.stdout.flush() - sys.stderr.flush() - self._publish_status(u'idle') - - def should_handle(self, stream, msg, idents): - """Check whether a shell-channel message should be handled - - Allows subclasses to prevent handling of certain messages (e.g. aborted requests). - """ - msg_id = msg['header']['msg_id'] - if msg_id in self.aborted: - msg_type = msg['header']['msg_type'] - # is it safe to assume a msg_id will not be resubmitted? - self.aborted.remove(msg_id) - reply_type = msg_type.split('_')[0] + '_reply' - status = {'status' : 'aborted'} - md = {'engine' : self.ident} - md.update(status) - self.session.send(stream, reply_type, metadata=md, - content=status, parent=msg, ident=idents) - return False - return True - - def dispatch_shell(self, stream, msg): - """dispatch shell requests""" - # flush control requests first - if self.control_stream: - self.control_stream.flush() - - idents,msg = self.session.feed_identities(msg, copy=False) - try: - msg = self.session.deserialize(msg, content=True, copy=False) - except: - self.log.error("Invalid Message", exc_info=True) - return - - # Set the parent message for side effects. - self.set_parent(idents, msg) - self._publish_status(u'busy') - - header = msg['header'] - msg_id = header['msg_id'] - msg_type = msg['header']['msg_type'] - - # Print some info about this message and leave a '--->' marker, so it's - # easier to trace visually the message chain when debugging. Each - # handler prints its message at the end. - self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type) - self.log.debug(' Content: %s\n --->\n ', msg['content']) - - if not self.should_handle(stream, msg, idents): - return - - handler = self.shell_handlers.get(msg_type, None) - if handler is None: - self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type) - else: - self.log.debug("%s: %s", msg_type, msg) - self.pre_handler_hook() - try: - handler(stream, idents, msg) - except Exception: - self.log.error("Exception in message handler:", exc_info=True) - finally: - self.post_handler_hook() - - sys.stdout.flush() - sys.stderr.flush() - self._publish_status(u'idle') - - def pre_handler_hook(self): - """Hook to execute before calling message handler""" - # ensure default_int_handler during handler call - self.saved_sigint_handler = signal(SIGINT, default_int_handler) - - def post_handler_hook(self): - """Hook to execute after calling message handler""" - signal(SIGINT, self.saved_sigint_handler) - - def enter_eventloop(self): - """enter eventloop""" - self.log.info("entering eventloop %s", self.eventloop) - for stream in self.shell_streams: - # flush any pending replies, - # which may be skipped by entering the eventloop - stream.flush(zmq.POLLOUT) - # restore default_int_handler - signal(SIGINT, default_int_handler) - while self.eventloop is not None: - try: - self.eventloop(self) - except KeyboardInterrupt: - # Ctrl-C shouldn't crash the kernel - self.log.error("KeyboardInterrupt caught in kernel") - continue - else: - # eventloop exited cleanly, this means we should stop (right?) - self.eventloop = None - break - self.log.info("exiting eventloop") - - def start(self): - """register dispatchers for streams""" - if self.control_stream: - self.control_stream.on_recv(self.dispatch_control, copy=False) - - def make_dispatcher(stream): - def dispatcher(msg): - return self.dispatch_shell(stream, msg) - return dispatcher - - for s in self.shell_streams: - s.on_recv(make_dispatcher(s), copy=False) - - # publish idle status - self._publish_status('starting') - - def do_one_iteration(self): - """step eventloop just once""" - if self.control_stream: - self.control_stream.flush() - for stream in self.shell_streams: - # handle at most one request per iteration - stream.flush(zmq.POLLIN, 1) - stream.flush(zmq.POLLOUT) - - def record_ports(self, ports): - """Record the ports that this kernel is using. - - The creator of the Kernel instance must call this methods if they - want the :meth:`connect_request` method to return the port numbers. - """ - self._recorded_ports = ports - - #--------------------------------------------------------------------------- - # Kernel request handlers - #--------------------------------------------------------------------------- - - def _publish_execute_input(self, code, parent, execution_count): - """Publish the code request on the iopub stream.""" - - self.session.send(self.iopub_socket, u'execute_input', - {u'code':code, u'execution_count': execution_count}, - parent=parent, ident=self._topic('execute_input') - ) - - def _publish_status(self, status, parent=None): - """send status (busy/idle) on IOPub""" - self.session.send(self.iopub_socket, - u'status', - {u'execution_state': status}, - parent=parent or self._parent_header, - ident=self._topic('status'), - ) - - def set_parent(self, ident, parent): - """Set the current parent_header - - Side effects (IOPub messages) and replies are associated with - the request that caused them via the parent_header. - - The parent identity is used to route input_request messages - on the stdin channel. - """ - self._parent_ident = ident - self._parent_header = parent - - def send_response(self, stream, msg_or_type, content=None, ident=None, - buffers=None, track=False, header=None, metadata=None): - """Send a response to the message we're currently processing. - - This accepts all the parameters of :meth:`jupyter_client.session.Session.send` - except ``parent``. - - This relies on :meth:`set_parent` having been called for the current - message. - """ - return self.session.send(stream, msg_or_type, content, self._parent_header, - ident, buffers, track, header, metadata) - - def init_metadata(self, parent): - """Initialize metadata. - - Run at the beginning of execution requests. - """ - return { - 'started': datetime.now(), - } - - def finish_metadata(self, parent, metadata, reply_content): - """Finish populating metadata. - - Run after completing an execution request. - """ - return metadata - - def execute_request(self, stream, ident, parent): - """handle an execute_request""" - - try: - content = parent[u'content'] - code = py3compat.cast_unicode_py2(content[u'code']) - silent = content[u'silent'] - store_history = content.get(u'store_history', not silent) - user_expressions = content.get('user_expressions', {}) - allow_stdin = content.get('allow_stdin', False) - except: - self.log.error("Got bad msg: ") - self.log.error("%s", parent) - return - - stop_on_error = content.get('stop_on_error', True) - - metadata = self.init_metadata(parent) - - # Re-broadcast our input for the benefit of listening clients, and - # start computing output - if not silent: - self.execution_count += 1 - self._publish_execute_input(code, parent, self.execution_count) - - reply_content = self.do_execute(code, silent, store_history, - user_expressions, allow_stdin) - - # Flush output before sending the reply. - sys.stdout.flush() - sys.stderr.flush() - # FIXME: on rare occasions, the flush doesn't seem to make it to the - # clients... This seems to mitigate the problem, but we definitely need - # to better understand what's going on. - if self._execute_sleep: - time.sleep(self._execute_sleep) - - # Send the reply. - reply_content = json_clean(reply_content) - metadata = self.finish_metadata(parent, metadata, reply_content) - - reply_msg = self.session.send(stream, u'execute_reply', - reply_content, parent, metadata=metadata, - ident=ident) - - self.log.debug("%s", reply_msg) - - if not silent and reply_msg['content']['status'] == u'error' and stop_on_error: - self._abort_queues() - - def do_execute(self, code, silent, store_history=True, - user_expressions=None, allow_stdin=False): - """Execute user code. Must be overridden by subclasses. - """ - raise NotImplementedError - - def complete_request(self, stream, ident, parent): - content = parent['content'] - code = content['code'] - cursor_pos = content['cursor_pos'] - - matches = self.do_complete(code, cursor_pos) - matches = json_clean(matches) - completion_msg = self.session.send(stream, 'complete_reply', - matches, parent, ident) - self.log.debug("%s", completion_msg) - - def do_complete(self, code, cursor_pos): - """Override in subclasses to find completions. - """ - return {'matches' : [], - 'cursor_end' : cursor_pos, - 'cursor_start' : cursor_pos, - 'metadata' : {}, - 'status' : 'ok'} - - def inspect_request(self, stream, ident, parent): - content = parent['content'] - - reply_content = self.do_inspect(content['code'], content['cursor_pos'], - content.get('detail_level', 0)) - # Before we send this object over, we scrub it for JSON usage - reply_content = json_clean(reply_content) - msg = self.session.send(stream, 'inspect_reply', - reply_content, parent, ident) - self.log.debug("%s", msg) - - def do_inspect(self, code, cursor_pos, detail_level=0): - """Override in subclasses to allow introspection. - """ - return {'status': 'ok', 'data': {}, 'metadata': {}, 'found': False} - - def history_request(self, stream, ident, parent): - content = parent['content'] - - reply_content = self.do_history(**content) - - reply_content = json_clean(reply_content) - msg = self.session.send(stream, 'history_reply', - reply_content, parent, ident) - self.log.debug("%s", msg) - - def do_history(self, hist_access_type, output, raw, session=None, start=None, - stop=None, n=None, pattern=None, unique=False): - """Override in subclasses to access history. - """ - return {'status': 'ok', 'history': []} - - def connect_request(self, stream, ident, parent): - if self._recorded_ports is not None: - content = self._recorded_ports.copy() - else: - content = {} - content['status'] = 'ok' - msg = self.session.send(stream, 'connect_reply', - content, parent, ident) - self.log.debug("%s", msg) - - @property - def kernel_info(self): - return { - 'protocol_version': kernel_protocol_version, - 'implementation': self.implementation, - 'implementation_version': self.implementation_version, - 'language_info': self.language_info, - 'banner': self.banner, - 'help_links': self.help_links, - } - - def kernel_info_request(self, stream, ident, parent): - content = {'status': 'ok'} - content.update(self.kernel_info) - msg = self.session.send(stream, 'kernel_info_reply', - content, parent, ident) - self.log.debug("%s", msg) - - def comm_info_request(self, stream, ident, parent): - content = parent['content'] - target_name = content.get('target_name', None) - - # Should this be moved to ipkernel? - if hasattr(self, 'comm_manager'): - comms = { - k: dict(target_name=v.target_name) - for (k, v) in self.comm_manager.comms.items() - if v.target_name == target_name or target_name is None - } - else: - comms = {} - reply_content = dict(comms=comms, status='ok') - msg = self.session.send(stream, 'comm_info_reply', - reply_content, parent, ident) - self.log.debug("%s", msg) - - def shutdown_request(self, stream, ident, parent): - content = self.do_shutdown(parent['content']['restart']) - self.session.send(stream, u'shutdown_reply', content, parent, ident=ident) - # same content, but different msg_id for broadcasting on IOPub - self._shutdown_message = self.session.msg(u'shutdown_reply', - content, parent - ) - - self._at_shutdown() - # call sys.exit after a short delay - loop = ioloop.IOLoop.instance() - loop.add_timeout(time.time()+0.1, loop.stop) - - def do_shutdown(self, restart): - """Override in subclasses to do things when the frontend shuts down the - kernel. - """ - return {'status': 'ok', 'restart': restart} - - def is_complete_request(self, stream, ident, parent): - content = parent['content'] - code = content['code'] - - reply_content = self.do_is_complete(code) - reply_content = json_clean(reply_content) - reply_msg = self.session.send(stream, 'is_complete_reply', - reply_content, parent, ident) - self.log.debug("%s", reply_msg) - - def do_is_complete(self, code): - """Override in subclasses to find completions. - """ - return {'status' : 'unknown', - } - - #--------------------------------------------------------------------------- - # Engine methods (DEPRECATED) - #--------------------------------------------------------------------------- - - def apply_request(self, stream, ident, parent): - self.log.warn("""apply_request is deprecated in kernel_base, moving to ipyparallel.""") - try: - content = parent[u'content'] - bufs = parent[u'buffers'] - msg_id = parent['header']['msg_id'] - except: - self.log.error("Got bad msg: %s", parent, exc_info=True) - return - - md = self.init_metadata(parent) - - reply_content, result_buf = self.do_apply(content, bufs, msg_id, md) - - # flush i/o - sys.stdout.flush() - sys.stderr.flush() - - md = self.finish_metadata(parent, md, reply_content) - - self.session.send(stream, u'apply_reply', reply_content, - parent=parent, ident=ident,buffers=result_buf, metadata=md) - - def do_apply(self, content, bufs, msg_id, reply_metadata): - """DEPRECATED""" - raise NotImplementedError - - #--------------------------------------------------------------------------- - # Control messages (DEPRECATED) - #--------------------------------------------------------------------------- - - def abort_request(self, stream, ident, parent): - """abort a specific msg by id""" - self.log.warn("abort_request is deprecated in kernel_base. It os only part of IPython parallel") - msg_ids = parent['content'].get('msg_ids', None) - if isinstance(msg_ids, string_types): - msg_ids = [msg_ids] - if not msg_ids: - self._abort_queues() - for mid in msg_ids: - self.aborted.add(str(mid)) - - content = dict(status='ok') - reply_msg = self.session.send(stream, 'abort_reply', content=content, - parent=parent, ident=ident) - self.log.debug("%s", reply_msg) - - def clear_request(self, stream, idents, parent): - """Clear our namespace.""" - self.log.warn("clear_request is deprecated in kernel_base. It os only part of IPython parallel") - content = self.do_clear() - self.session.send(stream, 'clear_reply', ident=idents, parent=parent, - content = content) - - def do_clear(self): - """DEPRECATED""" - raise NotImplementedError - - #--------------------------------------------------------------------------- - # Protected interface - #--------------------------------------------------------------------------- - - def _topic(self, topic): - """prefixed topic for IOPub messages""" - base = "kernel.%s" % self.ident - - return py3compat.cast_bytes("%s.%s" % (base, topic)) - - def _abort_queues(self): - for stream in self.shell_streams: - if stream: - self._abort_queue(stream) - - def _abort_queue(self, stream): - poller = zmq.Poller() - poller.register(stream.socket, zmq.POLLIN) - while True: - idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True) - if msg is None: - return - - self.log.info("Aborting:") - self.log.info("%s", msg) - msg_type = msg['header']['msg_type'] - reply_type = msg_type.split('_')[0] + '_reply' - - status = {'status' : 'aborted'} - md = {'engine' : self.ident} - md.update(status) - reply_msg = self.session.send(stream, reply_type, metadata=md, - content=status, parent=msg, ident=idents) - self.log.debug("%s", reply_msg) - # We need to wait a bit for requests to come in. This can probably - # be set shorter for true asynchronous clients. - poller.poll(50) - - def _no_raw_input(self): - """Raise StdinNotImplentedError if active frontend doesn't support - stdin.""" - raise StdinNotImplementedError("raw_input was called, but this " - "frontend does not support stdin.") - - def getpass(self, prompt=''): - """Forward getpass to frontends - - Raises - ------ - StdinNotImplentedError if active frontend doesn't support stdin. - """ - if not self._allow_stdin: - raise StdinNotImplementedError( - "getpass was called, but this frontend does not support input requests." - ) - return self._input_request(prompt, - self._parent_ident, - self._parent_header, - password=True, - ) - - def raw_input(self, prompt=''): - """Forward raw_input to frontends - - Raises - ------ - StdinNotImplentedError if active frontend doesn't support stdin. - """ - if not self._allow_stdin: - raise StdinNotImplementedError( - "raw_input was called, but this frontend does not support input requests." - ) - return self._input_request(str(prompt), - self._parent_ident, - self._parent_header, - password=False, - ) - - def _input_request(self, prompt, ident, parent, password=False): - # Flush output before making the request. - sys.stderr.flush() - sys.stdout.flush() - # flush the stdin socket, to purge stale replies - while True: - try: - self.stdin_socket.recv_multipart(zmq.NOBLOCK) - except zmq.ZMQError as e: - if e.errno == zmq.EAGAIN: - break - else: - raise - - # Send the input request. - content = json_clean(dict(prompt=prompt, password=password)) - self.session.send(self.stdin_socket, u'input_request', content, parent, - ident=ident) - - # Await a response. - while True: - try: - ident, reply = self.session.recv(self.stdin_socket, 0) - except Exception: - self.log.warn("Invalid Message:", exc_info=True) - except KeyboardInterrupt: - # re-raise KeyboardInterrupt, to truncate traceback - raise KeyboardInterrupt - else: - break - try: - value = py3compat.unicode_to_str(reply['content']['value']) - except: - self.log.error("Bad input_reply: %s", parent) - value = '' - if value == '\x04': - # EOF - raise EOFError - return value - - def _at_shutdown(self): - """Actions taken at shutdown by the kernel, called by python's atexit. - """ - # io.rprint("Kernel at_shutdown") # dbg - if self._shutdown_message is not None: - self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown')) - self.log.debug("%s", self._shutdown_message) - [ s.flush(zmq.POLLOUT) for s in self.shell_streams ] +"""Base class for a kernel that talks to frontends over 0MQ.""" + +# Copyright (c) IPython Development Team. +# Distributed under the terms of the Modified BSD License. + +from __future__ import print_function + +import sys +import time +import logging +import uuid + +from datetime import datetime +from signal import signal, default_int_handler, SIGINT + +import zmq +from tornado import ioloop +from zmq.eventloop.zmqstream import ZMQStream + +from traitlets.config.configurable import SingletonConfigurable +from IPython.core.error import StdinNotImplementedError +from ipython_genutils import py3compat +from ipython_genutils.py3compat import unicode_type, string_types +from ipykernel.jsonutil import json_clean +from traitlets import ( + Any, Instance, Float, Dict, List, Set, Integer, Unicode, Bool, observe, default +) + +from jupyter_client.session import Session + +from ._version import kernel_protocol_version + +class Kernel(SingletonConfigurable): + + #--------------------------------------------------------------------------- + # Kernel interface + #--------------------------------------------------------------------------- + + # attribute to override with a GUI + eventloop = Any(None) + + @observe('eventloop') + def _update_eventloop(self, change): + """schedule call to eventloop from IOLoop""" + loop = ioloop.IOLoop.instance() + loop.add_callback(self.enter_eventloop) + + session = Instance(Session, allow_none=True) + profile_dir = Instance('IPython.core.profiledir.ProfileDir', allow_none=True) + shell_streams = List() + control_stream = Instance(ZMQStream, allow_none=True) + iopub_socket = Any() + iopub_thread = Any() + stdin_socket = Any() + log = Instance(logging.Logger, allow_none=True) + + # identities: + int_id = Integer(-1) + ident = Unicode() + + @default('ident') + def _default_ident(self): + return unicode_type(uuid.uuid4()) + + # This should be overridden by wrapper kernels that implement any real + # language. + language_info = {} + + # any links that should go in the help menu + help_links = List() + + # Private interface + + _darwin_app_nap = Bool(True, + help="""Whether to use appnope for compatiblity with OS X App Nap. + + Only affects OS X >= 10.9. + """ + ).tag(config=True) + + # track associations with current request + _allow_stdin = Bool(False) + _parent_header = Dict() + _parent_ident = Any(b'') + # Time to sleep after flushing the stdout/err buffers in each execute + # cycle. While this introduces a hard limit on the minimal latency of the + # execute cycle, it helps prevent output synchronization problems for + # clients. + # Units are in seconds. The minimum zmq latency on local host is probably + # ~150 microseconds, set this to 500us for now. We may need to increase it + # a little if it's not enough after more interactive testing. + _execute_sleep = Float(0.0005).tag(config=True) + + # Frequency of the kernel's event loop. + # Units are in seconds, kernel subclasses for GUI toolkits may need to + # adapt to milliseconds. + _poll_interval = Float(0.05).tag(config=True) + + # If the shutdown was requested over the network, we leave here the + # necessary reply message so it can be sent by our registered atexit + # handler. This ensures that the reply is only sent to clients truly at + # the end of our shutdown process (which happens after the underlying + # IPython shell's own shutdown). + _shutdown_message = None + + # This is a dict of port number that the kernel is listening on. It is set + # by record_ports and used by connect_request. + _recorded_ports = Dict() + + # set of aborted msg_ids + aborted = Set() + + # Track execution count here. For IPython, we override this to use the + # execution count we store in the shell. +#Edit - Jay Patel - Do not initialize execution_count to 0. Use the same execution_count instead. + #execution_count = 0 + #execution_count = 'abcd' + #execution_count = chr(96) + + msg_types = [ + 'execute_request', 'complete_request', + 'inspect_request', 'history_request', + 'comm_info_request', 'kernel_info_request', + 'connect_request', 'shutdown_request', + 'is_complete_request', + # deprecated: + 'apply_request', + ] + # add deprecated ipyparallel control messages + control_msg_types = msg_types + ['clear_request', 'abort_request'] + + def __init__(self, **kwargs): + super(Kernel, self).__init__(**kwargs) + + # Build dict of handlers for message types + self.shell_handlers = {} + for msg_type in self.msg_types: + self.shell_handlers[msg_type] = getattr(self, msg_type) + + self.control_handlers = {} + for msg_type in self.control_msg_types: + self.control_handlers[msg_type] = getattr(self, msg_type) + + def dispatch_control(self, msg): + """dispatch control requests""" + idents,msg = self.session.feed_identities(msg, copy=False) + try: + msg = self.session.deserialize(msg, content=True, copy=False) + except: + self.log.error("Invalid Control Message", exc_info=True) + return + + self.log.debug("Control received: %s", msg) + + # Set the parent message for side effects. + self.set_parent(idents, msg) + self._publish_status(u'busy') + + header = msg['header'] + msg_type = header['msg_type'] + + handler = self.control_handlers.get(msg_type, None) + if handler is None: + self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r", msg_type) + else: + try: + handler(self.control_stream, idents, msg) + except Exception: + self.log.error("Exception in control handler:", exc_info=True) + + sys.stdout.flush() + sys.stderr.flush() + self._publish_status(u'idle') + + def should_handle(self, stream, msg, idents): + """Check whether a shell-channel message should be handled + + Allows subclasses to prevent handling of certain messages (e.g. aborted requests). + """ + msg_id = msg['header']['msg_id'] + if msg_id in self.aborted: + msg_type = msg['header']['msg_type'] + # is it safe to assume a msg_id will not be resubmitted? + self.aborted.remove(msg_id) + reply_type = msg_type.split('_')[0] + '_reply' + status = {'status' : 'aborted'} + md = {'engine' : self.ident} + md.update(status) + self.session.send(stream, reply_type, metadata=md, + content=status, parent=msg, ident=idents) + return False + return True + + def dispatch_shell(self, stream, msg): + """dispatch shell requests""" + # flush control requests first + if self.control_stream: + self.control_stream.flush() + + idents,msg = self.session.feed_identities(msg, copy=False) + try: + msg = self.session.deserialize(msg, content=True, copy=False) + except: + self.log.error("Invalid Message", exc_info=True) + return + + # Set the parent message for side effects. + self.set_parent(idents, msg) + self._publish_status(u'busy') + + header = msg['header'] + msg_id = header['msg_id'] + msg_type = msg['header']['msg_type'] + + # Print some info about this message and leave a '--->' marker, so it's + # easier to trace visually the message chain when debugging. Each + # handler prints its message at the end. + self.log.debug('\n*** MESSAGE TYPE:%s***', msg_type) + self.log.debug(' Content: %s\n --->\n ', msg['content']) + + if not self.should_handle(stream, msg, idents): + return + + handler = self.shell_handlers.get(msg_type, None) + if handler is None: + self.log.error("UNKNOWN MESSAGE TYPE: %r", msg_type) + else: + self.log.debug("%s: %s", msg_type, msg) + self.pre_handler_hook() + try: + handler(stream, idents, msg) + except Exception: + self.log.error("Exception in message handler:", exc_info=True) + finally: + self.post_handler_hook() + + sys.stdout.flush() + sys.stderr.flush() + self._publish_status(u'idle') + + def pre_handler_hook(self): + """Hook to execute before calling message handler""" + # ensure default_int_handler during handler call + self.saved_sigint_handler = signal(SIGINT, default_int_handler) + + def post_handler_hook(self): + """Hook to execute after calling message handler""" + signal(SIGINT, self.saved_sigint_handler) + + def enter_eventloop(self): + """enter eventloop""" + self.log.info("entering eventloop %s", self.eventloop) + for stream in self.shell_streams: + # flush any pending replies, + # which may be skipped by entering the eventloop + stream.flush(zmq.POLLOUT) + # restore default_int_handler + signal(SIGINT, default_int_handler) + while self.eventloop is not None: + try: + self.eventloop(self) + except KeyboardInterrupt: + # Ctrl-C shouldn't crash the kernel + self.log.error("KeyboardInterrupt caught in kernel") + continue + else: + # eventloop exited cleanly, this means we should stop (right?) + self.eventloop = None + break + self.log.info("exiting eventloop") + + def start(self): + """register dispatchers for streams""" + if self.control_stream: + self.control_stream.on_recv(self.dispatch_control, copy=False) + + def make_dispatcher(stream): + def dispatcher(msg): + return self.dispatch_shell(stream, msg) + return dispatcher + + for s in self.shell_streams: + s.on_recv(make_dispatcher(s), copy=False) + + # publish idle status + self._publish_status('starting') + + def do_one_iteration(self): + """step eventloop just once""" + if self.control_stream: + self.control_stream.flush() + for stream in self.shell_streams: + # handle at most one request per iteration + stream.flush(zmq.POLLIN, 1) + stream.flush(zmq.POLLOUT) + + def record_ports(self, ports): + """Record the ports that this kernel is using. + + The creator of the Kernel instance must call this methods if they + want the :meth:`connect_request` method to return the port numbers. + """ + self._recorded_ports = ports + + #--------------------------------------------------------------------------- + # Kernel request handlers + #--------------------------------------------------------------------------- + + def _publish_execute_input(self, code, parent, execution_count): + """Publish the code request on the iopub stream.""" + + self.session.send(self.iopub_socket, u'execute_input', + {u'code':code, u'execution_count': execution_count}, + parent=parent, ident=self._topic('execute_input') + ) + + def _publish_status(self, status, parent=None): + """send status (busy/idle) on IOPub""" + self.session.send(self.iopub_socket, + u'status', + {u'execution_state': status}, + parent=parent or self._parent_header, + ident=self._topic('status'), + ) + + def set_parent(self, ident, parent): + """Set the current parent_header + + Side effects (IOPub messages) and replies are associated with + the request that caused them via the parent_header. + + The parent identity is used to route input_request messages + on the stdin channel. + """ + self._parent_ident = ident + self._parent_header = parent + + def send_response(self, stream, msg_or_type, content=None, ident=None, + buffers=None, track=False, header=None, metadata=None): + """Send a response to the message we're currently processing. + + This accepts all the parameters of :meth:`jupyter_client.session.Session.send` + except ``parent``. + + This relies on :meth:`set_parent` having been called for the current + message. + """ + return self.session.send(stream, msg_or_type, content, self._parent_header, + ident, buffers, track, header, metadata) + + def init_metadata(self, parent): + """Initialize metadata. + + Run at the beginning of execution requests. + """ + return { + 'started': datetime.now(), + } + + def finish_metadata(self, parent, metadata, reply_content): + """Finish populating metadata. + + Run after completing an execution request. + """ + return metadata + + def execute_request(self, stream, ident, parent): + """handle an execute_request""" + + try: + content = parent[u'content'] + code = py3compat.cast_unicode_py2(content[u'code']) + silent = content[u'silent'] + store_history = content.get(u'store_history', not silent) + user_expressions = content.get('user_expressions', {}) + allow_stdin = content.get('allow_stdin', False) + except: + self.log.error("Got bad msg: ") + self.log.error("%s", parent) + return + + stop_on_error = content.get('stop_on_error', True) + + metadata = self.init_metadata(parent) + + # Re-broadcast our input for the benefit of listening clients, and + # start computing output + if not silent: +#test + #self.execution_count = uuid.uuid4().hex + #self.execution_count = new_uuid + #self.execution_count += 1 + #self.execution_count = chr(ord(self.execution_count) + 1) + self._publish_execute_input(code, parent, self.execution_count) + + reply_content = self.do_execute(code, silent, store_history, + user_expressions, allow_stdin) + + # Flush output before sending the reply. + sys.stdout.flush() + sys.stderr.flush() + # FIXME: on rare occasions, the flush doesn't seem to make it to the + # clients... This seems to mitigate the problem, but we definitely need + # to better understand what's going on. + if self._execute_sleep: + time.sleep(self._execute_sleep) + + # Send the reply. + reply_content = json_clean(reply_content) + metadata = self.finish_metadata(parent, metadata, reply_content) + + reply_msg = self.session.send(stream, u'execute_reply', + reply_content, parent, metadata=metadata, + ident=ident) + + self.log.debug("%s", reply_msg) + + if not silent and reply_msg['content']['status'] == u'error' and stop_on_error: + self._abort_queues() + + def do_execute(self, code, silent, store_history=True, + user_expressions=None, allow_stdin=False): + """Execute user code. Must be overridden by subclasses. + """ + raise NotImplementedError + + def complete_request(self, stream, ident, parent): + content = parent['content'] + code = content['code'] + cursor_pos = content['cursor_pos'] + + matches = self.do_complete(code, cursor_pos) + matches = json_clean(matches) + completion_msg = self.session.send(stream, 'complete_reply', + matches, parent, ident) + self.log.debug("%s", completion_msg) + + def do_complete(self, code, cursor_pos): + """Override in subclasses to find completions. + """ + return {'matches' : [], + 'cursor_end' : cursor_pos, + 'cursor_start' : cursor_pos, + 'metadata' : {}, + 'status' : 'ok'} + + def inspect_request(self, stream, ident, parent): + content = parent['content'] + + reply_content = self.do_inspect(content['code'], content['cursor_pos'], + content.get('detail_level', 0)) + # Before we send this object over, we scrub it for JSON usage + reply_content = json_clean(reply_content) + msg = self.session.send(stream, 'inspect_reply', + reply_content, parent, ident) + self.log.debug("%s", msg) + + def do_inspect(self, code, cursor_pos, detail_level=0): + """Override in subclasses to allow introspection. + """ + return {'status': 'ok', 'data': {}, 'metadata': {}, 'found': False} + + def history_request(self, stream, ident, parent): + content = parent['content'] + + reply_content = self.do_history(**content) + + reply_content = json_clean(reply_content) + msg = self.session.send(stream, 'history_reply', + reply_content, parent, ident) + self.log.debug("%s", msg) + + def do_history(self, hist_access_type, output, raw, session=None, start=None, + stop=None, n=None, pattern=None, unique=False): + """Override in subclasses to access history. + """ + return {'history': []} + + def connect_request(self, stream, ident, parent): + if self._recorded_ports is not None: + content = self._recorded_ports.copy() + else: + content = {} + msg = self.session.send(stream, 'connect_reply', + content, parent, ident) + self.log.debug("%s", msg) + + @property + def kernel_info(self): + return { + 'protocol_version': kernel_protocol_version, + 'implementation': self.implementation, + 'implementation_version': self.implementation_version, + 'language_info': self.language_info, + 'banner': self.banner, + 'help_links': self.help_links, + } + + def kernel_info_request(self, stream, ident, parent): + msg = self.session.send(stream, 'kernel_info_reply', + self.kernel_info, parent, ident) + self.log.debug("%s", msg) + + def comm_info_request(self, stream, ident, parent): + content = parent['content'] + target_name = content.get('target_name', None) + + # Should this be moved to ipkernel? + if hasattr(self, 'comm_manager'): + comms = { + k: dict(target_name=v.target_name) + for (k, v) in self.comm_manager.comms.items() + if v.target_name == target_name or target_name is None + } + else: + comms = {} + reply_content = dict(comms=comms) + msg = self.session.send(stream, 'comm_info_reply', + reply_content, parent, ident) + self.log.debug("%s", msg) + + def shutdown_request(self, stream, ident, parent): + content = self.do_shutdown(parent['content']['restart']) + self.session.send(stream, u'shutdown_reply', content, parent, ident=ident) + # same content, but different msg_id for broadcasting on IOPub + self._shutdown_message = self.session.msg(u'shutdown_reply', + content, parent + ) + + self._at_shutdown() + # call sys.exit after a short delay + loop = ioloop.IOLoop.instance() + loop.add_timeout(time.time()+0.1, loop.stop) + + def do_shutdown(self, restart): + """Override in subclasses to do things when the frontend shuts down the + kernel. + """ + return {'status': 'ok', 'restart': restart} + + def is_complete_request(self, stream, ident, parent): + content = parent['content'] + code = content['code'] + + reply_content = self.do_is_complete(code) + reply_content = json_clean(reply_content) + reply_msg = self.session.send(stream, 'is_complete_reply', + reply_content, parent, ident) + self.log.debug("%s", reply_msg) + + def do_is_complete(self, code): + """Override in subclasses to find completions. + """ + return {'status' : 'unknown', + } + + #--------------------------------------------------------------------------- + # Engine methods (DEPRECATED) + #--------------------------------------------------------------------------- + + def apply_request(self, stream, ident, parent): + self.log.warn("""apply_request is deprecated in kernel_base, moving to ipyparallel.""") + try: + content = parent[u'content'] + bufs = parent[u'buffers'] + msg_id = parent['header']['msg_id'] + except: + self.log.error("Got bad msg: %s", parent, exc_info=True) + return + + md = self.init_metadata(parent) + + reply_content, result_buf = self.do_apply(content, bufs, msg_id, md) + + # flush i/o + sys.stdout.flush() + sys.stderr.flush() + + md = self.finish_metadata(parent, md, reply_content) + + self.session.send(stream, u'apply_reply', reply_content, + parent=parent, ident=ident,buffers=result_buf, metadata=md) + + def do_apply(self, content, bufs, msg_id, reply_metadata): + """DEPRECATED""" + raise NotImplementedError + + #--------------------------------------------------------------------------- + # Control messages (DEPRECATED) + #--------------------------------------------------------------------------- + + def abort_request(self, stream, ident, parent): + """abort a specific msg by id""" + self.log.warn("abort_request is deprecated in kernel_base. It os only part of IPython parallel") + msg_ids = parent['content'].get('msg_ids', None) + if isinstance(msg_ids, string_types): + msg_ids = [msg_ids] + if not msg_ids: + self._abort_queues() + for mid in msg_ids: + self.aborted.add(str(mid)) + + content = dict(status='ok') + reply_msg = self.session.send(stream, 'abort_reply', content=content, + parent=parent, ident=ident) + self.log.debug("%s", reply_msg) + + def clear_request(self, stream, idents, parent): + """Clear our namespace.""" + self.log.warn("clear_request is deprecated in kernel_base. It os only part of IPython parallel") + content = self.do_clear() + self.session.send(stream, 'clear_reply', ident=idents, parent=parent, + content = content) + + def do_clear(self): + """DEPRECATED""" + raise NotImplementedError + + #--------------------------------------------------------------------------- + # Protected interface + #--------------------------------------------------------------------------- + + def _topic(self, topic): + """prefixed topic for IOPub messages""" + base = "kernel.%s" % self.ident + + return py3compat.cast_bytes("%s.%s" % (base, topic)) + + def _abort_queues(self): + for stream in self.shell_streams: + if stream: + self._abort_queue(stream) + + def _abort_queue(self, stream): + poller = zmq.Poller() + poller.register(stream.socket, zmq.POLLIN) + while True: + idents,msg = self.session.recv(stream, zmq.NOBLOCK, content=True) + if msg is None: + return + + self.log.info("Aborting:") + self.log.info("%s", msg) + msg_type = msg['header']['msg_type'] + reply_type = msg_type.split('_')[0] + '_reply' + + status = {'status' : 'aborted'} + md = {'engine' : self.ident} + md.update(status) + reply_msg = self.session.send(stream, reply_type, metadata=md, + content=status, parent=msg, ident=idents) + self.log.debug("%s", reply_msg) + # We need to wait a bit for requests to come in. This can probably + # be set shorter for true asynchronous clients. + poller.poll(50) + + def _no_raw_input(self): + """Raise StdinNotImplentedError if active frontend doesn't support + stdin.""" + raise StdinNotImplementedError("raw_input was called, but this " + "frontend does not support stdin.") + + def getpass(self, prompt=''): + """Forward getpass to frontends + + Raises + ------ + StdinNotImplentedError if active frontend doesn't support stdin. + """ + if not self._allow_stdin: + raise StdinNotImplementedError( + "getpass was called, but this frontend does not support input requests." + ) + return self._input_request(prompt, + self._parent_ident, + self._parent_header, + password=True, + ) + + def raw_input(self, prompt=''): + """Forward raw_input to frontends + + Raises + ------ + StdinNotImplentedError if active frontend doesn't support stdin. + """ + if not self._allow_stdin: + raise StdinNotImplementedError( + "raw_input was called, but this frontend does not support input requests." + ) + return self._input_request(prompt, + self._parent_ident, + self._parent_header, + password=False, + ) + + def _input_request(self, prompt, ident, parent, password=False): + # Flush output before making the request. + sys.stderr.flush() + sys.stdout.flush() + # flush the stdin socket, to purge stale replies + while True: + try: + self.stdin_socket.recv_multipart(zmq.NOBLOCK) + except zmq.ZMQError as e: + if e.errno == zmq.EAGAIN: + break + else: + raise + + # Send the input request. + content = json_clean(dict(prompt=prompt, password=password)) + self.session.send(self.stdin_socket, u'input_request', content, parent, + ident=ident) + + # Await a response. + while True: + try: + ident, reply = self.session.recv(self.stdin_socket, 0) + except Exception: + self.log.warn("Invalid Message:", exc_info=True) + except KeyboardInterrupt: + # re-raise KeyboardInterrupt, to truncate traceback + raise KeyboardInterrupt + else: + break + try: + value = py3compat.unicode_to_str(reply['content']['value']) + except: + self.log.error("Bad input_reply: %s", parent) + value = '' + if value == '\x04': + # EOF + raise EOFError + return value + + def _at_shutdown(self): + """Actions taken at shutdown by the kernel, called by python's atexit. + """ + # io.rprint("Kernel at_shutdown") # dbg + if self._shutdown_message is not None: + self.session.send(self.iopub_socket, self._shutdown_message, ident=self._topic('shutdown')) + self.log.debug("%s", self._shutdown_message) + [ s.flush(zmq.POLLOUT) for s in self.shell_streams ]