Python: CG trace: Make code modular

This commit is contained in:
Rasmus Wriedt Larsen
2020-07-17 14:30:12 +02:00
parent 94a03d73a3
commit 0a0c24f3c5
11 changed files with 228 additions and 152 deletions

View File

@@ -0,0 +1,7 @@
# Example DB
cg-trace-example-db/
# Artifact from building `pip install -e .`
cg_trace.egg-info/
venv/

View File

@@ -0,0 +1,6 @@
[settings]
multi_line_output = 3
include_trailing_comma = True
force_grid_wrap = 0
use_parentheses = True
line_length = 88

View File

@@ -14,10 +14,40 @@ The next hurdle is being able to handle multiple calls on the same line, such as
## How do I give it a spin?
Run the `recreate-db.sh` script to create the database `cg-trace-example-db`. Then run the queries inside the `ql/` directory.
After following setup instructions below, run the `recreate-db.sh` script to create the database `cg-trace-example-db`. Then run the queries inside the `ql/` directory.
## Limitations
## Setup
1. Ensure you have at least Python 3.6
2. Create virtual environment `python3 -m venv venv` and activate it
3. Install dependencies `pip install -r --upgrade requirements.txt`
4. Install this codebase as an editable package `pip install -e .`
5. Setup your editor. If you're using VS Code, create a new project for this folder, and
use these settings for correct autoformatting of code on save:
```
{
"python.pythonPath": "venv/bin/python",
"python.linting.enabled": true,
"python.linting.flake8Enabled": true,
"python.formatting.provider": "black",
"editor.formatOnSave": true,
"[python]": {
"editor.codeActionsOnSave": {
"source.organizeImports": true
}
}
}
```
6. Enjoy writing code, and being able to run `cg-trace` on your command line :tada:
## Tracing Limitations
### Code that uses `sys.setprofile`

View File

@@ -0,0 +1,15 @@
import sys
__version__ = "0.0.1"
# Since the virtual machine opcodes changed in 3.6, not going to attempt to support
# anything before that
MIN_PYTHON_VERSION = (3, 6)
MIN_PYTHON_VERSION_FORMATTED = ".".join(str(i) for i in MIN_PYTHON_VERSION)
if not sys.version_info[:2] >= MIN_PYTHON_VERSION:
sys.exit(
"You need at least Python {} to use 'cg_trace'".format(
MIN_PYTHON_VERSION_FORMATTED
)
)

View File

@@ -0,0 +1,5 @@
import sys
from cg_trace.main import main
sys.exit(main())

View File

@@ -0,0 +1,15 @@
import argparse
def parse(args):
parser = argparse.ArgumentParser()
parser.add_argument("--csv")
parser.add_argument("--xml")
parser.add_argument("progname", help="file to run as main program")
parser.add_argument(
"arguments", nargs=argparse.REMAINDER, help="arguments to the program"
)
return parser.parse_args(args)

View File

@@ -0,0 +1,57 @@
import csv
import dataclasses
from lxml import etree
class Exporter:
@staticmethod
def export(recorded_calls, outfile_path):
raise NotImplementedError()
@staticmethod
def dataclass_to_dict(obj):
d = dataclasses.asdict(obj)
prefix = obj.__class__.__name__.lower()
return {f"{prefix}_{key}": val for (key, val) in d.items()}
class CSVExporter(Exporter):
@staticmethod
def export(recorded_calls, outfile_path):
with open(outfile_path, "w", newline="") as csv_file:
writer = None
for (call, callee) in sorted(recorded_calls):
data = {
**Exporter.dataclass_to_dict(call),
**Exporter.dataclass_to_dict(callee),
}
if writer is None:
writer = csv.DictWriter(csv_file, fieldnames=data.keys())
writer.writeheader()
writer.writerow(data)
print(f"output written to {outfile_path}")
class XMLExporter(Exporter):
@staticmethod
def export(recorded_calls, outfile_path):
root = etree.Element("root")
for (call, callee) in sorted(recorded_calls):
data = {
**Exporter.dataclass_to_dict(call),
**Exporter.dataclass_to_dict(callee),
}
rc = etree.SubElement(root, "recorded_call")
for k, v in data.items():
# xml library only supports serializing attributes that have string values
rc.set(k, str(v))
tree = etree.ElementTree(root)
tree.write(outfile_path, encoding="utf-8", pretty_print=True)

