This repository has been archived on 2025-01-29. You can view files and clone it, but cannot push or open issues or pull requests.
2020-02-13 18:18:41 +01:00

692 lines
26 KiB
Python

import os, sys, subprocess, dataObjects, json, signal
from pycparser import c_parser, c_ast, parse_file
# Path for temp files.
PATH = os.path.join(os.path.dirname(os.path.abspath(sys.argv[0])), "temp")
# Keeps files created in temp folder if DEBUG = True
DEBUG = True
class C:
""" Class for processing, compiling, running and evaluating C code.
Attributes:
solution (Solution):
Solution object storing data from solution json and the corresponding exercise object
result (Result):
Result object storing evaluation data generated by this class.
"""
def __del__(self):
""" Destructor deletes files in temp folder after execution
"""
if os.path.isdir(PATH) and not DEBUG:
for f in os.scandir(PATH):
if not os.path.isdir(f):
os.remove(f.path)
else:
import shutil
shutil.rmtree(f.path)
def __init__(self, solution : dataObjects.Solution, config : dict = None, id : int = None):
""" Constructor
Args:
solution (Solution):
The solution object storing data from solution json and exercise object
"""
self.result = dataObjects.Result(dataObjects.readJson(solution.createJson()))
self.solution = solution
self.cfg = {} if config is None else config
self._lang = self.solution.exercise.lang
self._fileext = ".c" if self._lang == "C" else ".cpp"
if id is not None:
self.result.setId(id)
def processData(self):
""" Processes code, generates files and runs them to get a result.
"""
# Creates temp dir if it does not exist
if not os.path.exists(PATH):
os.makedirs(PATH)
# Prepare code by replacing placeholder code with solutions code
self.replaceCodeWithSolution()
maxState = self.getMaxState()
self.getMappedItems()
# Step 1: Merge source code
exitcode, self.fileInfo = self.merge()
# Step 2: Compile files containing source code
if exitcode == 0 and 1 <= maxState:
try:
exitcode = self.compile()
except Exception as e:
self.result.computation["userInfo"]["summary"] = "UNEXPECTED ERROR IN COMPILING"
self.result.computation["userInfo"]["elements"].append(f"{type(e).__name__}: {e}")
exitcode = 1
# Step 3 (Only C): Check if student's solution contains illegal calls
if exitcode == 0 and 2 <= maxState and self._lang == "C":
try:
exitcode = self.check()
except Exception as e:
self.result.computation["userInfo"]["summary"] = "UNEXPECTED ERROR IN CHECKING"
self.result.computation["userInfo"]["elements"].append(f"{type(e).__name__}: {e}")
exitcode = 1
# Step 4: Link compiled files and libraries
if exitcode == 0 and 3 <= maxState:
try:
exitcode = self.link()
except Exception as e:
self.result.computation["userInfo"]["summary"] = "UNEXPECTED ERROR IN LINKING"
self.result.computation["userInfo"]["elements"].append(f"{type(e).__name__}: {e}")
exitcode = 1
# Step 5: Run exectutable files
if exitcode == 0 and 4 <= maxState:
try:
self.run()
except Exception as e:
self.result.computation["userInfo"]["summary"] = "UNEXPECTED ERROR IN RUNNING"
self.result.computation["userInfo"]["elements"].append(f"{type(e).__name__}: {e}")
exitcode = 1
# Calculating computation time in result object
self.result.computation["technicalInfo"]["exitCode"] = exitcode
self.result.calculateComputationTime()
def getMaxState(self) -> int:
""" Retrieves max state of data processing
Returns:
An integer representing the max state
"""
s = self.solution.exercise.config.get("stopAfterPhase")
return 4 if s is None or s == "running" else \
3 if s == "linking" else \
2 if s == "checking" else \
1 if s == "compiling" else 0
def replaceCodeWithSolution(self):
""" Modifying exercise code by replacing placeholder code with student solution
"""
for sEl in self.solution.exerciseModifications["elements"]:
for eEl in self.solution.exercise.elements:
if eEl["identifier"] == sEl["identifier"] and eEl.get("modifiable") == True:
eEl["value"] = sEl["value"]
break
def mergeError(self, message):
""" Adds merge error to userInfo and returns returncode 1
Returns:
1 (Integer):
Returncode 1
An empty dict:
Empty file informations
"""
self.result.computation["userInfo"]["summary"] = "[ERROR]"
self.result.computation["userInfo"]["elements"].append({
"severity": "error",
"type": "chain",
"message": f"Merging failed! {message}"
})
return 1, {}
def merge(self):
""" Merges all code snippets given by exercise json in config.merging
Returns:
A dict containing one dict per merged source file.
- key: filename (without extension)
- value: dict
The structure of each of these dicts describing source files:
- key: identifier of code snippet
- value: dict containing following (keys: values):
- "visible": Bool indicating if section is visible for student
- "start": Integer indicating Start of Section (line number)
- "stop": Integer indicating End of Section (line number)
"""
merge = self.solution.exercise.config["merging"]
l = len(merge)
if l == 0:
return self.mergeError("Empty merging array")
if isinstance(merge, list) and isinstance(merge[0], dict) and l != 1:
return self.mergeMultipleFiles()
else:
return self.mergeSingleFile()
def mergeSingleFile(self) -> dict:
""" Merges a single file.
Returns:
A dict as specified as in "merge".
The filename is always "temp"
"""
fname = f"temp{self._fileext}"
r = {fname : {"path": os.path.join(PATH, fname)}}
code = ""
loc = 0
if isinstance(self.solution.exercise.config["merging"], list):
sourceElements = self.solution.exercise.config["merging"]
else:
sourceElements = self.solution.exercise.config["merging"]["sources"]
if len(sourceElements) == 0:
return self.mergeError("Empty merging array")
for s in sourceElements:
for e in self.solution.exercise.elements:
if s == e["identifier"]:
r[fname][s] = {}
if e.get("visible") is not None:
r[fname][s]["visible"] = e["visible"]
r[fname][s]["start"] = (loc + 1)
code += e["value"] or "\n"
if not code.endswith("\n"):
code += "\n"
cnt = (e["value"] or "\n").count("\n")
loc += cnt
r[fname][s]["stop"] = loc if cnt != 0 else (loc + 1)
break
fpath = os.path.join(PATH, f"{fname}")
with open(fpath, "w+") as f:
f.write(code)
os.chmod(fpath, 0o666)
return 0, r
def getMappedItems(self):
""" Checks elementMap for files not created yet and creates them.
"""
if self.solution.exercise.elementMap:
for m in self.solution.exercise.elementMap:
mergeInfo = self.solution.exercise.elementMap[m].split(os.sep)
fpath = mergeInfo[3:-1]
fname = mergeInfo[-1]
if fpath:
path = os.path.join(PATH, *path, fname)
if os.path.exists(path):
continue
else:
for element in self.solution.exercise.elements:
if element["identifier"] == m:
with open(path, "w") as f:
f.write(element["value"])
else:
path = os.path.join(PATH, fname)
if os.path.exists(path):
continue
else:
for element in self.solution.exercise.elements:
if element["identifier"] == m:
with open(path, "w") as f:
f.write(element["value"])
def getFileName(self, mergeDict, cnt):
""" Retrieves the filenname and path of a file used for compiling,
checking and running
Args:
mergeDict (dict):
the dict containing merge informations needed for the current file
cnt (int):
an integer counting up each call.
Returns:
Strings representing the filename and the filepath.
Incremented cnt Integer
"""
mergeID, path = mergeDict.get("mergeID"), None
if mergeID:
if self.solution.exercise.elementMap and \
mergeID in self.solution.exercise.elementMap:
mergeInfo = self.solution.exercise.elementMap[mergeID].split(os.sep)
path = mergeInfo[3:-1]
# ERROR HANDLING
if path and not path[0]:
return self.mergeError("Absolute Paths are not allowed"), 0
fname = mergeInfo[-1]
else:
fname = mergeID
else:
fname = f"temp{cnt}"
if self._fileext not in fname and ".h" not in fname:
return f"{fname}{self._fileext}", path, cnt + 1
else:
return fname, path, cnt + 1
def mergeMultipleFiles(self) -> dict:
""" Merges multiple files.
Returns:
A dict as specified as in "merge".
"""
i = 1 # used if neither mergeID nor mapping is given
r = {}
for m in self.solution.exercise.config["merging"]:
fname, fpath, i = self.getFileName(m, i)
if fname == 1:
return fname, fpath
r[fname] = {}
if fpath:
tmp = os.path.join(PATH, *fpath)
if not os.path.exists(tmp):
os.makedirs(tmp)
r[fname]["path"] = os.path.join(tmp, fname)
else:
r[fname]["path"] = os.path.join(PATH, fname)
loc = 0
code = ""
for s in m["sources"]:
for e in self.solution.exercise.elements:
if s == e["identifier"]:
r[fname][s] = {}
if e.get("visible") is not None:
r[fname][s]["visible"] = e["visible"]
r[fname][s]["start"] = (loc + 1)
code += e["value"]
if not code.endswith("\n"):
code += "\n"
cnt = e["value"].count("\n")
loc += cnt
r[fname][s]["stop"] = loc if cnt != 0 else (loc + 1)
break
loc += 1
with open(r[fname]["path"], "w+") as f:
f.write(code)
os.chmod(r[fname]["path"], 0o666)
return 0, r
def getSnippetIdentifier(self, file, line):
""" Retrieves the code snipped identifier of a given line
Args:
file (str):
the file containing the identifier
line (int):
the line number
Returns:
A string containing the snippet identifier
"""
for i in self.fileInfo[file]:
if i == "path":
continue
if line in range(self.fileInfo[file][i]["start"], self.fileInfo[file][i]["stop"] + 1):
return i
def getLoc(self, file, line, join=False):
""" Retrieves a line of code in a given file
Args:
file (str):
the filepath to be opened
line (int):
the line number of the line we want to return
join (bool):
if True, the filepath will be joined with PATH first
Returns:
A String containing the specified line of code.
"""
with open(file if not join else os.path.join(PATH, file), "r") as f:
i = 0
while i < line - 1:
f.readline()
i += 1
return f.readline()
def compile(self):
""" Compiles all merged source files.
Returns:
An Integer representing the return code of the compiler.
"""
# changes current working directory for easier compiling
cwd = os.getcwd()
os.chdir(PATH)
# compiling command as specified as in exercise
com = self.solution.exercise.getCompilingCommand().split(" ")
# path for all source files
for f in self.fileInfo:
if ".h" in f:
continue
com.append(self.fileInfo[f]["path"])
# flag to just compile files without linking
com.append("-c")
# flag for easier error handling. Requires GCC 9.4
com.append("-fdiagnostics-format=json")
self.result.computation["technicalInfo"]["compileCommand"] = " ".join(com)
proc = subprocess.run(com, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
try:
parsed = json.loads(proc.stderr.decode("utf-8")) if proc.stderr else \
json.loads(proc.stdout.decode("utf-8")) if proc.stdout else ""
except json.decoder.JSONDecodeError:
txt = proc.stderr if proc.stderr else \
proc.stdout if proc.stdout else ""
if txt[0] == b"[":
tmp = txt.replace(b"\n", b"")
sliced = tmp[:tmp.rfind(b"]") + 1]
elif txt[0] == b"{":
tmp = txt.replace(b"\n", b"")
sliced = tmp[:tmp.rfind(b"}") + 1]
else:
sliced = txt.replace(b"\n", b"")
txt = txt.decode("utf-8")
try:
parsed = json.loads(sliced)
except json.decoder.JSONDecodeError:
parsed = txt
if len(parsed) > 0:
if isinstance(parsed, dict):
maxState = "info"
elements = []
for p in parsed:
# updating maxState if neccessary
if p["kind"] == "warning" and maxState == "info":
maxState = "warning"
elif p["kind"] == "error" and maxState != "error":
maxState = "error"
# file and line of error
file = p["locations"][0]["caret"]["file"].split(".")[0]
line = p["locations"][0]["caret"]["line"]
# calculating the line
snippet = self.getSnippetIdentifier(file, line)
# dict specifying the current error/warning/info and source
e = {
"severity" : p["kind"],
"type" : "compiler",
"message" : p["message"],
"source" : {
"elementID" : snippet,
"extract" : self.getLoc(f"{file}{self._fileext}", line, join=True),
"begin" : self.fileInfo[file][snippet]["start"],
"end" : self.fileInfo[file][snippet]["stop"],
"line" : line - self.fileInfo[file][snippet]["start"],
"col" : p["locations"][0]["caret"]["column"]
}
}
elements.append(e)
self.result.computation["userInfo"]["summary"] = f"[{maxState.upper()}]"
self.result.computation["userInfo"]["elements"] += elements
elif isinstance(parsed, str):
maxState = None
if "error" in parsed:
maxState = "ERROR"
elif "warning" in parsed:
maxState = "WARNING"
elif "info" in parsed:
maxState = "INFO"
if maxState:
self.result.computation["userInfo"]["summary"] = f"[{maxState}] - could not parse output"
self.result.computation["userInfo"]["elements"].append({
"severity": maxState,
"type": "compiler",
"message": f"Could not parse output:\n{parsed}"
})
else: # list
self.result.computation["userInfo"]["elements"] += parsed
# adds compiling output to "elements" in result object
data = {
"MIMEtype":"text/plain",
"identifier":f"{self.result.id} Compiling",
"value" : parsed
}
self.result.elements.append(data)
os.chdir(cwd)
return proc.returncode
def check(self):
""" Checks all merged source files.
Checking after compiling to reduce effort. It's unnecessary to check if compiling fails.
Returns:
An Integer representing the final state of checking:
0 - Checking passed
1 - Checking failed
"""
checkConfig = self.solution.exercise.config.get("checking")
if checkConfig is None:
return 0
returncode = 0
forbidden = checkConfig["forbiddenCalls"].split(" ")
checker = Checker(self.fileInfo)
for a in checker.asts:
checker.getFunctions(checker.asts[a])
elements = []
for file in checker.visitor.data:
f = file.split(os.sep)[-1]
for func in checker.visitor.data[file]:
for i in checker.visitor.data[file][func]:
cur = checker.visitor.data[file][func][i]
id = self.getSnippetIdentifier(f, cur["Line"])
if id in checkConfig["sources"] and cur["FuncCall"] in forbidden:
line = cur["Line"] - self.fileInfo[f][id]["start"]
e = {
"severity": "error",
"type": "callcheck",
"message": f"[C function filtering] Function call not allowed:\n\'"
f"{cur['FuncCall']}\';original source: f'{id}', line "
f"(corrected): {line}, " \
f"col: {cur['Column']}\nForbidden calls:\nsystem.\n",
"source": {
"elementID": id,
"extract": self.getLoc(file, line),
"begin": self.fileInfo[f][id]["start"],
"end": self.fileInfo[f][id]["stop"],
"line": line,
"col": cur["Column"]
}
}
elements.append(e)
if returncode == 0:
returncode = 1
if len(elements) != 0:
if "elements" not in self.result.computation["userInfo"]:
self.result.computation["userInfo"]["elements"] = elements
else:
self.result.computation["userInfo"]["elements"] += elements
if "summary" not in self.result.computation["userInfo"]:
self.result.computation["userInfo"]["summary"] = "[ERROR]"
elif "ERROR" not in self.result.computation["userInfo"]["summary"]:
self.result.computation["userInfo"]["summary"] = "[ERROR]"
data = {
"MIMEtype":"text/plain",
"identifier":f"{self.result.id} Checking",
"value" : elements
}
self.result.elements.append(data)
return returncode
def link(self):
""" Links compiled files and libraries.
Returns:
An Integer representing the return code of the compiler.
"""
com = ["gcc" if self._lang == "C" else "g++", "-o", f"{os.path.join(PATH, 'out')}"]
for f in self.fileInfo:
if ".h" in f:
continue
com.append(f"{os.path.join(PATH, f)[:-len(self._fileext)]}.o")
flags = self.solution.exercise.config["linking"].get("flags")
if flags:
com.append(flags)
self.result.computation["technicalInfo"]["linkCommand"] = " ".join(com)
proc = subprocess.run(com, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
data = {
"MIMEtype":"text/plain",
"identifier":f"{self.result.id} Linking",
"value" : proc.stdout.decode("utf-8")
}
self.result.elements.append(data)
return proc.returncode
def run(self):
""" Makes file executable and runs it.
Returns:
An Integer representing the return code of the program.
"""
os.chmod(os.path.join(PATH, "out"), 0o700)
com = [f"{os.path.join(PATH, 'out')}"]
cmdLineArgs = self.solution.exercise.config["running"].get("commandLineArguments")
if cmdLineArgs is not None:
com.extend(cmdLineArgs.split())
# Time Limit of running process
timelimit = self.solution.exercise.config["running"].get("timelimitInSeconds")
cfglimit = self.cfg.get("timelimitInSeconds")
if not timelimit:
timelimit = cfglimit # is now either None or int
elif cfglimit:
timelimit = min(timelimit, cfglimit)
self.result.computation["technicalInfo"]["runCommand"] = "".join(com)
proc = subprocess.Popen(com, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
preexec_fn=os.setsid, shell=False)
try:
stdout, stderr = proc.communicate(timeout=timelimit)
text = ""
except subprocess.TimeoutExpired as e:
os.killpg(os.getpgid(proc.pid), signal.SIGKILL)
stdout, stderr, text = "", "", f"Runtime failed! Timeout after {e.timeout} seconds"
self.result.computation["userInfo"]["summary"] = "Runtime failed! Exit code: 1"
data = [{
"MIMEtype":"text/plain",
"identifier":f"{self.result.id} Running",
"value" : text
},
{
"MIMEtype":"text/plain",
"identifier":f"{self.result.id} Running stdout",
"value" : stdout#.decode("utf-8")
},
{
"MIMEtype":"text/plain",
"identifier":f"{self.result.id} Running stderr",
"value" : stderr#.decode("utf-8")
}]
for d in data:
self.result.elements.append(d)
return proc.returncode
class Checker:
""" Class for generating Abstract Syntax Trees (AST) of source files
and retrieving informations about function calls.
Attributes:
asts (dict):
A dict containing one entry for each merged source file
- key: filename (without extension)
- value: AST of source file
"""
def __init__(self, files: dict):
""" Constructor
Args:
files (dict):
A dict generated by the "merge" function in class "C"
"""
self._files = files
self.asts = self.getAsts()
self.visitor = self.Visitor()
class Visitor(c_ast.NodeVisitor):
""" Internal Class for visiting nodes in an AST.
"""
def __init__(self):
self.data = {}
def visit_FuncDef(self, node):
""" Finds and prints all found function calls in a function
"""
if node.decl.coord.file not in self.data:
self.data[node.decl.coord.file] = {}
self.data[node.decl.coord.file][node.decl.name] = {}
i = 0
for n in node.body.block_items:
if isinstance(n, c_ast.FuncCall):
self.data[node.decl.coord.file][node.decl.name][str(i)] = {
"FuncCall" : n.name.name,
"Line" : n.coord.line,
"Column" : n.coord.column
}
i += 1
def getAst(self, filename) -> c_ast.FileAST:
""" Generates an AST from given source file
Args:
filename (str):
The name of the source file to generate an AST for
"""
fake_libc_include = os.path.join(os.path.dirname(os.path.abspath(sys.argv[0])),
'utils', 'fake_libc_include')
return parse_file(filename, use_cpp=True, cpp_path="gcc",
cpp_args=["-E", f"-I{fake_libc_include}"])
def getAsts(self) -> dict:
""" Generates one AST for each merged source file
Returns:
A dict containing one (key, value) pair for each source file.
- key: filename (without extension)
- value: AST for the corresponding file
"""
asts = {}
for f in self._files:
asts[f] = self.getAst(self._files[f]["path"])
return asts
def getFunctions(self, ast: c_ast.FileAST):
""" Iterates over the given AST and visit nodes as specified as in Visitor class
Args:
ast:
An AST representing a source file.
"""
self.visitor.visit(ast)