import ast
import base64
import json
import re
import shutil
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Optional
import nbformat
[docs]
@dataclass
class FastAPINotebookBuilder:
notebook_path: str
temp_notebook: Optional[str] = None
assignment_tag: str = ""
require_key: bool = False
verbose: bool = False
def __post_init__(self) -> None:
self.root_path, self.filename = FastAPINotebookBuilder.get_filename_and_root(
self.notebook_path
)
self.total_points = 0.0
self.max_question_points: dict[str, float] = {}
self.run()
[docs]
def run(self) -> None:
# here for easy debugging
if self.temp_notebook is not None:
shutil.copy(
self.notebook_path, self.notebook_path.replace(".ipynb", "_temp.ipynb")
)
self.temp_notebook = self.notebook_path.replace(".ipynb", "_temp.ipynb")
else:
self.temp_notebook = self.notebook_path
self.assertion_tests_dict = self.question_dict()
self.question_points = self.question_points_by_part = (
self.get_question_points_by_part(self.assertion_tests_dict)
)
self.add_points_to_notebook()
self.add_api_code()
[docs]
def add_points_to_notebook(self) -> None:
self.add_question_points_to_notebook()
self.add_question_part_points_to_notebook()
[docs]
def add_question_points_to_notebook(self) -> None:
for question, points in self.question_points["question_sums"].items():
index, source = self.find_first_markdown_cell_with(
points["current_key"], points["previous_key"], "## "
)
# if the question is not found, skip it
if index is None:
continue
# add the question points to the question description
source = self.get_cell_source(self.temp_notebook, index)
modified_source = FastAPINotebookBuilder.add_text_after_double_hash(
source,
f"Question {points['question_number']} (Points: {points['total_points']:.2f}):",
)
self.replace_cell_source(index, modified_source)
[docs]
def add_question_part_points_to_notebook(self) -> None:
for question, points in self.question_points["part_sums"].items():
for part, points in points.items():
index, source = self.find_first_markdown_cell_with(
points["current_key"], points["previous_key"], "### "
)
# if the question part is not found, skip it
if index is None:
continue
# add the question part points to the question part description
source = self.get_cell_source(self.temp_notebook, index)
modified_source = FastAPINotebookBuilder.add_text_after_double_hash(
source,
f"Question {points['question_number']}-Part {points['question_part_number']} (Points: {points['total_points']:.2f}):",
"### ",
)
self.replace_cell_source(index, modified_source)
[docs]
def find_first_markdown_cell_with(
self, start_index: int, end_index: int = 0, code_to_find: str = "## "
):
"""
Finds the first markdown cell going backwards from the given start_index
to the given end_index where the first line starts with '##'.
Args:
- start_index (int): The index to start searching from.
- end_index (int): The index to stop searching at.
Returns:
- tuple: The index of the found markdown cell and its source content.
"""
with open(self.temp_notebook, "r", encoding="utf-8") as f:
nb_data = json.load(f)
for idx in range(start_index, end_index - 1, -1):
cell = nb_data.get("cells", [])[idx]
if cell["cell_type"] == "markdown" and cell.get("source", [])[0].startswith(
code_to_find
):
return idx, cell.get("source", [])
return None, None # Return None if no such markdown cell is found
[docs]
@staticmethod
def get_question_points_by_part(question_dict: dict) -> dict:
"""
Get the points for each part of a question.
"""
# Compute sum for each question and store the previous cell number (key)
question_sums = {}
prev_question = 0 # Initialize previous cell number as 0
for key, entry in question_dict.items():
question_number = entry["question_number"]
if question_number not in question_sums:
question_sums[question_number] = {
"total_points": 0,
"previous_key": prev_question,
"current_key": None,
"question_number": question_number,
}
prev_question = key # Update previous key for the next new question
question_sums[question_number]["total_points"] += entry["points"]
question_sums[question_number]["current_key"] = (
key # Update current key to the end of the current question
)
# Compute sum for each question part and store the previous cell number (key)
part_sums = {}
prev_part = 0 # Initialize previous cell number as 0
for key, entry in question_dict.items():
question = entry["question"]
part = entry["question_part"]
if part is None:
continue
if question not in part_sums:
part_sums[question] = {}
if part not in part_sums[question]:
part_sums[question][part] = {
"total_points": 0,
"previous_key": prev_part,
"current_key": key,
"question_number": entry["question_number"],
"question_part_number": entry["question_part"],
}
prev_part = key # Update previous key for the next new question part
part_sums[question][part]["total_points"] += entry["points"]
# Combine results into a dictionary
result = {"question_sums": question_sums, "part_sums": part_sums}
# Return result dictionary
return result
[docs]
@staticmethod
def conceal_tests(cell_source):
"""
Takes a list of code lines, detects blocks between `# BEGIN HIDE` and `# END HIDE`,
encodes them in Base64, and replaces them with an `exec()` statement.
Returns a new list of lines with the concealed blocks.
"""
concealed_lines = []
hide_mode = False
hidden_code = []
for line in cell_source:
if "# BEGIN HIDE" in line:
hide_mode = True
hidden_code = [] # Start a new hidden block
concealed_lines.append(line) # Keep the marker for clarity
continue
elif "# END HIDE" in line:
hide_mode = False
# Encode the entire block
encoded_block = base64.b64encode(
"\n".join(hidden_code).encode()
).decode()
concealed_lines.append(
f'exec(base64.b64decode("{encoded_block}").decode()) # Obfuscated\n'
)
concealed_lines.append(line) # Keep the marker for clarity
continue
if hide_mode:
hidden_code.append(line.strip()) # Collect hidden code
else:
concealed_lines.append(line)
return concealed_lines
[docs]
def add_api_code(self) -> None:
self.compute_max_points_free_response()
for i, (cell_index, cell_dict) in enumerate(self.assertion_tests_dict.items()):
if self.verbose:
print(
f"Processing cell {cell_index + 1}, {i} of {len(self.assertion_tests_dict)}"
)
cell = self.get_cell(cell_index)
cell_source = FastAPINotebookBuilder.add_import_statements_to_tests(
cell["source"],
require_key=self.require_key,
assignment_tag=self.assignment_tag,
)
cell_source = FastAPINotebookBuilder.conceal_tests(cell_source)
last_import_line_ind = FastAPINotebookBuilder.find_last_import_line(
cell_source
)
updated_cell_source = []
updated_cell_source.extend(cell_source[: last_import_line_ind + 1])
if cell_dict["is_first"]:
updated_cell_source.extend(
self.construct_first_cell_question_header(cell_dict)
)
updated_cell_source.extend(["\n"])
updated_cell_source.extend(
FastAPINotebookBuilder.construct_question_info(cell_dict)
)
updated_cell_source.extend(cell_source[last_import_line_ind + 1 :])
updated_cell_source.extend(["\n"])
updated_cell_source.extend(
FastAPINotebookBuilder.construct_graders(cell_dict)
)
updated_cell_source.extend(["\n"])
updated_cell_source.extend(
["earned_points = float(os.environ.get('EARNED_POINTS', 0))\n"]
)
updated_cell_source.extend(["earned_points += score\n"])
short_filename = self.filename.split(".")[0].replace("_temp", "")
updated_cell_source.extend(
[
f'log_variable("{short_filename}",f"{{score}}, {{max_score}}", question_id)\n'
]
)
updated_cell_source.extend(
["os.environ['EARNED_POINTS'] = str(earned_points)\n"]
)
updated_cell_source.extend(
FastAPINotebookBuilder.construct_update_responses(cell_dict)
)
# # code to reset matplotlib
# updated_cell_source.extend(
# ["_ = matplotlib.pyplot.close('all')\n"]
# )
self.replace_cell_source(cell_index, updated_cell_source)
[docs]
def find_question_description(self, search_string):
with open(self.temp_notebook, "r", encoding="utf-8") as f:
nb_data = json.load(f)
found_raw = False
for idx, cell in enumerate(nb_data.get("cells", [])):
if (
cell["cell_type"] == "raw"
and any("# BEGIN QUESTION" in line for line in cell.get("source", []))
and any(search_string in line for line in cell.get("source", []))
):
found_raw = True
elif found_raw and cell["cell_type"] == "markdown":
return idx, cell.get(
"source", []
) # Return the index of the first matching markdown cell
return None, None # Return None if no such markdown cell is found
[docs]
def get_max_question_points(self, cell_dict) -> float:
return sum(
cell["points"]
for cell in self.assertion_tests_dict.values()
if cell["question"] == cell_dict["question"]
)
[docs]
@staticmethod
def add_text_after_double_hash(markdown_source, insert_text, hash_prefix="## "):
"""
Adds insert_text immediately after the first '##' in the first line that starts with '##'.
Args:
- markdown_source (list of str): The list of lines in the markdown cell.
- insert_text (str): The text to be inserted.
Returns:
- list of str: The modified markdown cell content.
"""
modified_source = []
inserted = False
for line in markdown_source:
if not inserted and line.startswith(hash_prefix):
modified_source.append(
f"{hash_prefix}{insert_text} {line[len(hash_prefix) :]}"
) # Insert text after hash_prefix
inserted = True # Ensure it only happens once
else:
modified_source.append(line)
return modified_source
[docs]
def compute_max_points_free_response(self) -> None:
for cell_dict in self.assertion_tests_dict.values():
# gets the question name from the first cell to not double count
if cell_dict["is_first"]:
# get the max points for the question
max_question_points = self.get_max_question_points(cell_dict)
# store the max points for the question
self.max_question_points[f"{cell_dict['question']}"] = (
max_question_points
)
self.total_points += max_question_points
[docs]
@staticmethod
def construct_update_responses(cell_dict: dict) -> list[str]:
update_responses = []
logging_variables = cell_dict["logging_variables"]
for logging_variable in logging_variables:
update_responses.append(
f"responses = update_responses(question_id, str({logging_variable}))\n"
)
return update_responses
[docs]
@staticmethod
def split_list_at_marker(
input_list: list[str], marker: str = """# END TEST CONFIG"""
) -> tuple[list[str], list[str]]:
"""
Splits a list into two parts at the specified marker string.
Args:
input_list (list): The list to split.
marker (str): The string at which to split the list.
Returns:
tuple: A tuple containing two lists. The first list contains the elements
before the marker, and the second list contains the elements after
the marker (excluding the marker itself).
"""
if marker in input_list:
index = input_list.index(marker)
return input_list[: index + 1], input_list[index + 2 :]
else:
return (
input_list,
[],
) # If the marker is not in the list, return the original list and an empty list
[docs]
@staticmethod
def construct_graders(cell_dict: dict) -> list[str]:
# Generate Python code
added_code = [
"if "
+ " and ".join(f"({test})" for test in cell_dict["assertions"])
+ ":\n"
]
added_code.append(f" score = {cell_dict['points']}\n")
return added_code
[docs]
@staticmethod
def construct_question_info(cell_dict: dict) -> list[str]:
question_info = []
question_id = cell_dict["question"] + "-" + str(cell_dict["test_number"])
question_info.append(f'question_id = "{question_id}"' + "\n")
question_info.append(f"max_score = {cell_dict['points']}\n")
question_info.append("score = 0\n")
return question_info
[docs]
@staticmethod
def insert_list_at_index(
original_list: list[str],
insert_list: list[str],
index: int,
line_break: bool = True,
inplace_line_break: bool = True,
) -> list[str]:
"""
Inserts a list into another list at a specific index.
Args:
original_list (list): The original list.
insert_list (list): The list to insert.
index (int): The position at which to insert the new list.
Returns:
list: A single combined list with the second list inserted at the specified index.
"""
if inplace_line_break:
insert_list = [s + "\n" for s in insert_list]
if line_break:
if inplace_line_break:
insert_list = ["\n"] + insert_list
else:
insert_list = ["\n"] + insert_list + ["\n"]
return original_list[:index] + insert_list + original_list[index:]
[docs]
@staticmethod
def add_import_statements_to_tests(
cell_source: list[str], require_key: bool = False, assignment_tag=None
) -> list[str]:
"""
Adds the necessary import statements to the first cell of the notebook.
"""
end_test_config_line = "# END TEST CONFIG"
# Imports to add
imports = [
"from pykubegrader.telemetry import (\n",
" ensure_responses,\n",
" log_variable,\n",
" score_question,\n",
" submit_question,\n",
" telemetry,\n",
" update_responses,\n",
")\n",
"import os\n",
"import base64\n",
"import matplotlib\n",
]
if require_key:
imports.append(
f"from pykubegrader.tokens.validate_token import validate_token\nvalidate_token(assignment='{assignment_tag}')\n"
)
imports.append("import matplotlib\n")
imports.append("matplotlib.use('Agg')\n")
for i, line in enumerate(cell_source):
if end_test_config_line in line:
# Insert the imports immediately after the current line
cell_source[i + 1 : i + 1] = [
"\n"
] + imports # Add a blank line for readability
return cell_source # Exit the loop once the imports are inserted
raise ValueError("End of test configuration not found")
# TODO: `Any` return not good; would be better to specify return type(s)
[docs]
@staticmethod
def get_filename_and_root(path: str) -> tuple[Path, str]:
path_obj = Path(path).resolve() # Resolve the path to get an absolute path
root_path = path_obj.parent # Get the parent directory
filename = path_obj.name # Get the filename
return root_path, filename
# TODO: `Any` return not good; would be better to specify return type(s)
[docs]
def get_cell(self, cell_index: int) -> Any:
if not self.temp_notebook:
raise ValueError("No temporary notebook file path provided")
with open(self.temp_notebook, "r", encoding="utf-8") as f:
notebook = json.load(f)
if "cells" in notebook and len(notebook["cells"]) > cell_index:
return notebook["cells"][cell_index]
else:
return None
[docs]
def replace_cell_source(self, cell_index: int, new_source: str | list[str]) -> None:
"""
Replace the source code of a specific Jupyter notebook cell.
Args:
cell_index (int): Index of the cell to be modified (0-based).
new_source (str): New source code to replace the cell's content.
"""
# Load the notebook
if not self.temp_notebook:
raise ValueError("No temporary notebook file path provided")
with open(self.temp_notebook, "r", encoding="utf-8") as f:
notebook = nbformat.read(f, as_version=4)
# Check if the cell index is valid
if cell_index >= len(notebook.cells) or cell_index < 0:
raise IndexError(
f"Cell index {cell_index} is out of range for this notebook."
)
# Replace the source code of the specified cell
notebook.cells[cell_index]["source"] = new_source
# Save the notebook
with open(self.temp_notebook, "w", encoding="utf-8") as f:
nbformat.write(notebook, f)
print(f"Updated notebook saved to {self.temp_notebook}")
[docs]
@staticmethod
def find_last_import_line(cell_source: list[str]) -> int:
"""
Finds the index of the last line with an import statement in a list of code lines,
including multiline import statements.
Args:
cell_source (list): List of strings representing the code lines.
Returns:
int: The index of the last line with an import statement, or -1 if no import is found.
"""
last_import_index = -1
is_multiline_import = False # Flag to track if we're inside a multiline import
for i, line in enumerate(cell_source):
stripped_line = line.strip()
if is_multiline_import:
# Continue tracking multiline import
if stripped_line.endswith("\\") or (
stripped_line and not stripped_line.endswith(")")
):
last_import_index = i # Update to current line
continue
else:
is_multiline_import = False # End of multiline import
last_import_index = i # Update to current line
# Check for single-line or start of multiline imports
if stripped_line.startswith("import") or stripped_line.startswith("from"):
last_import_index = i
# Check if it's a multiline import
if stripped_line.endswith("\\") or "(" in stripped_line:
is_multiline_import = True
return last_import_index
[docs]
@staticmethod
def tag_questions(cells_dict: dict) -> dict:
"""
Adds 'is_first' and 'is_last' boolean flags to the cells based on their position
within the group of the same question. All cells will have both flags.
Args:
cells_dict (dict): A dictionary where keys are cell IDs and values are cell details.
Returns:
dict: The modified dictionary with 'is_first' and 'is_last' flags added.
"""
if not isinstance(cells_dict, dict):
raise ValueError("Input must be a dictionary.")
# Ensure all cells have the expected structure
for key, cell in cells_dict.items():
if not isinstance(cell, dict):
raise ValueError(f"Cell {key} is not a dictionary.")
if "question" not in cell:
raise KeyError(f"Cell {key} is missing the 'question' key.")
# Group the keys by question name
question_groups: dict = {}
for key, cell in cells_dict.items():
question = cell.get(
"question"
) # Use .get() to avoid errors if key is missing
if question not in question_groups:
question_groups[question] = []
question_groups[question].append(key)
# Add 'is_first' and 'is_last' flags to all cells
for keys in question_groups.values():
test_number = 1
for i, key in enumerate(keys):
cells_dict[key]["is_first"] = i == 0
cells_dict[key]["is_last"] = i == len(keys) - 1
cells_dict[key]["test_number"] = test_number
test_number += 1
return cells_dict
[docs]
def question_dict(self) -> dict:
"""
Builds a dictionary of question information from the notebook.
Returns:
dict: A dictionary containing question information.
"""
# Check if the temporary notebook file path is provided
if not self.temp_notebook:
raise ValueError("No temporary notebook file path provided")
# Check if the file exists
notebook_path = Path(self.temp_notebook)
# Check if the file exists
if not notebook_path.exists():
raise FileNotFoundError(f"The file {notebook_path} does not exist.")
# Read the notebook
notebook = self.read_notebook(notebook_path)
# Initialize the results dictionary
results_dict = {}
question_name = None # At least define the variable up front
for cell_index, cell in enumerate(notebook.get("cells", [])):
if cell.get("cell_type") == "raw":
source = "".join(cell.get("source", ""))
if source.strip().startswith("# BEGIN QUESTION"):
question_name, question_number, question_part = (
FastAPINotebookBuilder.extract_question_information(source)
)
elif cell.get("cell_type") == "code":
source = "".join(cell.get("source", ""))
if source.strip().startswith('""" # BEGIN TEST CONFIG'):
# Extract the assertion test source
logging_variables, assertions, comments, points_value = (
self.extract_assertion_test_source(cell, source)
)
# Add to results dictionary
results_dict[cell_index] = {
"assertions": assertions,
"comments": comments,
"question": question_name,
"question_number": question_number,
"question_part": question_part,
"points": points_value,
"logging_variables": logging_variables,
}
results_dict = FastAPINotebookBuilder.tag_questions(results_dict)
return results_dict
[docs]
def read_notebook(self, notebook_path):
with open(notebook_path, "r", encoding="utf-8") as f:
notebook = json.load(f)
return notebook
[docs]
def get_cell_source(self, notebook_path, cell_index):
notebook = self.read_notebook(notebook_path)
return notebook["cells"][cell_index]["source"]