View File

@@ -0,0 +1,64 @@
import os
import sys
from io import StringIO
from cg_trace import cmdline, tracer
from cg_trace.exporter import CSVExporter, XMLExporter
def record_calls(code, globals):
real_stdout = sys.stdout
real_stderr = sys.stderr
captured_stdout = StringIO()
captured_stderr = StringIO()
sys.stdout = captured_stdout
sys.stderr = captured_stderr
cgt = tracer.CallGraphTracer()
cgt.run(code, globals, globals)
sys.stdout = real_stdout
sys.stderr = real_stderr
return sorted(cgt.recorded_calls), captured_stdout, captured_stderr
def main(args=None) -> int:
if args is None:
# first element in argv is program name
args = sys.argv[1:]
opts = cmdline.parse(args)
# These details of setting up the program to be run is very much inspired by `trace`
# from the standard library
sys.argv = [opts.progname, *opts.arguments]
sys.path[0] = os.path.dirname(opts.progname)
with open(opts.progname) as fp:
code = compile(fp.read(), opts.progname, "exec")
# try to emulate __main__ namespace as much as possible
globs = {
"__file__": opts.progname,
"__name__": "__main__",
"__package__": None,
"__cached__": None,
}
recorded_calls, captured_stdout, captured_stderr = record_calls(code, globs)
if opts.csv:
CSVExporter.export(recorded_calls, opts.csv)
elif opts.xml:
XMLExporter.export(recorded_calls, opts.xml)
else:
for (call, callee) in recorded_calls:
print(f"{call} --> {callee}")
print("--- captured stdout ---")
print(captured_stdout.getvalue(), end="")
print("--- captured stderr ---")
print(captured_stderr.getvalue(), end="")
return 0

View File

