import os, sys, subprocess, dataObjects, json, signal from pycparser import c_parser, c_ast, parse_file # Path for temp files. PATH = os.path.join(os.path.dirname(os.path.abspath(sys.argv[0])), "temp") # Keeps files created in temp folder if DEBUG = True DEBUG = True class C: """ Class for processing, compiling, running and evaluating C code. Attributes: solution (Solution): Solution object storing data from solution json and the corresponding exercise object result (Result): Result object storing evaluation data generated by this class. """ def __del__(self): """ Destructor deletes files in temp folder after execution """ if os.path.isdir(PATH) and not DEBUG: for f in os.scandir(PATH): if not os.path.isdir(f): os.remove(f.path) else: import shutil shutil.rmtree(f.path) def __init__(self, solution : dataObjects.Solution, config : dict = None, id : int = None): """ Constructor Args: solution (Solution): The solution object storing data from solution json and exercise object """ self.result = dataObjects.Result(dataObjects.readJson(solution.createJson())) self.solution = solution self.cfg = {} if config is None else config self._lang = self.solution.exercise.lang self._fileext = ".c" if self._lang == "C" else ".cpp" if id is not None: self.result.setId(id) def processData(self): """ Processes code, generates files and runs them to get a result. """ # Creates temp dir if it does not exist if not os.path.exists(PATH): os.makedirs(PATH) # Prepare code by replacing placeholder code with solutions code self.replaceCodeWithSolution() maxState = self.getMaxState() self.getMappedItems() # Step 1: Merge source code exitcode, self.fileInfo = self.merge() # Step 2: Compile files containing source code if exitcode == 0 and 1 <= maxState: try: exitcode = self.compile() except Exception as e: self.result.computation["userInfo"]["summary"] = "UNEXPECTED ERROR IN COMPILING" self.result.computation["userInfo"]["elements"].append(f"{type(e).__name__}: {e}") exitcode = 1 # Step 3 (Only C): Check if student's solution contains illegal calls if exitcode == 0 and 2 <= maxState and self._lang == "C": try: exitcode = self.check() except Exception as e: self.result.computation["userInfo"]["summary"] = "UNEXPECTED ERROR IN CHECKING" self.result.computation["userInfo"]["elements"].append(f"{type(e).__name__}: {e}") exitcode = 1 # Step 4: Link compiled files and libraries if exitcode == 0 and 3 <= maxState: try: exitcode = self.link() except Exception as e: self.result.computation["userInfo"]["summary"] = "UNEXPECTED ERROR IN LINKING" self.result.computation["userInfo"]["elements"].append(f"{type(e).__name__}: {e}") exitcode = 1 # Step 5: Run exectutable files if exitcode == 0 and 4 <= maxState: try: self.run() except Exception as e: self.result.computation["userInfo"]["summary"] = "UNEXPECTED ERROR IN RUNNING" self.result.computation["userInfo"]["elements"].append(f"{type(e).__name__}: {e}") exitcode = 1 # Calculating computation time in result object self.result.computation["technicalInfo"]["exitCode"] = exitcode self.result.calculateComputationTime() def getMaxState(self) -> int: """ Retrieves max state of data processing Returns: An integer representing the max state """ s = self.solution.exercise.config.get("stopAfterPhase") return 4 if s is None or s == "running" else \ 3 if s == "linking" else \ 2 if s == "checking" else \ 1 if s == "compiling" else 0 def replaceCodeWithSolution(self): """ Modifying exercise code by replacing placeholder code with student solution """ for sEl in self.solution.exerciseModifications["elements"]: for eEl in self.solution.exercise.elements: if eEl["identifier"] == sEl["identifier"] and eEl.get("modifiable") == True: eEl["value"] = sEl["value"] break def mergeError(self, message): """ Adds merge error to userInfo and returns returncode 1 Returns: 1 (Integer): Returncode 1 An empty dict: Empty file informations """ self.result.computation["userInfo"]["summary"] = "[ERROR]" self.result.computation["userInfo"]["elements"].append({ "severity": "error", "type": "chain", "message": f"Merging failed! {message}" }) return 1, {} def merge(self): """ Merges all code snippets given by exercise json in config.merging Returns: A dict containing one dict per merged source file. - key: filename (without extension) - value: dict The structure of each of these dicts describing source files: - key: identifier of code snippet - value: dict containing following (keys: values): - "visible": Bool indicating if section is visible for student - "start": Integer indicating Start of Section (line number) - "stop": Integer indicating End of Section (line number) """ merge = self.solution.exercise.config["merging"] l = len(merge) if l == 0: return self.mergeError("Empty merging array") if isinstance(merge, list) and isinstance(merge[0], dict) and l != 1: return self.mergeMultipleFiles() else: return self.mergeSingleFile() def mergeSingleFile(self) -> dict: """ Merges a single file. Returns: A dict as specified as in "merge". The filename is always "temp" """ fname = f"temp{self._fileext}" r = {fname : {"path": os.path.join(PATH, fname)}} code = "" loc = 0 if isinstance(self.solution.exercise.config["merging"], list): sourceElements = self.solution.exercise.config["merging"] else: sourceElements = self.solution.exercise.config["merging"]["sources"] if len(sourceElements) == 0: return self.mergeError("Empty merging array") for s in sourceElements: for e in self.solution.exercise.elements: if s == e["identifier"]: r[fname][s] = {} if e.get("visible") is not None: r[fname][s]["visible"] = e["visible"] r[fname][s]["start"] = (loc + 1) code += e["value"] or "\n" if not code.endswith("\n"): code += "\n" cnt = (e["value"] or "\n").count("\n") loc += cnt r[fname][s]["stop"] = loc if cnt != 0 else (loc + 1) break fpath = os.path.join(PATH, f"{fname}") with open(fpath, "w+") as f: f.write(code) os.chmod(fpath, 0o666) return 0, r def getMappedItems(self): """ Checks elementMap for files not created yet and creates them. """ if self.solution.exercise.elementMap: for m in self.solution.exercise.elementMap: mergeInfo = self.solution.exercise.elementMap[m].split(os.sep) fpath = mergeInfo[3:-1] fname = mergeInfo[-1] if fpath: path = os.path.join(PATH, *path, fname) if os.path.exists(path): continue else: for element in self.solution.exercise.elements: if element["identifier"] == m: with open(path, "w") as f: f.write(element["value"]) else: path = os.path.join(PATH, fname) if os.path.exists(path): continue else: for element in self.solution.exercise.elements: if element["identifier"] == m: with open(path, "w") as f: f.write(element["value"]) def getFileName(self, mergeDict, cnt): """ Retrieves the filenname and path of a file used for compiling, checking and running Args: mergeDict (dict): the dict containing merge informations needed for the current file cnt (int): an integer counting up each call. Returns: Strings representing the filename and the filepath. Incremented cnt Integer """ mergeID, path = mergeDict.get("mergeID"), None if mergeID: if self.solution.exercise.elementMap and \ mergeID in self.solution.exercise.elementMap: mergeInfo = self.solution.exercise.elementMap[mergeID].split(os.sep) path = mergeInfo[3:-1] # ERROR HANDLING if path and not path[0]: return self.mergeError("Absolute Paths are not allowed"), 0 fname = mergeInfo[-1] else: fname = mergeID else: fname = f"temp{cnt}" if self._fileext not in fname and ".h" not in fname: return f"{fname}{self._fileext}", path, cnt + 1 else: return fname, path, cnt + 1 def mergeMultipleFiles(self) -> dict: """ Merges multiple files. Returns: A dict as specified as in "merge". """ i = 1 # used if neither mergeID nor mapping is given r = {} for m in self.solution.exercise.config["merging"]: fname, fpath, i = self.getFileName(m, i) if fname == 1: return fname, fpath r[fname] = {} if fpath: tmp = os.path.join(PATH, *fpath) if not os.path.exists(tmp): os.makedirs(tmp) r[fname]["path"] = os.path.join(tmp, fname) else: r[fname]["path"] = os.path.join(PATH, fname) loc = 0 code = "" for s in m["sources"]: for e in self.solution.exercise.elements: if s == e["identifier"]: r[fname][s] = {} if e.get("visible") is not None: r[fname][s]["visible"] = e["visible"] r[fname][s]["start"] = (loc + 1) code += e["value"] if not code.endswith("\n"): code += "\n" cnt = e["value"].count("\n") loc += cnt r[fname][s]["stop"] = loc if cnt != 0 else (loc + 1) break loc += 1 with open(r[fname]["path"], "w+") as f: f.write(code) os.chmod(r[fname]["path"], 0o666) return 0, r def getSnippetIdentifier(self, file, line): """ Retrieves the code snipped identifier of a given line Args: file (str): the file containing the identifier line (int): the line number Returns: A string containing the snippet identifier """ for i in self.fileInfo[file]: if i == "path": continue if line in range(self.fileInfo[file][i]["start"], self.fileInfo[file][i]["stop"] + 1): return i def getLoc(self, file, line, join=False): """ Retrieves a line of code in a given file Args: file (str): the filepath to be opened line (int): the line number of the line we want to return join (bool): if True, the filepath will be joined with PATH first Returns: A String containing the specified line of code. """ with open(file if not join else os.path.join(PATH, file), "r") as f: i = 0 while i < line - 1: f.readline() i += 1 return f.readline() def compile(self): """ Compiles all merged source files. Returns: An Integer representing the return code of the compiler. """ # changes current working directory for easier compiling cwd = os.getcwd() os.chdir(PATH) # compiling command as specified as in exercise com = self.solution.exercise.getCompilingCommand().split(" ") # path for all source files for f in self.fileInfo: if ".h" in f: continue com.append(self.fileInfo[f]["path"]) # flag to just compile files without linking com.append("-c") # flag for easier error handling. Requires GCC 9.4 com.append("-fdiagnostics-format=json") self.result.computation["technicalInfo"]["compileCommand"] = " ".join(com) proc = subprocess.run(com, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) try: parsed = json.loads(proc.stderr.decode("utf-8")) if proc.stderr else \ json.loads(proc.stdout.decode("utf-8")) if proc.stdout else "" except json.decoder.JSONDecodeError: txt = proc.stderr if proc.stderr else \ proc.stdout if proc.stdout else "" if txt[0] == b"[": tmp = txt.replace(b"\n", b"") sliced = tmp[:tmp.rfind(b"]") + 1] elif txt[0] == b"{": tmp = txt.replace(b"\n", b"") sliced = tmp[:tmp.rfind(b"}") + 1] else: sliced = txt.replace(b"\n", b"") txt = txt.decode("utf-8") try: parsed = json.loads(sliced) except json.decoder.JSONDecodeError: parsed = txt if len(parsed) > 0: if isinstance(parsed, dict): maxState = "info" elements = [] for p in parsed: # updating maxState if neccessary if p["kind"] == "warning" and maxState == "info": maxState = "warning" elif p["kind"] == "error" and maxState != "error": maxState = "error" # file and line of error file = p["locations"][0]["caret"]["file"].split(".")[0] line = p["locations"][0]["caret"]["line"] # calculating the line snippet = self.getSnippetIdentifier(file, line) # dict specifying the current error/warning/info and source e = { "severity" : p["kind"], "type" : "compiler", "message" : p["message"], "source" : { "elementID" : snippet, "extract" : self.getLoc(f"{file}{self._fileext}", line, join=True), "begin" : self.fileInfo[file][snippet]["start"], "end" : self.fileInfo[file][snippet]["stop"], "line" : line - self.fileInfo[file][snippet]["start"], "col" : p["locations"][0]["caret"]["column"] } } elements.append(e) self.result.computation["userInfo"]["summary"] = f"[{maxState.upper()}]" self.result.computation["userInfo"]["elements"] += elements elif isinstance(parsed, str): maxState = None if "error" in parsed: maxState = "ERROR" elif "warning" in parsed: maxState = "WARNING" elif "info" in parsed: maxState = "INFO" if maxState: self.result.computation["userInfo"]["summary"] = f"[{maxState}] - could not parse output" self.result.computation["userInfo"]["elements"].append({ "severity": maxState, "type": "compiler", "message": f"Could not parse output:\n{parsed}" }) else: # list self.result.computation["userInfo"]["elements"] += parsed # adds compiling output to "elements" in result object data = { "MIMEtype":"text/plain", "identifier":f"{self.result.id} Compiling", "value" : parsed } self.result.elements.append(data) os.chdir(cwd) return proc.returncode def check(self): """ Checks all merged source files. Checking after compiling to reduce effort. It's unnecessary to check if compiling fails. Returns: An Integer representing the final state of checking: 0 - Checking passed 1 - Checking failed """ checkConfig = self.solution.exercise.config.get("checking") if checkConfig is None: return 0 returncode = 0 forbidden = checkConfig["forbiddenCalls"].split(" ") checker = Checker(self.fileInfo) for a in checker.asts: checker.getFunctions(checker.asts[a]) elements = [] for file in checker.visitor.data: f = file.split(os.sep)[-1] for func in checker.visitor.data[file]: for i in checker.visitor.data[file][func]: cur = checker.visitor.data[file][func][i] id = self.getSnippetIdentifier(f, cur["Line"]) if id in checkConfig["sources"] and cur["FuncCall"] in forbidden: line = cur["Line"] - self.fileInfo[f][id]["start"] e = { "severity": "error", "type": "callcheck", "message": f"[C function filtering] Function call not allowed:\n\'" f"{cur['FuncCall']}\';original source: f'{id}', line " f"(corrected): {line}, " \ f"col: {cur['Column']}\nForbidden calls:\nsystem.\n", "source": { "elementID": id, "extract": self.getLoc(file, line), "begin": self.fileInfo[f][id]["start"], "end": self.fileInfo[f][id]["stop"], "line": line, "col": cur["Column"] } } elements.append(e) if returncode == 0: returncode = 1 if len(elements) != 0: if "elements" not in self.result.computation["userInfo"]: self.result.computation["userInfo"]["elements"] = elements else: self.result.computation["userInfo"]["elements"] += elements if "summary" not in self.result.computation["userInfo"]: self.result.computation["userInfo"]["summary"] = "[ERROR]" elif "ERROR" not in self.result.computation["userInfo"]["summary"]: self.result.computation["userInfo"]["summary"] = "[ERROR]" data = { "MIMEtype":"text/plain", "identifier":f"{self.result.id} Checking", "value" : elements } self.result.elements.append(data) return returncode def link(self): """ Links compiled files and libraries. Returns: An Integer representing the return code of the compiler. """ com = ["gcc" if self._lang == "C" else "g++", "-o", f"{os.path.join(PATH, 'out')}"] for f in self.fileInfo: if ".h" in f: continue com.append(f"{os.path.join(PATH, f)[:-len(self._fileext)]}.o") flags = self.solution.exercise.config["linking"].get("flags") if flags: com.append(flags) self.result.computation["technicalInfo"]["linkCommand"] = " ".join(com) proc = subprocess.run(com, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) data = { "MIMEtype":"text/plain", "identifier":f"{self.result.id} Linking", "value" : proc.stdout.decode("utf-8") } self.result.elements.append(data) return proc.returncode def run(self): """ Makes file executable and runs it. Returns: An Integer representing the return code of the program. """ os.chmod(os.path.join(PATH, "out"), 0o700) com = [f"{os.path.join(PATH, 'out')}"] cmdLineArgs = self.solution.exercise.config["running"].get("commandLineArguments") if cmdLineArgs is not None: com.extend(cmdLineArgs.split()) # Time Limit of running process timelimit = self.solution.exercise.config["running"].get("timelimitInSeconds") cfglimit = self.cfg.get("timelimitInSeconds") if not timelimit: timelimit = cfglimit # is now either None or int elif cfglimit: timelimit = min(timelimit, cfglimit) self.result.computation["technicalInfo"]["runCommand"] = "".join(com) proc = subprocess.Popen(com, stdout=subprocess.PIPE, stderr=subprocess.PIPE, preexec_fn=os.setsid, shell=False) try: stdout, stderr = proc.communicate(timeout=timelimit) text = "" except subprocess.TimeoutExpired as e: os.killpg(os.getpgid(proc.pid), signal.SIGKILL) stdout, stderr, text = "", "", f"Runtime failed! Timeout after {e.timeout} seconds" self.result.computation["userInfo"]["summary"] = "Runtime failed! Exit code: 1" data = [{ "MIMEtype":"text/plain", "identifier":f"{self.result.id} Running", "value" : text }, { "MIMEtype":"text/plain", "identifier":f"{self.result.id} Running stdout", "value" : stdout#.decode("utf-8") }, { "MIMEtype":"text/plain", "identifier":f"{self.result.id} Running stderr", "value" : stderr#.decode("utf-8") }] for d in data: self.result.elements.append(d) return proc.returncode class Checker: """ Class for generating Abstract Syntax Trees (AST) of source files and retrieving informations about function calls. Attributes: asts (dict): A dict containing one entry for each merged source file - key: filename (without extension) - value: AST of source file """ def __init__(self, files: dict): """ Constructor Args: files (dict): A dict generated by the "merge" function in class "C" """ self._files = files self.asts = self.getAsts() self.visitor = self.Visitor() class Visitor(c_ast.NodeVisitor): """ Internal Class for visiting nodes in an AST. """ def __init__(self): self.data = {} def visit_FuncDef(self, node): """ Finds and prints all found function calls in a function """ if node.decl.coord.file not in self.data: self.data[node.decl.coord.file] = {} self.data[node.decl.coord.file][node.decl.name] = {} i = 0 for n in node.body.block_items: if isinstance(n, c_ast.FuncCall): self.data[node.decl.coord.file][node.decl.name][str(i)] = { "FuncCall" : n.name.name, "Line" : n.coord.line, "Column" : n.coord.column } i += 1 def getAst(self, filename) -> c_ast.FileAST: """ Generates an AST from given source file Args: filename (str): The name of the source file to generate an AST for """ fake_libc_include = os.path.join(os.path.dirname(os.path.abspath(sys.argv[0])), 'utils', 'fake_libc_include') return parse_file(filename, use_cpp=True, cpp_path="gcc", cpp_args=["-E", f"-I{fake_libc_include}"]) def getAsts(self) -> dict: """ Generates one AST for each merged source file Returns: A dict containing one (key, value) pair for each source file. - key: filename (without extension) - value: AST for the corresponding file """ asts = {} for f in self._files: asts[f] = self.getAst(self._files[f]["path"]) return asts def getFunctions(self, ast: c_ast.FileAST): """ Iterates over the given AST and visit nodes as specified as in Visitor class Args: ast: An AST representing a source file. """ self.visitor.visit(ast)