"""
This is for interpreting and executing an executable document in JSON format. It can be called from the command line:
python3 interpreter.py <infile> <outfile> [document parameters]
See README.md for more information.
Warning: `eval` and `exec` are used to run code in the document. Don't execute documents that you haven't verified
yourself.
"""
import ast
import base64
import datetime
import logging
import sys
import typing
from contextlib import redirect_stdout
from io import BytesIO, TextIOWrapper
import astor
from stencila.schema.types import (
ArrayValidator,
Article,
BooleanValidator,
CodeChunk,
CodeExpression,
Datatable,
DatatableColumn,
Entity,
Function,
ImageObject,
IntegerValidator,
Node,
NumberValidator,
Parameter,
StringValidator,
)
from .errors import CapabilityError
from .parser import (
CodeChunkExecution,
CodeChunkParser,
CodeChunkParseResult,
set_code_error,
simple_code_chunk_parse,
)
if sys.version_info > (3, 8):
AstModule = ast.Module
else:
AstModule = lambda nodelist, type_ignores: ast.Module(nodelist)
try:
import matplotlib.artist
import matplotlib.figure
from matplotlib.cbook import silent_list
# pylint: disable=C0103
MPLFigure = matplotlib.figure.Figure
# pylint: disable=C0103
MPLArtist = matplotlib.artist.Artist
MPL_AVAILABLE = True
except ImportError:
# pylint: disable=R0903
# pylint: disable=R0903
[docs] class MLPArtist: # type: ignore
"""A fake MLPArtist to prevent ReferenceErrors later."""
# pylint: disable=R0903,C0103
[docs] class silent_list: # type: ignore
"""A fake silent_list to prevent ReferenceErrors later."""
MPL_AVAILABLE = False
try:
import numpy
from pandas import DataFrame
PANDAS_AVAILABLE = True
except ImportError:
# pylint: disable=R0903
[docs] class DataFrame: # type: ignore
"""A fake DataFrame to prevent ReferenceErrors later."""
PANDAS_AVAILABLE = False
LOGGER = logging.getLogger(__name__)
LOGGER.addHandler(logging.NullHandler())
CHUNK_PREVIEW_LENGTH = 20
ExecutableCode = typing.Union[CodeChunkExecution, CodeExpression]
StatementRuntime = typing.Tuple[bool, typing.Union[str], typing.Callable]
# Used to indicate that a particular output should not be added to outputs (c.f. a valid `None` value)
SKIP_OUTPUT_SEMAPHORE = object()
[docs]class CodeTimer:
"""
Context handler for timing code, use inside a `with` statement.
"""
_start_time: datetime.datetime
duration: typing.Optional[datetime.timedelta] = None
def __enter__(self):
self.duration = None
self._start_time = datetime.datetime.now()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.duration = datetime.datetime.now() - self._start_time
@property
def duration_seconds(self) -> float:
"""Calculate the duration (time between `__enter__` and `__exit__` in microseconds."""
if not self.duration:
raise RuntimeError("CodeTimer has not yet been run")
return self.duration.total_seconds()
[docs]class StdoutBuffer(TextIOWrapper):
"""
Used for capturing output to stdout.
"""
[docs] def write(self, string: typing.Union[bytes, str]) -> int:
if isinstance(string, str):
return super().write(string)
return super().buffer.write(string)
[docs]class DocumentCompilationResult(typing.NamedTuple):
"""
Stores references to Parameters and Code that is parsed from a Document.
"""
parameters: typing.List[Parameter] = []
code: typing.List[ExecutableCode] = []
[docs]class DocumentCompiler:
"""
Parse an executable document (`Article`) and cache references to its parameters and code nodes.
"""
TARGET_LANGUAGE = "python"
function_depth: int = 0
[docs] def compile(self, source: Article) -> DocumentCompilationResult:
"""
Compile an `Article` by walking it and finding `Parameter`s and `CodeChunk` or `CodeExpression` nodes.
These are set on a `DocumentCompilationResult` which can be passed to the `Interpreter`.
"""
self.function_depth = 0
dcr = DocumentCompilationResult()
self.handle_item(source, dcr)
return dcr
[docs] def handle_item(
self, item: typing.Any, compilation_result: DocumentCompilationResult
) -> None:
"""
Parse any kind of dict, list of Entity.
Returns nothing but updates the passed in `DocumentCompilationResult`.
"""
if isinstance(item, dict):
self.traverse_dict(item, compilation_result)
elif isinstance(item, list):
self.traverse_list(item, compilation_result)
elif isinstance(item, Entity):
if isinstance(item, (CodeChunk, CodeExpression)):
if (
item.programmingLanguage == self.TARGET_LANGUAGE
): # Only add Python code
self.handle_code(item, compilation_result)
elif isinstance(item, Parameter) and self.function_depth == 0:
compilation_result.parameters.append(item)
LOGGER.debug("Adding %s", type(item))
if isinstance(item, Function):
# This prevents treating a `Parameter` found inside `Function` as a document parameter
self.function_depth += 1
self.traverse_dict(item.__dict__, compilation_result)
if isinstance(item, Function):
self.function_depth -= 1
[docs] @staticmethod
def handle_code(
item: typing.Union[CodeChunk, CodeExpression],
compilation_result: DocumentCompilationResult,
) -> None:
"""
Parse a CodeChunk or CodeExpression and add it to `compilation_result.code` list.
"""
if isinstance(item, CodeChunk):
cc_result, item = Interpreter.compile_code_chunk(item)
code_to_add = CodeChunkExecution(item, cc_result)
else:
try:
ast.parse(item.text)
except SyntaxError as exc: # Probably will only be this
set_code_error(item, exc)
code_to_add = item
compilation_result.code.append(code_to_add)
LOGGER.debug("Adding %s", type(item))
[docs] def traverse_dict(
self, _dict: dict, compilation_result: DocumentCompilationResult
) -> None:
"""
Traverse into each item of a dict.
"""
for child in _dict.values():
self.handle_item(child, compilation_result)
[docs] def traverse_list(
self, _list: typing.List, compilation_result: DocumentCompilationResult
) -> None:
"""
Traverse into each element in a list.
"""
for child in _list:
self.handle_item(child, compilation_result)
[docs]class Interpreter:
"""Execute a list of code blocks, maintaining its own `globals` scope for this execution run."""
"""
List of values for the `programmingLanguage` property that are handled
by this interpreter.
"""
PROGRAMMING_LANGUAGES = ["py", "python"]
"""
JSON Schema specification of the types of nodes that this intertpreter's
`compile` and `execute` methods are capable of handling
"""
CODE_CAPABILITIES = {
"type": "object",
"required": ["node"],
"properties": {
"node": {
"type": "object",
"required": ["type", "programmingLanguage"],
"properties": {
"type": {"enum": ["CodeChunk", "CodeExpression"]},
"programmingLanguage": {"enum": PROGRAMMING_LANGUAGES},
},
}
},
}
"""
The manifest of this interpreter's capabilities and addresses.
Conforms to Executa's
[Manifest](https://github.com/stencila/executa/blob/v1.4.0/src/base/Executor.ts#L63)
interface.
"""
MANIFEST = {
"version": 1,
"capabilities": {"compile": CODE_CAPABILITIES, "execute": CODE_CAPABILITIES},
"addresses": {
"stdio": {
"type": "stdio",
"command": sys.executable,
"args": ["-m", "stencila.pyla", "serve"],
}
},
}
globals: typing.Dict[str, typing.Any]
locals: typing.Dict[str, typing.Any]
def __init__(self) -> None:
self.globals = {}
self.locals = {}
[docs] @staticmethod
def compile_code_chunk(
chunk: CodeChunk,
) -> typing.Tuple[CodeChunkParseResult, CodeChunk]:
"""
Compile a `CodeChunk`.
Returns a `CodeChunkParseResult` which is primarily needed for the AST, and the `CodeChunk` itself, which has
its code metadata properties set.
"""
parser = CodeChunkParser()
cc_result = parser.parse(chunk)
chunk.imports = cc_result.combined_code_imports(chunk.imports)
chunk.declares = cc_result.declares
chunk.assigns = cc_result.assigns
chunk.alters = cc_result.alters
chunk.uses = cc_result.uses
chunk.reads = cc_result.reads
if cc_result.error:
set_code_error(chunk, cc_result.error)
return cc_result, chunk
[docs] @staticmethod
def compile(node: Node) -> Node:
"""
Compile a `CodeChunk`.
"""
if isinstance(node, CodeChunk) and Interpreter.is_python_code(node):
return Interpreter.compile_code_chunk(node)[1]
raise CapabilityError("compile", node=node)
[docs] def execute(
self, node: Node, parameter_values: typing.Dict[str, typing.Any] = None
) -> Node:
"""
Execute a `CodeChunk` or `CodeExpression`.
"""
_locals = self.locals
if parameter_values is not None:
_locals.update(parameter_values)
if isinstance(node, CodeExpression):
return self.execute_code_expression(node, _locals)
if isinstance(node, CodeChunk):
cce = simple_code_chunk_parse(node)
return self.execute_code_chunk(cce, _locals)
if isinstance(node, CodeChunkExecution):
return self.execute_code_chunk(node, _locals)
raise CapabilityError("execute", node=node)
[docs] @staticmethod
def is_python_code(code: typing.Union[CodeChunk, CodeExpression]) -> bool:
"""
Is a `CodeChunk` or `CodeExpression` Python code?
"""
return code.programmingLanguage.lower() in Interpreter.PROGRAMMING_LANGUAGES
[docs] def execute_code_expression(
self, expression: CodeExpression, _locals: typing.Dict[str, typing.Any]
) -> CodeExpression:
"""
Evaluate `CodeExpression.text`, and get the result. Catch any exception the occurs.
"""
try:
# pylint: disable=W0123 # Disable warning that eval is being used.
expression.output = self.decode_output(
eval(expression.text, self.globals, _locals)
)
# pylint: disable=W0703 # we really don't know what Exception some eval'd code might raise.
except Exception as exc:
set_code_error(expression, exc)
return expression
[docs] def execute_code_chunk(
self, chunk_execution: CodeChunkExecution, _locals: typing.Dict[str, typing.Any]
) -> CodeChunk:
"""
Execute a `CodeChunk` that has been parsed and stored in a `CodeChunkExecution`.
"""
chunk, parse_result = chunk_execution
if parse_result.chunk_ast is None:
LOGGER.info(
"Not executing CodeChunk without AST: %s",
chunk.text[:CHUNK_PREVIEW_LENGTH],
)
return chunk
cc_outputs: typing.List[typing.Any] = []
duration = 0.0
for statement in parse_result.chunk_ast.body:
duration, error_occurred = self.execute_statement(
statement, chunk, _locals, cc_outputs, duration
)
if error_occurred:
break # stop executing the rest of the statements in the chunk after capturing the outputs
chunk.duration = duration
if MPL_AVAILABLE:
# Because of the way matplotlib might progressively build an image, only keep the last MPL that was
# generated for a specific code chunk
mpl_output_indexes = [
i for i, output in enumerate(cc_outputs) if self.value_is_mpl(output)
]
if mpl_output_indexes:
for i in reversed(mpl_output_indexes[:-1]):
# remove all but the last mpl
cc_outputs.pop(i)
new_last_index = mpl_output_indexes[-1] - (
len(mpl_output_indexes) - 1
) # output will have shifted back
cc_outputs[new_last_index] = self.decode_mpl()
chunk.outputs = cc_outputs
return chunk
[docs] def execute_statement(
self,
statement: ast.stmt,
chunk: CodeChunk,
_locals: typing.Dict[str, typing.Any],
cc_outputs: typing.List[str],
duration: float,
) -> typing.Tuple[float, bool]:
"""
Execute a single AST statement.
The statement will be executed with `eval` or `exec` depending on its type (see `parse_statement_runtime`).
"""
error_occurred = False
capture_result, code_to_run, run_function = self.parse_statement_runtime(
statement
)
stdout = StdoutBuffer(BytesIO(), sys.stdout.encoding)
result = None
with redirect_stdout(stdout):
try:
with CodeTimer() as code_timer:
result = run_function(code_to_run, self.globals, _locals)
duration += code_timer.duration_seconds
# pylint: disable=W0703 # we really don't know what Exception some exec'd code might raise.
except Exception as exc:
error_occurred = True
set_code_error(chunk, exc)
if capture_result and result is not None:
self.add_output(cc_outputs, result)
stdout.seek(0)
std_out_output = stdout.buffer.read()
if std_out_output:
cc_outputs.append(std_out_output.decode("utf8"))
# could save a variable by returning `duration` as `None` if an error occurs but I think that breaks readability
return duration, error_occurred
[docs] def add_output(
self, cc_outputs: typing.List[typing.Any], result: typing.Any
) -> None:
"""
Add an output to cc_outputs.
Should be inside `execute_statement` as it's only used there, but pylint complains about too many local
variables.
"""
decoded = self.decode_output(result)
if decoded != SKIP_OUTPUT_SEMAPHORE:
cc_outputs.append(decoded)
[docs] @staticmethod
def parse_statement_runtime(statement: ast.stmt) -> StatementRuntime:
"""
Determine the information to execute a statement.
Returns the function with which to execute a statement (`eval` or `exec`), whether its output should be captured
or not, and the compiled code to execute.
See the other comments below on how/why these functions are chosen.
"""
if isinstance(statement, ast.Expr):
# An expression is something that we want to capture the result of - this could be something like
# `a + 3` or a function call (not an assignment). It must be executed with `eval`. Since `eval` can't
# execute compiled code, `astor` is used to convert it back to source code.
capture_result = True
run_function = eval
code_to_run = astor.to_source(statement)
else:
# We don't care about the result of this call (it could just be an assignment, update or even function
# definition) so it can be executed with `exec`.
capture_result = False
run_function = exec
mod = AstModule([statement], [])
code_to_run = compile(mod, "<ast>", "exec")
return capture_result, code_to_run, run_function
[docs] @staticmethod
def value_is_mpl(value: typing.Any) -> bool:
"""
Basic type checking to determine if a variable is a matplotlib figure.
"""
if not MPL_AVAILABLE:
return False
return isinstance(value, (MPLFigure, MPLArtist)) or (
isinstance(value, list)
and len(value) == 1
and isinstance(value[0], MPLArtist)
)
[docs] @staticmethod
def decode_mpl() -> ImageObject:
"""
Decode a matplotlib `MPLFigure` or `MPLArtist` into an `ImageObject`.
The `matplotlib.pyplot.savefig` function just saves the current MPL figure that's in the context.
"""
image = BytesIO()
matplotlib.pyplot.savefig(image, format="png")
src = "data:image/png;base64," + base64.encodebytes(image.getvalue()).decode()
return ImageObject(src)
[docs] @staticmethod
def decode_dataframe(data_frame: DataFrame) -> Datatable:
"""
Decode a pandas `DataFrame` into a `Datatable`.
"""
columns = []
for column_name in data_frame.columns:
column = data_frame[column_name]
values = column.tolist()
if column.dtype in (numpy.bool_, numpy.bool8):
validator = BooleanValidator()
values = [bool(row) for row in values]
elif column.dtype in (numpy.int8, numpy.int16, numpy.int32, numpy.int64):
validator = IntegerValidator()
values = [int(row) for row in values]
elif column.dtype in (numpy.float16, numpy.float32, numpy.float64):
validator = NumberValidator()
values = [float(row) for row in values]
elif column.dtype in (numpy.str_, numpy.unicode_,):
validator = StringValidator()
else:
validator = None
columns.append(
DatatableColumn(
column_name,
values,
validator=ArrayValidator(itemsValidator=validator),
)
)
return Datatable(columns)
[docs] def decode_output(self, output: typing.Any) -> typing.Any:
"""
Check if the output is convertible from a special data type to a Stencila type.
If not, just return the original object."""
if MPL_AVAILABLE:
if isinstance(output, silent_list):
return SKIP_OUTPUT_SEMAPHORE
if isinstance(output, list):
return [self.decode_output(item) for item in output]
if isinstance(output, tuple):
return tuple(self.decode_output(item) for item in output)
if isinstance(output, DataFrame):
return self.decode_dataframe(output)
if PANDAS_AVAILABLE:
if isinstance(output, numpy.ndarray):
return output.tolist()
return output