692 lines
26 KiB
Python
692 lines
26 KiB
Python
import os, sys, subprocess, dataObjects, json, signal
|
|
from pycparser import c_parser, c_ast, parse_file
|
|
|
|
# Path for temp files.
|
|
PATH = os.path.join(os.path.dirname(os.path.abspath(sys.argv[0])), "temp")
|
|
# Keeps files created in temp folder if DEBUG = True
|
|
DEBUG = True
|
|
|
|
class C:
|
|
""" Class for processing, compiling, running and evaluating C code.
|
|
|
|
Attributes:
|
|
solution (Solution):
|
|
Solution object storing data from solution json and the corresponding exercise object
|
|
result (Result):
|
|
Result object storing evaluation data generated by this class.
|
|
"""
|
|
def __del__(self):
|
|
""" Destructor deletes files in temp folder after execution
|
|
"""
|
|
if os.path.isdir(PATH) and not DEBUG:
|
|
for f in os.scandir(PATH):
|
|
if not os.path.isdir(f):
|
|
os.remove(f.path)
|
|
else:
|
|
import shutil
|
|
shutil.rmtree(f.path)
|
|
|
|
def __init__(self, solution : dataObjects.Solution, config : dict = None, id : int = None):
|
|
""" Constructor
|
|
|
|
Args:
|
|
solution (Solution):
|
|
The solution object storing data from solution json and exercise object
|
|
"""
|
|
self.result = dataObjects.Result(dataObjects.readJson(solution.createJson()))
|
|
self.solution = solution
|
|
self.cfg = {} if config is None else config
|
|
self._lang = self.solution.exercise.lang
|
|
self._fileext = ".c" if self._lang == "C" else ".cpp"
|
|
if id is not None:
|
|
self.result.setId(id)
|
|
|
|
def processData(self):
|
|
""" Processes code, generates files and runs them to get a result.
|
|
"""
|
|
# Creates temp dir if it does not exist
|
|
if not os.path.exists(PATH):
|
|
os.makedirs(PATH)
|
|
|
|
# Prepare code by replacing placeholder code with solutions code
|
|
self.replaceCodeWithSolution()
|
|
|
|
maxState = self.getMaxState()
|
|
self.getMappedItems()
|
|
|
|
# Step 1: Merge source code
|
|
exitcode, self.fileInfo = self.merge()
|
|
|
|
# Step 2: Compile files containing source code
|
|
if exitcode == 0 and 1 <= maxState:
|
|
try:
|
|
exitcode = self.compile()
|
|
except Exception as e:
|
|
self.result.computation["userInfo"]["summary"] = "UNEXPECTED ERROR IN COMPILING"
|
|
self.result.computation["userInfo"]["elements"].append(f"{type(e).__name__}: {e}")
|
|
exitcode = 1
|
|
|
|
# Step 3 (Only C): Check if student's solution contains illegal calls
|
|
if exitcode == 0 and 2 <= maxState and self._lang == "C":
|
|
try:
|
|
exitcode = self.check()
|
|
except Exception as e:
|
|
self.result.computation["userInfo"]["summary"] = "UNEXPECTED ERROR IN CHECKING"
|
|
self.result.computation["userInfo"]["elements"].append(f"{type(e).__name__}: {e}")
|
|
exitcode = 1
|
|
|
|
# Step 4: Link compiled files and libraries
|
|
if exitcode == 0 and 3 <= maxState:
|
|
try:
|
|
exitcode = self.link()
|
|
except Exception as e:
|
|
self.result.computation["userInfo"]["summary"] = "UNEXPECTED ERROR IN LINKING"
|
|
self.result.computation["userInfo"]["elements"].append(f"{type(e).__name__}: {e}")
|
|
exitcode = 1
|
|
|
|
# Step 5: Run exectutable files
|
|
if exitcode == 0 and 4 <= maxState:
|
|
try:
|
|
self.run()
|
|
except Exception as e:
|
|
self.result.computation["userInfo"]["summary"] = "UNEXPECTED ERROR IN RUNNING"
|
|
self.result.computation["userInfo"]["elements"].append(f"{type(e).__name__}: {e}")
|
|
exitcode = 1
|
|
|
|
# Calculating computation time in result object
|
|
self.result.computation["technicalInfo"]["exitCode"] = exitcode
|
|
self.result.calculateComputationTime()
|
|
|
|
def getMaxState(self) -> int:
|
|
""" Retrieves max state of data processing
|
|
|
|
Returns:
|
|
An integer representing the max state
|
|
"""
|
|
s = self.solution.exercise.config.get("stopAfterPhase")
|
|
return 4 if s is None or s == "running" else \
|
|
3 if s == "linking" else \
|
|
2 if s == "checking" else \
|
|
1 if s == "compiling" else 0
|
|
|
|
def replaceCodeWithSolution(self):
|
|
""" Modifying exercise code by replacing placeholder code with student solution
|
|
"""
|
|
for sEl in self.solution.exerciseModifications["elements"]:
|
|
for eEl in self.solution.exercise.elements:
|
|
if eEl["identifier"] == sEl["identifier"] and eEl.get("modifiable") == True:
|
|
eEl["value"] = sEl["value"]
|
|
break
|
|
|
|
def mergeError(self, message):
|
|
""" Adds merge error to userInfo and returns returncode 1
|
|
|
|
Returns:
|
|
1 (Integer):
|
|
Returncode 1
|
|
An empty dict:
|
|
Empty file informations
|
|
"""
|
|
|
|
self.result.computation["userInfo"]["summary"] = "[ERROR]"
|
|
self.result.computation["userInfo"]["elements"].append({
|
|
"severity": "error",
|
|
"type": "chain",
|
|
"message": f"Merging failed! {message}"
|
|
})
|
|
return 1, {}
|
|
|
|
def merge(self):
|
|
""" Merges all code snippets given by exercise json in config.merging
|
|
|
|
Returns:
|
|
A dict containing one dict per merged source file.
|
|
- key: filename (without extension)
|
|
- value: dict
|
|
The structure of each of these dicts describing source files:
|
|
- key: identifier of code snippet
|
|
- value: dict containing following (keys: values):
|
|
- "visible": Bool indicating if section is visible for student
|
|
- "start": Integer indicating Start of Section (line number)
|
|
- "stop": Integer indicating End of Section (line number)
|
|
"""
|
|
merge = self.solution.exercise.config["merging"]
|
|
|
|
l = len(merge)
|
|
|
|
if l == 0:
|
|
return self.mergeError("Empty merging array")
|
|
if isinstance(merge, list) and isinstance(merge[0], dict) and l != 1:
|
|
return self.mergeMultipleFiles()
|
|
else:
|
|
return self.mergeSingleFile()
|
|
|
|
def mergeSingleFile(self) -> dict:
|
|
""" Merges a single file.
|
|
|
|
Returns:
|
|
A dict as specified as in "merge".
|
|
The filename is always "temp"
|
|
"""
|
|
fname = f"temp{self._fileext}"
|
|
r = {fname : {"path": os.path.join(PATH, fname)}}
|
|
code = ""
|
|
loc = 0
|
|
|
|
if isinstance(self.solution.exercise.config["merging"], list):
|
|
sourceElements = self.solution.exercise.config["merging"]
|
|
else:
|
|
sourceElements = self.solution.exercise.config["merging"]["sources"]
|
|
if len(sourceElements) == 0:
|
|
return self.mergeError("Empty merging array")
|
|
|
|
for s in sourceElements:
|
|
for e in self.solution.exercise.elements:
|
|
if s == e["identifier"]:
|
|
r[fname][s] = {}
|
|
if e.get("visible") is not None:
|
|
r[fname][s]["visible"] = e["visible"]
|
|
r[fname][s]["start"] = (loc + 1)
|
|
code += e["value"] or "\n"
|
|
if not code.endswith("\n"):
|
|
code += "\n"
|
|
cnt = (e["value"] or "\n").count("\n")
|
|
loc += cnt
|
|
r[fname][s]["stop"] = loc if cnt != 0 else (loc + 1)
|
|
break
|
|
|
|
fpath = os.path.join(PATH, f"{fname}")
|
|
|
|
with open(fpath, "w+") as f:
|
|
f.write(code)
|
|
os.chmod(fpath, 0o666)
|
|
return 0, r
|
|
|
|
def getMappedItems(self):
|
|
""" Checks elementMap for files not created yet and creates them.
|
|
"""
|
|
if self.solution.exercise.elementMap:
|
|
for m in self.solution.exercise.elementMap:
|
|
mergeInfo = self.solution.exercise.elementMap[m].split(os.sep)
|
|
fpath = mergeInfo[3:-1]
|
|
fname = mergeInfo[-1]
|
|
|
|
if fpath:
|
|
path = os.path.join(PATH, *path, fname)
|
|
if os.path.exists(path):
|
|
continue
|
|
else:
|
|
for element in self.solution.exercise.elements:
|
|
if element["identifier"] == m:
|
|
with open(path, "w") as f:
|
|
f.write(element["value"])
|
|
|
|
|
|
else:
|
|
path = os.path.join(PATH, fname)
|
|
if os.path.exists(path):
|
|
continue
|
|
else:
|
|
for element in self.solution.exercise.elements:
|
|
if element["identifier"] == m:
|
|
with open(path, "w") as f:
|
|
f.write(element["value"])
|
|
|
|
def getFileName(self, mergeDict, cnt):
|
|
""" Retrieves the filenname and path of a file used for compiling,
|
|
checking and running
|
|
|
|
Args:
|
|
mergeDict (dict):
|
|
the dict containing merge informations needed for the current file
|
|
cnt (int):
|
|
an integer counting up each call.
|
|
|
|
Returns:
|
|
Strings representing the filename and the filepath.
|
|
Incremented cnt Integer
|
|
"""
|
|
|
|
mergeID, path = mergeDict.get("mergeID"), None
|
|
|
|
if mergeID:
|
|
if self.solution.exercise.elementMap and \
|
|
mergeID in self.solution.exercise.elementMap:
|
|
mergeInfo = self.solution.exercise.elementMap[mergeID].split(os.sep)
|
|
path = mergeInfo[3:-1]
|
|
# ERROR HANDLING
|
|
if path and not path[0]:
|
|
return self.mergeError("Absolute Paths are not allowed"), 0
|
|
|
|
fname = mergeInfo[-1]
|
|
else:
|
|
fname = mergeID
|
|
else:
|
|
fname = f"temp{cnt}"
|
|
|
|
if self._fileext not in fname and ".h" not in fname:
|
|
return f"{fname}{self._fileext}", path, cnt + 1
|
|
else:
|
|
return fname, path, cnt + 1
|
|
|
|
def mergeMultipleFiles(self) -> dict:
|
|
""" Merges multiple files.
|
|
|
|
Returns:
|
|
A dict as specified as in "merge".
|
|
"""
|
|
i = 1 # used if neither mergeID nor mapping is given
|
|
r = {}
|
|
for m in self.solution.exercise.config["merging"]:
|
|
fname, fpath, i = self.getFileName(m, i)
|
|
if fname == 1:
|
|
return fname, fpath
|
|
|
|
r[fname] = {}
|
|
|
|
if fpath:
|
|
tmp = os.path.join(PATH, *fpath)
|
|
if not os.path.exists(tmp):
|
|
os.makedirs(tmp)
|
|
r[fname]["path"] = os.path.join(tmp, fname)
|
|
else:
|
|
r[fname]["path"] = os.path.join(PATH, fname)
|
|
|
|
loc = 0
|
|
code = ""
|
|
for s in m["sources"]:
|
|
for e in self.solution.exercise.elements:
|
|
if s == e["identifier"]:
|
|
r[fname][s] = {}
|
|
if e.get("visible") is not None:
|
|
r[fname][s]["visible"] = e["visible"]
|
|
r[fname][s]["start"] = (loc + 1)
|
|
code += e["value"]
|
|
if not code.endswith("\n"):
|
|
code += "\n"
|
|
cnt = e["value"].count("\n")
|
|
loc += cnt
|
|
r[fname][s]["stop"] = loc if cnt != 0 else (loc + 1)
|
|
break
|
|
loc += 1
|
|
|
|
with open(r[fname]["path"], "w+") as f:
|
|
f.write(code)
|
|
os.chmod(r[fname]["path"], 0o666)
|
|
return 0, r
|
|
|
|
def getSnippetIdentifier(self, file, line):
|
|
""" Retrieves the code snipped identifier of a given line
|
|
|
|
Args:
|
|
file (str):
|
|
the file containing the identifier
|
|
line (int):
|
|
the line number
|
|
|
|
Returns:
|
|
A string containing the snippet identifier
|
|
"""
|
|
for i in self.fileInfo[file]:
|
|
if i == "path":
|
|
continue
|
|
if line in range(self.fileInfo[file][i]["start"], self.fileInfo[file][i]["stop"] + 1):
|
|
return i
|
|
|
|
def getLoc(self, file, line, join=False):
|
|
""" Retrieves a line of code in a given file
|
|
|
|
Args:
|
|
file (str):
|
|
the filepath to be opened
|
|
line (int):
|
|
the line number of the line we want to return
|
|
join (bool):
|
|
if True, the filepath will be joined with PATH first
|
|
|
|
Returns:
|
|
A String containing the specified line of code.
|
|
"""
|
|
with open(file if not join else os.path.join(PATH, file), "r") as f:
|
|
i = 0
|
|
while i < line - 1:
|
|
f.readline()
|
|
i += 1
|
|
return f.readline()
|
|
|
|
def compile(self):
|
|
""" Compiles all merged source files.
|
|
|
|
Returns:
|
|
An Integer representing the return code of the compiler.
|
|
"""
|
|
# changes current working directory for easier compiling
|
|
cwd = os.getcwd()
|
|
os.chdir(PATH)
|
|
|
|
# compiling command as specified as in exercise
|
|
com = self.solution.exercise.getCompilingCommand().split(" ")
|
|
# path for all source files
|
|
for f in self.fileInfo:
|
|
if ".h" in f:
|
|
continue
|
|
com.append(self.fileInfo[f]["path"])
|
|
# flag to just compile files without linking
|
|
com.append("-c")
|
|
# flag for easier error handling. Requires GCC 9.4
|
|
com.append("-fdiagnostics-format=json")
|
|
|
|
self.result.computation["technicalInfo"]["compileCommand"] = " ".join(com)
|
|
proc = subprocess.run(com, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
|
|
try:
|
|
parsed = json.loads(proc.stderr.decode("utf-8")) if proc.stderr else \
|
|
json.loads(proc.stdout.decode("utf-8")) if proc.stdout else ""
|
|
except json.decoder.JSONDecodeError:
|
|
txt = proc.stderr if proc.stderr else \
|
|
proc.stdout if proc.stdout else ""
|
|
|
|
if txt[0] == b"[":
|
|
tmp = txt.replace(b"\n", b"")
|
|
sliced = tmp[:tmp.rfind(b"]") + 1]
|
|
elif txt[0] == b"{":
|
|
tmp = txt.replace(b"\n", b"")
|
|
sliced = tmp[:tmp.rfind(b"}") + 1]
|
|
else:
|
|
sliced = txt.replace(b"\n", b"")
|
|
txt = txt.decode("utf-8")
|
|
|
|
try:
|
|
parsed = json.loads(sliced)
|
|
except json.decoder.JSONDecodeError:
|
|
parsed = txt
|
|
|
|
if len(parsed) > 0:
|
|
if isinstance(parsed, dict):
|
|
maxState = "info"
|
|
elements = []
|
|
for p in parsed:
|
|
# updating maxState if neccessary
|
|
if p["kind"] == "warning" and maxState == "info":
|
|
maxState = "warning"
|
|
elif p["kind"] == "error" and maxState != "error":
|
|
maxState = "error"
|
|
|
|
# file and line of error
|
|
file = p["locations"][0]["caret"]["file"].split(".")[0]
|
|
line = p["locations"][0]["caret"]["line"]
|
|
|
|
# calculating the line
|
|
snippet = self.getSnippetIdentifier(file, line)
|
|
|
|
# dict specifying the current error/warning/info and source
|
|
e = {
|
|
"severity" : p["kind"],
|
|
"type" : "compiler",
|
|
"message" : p["message"],
|
|
"source" : {
|
|
"elementID" : snippet,
|
|
"extract" : self.getLoc(f"{file}{self._fileext}", line, join=True),
|
|
"begin" : self.fileInfo[file][snippet]["start"],
|
|
"end" : self.fileInfo[file][snippet]["stop"],
|
|
"line" : line - self.fileInfo[file][snippet]["start"],
|
|
"col" : p["locations"][0]["caret"]["column"]
|
|
}
|
|
}
|
|
elements.append(e)
|
|
|
|
self.result.computation["userInfo"]["summary"] = f"[{maxState.upper()}]"
|
|
self.result.computation["userInfo"]["elements"] += elements
|
|
elif isinstance(parsed, str):
|
|
maxState = None
|
|
if "error" in parsed:
|
|
maxState = "ERROR"
|
|
elif "warning" in parsed:
|
|
maxState = "WARNING"
|
|
elif "info" in parsed:
|
|
maxState = "INFO"
|
|
|
|
if maxState:
|
|
self.result.computation["userInfo"]["summary"] = f"[{maxState}] - could not parse output"
|
|
self.result.computation["userInfo"]["elements"].append({
|
|
"severity": maxState,
|
|
"type": "compiler",
|
|
"message": f"Could not parse output:\n{parsed}"
|
|
})
|
|
else: # list
|
|
self.result.computation["userInfo"]["elements"] += parsed
|
|
|
|
# adds compiling output to "elements" in result object
|
|
data = {
|
|
"MIMEtype":"text/plain",
|
|
"identifier":f"{self.result.id} Compiling",
|
|
"value" : parsed
|
|
}
|
|
self.result.elements.append(data)
|
|
os.chdir(cwd)
|
|
return proc.returncode
|
|
|
|
def check(self):
|
|
""" Checks all merged source files.
|
|
Checking after compiling to reduce effort. It's unnecessary to check if compiling fails.
|
|
|
|
Returns:
|
|
An Integer representing the final state of checking:
|
|
0 - Checking passed
|
|
1 - Checking failed
|
|
"""
|
|
checkConfig = self.solution.exercise.config.get("checking")
|
|
if checkConfig is None:
|
|
return 0
|
|
|
|
returncode = 0
|
|
forbidden = checkConfig["forbiddenCalls"].split(" ")
|
|
checker = Checker(self.fileInfo)
|
|
for a in checker.asts:
|
|
checker.getFunctions(checker.asts[a])
|
|
|
|
elements = []
|
|
|
|
for file in checker.visitor.data:
|
|
f = file.split(os.sep)[-1]
|
|
for func in checker.visitor.data[file]:
|
|
for i in checker.visitor.data[file][func]:
|
|
cur = checker.visitor.data[file][func][i]
|
|
id = self.getSnippetIdentifier(f, cur["Line"])
|
|
if id in checkConfig["sources"] and cur["FuncCall"] in forbidden:
|
|
|
|
line = cur["Line"] - self.fileInfo[f][id]["start"]
|
|
|
|
e = {
|
|
"severity": "error",
|
|
"type": "callcheck",
|
|
"message": f"[C function filtering] Function call not allowed:\n\'"
|
|
f"{cur['FuncCall']}\';original source: f'{id}', line "
|
|
f"(corrected): {line}, " \
|
|
f"col: {cur['Column']}\nForbidden calls:\nsystem.\n",
|
|
|
|
"source": {
|
|
"elementID": id,
|
|
"extract": self.getLoc(file, line),
|
|
"begin": self.fileInfo[f][id]["start"],
|
|
"end": self.fileInfo[f][id]["stop"],
|
|
"line": line,
|
|
"col": cur["Column"]
|
|
}
|
|
}
|
|
elements.append(e)
|
|
|
|
if returncode == 0:
|
|
returncode = 1
|
|
|
|
if len(elements) != 0:
|
|
if "elements" not in self.result.computation["userInfo"]:
|
|
self.result.computation["userInfo"]["elements"] = elements
|
|
else:
|
|
self.result.computation["userInfo"]["elements"] += elements
|
|
|
|
if "summary" not in self.result.computation["userInfo"]:
|
|
self.result.computation["userInfo"]["summary"] = "[ERROR]"
|
|
elif "ERROR" not in self.result.computation["userInfo"]["summary"]:
|
|
self.result.computation["userInfo"]["summary"] = "[ERROR]"
|
|
|
|
data = {
|
|
"MIMEtype":"text/plain",
|
|
"identifier":f"{self.result.id} Checking",
|
|
"value" : elements
|
|
}
|
|
self.result.elements.append(data)
|
|
return returncode
|
|
|
|
def link(self):
|
|
""" Links compiled files and libraries.
|
|
|
|
Returns:
|
|
An Integer representing the return code of the compiler.
|
|
"""
|
|
com = ["gcc" if self._lang == "C" else "g++", "-o", f"{os.path.join(PATH, 'out')}"]
|
|
for f in self.fileInfo:
|
|
if ".h" in f:
|
|
continue
|
|
com.append(f"{os.path.join(PATH, f)[:-len(self._fileext)]}.o")
|
|
|
|
flags = self.solution.exercise.config["linking"].get("flags")
|
|
if flags:
|
|
com.append(flags)
|
|
|
|
self.result.computation["technicalInfo"]["linkCommand"] = " ".join(com)
|
|
proc = subprocess.run(com, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
|
|
data = {
|
|
"MIMEtype":"text/plain",
|
|
"identifier":f"{self.result.id} Linking",
|
|
"value" : proc.stdout.decode("utf-8")
|
|
}
|
|
self.result.elements.append(data)
|
|
return proc.returncode
|
|
|
|
def run(self):
|
|
""" Makes file executable and runs it.
|
|
|
|
Returns:
|
|
An Integer representing the return code of the program.
|
|
"""
|
|
os.chmod(os.path.join(PATH, "out"), 0o700)
|
|
com = [f"{os.path.join(PATH, 'out')}"]
|
|
cmdLineArgs = self.solution.exercise.config["running"].get("commandLineArguments")
|
|
if cmdLineArgs is not None:
|
|
com.extend(cmdLineArgs.split())
|
|
|
|
# Time Limit of running process
|
|
timelimit = self.solution.exercise.config["running"].get("timelimitInSeconds")
|
|
cfglimit = self.cfg.get("timelimitInSeconds")
|
|
if not timelimit:
|
|
timelimit = cfglimit # is now either None or int
|
|
elif cfglimit:
|
|
timelimit = min(timelimit, cfglimit)
|
|
|
|
self.result.computation["technicalInfo"]["runCommand"] = "".join(com)
|
|
|
|
proc = subprocess.Popen(com, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
|
|
preexec_fn=os.setsid, shell=False)
|
|
try:
|
|
stdout, stderr = proc.communicate(timeout=timelimit)
|
|
text = ""
|
|
except subprocess.TimeoutExpired as e:
|
|
os.killpg(os.getpgid(proc.pid), signal.SIGKILL)
|
|
stdout, stderr, text = "", "", f"Runtime failed! Timeout after {e.timeout} seconds"
|
|
self.result.computation["userInfo"]["summary"] = "Runtime failed! Exit code: 1"
|
|
|
|
data = [{
|
|
"MIMEtype":"text/plain",
|
|
"identifier":f"{self.result.id} Running",
|
|
"value" : text
|
|
},
|
|
{
|
|
"MIMEtype":"text/plain",
|
|
"identifier":f"{self.result.id} Running stdout",
|
|
"value" : stdout#.decode("utf-8")
|
|
},
|
|
{
|
|
"MIMEtype":"text/plain",
|
|
"identifier":f"{self.result.id} Running stderr",
|
|
"value" : stderr#.decode("utf-8")
|
|
}]
|
|
for d in data:
|
|
self.result.elements.append(d)
|
|
return proc.returncode
|
|
|
|
class Checker:
|
|
""" Class for generating Abstract Syntax Trees (AST) of source files
|
|
and retrieving informations about function calls.
|
|
|
|
Attributes:
|
|
asts (dict):
|
|
A dict containing one entry for each merged source file
|
|
- key: filename (without extension)
|
|
- value: AST of source file
|
|
"""
|
|
|
|
def __init__(self, files: dict):
|
|
""" Constructor
|
|
|
|
Args:
|
|
files (dict):
|
|
A dict generated by the "merge" function in class "C"
|
|
"""
|
|
self._files = files
|
|
self.asts = self.getAsts()
|
|
self.visitor = self.Visitor()
|
|
|
|
class Visitor(c_ast.NodeVisitor):
|
|
""" Internal Class for visiting nodes in an AST.
|
|
"""
|
|
def __init__(self):
|
|
self.data = {}
|
|
|
|
def visit_FuncDef(self, node):
|
|
""" Finds and prints all found function calls in a function
|
|
"""
|
|
if node.decl.coord.file not in self.data:
|
|
self.data[node.decl.coord.file] = {}
|
|
self.data[node.decl.coord.file][node.decl.name] = {}
|
|
i = 0
|
|
for n in node.body.block_items:
|
|
if isinstance(n, c_ast.FuncCall):
|
|
self.data[node.decl.coord.file][node.decl.name][str(i)] = {
|
|
"FuncCall" : n.name.name,
|
|
"Line" : n.coord.line,
|
|
"Column" : n.coord.column
|
|
}
|
|
i += 1
|
|
|
|
def getAst(self, filename) -> c_ast.FileAST:
|
|
""" Generates an AST from given source file
|
|
|
|
Args:
|
|
filename (str):
|
|
The name of the source file to generate an AST for
|
|
"""
|
|
fake_libc_include = os.path.join(os.path.dirname(os.path.abspath(sys.argv[0])),
|
|
'utils', 'fake_libc_include')
|
|
return parse_file(filename, use_cpp=True, cpp_path="gcc",
|
|
cpp_args=["-E", f"-I{fake_libc_include}"])
|
|
|
|
def getAsts(self) -> dict:
|
|
""" Generates one AST for each merged source file
|
|
|
|
Returns:
|
|
A dict containing one (key, value) pair for each source file.
|
|
- key: filename (without extension)
|
|
- value: AST for the corresponding file
|
|
"""
|
|
asts = {}
|
|
for f in self._files:
|
|
asts[f] = self.getAst(self._files[f]["path"])
|
|
return asts
|
|
|
|
def getFunctions(self, ast: c_ast.FileAST):
|
|
""" Iterates over the given AST and visit nodes as specified as in Visitor class
|
|
Args:
|
|
ast:
|
|
An AST representing a source file.
|
|
"""
|
|
self.visitor.visit(ast) |