lib/contexts/JupyterContext.js
const fs = require('fs')
const jmp = require('jmp')
const kernelspecs = require('kernelspecs')
const spawnteract = require('spawnteract')
const uuid = require('uuid')
const Context = require('./Context')
/**
* An execution context using Jupyter kernels
*
* This class of execution context acts as a bridge between Stencila and
* Jupyter kernels. It exposes methods of the Stencila `Context` API e.g. `executeEval`
* which delegate execution to a Jupyter kernel. This is done via the
* [Jupyter Messageing Protocol (JMP)](http://jupyter-client.readthedocs.io/en/stable/messaging.html)
* over [ZeroMQ](http://zeromq.org/) sockets.
*
* The `discover` static method should be called initially to find all Jupyter kernels
* currently installed on the machine and update `JupyterContext.spec.kernels`:
*
* JupyterContext.discover()
*
* New Jupyter execution contexts can be constructed using the `language` option which will
* search for a kernel with a matching lowercased `language` property:
*
* new JupyterContext({language:'r'})
*
* Alternively, you can specify a kernel directly:
*
* new JupyterContext({kernel:'ir'})
*
* See https://github.com/jupyter/jupyter/wiki/Jupyter-kernels for a list of available
* Jupyter kernels.
*
* Many thanks to the nteract community for [`kernelspecs`](https://github.com/nteract/kernelspecs) and
* [`spawnteract`](https://github.com/nteract/spawnteract), and to Nicolas Riesco for (`jmp`)[https://github.com/n-riesco/jmp],
* all of which made this implementation far easier!
*/
class JupyterContext extends Context {
/**
* Discover Jupyter kernels on the current machine
*
* Looks for Jupyter kernels that have been installed on the system
* and puts that list in `JupyterContext.spec.kernels` so that
* peers know the capabilities of this "meta-context".
*
* @return {Promise} A promise
*/
static discover () {
// Create a list of kernel names and aliases
return kernelspecs.findAll().then(kernelspecs => {
JupyterContext.spec.kernels = kernelspecs
})
}
/**
* Construct a Jupyter execution context
*
* @param {Object} options Options for specifying which kernel to use
*/
constructor (host, name, options = {}) {
super(host, name)
let kernel = options.kernel
let kernelName = options.name
const kernels = JupyterContext.spec.kernels
const kernelNames = Object.keys(kernels)
if (!kernelNames.length) {
throw new Error('No Jupyter kernels available on this machine')
}
if (kernel && !kernels[kernel]) {
throw new Error(`Jupyter kernel "${kernel}" not available on this machine`)
}
if (kernelName) {
for (let spec of kernels) {
if (spec.name.toLowerCase() === kernelName) {
kernel = spec.name
break
}
}
if (!kernel) {
throw new Error(`No Jupyter kernel on this machine with name "${kernelName}"`)
}
}
if (!kernel) {
if (kernelNames.indexOf('python3') >= 0) kernel = 'python3'
else kernel = kernelNames[0]
}
this.kernel = kernel
this.debug = options.debug || false
this.timeout = options.timeout || -1
}
/**
* Initialize the context
*
* @return {Promise} A promise
*/
initialize () {
if (this._process) return Promise.resolve()
else {
// Options to [child_process.spawn]{@link https://nodejs.org/api/child_process.html#child_process_child_process_spawn_command_args_options}
let options = {}
// Pass `kernels` to `launch()` as an optimization to prevent another kernelspecs search of filesystem
return spawnteract.launch(this.kernel, options, JupyterContext.spec.kernels).then(kernel => {
this._process = kernel.spawn // The running process, from child_process.spawn(...)
this._connectionFile = kernel.connectionFile // Connection file path
this._config = kernel.config // Connection information from the file
this._spec = kernel.kernelSpec
// Unique session id for requests
this._sessionId = uuid()
// Map of requests for handling response messages
this._requests = {}
const origin = this._config.transport + '://' + this._config.ip
// Shell socket for execute, and other, request
this._shellSocket = new jmp.Socket('dealer', 'sha256', this._config.key)
this._shellSocket.connect(origin + ':' + this._config.shell_port)
this._shellSocket.on('message', this._response.bind(this))
// IOPub socket for receiving updates
this._ioSocket = new jmp.Socket('sub', 'sha256', this._config.key)
this._ioSocket.connect(origin + ':' + this._config.iopub_port)
this._ioSocket.on('message', this._response.bind(this))
this._ioSocket.subscribe('') // Subscribe to all topics
// Get kernel info mainly to confirm communication with kernel is
// working
return this._request('kernel_info_request', {}, ['kernel_info_reply']).then(({request, response}) => {
this._kernelInfo = response.content
// This wait seems to be necessary in order for messages to be received on
// `this._ioSocket`.
return new Promise((resolve, reject) => {
setTimeout(resolve, 1000)
})
})
})
}
}
/**
* Finalize the context
*
* @return {Promise} A resolved promise
*/
finalize () {
if (this._shellSocket) {
this._shellSocket.removeAllListeners('message')
this._shellSocket.close()
this._shellSocket = null
}
if (this._ioSocket) {
this._ioSocket.removeAllListeners('message')
this._ioSocket.close()
this._ioSocket = null
}
if (this._process) {
this._process.kill()
this._process = null
}
if (this._connectionFile) {
fs.unlinkSync(this._connectionFile)
this._connectionFile = null
}
this._config = null
this._spec = null
return Promise.resolve()
}
/**
* Execute a cell
*
* For cells with `expr: true` utilises `user_expressions` property of an `execute_request` to
* evaluate expression side-effect free.
*
* @override
*/
async execute (cell) {
// Compile the cell so it has correct structure
cell = await this.compile(cell)
// For expression cells, use `user_expressions`, not `code`
// to ensure there are no side effects (?)
let code
let expressions
if (cell.expr) {
code = ''
expressions = {
'value': cell.source.data
}
} else {
code = cell.source.data
expressions = {}
}
let content = {
// Source code to be executed by the kernel, one or more lines.
'code': code,
// A boolean flag which, if True, signals the kernel to execute
// this code as quietly as possible.
// silent=True forces store_history to be False,
// and will *not*:
// - broadcast output on the IOPUB channel
// - have an execute_result
// The default is False.
'silent': false,
// A boolean flag which, if True, signals the kernel to populate history
// The default is True if silent is False. If silent is True, store_history
// is forced to be False.
'store_history': true,
// A dict mapping names to expressions to be evaluated in the
// user's dict. The rich display-data representation of each will be evaluated after execution.
// See the display_data content for the structure of the representation data.
'user_expressions': expressions,
// Some frontends do not support stdin requests.
// If this is true, code running in the kernel can prompt the user for input
// with an input_request message (see below). If it is false, the kernel
// should not send these messages.
'allow_stdin': false,
// A boolean flag, which, if True, does not abort the execution queue, if an exception is encountered.
// This allows the queued execution of multiple execute_requests, even if they generate exceptions.
'stop_on_error': false
}
return this._request('execute_request', content).then(({request, response}) => {
const msgType = response.header.msg_type
switch (msgType) {
case 'execute_result':
case 'display_data':
// Success! Unbundle the execution result, insert it into cell
// outputs and then return the cell
return this._unbundle(response.content.data).then(value => {
cell.outputs.push({value})
return cell
})
case 'execute_reply':
// We get `execute_reply` messages when there is no
// execution result (e.g. an assignment), or when evaluating
// a user expression
const result = response.content.user_expressions.value
if (result) {
if (result.status === 'ok') {
return this._unbundle(result.data).then(value => {
cell.outputs.push({value})
return cell
})
} else if (result && result.status === 'error') {
cell.messages.push({
type: 'error',
message: result.ename + ': ' + result.evalue
})
return cell
}
} else {
return cell
}
break
case 'error':
// Errrror :( Add an error message to the cell
const error = response.content
cell.messages.push({
type: 'error',
message: error.ename + ': ' + error.evalue
})
return cell
default:
if (this.debug) console.log(`Unhandled message type: ${msgType}`)
}
}).catch(error => {
// Some other error happened...
cell.messages.push({
type: 'error',
message: error.message
})
return cell
})
}
/**
* Send a request message to the kernal
*
* @private
* @param {String} requestType Type of request e.g. 'execute'
* @param {Object} content Content of message
* @param {String} responseTypes Types of response message to resolve
* @returns {Promise} Promise resolving to the {request, response} messages
*/
_request (requestType, content, responseTypes = ['execute_result', 'display_data', 'execute_reply', 'error']) {
return new Promise((resolve, reject) => {
var request = new jmp.Message()
request.idents = []
request.header = {
'msg_id': uuid(),
'username': 'user',
'session': this._sessionId,
'msg_type': requestType,
'version': '5.2'
}
request.parent_header = {}
request.metadata = {}
request.content = content
this._requests[request.header.msg_id] = {
request,
responseTypes,
handler: (response) => resolve({request, response})
}
this._shellSocket.send(request)
// If this request has not been handled before `timeout`
// throw an error
if (this.timeout >= 0) {
setTimeout(() => {
if (this._requests[request.header.msg_id]) {
reject(new Error('Request timed out'))
}
}, this.timeout * 1000)
}
})
}
/**
* Receive a response message from the kernel
*
* @private
* @param {Message} response Response message
*/
_response (response) {
const requestId = response.parent_header.msg_id
const responseType = response.header.msg_type
const request = this._requests[requestId]
if (this.debug) {
console.log('Response: ', requestId, responseType, response.content)
}
// First response matching the request, including response type
// calls handler
if (request && request.responseTypes.indexOf(responseType) > -1) {
request.handler(response)
delete this._requests[requestId]
}
}
/**
* Convert a "MIME bundle" within a JMP message (e.g. a `execute_result` or
* `display data` message) into a data node
* e.g. `{'text/plain': 'Hello'}` to `{type: 'string', data: 'Hello'}`
*
* @private
* @param {Object} bundle A JMP MIME bundle
* @return {Promise} Promise resolving to a data node
*/
_unbundle (bundle) {
return Promise.resolve().then(() => {
const image = bundle['image/png']
if (image) {
return {
type: 'image',
src: 'data:image/png;base64,' + image
}
}
const text = bundle['text/plain']
if (text) {
// Attempt to parse to JSON
try {
return JSON.parse(text)
} catch (error) {
return text
}
}
}).then(value => {
return this.pack(value)
})
}
}
JupyterContext.spec = {
name: 'JupyterContext',
client: 'ContextHttpClient',
kernels: {} // Populated by JupyterContext.setup
}
module.exports = JupyterContext