import base64
import json
import os
import re
import sys
from datetime import datetime
import nacl.public
import numpy as np
import requests
from requests.auth import HTTPBasicAuth
[docs]
def validate_logfile(
filepath: str,
assignment_id: str,
question_max_scores: dict[int, int],
free_response_questions=0,
username="student",
password="capture",
post_url="http://localhost:8000/upload-score",
login_url="http://localhost:8000/login",
) -> None:
login_data = {
"username": username,
"password": password,
}
with open("server_private_key.bin", "rb") as priv_file:
server_private_key_bytes = priv_file.read()
server_priv_key = nacl.public.PrivateKey(server_private_key_bytes)
with open("client_public_key.bin", "rb") as pub_file:
client_public_key_bytes = pub_file.read()
client_pub_key = nacl.public.PublicKey(client_public_key_bytes)
box = nacl.public.Box(server_priv_key, client_pub_key)
with open(filepath, "r") as logfile:
encrypted_lines = logfile.readlines()
data_: list[str] = []
for line in encrypted_lines:
if "Encrypted Output: " in line:
trimmed = line.split("Encrypted Output: ")[1].strip()
decoded = base64.b64decode(trimmed)
decrypted = box.decrypt(decoded).decode()
data_.append(decrypted)
# Decoding the log file
# data_: list[str] = drexel_jupyter_logger.decode_log_file(self.filepath, key=key)
_loginfo = str(data_)
# Where possible, we should work with this reduced list of relevant entries
data_reduced = [
entry
for entry in data_
if re.match(r"info,", entry) or re.match(r"q\d+_\d+,", entry)
]
# For debugging; to be commented out
with open(".output_reduced.log", "w") as f:
f.writelines(f"{item}\n" for item in data_reduced)
# Initialize the question scores and max scores
question_max_scores = question_max_scores
question_scores = {key: 0 for key in question_max_scores}
# Parsing the data to find the last entries for required fields
# This gets the student name etc.
last_entries: dict[str, str | float] = {}
for entry in data_reduced:
parts = [part.strip() for part in entry.split(",")]
if parts[0] == "info" and len(parts) == 4:
field_name = parts[1]
field_value = parts[2]
last_entries[field_name] = field_value
# For debugging; to be commented out
# print(f"Keys in last_entries dict: {last_entries.keys()}")
# Check if the assignment id is in the log file
if "assignment" not in last_entries or assignment_id != last_entries["assignment"]:
sys.exit(
"Your log file is not for the correct assignment. Please submit the correct log file."
)
required_student_info = ["drexel_id", "first_name", "last_name", "drexel_email"]
for field in required_student_info:
if last_entries.get(field) is None:
sys.exit(
"You must submit your student information before you start the exam. Please submit your information and try again."
)
# Initialize code and data lists
code: list[str] = []
data: list[str] = []
# Splitting the data into code and responses
for entry in data_:
# Splitting the data into code and responses
if "code run:" in entry:
code.append(entry)
else:
data.append(entry)
# Checks to see if the drexel_jupyter_logger is in the code
# If it is, the student might have tried to look at the solutions
# Commenting this out, since we're switching to asymmetric encryption
# flag = any("drexel_jupyter_logger" in item for item in code)
# Extracting timestamps and converting them to datetime objects
timestamps = [
datetime.strptime(row.split(",")[-1].strip(), "%Y-%m-%d %H:%M:%S")
for row in data_reduced
]
# Getting the earliest and latest times
last_entries["start_time"] = min(timestamps).strftime("%Y-%m-%d %H:%M:%S")
last_entries["end_time"] = max(timestamps).strftime("%Y-%m-%d %H:%M:%S")
delta = max(timestamps) - min(timestamps)
minutes_rounded = round(delta.total_seconds() / 60, 2)
last_entries["elapsed_minutes"] = minutes_rounded
# last_entries["flag"] = flag
# Collect student info dict
student_information = {key.upper(): value for key, value in last_entries.items()}
# Write info dict to info.json
with open("info.json", "w") as file:
print("Writing to info.json")
json.dump(student_information, file)
def get_last_entry(data: list[str], field_name: str) -> str:
for entry in data[::-1]:
parts = [part.strip() for part in entry.split(",")]
if parts[0] == field_name:
return entry
return ""
def get_entries_len(data: list[str], question_number: int) -> int:
"""function to get the unique entries by length
Args:
data (list): list of all the data records
question_number (int): question number to evaluate
Returns:
int: length of the unique entries
"""
# Set for unique qN_* values
unique_qN_values = set()
for entry in data:
if entry.startswith(f"q{question_number}_"):
# Split the string by commas and get the value part
parts = [part.strip() for part in entry.split(",")]
# The value is the third element after splitting (?)
value = parts[0].split("_")[1]
unique_qN_values.add(value)
return len(unique_qN_values) + 1
# Modified list comprehension to filter as per the criteria
free_response = [
entry
for entry in data_
if entry.startswith("q")
and entry.split("_")[0][1:].isdigit()
and int(entry.split("_")[0][1:]) > free_response_questions
]
# Initialize a dictionary to hold question entries.
q_entries = []
# Iterate over the number of free response questions.
for i in range(1, free_response_questions + 1):
# Collect entries for each question in a list.
entries = [
entry
for j in range(1, get_entries_len(data, i))
if (entry := get_last_entry(data, f"q{i}_{j}")) != ""
]
# Store the list of entries in the dictionary, keyed by question number.
q_entries += entries
q_entries += free_response
# Parse the data
parsed_data: list[list[str]] = [
[part.strip() for part in line.split(",")] for line in q_entries
]
unique_question_IDs = set(row[0] for row in parsed_data)
# Initialize a dictionary to hold the maximum score for each unique value
max_scores = {unique_value: 0 for unique_value in unique_question_IDs}
# Loop through each row in the data
for score_entry in parsed_data:
unique_value = score_entry[0]
score = int(score_entry[1])
# possible_score = float(row[3])
# Update the score if it's higher than the current maximum
if score > max_scores[unique_value]:
max_scores[unique_value] = score
# Loop through the max_scores dictionary and sum scores for each question
for unique_value, score in max_scores.items():
# Extract question number (assuming it's the number immediately after 'q')
question_number = int(unique_value.split("_")[0][1:])
question_scores[question_number] += score
# Sorting the dictionary by keys
question_max_scores = {
key: int(np.round(question_max_scores[key]))
for key in sorted(question_max_scores)
}
# Sorting the dictionary by keys
question_scores = {
key: int(np.round(question_scores[key])) for key in sorted(question_scores)
}
# Creating the dictionary structure
result_structure: dict[str, list[dict]] = {
"tests": [],
}
# Adding entries for each question
for question_number in question_scores.keys():
question_entry = {
"name": f"Question {question_number}",
"score": question_scores[question_number],
"max_score": question_max_scores[question_number],
# "visibility": "visible",
# "output": "",
}
result_structure["tests"].append(question_entry)
# Write results dict to results.json
with open("results.json", "w") as file:
print("Writing to results.json")
json.dump(result_structure, file, indent=4)
verify_login(login_data, login_url)
# The file to be uploaded. Ensure the path is correct.
file_path = "results.json"
# Construct data payload as a dict
final_data = {
"assignment": assignment_id,
"student_email": last_entries.get("drexel_email"),
# "original_file_name": file_path,
"start_time": last_entries["start_time"],
"end_time": last_entries["end_time"],
# "flag": last_entries["flag"],
# "submission_mechanism": "jupyter_notebook",
# "log_file": loginfo,
"scores": result_structure["tests"],
}
# Files to be uploaded. The key should match the name expected by the server.
_files = {
"file": (file_path, open(file_path, "rb")),
}
# Make the POST request with data and files
response = requests.post(
url=post_url,
json=final_data,
# files=files,
auth=HTTPBasicAuth(login_data["username"], login_data["password"]),
)
# Print messages for the user
submission_message(response)
#
# Helper functions
#
[docs]
def submission_message(response) -> None:
if response.status_code == 200:
print("Data successfully uploaded to the server")
print(response.text)
else:
print(f"Failed to upload data. Status code: {response.status_code}")
print(response.text)
print(
"There is something wrong with your log file or your submission. Please contact an instructor for help."
)
if os.path.exists("results.json"):
# os.remove("results.json")
# Let's keep results.json for now, for debugging
pass
else:
print("results.json was not present")
[docs]
def verify_login(login_data, login_url):
login_response = requests.post(
login_url, auth=HTTPBasicAuth(login_data["username"], login_data["password"])
)
if login_response.status_code == 200:
print("Login successful")
else:
Exception("Login failed")