import base64
import json
import os
import re
import sys
from datetime import datetime
import nacl.public
import numpy as np
import requests
from requests.auth import HTTPBasicAuth
#
# Primary function
#
[docs]
def validate_logfile(
filepath: str,
assignment_id: str,
question_max_scores: dict[int, int],
free_response_questions: int = 0,
key_box=None,
) -> None:
username = os.getenv("user_name_student")
password = os.getenv("keys_student")
base_url = os.getenv("DB_URL")
if not username or not password or not base_url:
sys.exit("Necessary environment variables are not set")
login_data = {
"username": username,
"password": password,
}
if key_box is None:
# Generate box from private and public keys
key_box = generate_keys()
decrypted_log, log_reduced = read_logfile(filepath, key_box)
# For debugging; to be commented out
# with open(".output_reduced.log", "w") as f:
# f.writelines(f"{item}\n" for item in log_reduced)
# Initialize question scores based on max scores
question_scores = {key: 0 for key in question_max_scores}
# Iterate over log to find the last entries for student info fields
# This gets the student name etc.
last_entries: dict[str, str | float] = {}
for entry in log_reduced:
# Split on commas and strip whitespace
parts = [part.strip() for part in entry.split(",")]
# This just overwrites, so the last iteration sticks
if parts[0] == "info" and len(parts) == 4:
field_name = parts[1]
field_value = parts[2]
last_entries[field_name] = field_value
# For debugging; to be commented out
# print(f"Keys in last_entries dict: {last_entries.keys()}")
# Check if the assignment id is in the log file
if "assignment" not in last_entries or assignment_id != last_entries["assignment"]:
sys.exit(
"Your log file is not for the correct assignment. Please submit the correct log file."
)
# TODO: Revisit this; we may no longer require as much info
required_student_info = ["drexel_id", "first_name", "last_name", "drexel_email"]
for field in required_student_info:
if last_entries.get(field) is None:
sys.exit("Missing required student information")
# Initialize code and data lists
log_execution: list[str] = []
log_data: list[str] = []
# Splitting the data into code and responses
for entry in decrypted_log:
# Splitting the data into code and responses
if "code run:" in entry:
log_execution.append(entry)
else:
log_data.append(entry)
# Extracting timestamps and converting them to datetime objects
# TODO: Check why we're using log_reduced instead of decrypted_log
timestamps = [
datetime.strptime(row.split(",")[-1].strip(), "%Y-%m-%d %H:%M:%S")
for row in log_reduced
]
# Getting the earliest and latest times
last_entries["start_time"] = min(timestamps).strftime("%Y-%m-%d %H:%M:%S")
last_entries["end_time"] = max(timestamps).strftime("%Y-%m-%d %H:%M:%S")
delta = max(timestamps) - min(timestamps)
minutes_rounded = round(delta.total_seconds() / 60, 2)
last_entries["elapsed_minutes"] = minutes_rounded
# Collect student info dict
student_info = {key.upper(): value for key, value in last_entries.items()}
# Write info dict to info.json
# TODO: Try/except block here?
with open("info.json", "w") as file:
# print("Writing to info.json")
json.dump(student_info, file)
# Modified list comprehension to filter as per the criteria
free_response = [
entry
for entry in log_reduced
if entry.startswith("q")
and entry.split("_")[0][1:].isdigit()
and int(entry.split("_")[0][1:]) > free_response_questions
]
# Initialize a dictionary to hold question entries.
q_entries = []
# Iterate over the number of free response questions.
for i in range(1, free_response_questions + 1):
# Collect entries for each question in a list.
entries = [
entry
for j in range(1, get_entries_len(log_data, i))
if (entry := get_last_entry(log_data, f"q{i}_{j}")) != ""
]
# Store the list of entries in the dictionary, keyed by question number.
q_entries += entries
q_entries += free_response
# Parse the data
parsed_data: list[list[str]] = [
[part.strip() for part in line.split(",")] for line in q_entries
]
unique_question_IDs = set(row[0] for row in parsed_data)
# Initialize a dictionary to hold the maximum score for each unique value
max_scores = {unique_value: 0 for unique_value in unique_question_IDs}
# Loop through each row in the data
for score_entry in parsed_data:
unique_value = score_entry[0]
score = int(score_entry[1])
# possible_score = float(row[3])
# Update the score if it's higher than the current maximum
if score > max_scores[unique_value]:
max_scores[unique_value] = score
# Loop through the max_scores dictionary and sum scores for each question
for unique_value, score in max_scores.items():
# Extract question number (assuming it's the number immediately after 'q')
question_number = int(unique_value.split("_")[0][1:])
question_scores[question_number] += score
# Sorting the dictionary by keys
question_max_scores = {
key: int(np.round(question_max_scores[key]))
for key in sorted(question_max_scores)
}
# Sorting the dictionary by keys
question_scores = {
key: int(np.round(question_scores[key])) for key in sorted(question_scores)
}
# Creating the dictionary structure
result_structure: dict[str, list[dict]] = {
"tests": [],
}
# Adding entries for each question
for question_number in question_scores.keys():
question_entry = {
"name": f"Question {question_number}",
"score": question_scores[question_number],
"max_score": question_max_scores[question_number],
# "visibility": "visible",
# "output": "",
}
result_structure["tests"].append(question_entry)
# Write results dict to results.json
with open("results.json", "w") as file:
print("Writing to results.json")
json.dump(result_structure, file, indent=4)
login_url = f"{base_url}/login"
verify_login(login_data, login_url)
# The file to be uploaded. Ensure the path is correct.
file_path = "results.json"
# Construct data payload as a dict
final_data = {
"assignment": assignment_id,
"student_email": last_entries.get("drexel_email"),
# "original_file_name": file_path,
"start_time": last_entries["start_time"],
"end_time": last_entries["end_time"],
# "flag": last_entries["flag"],
# "submission_mechanism": "jupyter_notebook",
# "log_file": loginfo,
"scores": result_structure["tests"],
}
# Files to be uploaded. The key should match the name expected by the server.
_files = {
"file": (file_path, open(file_path, "rb")),
}
post_url = f"{base_url}/upload-score"
# Make the POST request with data and files
response = requests.post(
url=post_url,
json=final_data,
# files=files,
auth=HTTPBasicAuth(login_data["username"], login_data["password"]),
)
# Print messages for the user
submission_message(response)
[docs]
def read_logfile(filepath: str, key_box=None) -> tuple[list[str], list[str]]:
if key_box is None:
key_box = generate_keys()
with open(filepath, "r") as logfile:
encrypted_lines = logfile.readlines()
decrypted_log: list[str] = []
for line in encrypted_lines:
if "Encrypted Output: " in line:
trimmed = line.split("Encrypted Output: ")[1].strip()
decoded = base64.b64decode(trimmed)
decrypted = key_box.decrypt(decoded).decode()
decrypted_log.append(decrypted)
# Decoding the log file
# data_: list[str] = drexel_jupyter_logger.decode_log_file(self.filepath, key=key)
# _loginfo = str(decrypted_log)
# Where possible, we should work with this reduced list of relevant entries
# Here we take only lines with student info or question scores
log_reduced = [
entry
for entry in decrypted_log
if re.match(r"info,", entry) or re.match(r"q\d+_\d+,", entry)
]
return decrypted_log, log_reduced
#
# Helper functions
#
[docs]
def generate_keys() -> nacl.public.Box:
with open(".server_private_key.bin", "rb") as priv_file:
server_private_key_bytes = priv_file.read()
server_priv_key = nacl.public.PrivateKey(server_private_key_bytes)
with open(".client_public_key.bin", "rb") as pub_file:
client_public_key_bytes = pub_file.read()
client_pub_key = nacl.public.PublicKey(client_public_key_bytes)
box = nacl.public.Box(server_priv_key, client_pub_key)
return box
[docs]
def get_entries_len(data: list[str], question_number: int) -> int:
"""function to get the unique entries by length
Args:
data (list): list of all the data records
question_number (int): question number to evaluate
Returns:
int: length of the unique entries
"""
# Set for unique qN_* values
unique_qN_values = set()
for entry in data:
if entry.startswith(f"q{question_number}_"):
# Split the string by commas and get the value part
parts = [part.strip() for part in entry.split(",")]
# The value is the third element after splitting (?)
value = parts[0].split("_")[1]
unique_qN_values.add(value)
return len(unique_qN_values) + 1
[docs]
def get_last_entry(data: list[str], field_name: str) -> str:
for entry in data[::-1]:
parts = [part.strip() for part in entry.split(",")]
if parts[0] == field_name:
return entry
return ""
[docs]
def submission_message(response: requests.Response) -> None:
if response.status_code == 200:
print("Data successfully uploaded to the server")
print(response.text)
else:
print(f"Failed to upload data. Status code: {response.status_code}")
print(response.text)
print(
"There is something wrong with your log file or your submission. Please contact an instructor for help."
)
if os.path.exists("results.json"):
# os.remove("results.json")
# Let's keep results.json for now, for debugging
pass
else:
print("results.json was not present")
[docs]
def verify_login(login_data: dict[str, str], login_url: str) -> None:
login_response = requests.post(
login_url, auth=HTTPBasicAuth(login_data["username"], login_data["password"])
)
if login_response.status_code == 200:
print("Login successful")
else:
Exception("Login failed")