@@ -1,33 +1,7 @@
#!/usr/bin/env python3
"""Call Graph tracing.
Execute a python program and for each call being made, record the call and callee. This
allows us to compare call graph resolution from static analysis with actual data -- that
is, can we statically determine the target of each actual call correctly.
If there is 100% code coverage from the Python execution, it would also be possible to
look at the precision of the call graph resolutions -- that is, do we expect a function to
be able to be called in a place where it is not? Currently not something we're looking at.
"""
# read: https://eli.thegreenplace.net/2012/03/23/python-internals-how-callables-work/
# TODO: Know that a call to a C-function was made. See
# https://docs.python.org/3/library/bdb.html#bdb.Bdb.trace_dispatch. Maybe use `lxml` as
# test
# For inspiration, look at these projects:
# - https://github.com/joerick/pyinstrument (capture call-stack every <n> ms for profiling)
# - https://github.com/gak/pycallgraph (display call-graph with graphviz after python execution)
import argparse
from io import StringIO
import sys
import os
import dis
import dataclasses
import csv
from lxml import etree
import dis
import os
import sys
from typing import Optional
# copy-paste For interactive ipython sessions
@@ -126,9 +100,6 @@ class ExternalCallee(Callee):
@classmethod
def from_arg(cls, func):
# if func.__name__ == "append":
# import IPython; sys.stdout = sys.__stdout__; IPython.embed(); sys.exit()
return cls(
module=func.__module__,
qualname=func.__qualname__,
@@ -192,7 +163,7 @@ class CallGraphTracer:
self.exec_call_seen = False
self.ignore_rest = False
try:
sys.setprofile(cgt.profilefunc)
sys.setprofile(self.profilefunc)
exec(code, globals, locals)
# TODO: exception handling?
finally:
@@ -233,118 +204,3 @@ class CallGraphTracer:
debug_print(f"{call} --> {callee}")
debug_print("\n" * 5)
self.recorded_calls.add((call, callee))
################################################################################
# Export
################################################################################
class Exporter:
@staticmethod
def export(recorded_calls, outfile_path):
raise NotImplementedError()
@staticmethod
def dataclass_to_dict(obj):
d = dataclasses.asdict(obj)
prefix = obj.__class__.__name__.lower()
return {f"{prefix}_{key}": val for (key, val) in d.items()}
class CSVExporter(Exporter):
@staticmethod
def export(recorded_calls, outfile_path):
with open(outfile_path, "w", newline="") as csv_file:
writer = None
for (call, callee) in sorted(recorded_calls):
data = {
**Exporter.dataclass_to_dict(call),
**Exporter.dataclass_to_dict(callee),
}
if writer is None:
writer = csv.DictWriter(csv_file, fieldnames=data.keys())
writer.writeheader()
writer.writerow(data)
print(f"output written to {outfile_path}")
# embed(); sys.exit()
class XMLExporter(Exporter):
@staticmethod
def export(recorded_calls, outfile_path):
root = etree.Element("root")
for (call, callee) in sorted(recorded_calls):
data = {
**Exporter.dataclass_to_dict(call),
**Exporter.dataclass_to_dict(callee),
}
rc = etree.SubElement(root, "recorded_call")
for k, v in data.items():
# xml library only supports serializing attributes that have string values
rc.set(k, str(v))
tree = etree.ElementTree(root)
tree.write(outfile_path, encoding="utf-8", pretty_print=True)
################################################################################
# __main__
################################################################################
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--csv")
parser.add_argument("--xml")
parser.add_argument("progname", help="file to run as main program")
parser.add_argument(
"arguments", nargs=argparse.REMAINDER, help="arguments to the program"
)
opts = parser.parse_args()
# These details of setting up the program to be run is very much inspired by `trace`
# from the standard library
sys.argv = [opts.progname, *opts.arguments]
sys.path[0] = os.path.dirname(opts.progname)
with open(opts.progname) as fp:
code = compile(fp.read(), opts.progname, "exec")
# try to emulate __main__ namespace as much as possible
globs = {
"__file__": opts.progname,
"__name__": "__main__",
"__package__": None,
"__cached__": None,
}
real_stdout = sys.stdout
real_stderr = sys.stderr
captured_stdout = StringIO()
sys.stdout = captured_stdout
cgt = CallGraphTracer()
cgt.run(code, globs, globs)
sys.stdout = real_stdout
if opts.csv:
CSVExporter.export(cgt.recorded_calls, opts.csv)
elif opts.xml:
XMLExporter.export(cgt.recorded_calls, opts.xml)
else:
for (call, callee) in sorted(cgt.recorded_calls):
print(f"{call} --> {callee}")
print("--- captured stdout ---")
print(captured_stdout.getvalue(), end="")

View File

@@ -3,14 +3,19 @@
set -e
set -x
if ! pip show cg_trace; then
echo "You need to follow setup instructions in README"
exit 1
fi
DB="cg-trace-example-db"
SRC="example/"
XMLDIR="example-traces/"
PYTHON_EXTRACTOR=$(codeql resolve extractor --language=python)
./cg_trace.py --xml "$XMLDIR"/simple.xml example/simple.py
./cg_trace.py --xml "$XMLDIR"/builtins.xml example/builtins.py
cg-trace --xml "$XMLDIR"/simple.xml example/simple.py
cg-trace --xml "$XMLDIR"/builtins.xml example/builtins.py
rm -rf "$DB"

View File

@@ -0,0 +1,16 @@
from setuptools import find_packages, setup
from cg_trace import MIN_PYTHON_VERSION_FORMATTED, __version__
# TODO: There was some benefit of structuring your code as `src/yourpackage/code.py`
# instead of `yourpackage/code.py` concerning imports, but I don't recall the details
setup(
name="cg_trace",
version=__version__,
description="Call graph tracing",
packages=find_packages(),
install_requires=["lxml"],
entry_points={"console_scripts": ["cg-trace = cg_trace.main:main"]},
python_requires=">={}".format(MIN_PYTHON_VERSION_FORMATTED),
)