Source code for pykubegrader.log_parser.parse

from dataclasses import dataclass, field
from typing import Any, Optional, TypedDict


[docs] class LogParserResults(TypedDict): student_information: dict[str, str] week: Optional[str] week_num: Optional[int] assignment_type: Optional[str] assignment_information: dict[str, Any] assignment_scores: dict[str, Any]
[docs] @dataclass class LogParser: """ A class for parsing chronological logs and extracting information. Handles both assignment info and question-level details. """ log_lines: list[str] week_tag: Optional[str] = None student_info: dict[str, str] = field(default_factory=dict) assignments: dict[str, dict] = field(default_factory=dict)
[docs] def parse_logs(self) -> None: """ Main method to parse logs and populate student_info and assignments. """ unique_students: set[str] = set() self._find_all_questions() for line in reversed( self.log_lines ): # Process in reverse to get the most recent entries first if self._is_student_info(line): self._process_student_info(line, unique_students) elif ( any(item in line for item in self.all_questions) and "total-points" in line ): self._process_assignment_header(line) # process assignment entries after all headers have been processed for line in reversed(self.log_lines): if ( any(item in line for item in self.all_questions) and "total-points" not in line ): self._process_assignment_entry(line)
def _find_all_questions(self) -> None: """ Finds all questions in the log_lines and returns a list of them. """ questions = [] for line in self.log_lines: if self.week_tag and self.week_tag in line: parts = line.split(",") question_tag = parts[3].strip() if question_tag not in questions: questions.append(question_tag) self.all_questions = questions def _is_student_info(self, line: str) -> bool: """ Checks if the line contains student information. """ return line.startswith("Student Info") def _process_student_info(self, line: str, unique_students: set) -> None: """ Processes a line containing student information. Raises an error if multiple unique students are found. """ parts = line.split(", ") # Example: "Student Info, 790, jovyan, 2024-12-27 19:40:10" student_name = parts[2].strip() unique_students.add(student_name) if len(unique_students) > 1: raise ValueError( f"Error: Multiple unique student names found: {unique_students}" ) # Only set student_info once if not self.student_info: self.student_info = { "student_id": parts[1].strip(), "username": student_name, "timestamp": parts[3].strip(), } def _process_assignment_header(self, line: str) -> None: parts = line.split(",") assignment_tag = parts[0].strip() if assignment_tag.startswith("total-points"): # Handle total-points lines as assignment info total_points_value = self._extract_total_points(parts) timestamp = parts[-1].strip() notebook_name = parts[3].strip() if notebook_name not in self.assignments: self.assignments[notebook_name] = { "max_points": total_points_value, "notebook": notebook_name, "assignment": self.week_tag, "total_score": 0.0, "latest_timestamp": timestamp, "questions": {}, # Ensure 'questions' key is initialized } elif self.assignments[notebook_name]["latest_timestamp"] < timestamp: self.assignments[notebook_name]["max_points"] = total_points_value self.assignments[notebook_name]["latest_timestamp"] = timestamp def _process_assignment_entry(self, line: str) -> None: """ Processes a line containing an assignment entry. Adds it to the assignments dictionary. """ parts = line.split(",") assignment_tag = parts[0].strip() question_tag = parts[1].strip() score_earned = float(parts[2].strip()) if len(parts) > 2 else 0.0 score_possible = float(parts[3].strip()) if len(parts) > 3 else 0.0 timestamp = parts[-1].strip() # Ensure assignment entry exists if assignment_tag not in self.assignments: self.assignments[assignment_tag] = { "questions": {}, "total_score": 0.0, "latest_timestamp": timestamp, } # Add or update the question with the most recent timestamp questions = self.assignments[assignment_tag]["questions"] if ( question_tag not in questions or timestamp > questions[question_tag]["timestamp"] ): questions[question_tag] = { "score_earned": score_earned, "score_possible": score_possible, "timestamp": timestamp, } # Update the latest timestamp if this one is more recent if timestamp > self.assignments[assignment_tag]["latest_timestamp"]: self.assignments[assignment_tag]["latest_timestamp"] = timestamp def _extract_total_points(self, parts: list[str]) -> Optional[float]: """ Extracts the total-points value from the parts array of a total-points line. """ try: return float(parts[1].strip()) except (ValueError, IndexError): return None
[docs] def calculate_total_scores(self) -> None: """ Calculates total scores for each assignment by summing the 'score_earned' of its questions, and sets 'total_points' if it was not specified. """ for data in self.assignments.values(): # Sum of all question score_earned total_score = sum(q["score_earned"] for q in data["questions"].values()) data["total_score"] = total_score
[docs] def get_results(self) -> LogParserResults: """ Returns the parsed results as a hierarchical dictionary with three sections: """ return { "student_information": self.student_info, "week": self.week_tag, "week_num": ( int(self.week_tag.split("-")[0].strip().replace("week", "")) if self.week_tag else None ), "assignment_type": ( self.week_tag.split("-")[1].strip() if self.week_tag else None ), "assignment_information": { assignment: { "latest_timestamp": data["latest_timestamp"], "total_score": data["total_score"], "max_points": data.get("max_points", 0.0), } for assignment, data in self.assignments.items() }, "assignment_scores": { assignment: { "questions": data["questions"], "total_score": data["total_score"], } for assignment, data in self.assignments.items() }, }