Merge pull request #4797 from RasmusWL/stdlib-http-source-modeling

Python: Model sources from stdlib HTTP servers
This commit is contained in:
yoff
2020-12-15 14:49:32 +01:00
committed by GitHub
5 changed files with 701 additions and 0 deletions

View File

@@ -0,0 +1,3 @@
lgtm,codescanning
* Added modeling of HTTP servers created with `BaseHTTPRequestHandler` from standard library as a source of remote user input (`RemoteFlowSource`).
* Added modeling of HTML form submission with `cgi.FieldStorage` from standard library as a source of remote user input (`RemoteFlowSource`).

View File

@@ -1082,6 +1082,542 @@ private module Stdlib {
override string getFormat() { result = "JSON" }
}
// ---------------------------------------------------------------------------
// cgi
// ---------------------------------------------------------------------------
/** Gets a reference to the `cgi` module. */
private DataFlow::Node cgi(DataFlow::TypeTracker t) {
t.start() and
result = DataFlow::importNode("cgi")
or
exists(DataFlow::TypeTracker t2 | result = cgi(t2).track(t2, t))
}
/** Gets a reference to the `cgi` module. */
DataFlow::Node cgi() { result = cgi(DataFlow::TypeTracker::end()) }
/** Provides models for the `cgi` module. */
module cgi {
/**
* Provides models for the `cgi.FieldStorage` class
*
* See https://docs.python.org/3/library/cgi.html.
*/
module FieldStorage {
/** Gets a reference to the `cgi.FieldStorage` class. */
private DataFlow::Node classRef(DataFlow::TypeTracker t) {
t.startInAttr("FieldStorage") and
result = cgi()
or
exists(DataFlow::TypeTracker t2 | result = classRef(t2).track(t2, t))
}
/** Gets a reference to the `cgi.FieldStorage` class. */
DataFlow::Node classRef() { result = classRef(DataFlow::TypeTracker::end()) }
/**
* A source of an instance of `cgi.FieldStorage`.
*
* This can include instantiation of the class, return value from function
* calls, or a special parameter that will be set when functions are call by external
* library.
*
* Use `FieldStorage::instance()` predicate to get references to instances of `cgi.FieldStorage`.
*/
abstract class InstanceSource extends DataFlow::Node { }
/**
* A direct instantiation of `cgi.FieldStorage`.
*
* We currently consider ALL instantiations to be `RemoteFlowSource`. This seems
* reasonable since it's used to parse form data for incoming POST requests, but
* if it turns out to be a problem, we'll have to refine.
*/
private class ClassInstantiation extends InstanceSource, RemoteFlowSource::Range,
DataFlow::CfgNode {
override CallNode node;
ClassInstantiation() { node.getFunction() = classRef().asCfgNode() }
override string getSourceType() { result = "cgi.FieldStorage" }
}
/** Gets a reference to an instance of `cgi.FieldStorage`. */
private DataFlow::Node instance(DataFlow::TypeTracker t) {
t.start() and
result instanceof InstanceSource
or
exists(DataFlow::TypeTracker t2 | result = instance(t2).track(t2, t))
}
/** Gets a reference to an instance of `cgi.FieldStorage`. */
DataFlow::Node instance() { result = instance(DataFlow::TypeTracker::end()) }
/** Gets a reference to the `getvalue` method on a `cgi.FieldStorage` instance. */
private DataFlow::Node getvalueRef(DataFlow::TypeTracker t) {
t.startInAttr("getvalue") and
result = instance()
or
exists(DataFlow::TypeTracker t2 | result = getvalueRef(t2).track(t2, t))
}
/** Gets a reference to the `getvalue` method on a `cgi.FieldStorage` instance. */
DataFlow::Node getvalueRef() { result = getvalueRef(DataFlow::TypeTracker::end()) }
/** Gets a reference to the result of calling the `getvalue` method on a `cgi.FieldStorage` instance. */
private DataFlow::Node getvalueResult(DataFlow::TypeTracker t) {
t.start() and
result.asCfgNode().(CallNode).getFunction() = getvalueRef().asCfgNode()
or
exists(DataFlow::TypeTracker t2 | result = getvalueResult(t2).track(t2, t))
}
/** Gets a reference to the result of calling the `getvalue` method on a `cgi.FieldStorage` instance. */
DataFlow::Node getvalueResult() { result = getvalueResult(DataFlow::TypeTracker::end()) }
/** Gets a reference to the `getfirst` method on a `cgi.FieldStorage` instance. */
private DataFlow::Node getfirstRef(DataFlow::TypeTracker t) {
t.startInAttr("getfirst") and
result = instance()
or
exists(DataFlow::TypeTracker t2 | result = getfirstRef(t2).track(t2, t))
}
/** Gets a reference to the `getfirst` method on a `cgi.FieldStorage` instance. */
DataFlow::Node getfirstRef() { result = getfirstRef(DataFlow::TypeTracker::end()) }
/** Gets a reference to the result of calling the `getfirst` method on a `cgi.FieldStorage` instance. */
private DataFlow::Node getfirstResult(DataFlow::TypeTracker t) {
t.start() and
result.asCfgNode().(CallNode).getFunction() = getfirstRef().asCfgNode()
or
exists(DataFlow::TypeTracker t2 | result = getfirstResult(t2).track(t2, t))
}
/** Gets a reference to the result of calling the `getfirst` method on a `cgi.FieldStorage` instance. */
DataFlow::Node getfirstResult() { result = getfirstResult(DataFlow::TypeTracker::end()) }
/** Gets a reference to the `getlist` method on a `cgi.FieldStorage` instance. */
private DataFlow::Node getlistRef(DataFlow::TypeTracker t) {
t.startInAttr("getlist") and
result = instance()
or
exists(DataFlow::TypeTracker t2 | result = getlistRef(t2).track(t2, t))
}
/** Gets a reference to the `getlist` method on a `cgi.FieldStorage` instance. */
DataFlow::Node getlistRef() { result = getlistRef(DataFlow::TypeTracker::end()) }
/** Gets a reference to the result of calling the `getlist` method on a `cgi.FieldStorage` instance. */
private DataFlow::Node getlistResult(DataFlow::TypeTracker t) {
t.start() and
result.asCfgNode().(CallNode).getFunction() = getlistRef().asCfgNode()
or
exists(DataFlow::TypeTracker t2 | result = getlistResult(t2).track(t2, t))
}
/** Gets a reference to the result of calling the `getlist` method on a `cgi.FieldStorage` instance. */
DataFlow::Node getlistResult() { result = getlistResult(DataFlow::TypeTracker::end()) }
/** Gets a reference to a list of fields. */
private DataFlow::Node fieldList(DataFlow::TypeTracker t) {
t.start() and
(
result = getlistResult()
or
result = getvalueResult()
or
// TODO: Should have better handling of subscripting
result.asCfgNode().(SubscriptNode).getObject() = instance().asCfgNode()
)
or
exists(DataFlow::TypeTracker t2 | result = fieldList(t2).track(t2, t))
}
/** Gets a reference to a list of fields. */
DataFlow::Node fieldList() { result = fieldList(DataFlow::TypeTracker::end()) }
/** Gets a reference to a field. */
private DataFlow::Node field(DataFlow::TypeTracker t) {
t.start() and
(
result = getfirstResult()
or
result = getvalueResult()
or
// TODO: Should have better handling of subscripting
result.asCfgNode().(SubscriptNode).getObject() = [instance(), fieldList()].asCfgNode()
)
or
exists(DataFlow::TypeTracker t2 | result = field(t2).track(t2, t))
}
/** Gets a reference to a field. */
DataFlow::Node field() { result = field(DataFlow::TypeTracker::end()) }
private class AdditionalTaintStep extends TaintTracking::AdditionalTaintStep {
override predicate step(DataFlow::Node nodeFrom, DataFlow::Node nodeTo) {
// Methods
nodeFrom = nodeTo.(DataFlow::AttrRead).getObject() and
nodeFrom = instance() and
nodeTo in [getvalueRef(), getfirstRef(), getlistRef()]
or
nodeFrom.asCfgNode() = nodeTo.asCfgNode().(CallNode).getFunction() and
(
nodeFrom = getvalueRef() and nodeTo = getvalueResult()
or
nodeFrom = getfirstRef() and nodeTo = getfirstResult()
or
nodeFrom = getlistRef() and nodeTo = getlistResult()
)
or
// Indexing
nodeFrom in [instance(), fieldList()] and
nodeTo.asCfgNode().(SubscriptNode).getObject() = nodeFrom.asCfgNode()
or
// Attributes on Field
nodeFrom = field() and
exists(DataFlow::AttrRead read | nodeTo = read and read.getObject() = nodeFrom |
read.getAttributeName() in ["value", "file", "filename"]
)
}
}
}
}
// ---------------------------------------------------------------------------
// BaseHTTPServer (Python 2 only)
// ---------------------------------------------------------------------------
/** Gets a reference to the `BaseHTTPServer` module. */
private DataFlow::Node baseHTTPServer(DataFlow::TypeTracker t) {
t.start() and
result = DataFlow::importNode("BaseHTTPServer")
or
exists(DataFlow::TypeTracker t2 | result = baseHTTPServer(t2).track(t2, t))
}
/** Gets a reference to the `BaseHTTPServer` module. */
DataFlow::Node baseHTTPServer() { result = baseHTTPServer(DataFlow::TypeTracker::end()) }
/** Provides models for the `BaseHTTPServer` module. */
module BaseHTTPServer {
/**
* Provides models for the `BaseHTTPServer.BaseHTTPRequestHandler` class (Python 2 only).
*/
module BaseHTTPRequestHandler {
/** Gets a reference to the `BaseHTTPServer.BaseHTTPRequestHandler` class. */
private DataFlow::Node classRef(DataFlow::TypeTracker t) {
t.start() and
result = DataFlow::importNode("BaseHTTPServer" + "." + "BaseHTTPRequestHandler")
or
t.startInAttr("BaseHTTPRequestHandler") and
result = baseHTTPServer()
or
exists(DataFlow::TypeTracker t2 | result = classRef(t2).track(t2, t))
}
/** Gets a reference to the `BaseHTTPServer.BaseHTTPRequestHandler` class. */
DataFlow::Node classRef() { result = classRef(DataFlow::TypeTracker::end()) }
}
}
// ---------------------------------------------------------------------------
// SimpleHTTPServer (Python 2 only)
// ---------------------------------------------------------------------------
/** Gets a reference to the `SimpleHTTPServer` module. */
private DataFlow::Node simpleHTTPServer(DataFlow::TypeTracker t) {
t.start() and
result = DataFlow::importNode("SimpleHTTPServer")
or
exists(DataFlow::TypeTracker t2 | result = simpleHTTPServer(t2).track(t2, t))
}
/** Gets a reference to the `SimpleHTTPServer` module. */
DataFlow::Node simpleHTTPServer() { result = simpleHTTPServer(DataFlow::TypeTracker::end()) }
/** Provides models for the `SimpleHTTPServer` module. */
module SimpleHTTPServer {
/**
* Provides models for the `SimpleHTTPServer.SimpleHTTPRequestHandler` class (Python 2 only).
*/
module SimpleHTTPRequestHandler {
/** Gets a reference to the `SimpleHTTPServer.SimpleHTTPRequestHandler` class. */
private DataFlow::Node classRef(DataFlow::TypeTracker t) {
t.start() and
result = DataFlow::importNode("SimpleHTTPServer" + "." + "SimpleHTTPRequestHandler")
or
t.startInAttr("SimpleHTTPRequestHandler") and
result = simpleHTTPServer()
or
exists(DataFlow::TypeTracker t2 | result = classRef(t2).track(t2, t))
}
/** Gets a reference to the `SimpleHTTPServer.SimpleHTTPRequestHandler` class. */
DataFlow::Node classRef() { result = classRef(DataFlow::TypeTracker::end()) }
}
}
// ---------------------------------------------------------------------------
// CGIHTTPServer (Python 2 only)
// ---------------------------------------------------------------------------
/** Gets a reference to the `CGIHTTPServer` module. */
private DataFlow::Node cgiHTTPServer(DataFlow::TypeTracker t) {
t.start() and
result = DataFlow::importNode("CGIHTTPServer")
or
exists(DataFlow::TypeTracker t2 | result = cgiHTTPServer(t2).track(t2, t))
}
/** Gets a reference to the `CGIHTTPServer` module. */
DataFlow::Node cgiHTTPServer() { result = cgiHTTPServer(DataFlow::TypeTracker::end()) }
/** Provides models for the `CGIHTTPServer` module. */
module CGIHTTPServer {
/**
* Provides models for the `CGIHTTPServer.CGIHTTPRequestHandler` class (Python 2 only).
*/
module CGIHTTPRequestHandler {
/** Gets a reference to the `CGIHTTPServer.CGIHTTPRequestHandler` class. */
private DataFlow::Node classRef(DataFlow::TypeTracker t) {
t.start() and
result = DataFlow::importNode("CGIHTTPServer" + "." + "CGIHTTPRequestHandler")
or
t.startInAttr("CGIHTTPRequestHandler") and
result = cgiHTTPServer()
or
exists(DataFlow::TypeTracker t2 | result = classRef(t2).track(t2, t))
}
/** Gets a reference to the `CGIHTTPServer.CGIHTTPRequestHandler` class. */
DataFlow::Node classRef() { result = classRef(DataFlow::TypeTracker::end()) }
}
}
// ---------------------------------------------------------------------------
// http (Python 3 only)
// ---------------------------------------------------------------------------
/** Gets a reference to the `http` module. */
private DataFlow::Node http(DataFlow::TypeTracker t) {
t.start() and
result = DataFlow::importNode("http")
or
exists(DataFlow::TypeTracker t2 | result = http(t2).track(t2, t))
}
/** Gets a reference to the `http` module. */
DataFlow::Node http() { result = http(DataFlow::TypeTracker::end()) }
/**
* Gets a reference to the attribute `attr_name` of the `http` module.
* WARNING: Only holds for a few predefined attributes.
*/
private DataFlow::Node http_attr(DataFlow::TypeTracker t, string attr_name) {
attr_name in ["server"] and
(
t.start() and
result = DataFlow::importNode("http" + "." + attr_name)
or
t.startInAttr(attr_name) and
result = http()
)
or
// Due to bad performance when using normal setup with `http_attr(t2, attr_name).track(t2, t)`
// we have inlined that code and forced a join
exists(DataFlow::TypeTracker t2 |
exists(DataFlow::StepSummary summary |
http_attr_first_join(t2, attr_name, result, summary) and
t = t2.append(summary)
)
)
}
pragma[nomagic]
private predicate http_attr_first_join(
DataFlow::TypeTracker t2, string attr_name, DataFlow::Node res, DataFlow::StepSummary summary
) {
DataFlow::StepSummary::step(http_attr(t2, attr_name), res, summary)
}
/**
* Gets a reference to the attribute `attr_name` of the `http` module.
* WARNING: Only holds for a few predefined attributes.
*/
private DataFlow::Node http_attr(string attr_name) {
result = http_attr(DataFlow::TypeTracker::end(), attr_name)
}
/** Provides models for the `http` module. */
module http {
// -------------------------------------------------------------------------
// http.server
// -------------------------------------------------------------------------
/** Gets a reference to the `http.server` module. */
DataFlow::Node server() { result = http_attr("server") }
/** Provides models for the `http.server` module */
module server {
/**
* Gets a reference to the attribute `attr_name` of the `http.server` module.
* WARNING: Only holds for a few predefined attributes.
*/
private DataFlow::Node server_attr(DataFlow::TypeTracker t, string attr_name) {
attr_name in ["BaseHTTPRequestHandler", "SimpleHTTPRequestHandler", "CGIHTTPRequestHandler"] and
(
t.start() and
result = DataFlow::importNode("http.server" + "." + attr_name)
or
t.startInAttr(attr_name) and
result = server()
)
or
// Due to bad performance when using normal setup with `server_attr(t2, attr_name).track(t2, t)`
// we have inlined that code and forced a join
exists(DataFlow::TypeTracker t2 |
exists(DataFlow::StepSummary summary |
server_attr_first_join(t2, attr_name, result, summary) and
t = t2.append(summary)
)
)
}
pragma[nomagic]
private predicate server_attr_first_join(
DataFlow::TypeTracker t2, string attr_name, DataFlow::Node res,
DataFlow::StepSummary summary
) {
DataFlow::StepSummary::step(server_attr(t2, attr_name), res, summary)
}
/**
* Gets a reference to the attribute `attr_name` of the `http.server` module.
* WARNING: Only holds for a few predefined attributes.
*/
private DataFlow::Node server_attr(string attr_name) {
result = server_attr(DataFlow::TypeTracker::end(), attr_name)
}
/**
* Provides models for the `http.server.BaseHTTPRequestHandler` class (Python 3 only).
*
* See https://docs.python.org/3.9/library/http.server.html#http.server.BaseHTTPRequestHandler.
*/
module BaseHTTPRequestHandler {
/** Gets a reference to the `http.server.BaseHTTPRequestHandler` class. */
DataFlow::Node classRef() { result = server_attr("BaseHTTPRequestHandler") }
}
/**
* Provides models for the `http.server.SimpleHTTPRequestHandler` class (Python 3 only).
*
* See https://docs.python.org/3.9/library/http.server.html#http.server.SimpleHTTPRequestHandler.
*/
module SimpleHTTPRequestHandler {
/** Gets a reference to the `http.server.SimpleHTTPRequestHandler` class. */
DataFlow::Node classRef() { result = server_attr("SimpleHTTPRequestHandler") }
}
/**
* Provides models for the `http.server.CGIHTTPRequestHandler` class (Python 3 only).
*
* See https://docs.python.org/3.9/library/http.server.html#http.server.CGIHTTPRequestHandler.
*/
module CGIHTTPRequestHandler {
/** Gets a reference to the `http.server.CGIHTTPRequestHandler` class. */
DataFlow::Node classRef() { result = server_attr("CGIHTTPRequestHandler") }
}
}
}
/**
* Provides models for the `BaseHTTPRequestHandler` class and subclasses.
*
* See
* - https://docs.python.org/3.9/library/http.server.html#http.server.BaseHTTPRequestHandler
* - https://docs.python.org/2.7/library/basehttpserver.html#BaseHTTPServer.BaseHTTPRequestHandler
*/
private module HTTPRequestHandler {
/** Gets a reference to the `BaseHTTPRequestHandler` class or any subclass. */
private DataFlow::Node subclassRef(DataFlow::TypeTracker t) {
// Python 2
t.start() and
result in [
BaseHTTPServer::BaseHTTPRequestHandler::classRef(),
SimpleHTTPServer::SimpleHTTPRequestHandler::classRef(),
CGIHTTPServer::CGIHTTPRequestHandler::classRef()
]
or
// Python 3
t.start() and
result in [
http::server::BaseHTTPRequestHandler::classRef(),
http::server::SimpleHTTPRequestHandler::classRef(),
http::server::CGIHTTPRequestHandler::classRef()
]
or
// subclasses in project code
result.asExpr().(ClassExpr).getABase() = subclassRef(t.continue()).asExpr()
or
exists(DataFlow::TypeTracker t2 | result = subclassRef(t2).track(t2, t))
}
/** Gets a reference to the `BaseHTTPRequestHandler` class or any subclass. */
DataFlow::Node subclassRef() { result = subclassRef(DataFlow::TypeTracker::end()) }
/** A HTTPRequestHandler class definition (most likely in project code). */
class HTTPRequestHandlerClassDef extends Class {
HTTPRequestHandlerClassDef() { this.getParent() = subclassRef().asExpr() }
}
/**
* A source of an instance of the `BaseHTTPRequestHandler` class or any subclass.
*
* This can include instantiation of the class, return value from function
* calls, or a special parameter that will be set when functions are call by external
* library.
*
* Use `classname::instance()` predicate to get references to instances of the `BaseHTTPRequestHandler` class or any subclass.
*/
abstract class InstanceSource extends DataFlow::Node { }
/** The `self` parameter in a method on the `BaseHTTPRequestHandler` class or any subclass. */
private class SelfParam extends InstanceSource, RemoteFlowSource::Range, DataFlow::ParameterNode {
SelfParam() {
exists(HTTPRequestHandlerClassDef cls | cls.getAMethod().getArg(0) = this.getParameter())
}
override string getSourceType() { result = "stdlib HTTPRequestHandler" }
}
/** Gets a reference to an instance of the `BaseHTTPRequestHandler` class or any subclass. */
private DataFlow::Node instance(DataFlow::TypeTracker t) {
t.start() and
result instanceof InstanceSource
or
exists(DataFlow::TypeTracker t2 | result = instance(t2).track(t2, t))
}
/** Gets a reference to an instance of the `BaseHTTPRequestHandler` class or any subclass. */
DataFlow::Node instance() { result = instance(DataFlow::TypeTracker::end()) }
private class AdditionalTaintStep extends TaintTracking::AdditionalTaintStep {
override predicate step(DataFlow::Node nodeFrom, DataFlow::Node nodeTo) {
nodeFrom = instance() and
exists(DataFlow::AttrRead read | nodeTo = read and read.getObject() = nodeFrom |
read.getAttributeName() in [
// str
"requestline", "path",
// by default dict-like http.client.HTTPMessage, which is a subclass of email.message.Message
// see https://docs.python.org/3.9/library/email.compat32-message.html#email.message.Message
// TODO: Implement custom methods (at least `get_all`, `as_bytes`, `as_string`)
"headers",
// file-like
"rfile"
]
)
}
}
}
// ---------------------------------------------------------------------------
// sqlite3
// ---------------------------------------------------------------------------

View File

@@ -2,3 +2,36 @@
| CodeExecution.py:36 | ok | test_additional_taint | cmd1 |
| CodeExecution.py:37 | ok | test_additional_taint | cmd2 |
| CodeExecution.py:38 | ok | test_additional_taint | cmd3 |
| http_server.py:22 | ok | test_cgi_FieldStorage_taint | form |
| http_server.py:24 | ok | test_cgi_FieldStorage_taint | form['key'] |
| http_server.py:25 | ok | test_cgi_FieldStorage_taint | form['key'].value |
| http_server.py:26 | ok | test_cgi_FieldStorage_taint | form['key'].file |
| http_server.py:27 | ok | test_cgi_FieldStorage_taint | form['key'].filename |
| http_server.py:28 | ok | test_cgi_FieldStorage_taint | form['key'][0] |
| http_server.py:29 | ok | test_cgi_FieldStorage_taint | form['key'][0].value |
| http_server.py:30 | ok | test_cgi_FieldStorage_taint | form['key'][0].file |
| http_server.py:31 | ok | test_cgi_FieldStorage_taint | form['key'][0].filename |
| http_server.py:32 | fail | test_cgi_FieldStorage_taint | ListComp |
| http_server.py:34 | ok | test_cgi_FieldStorage_taint | form.getvalue(..) |
| http_server.py:35 | ok | test_cgi_FieldStorage_taint | form.getvalue(..)[0] |
| http_server.py:37 | ok | test_cgi_FieldStorage_taint | form.getfirst(..) |
| http_server.py:39 | ok | test_cgi_FieldStorage_taint | form.getlist(..) |
| http_server.py:40 | ok | test_cgi_FieldStorage_taint | form.getlist(..)[0] |
| http_server.py:41 | fail | test_cgi_FieldStorage_taint | ListComp |
| http_server.py:50 | ok | taint_sources | self |
| http_server.py:52 | ok | taint_sources | self.requestline |
| http_server.py:54 | ok | taint_sources | self.path |
| http_server.py:56 | ok | taint_sources | self.headers |
| http_server.py:57 | ok | taint_sources | self.headers['Foo'] |
| http_server.py:58 | ok | taint_sources | self.headers.get(..) |
| http_server.py:59 | fail | taint_sources | self.headers.get_all(..) |
| http_server.py:60 | fail | taint_sources | self.headers.keys() |
| http_server.py:61 | ok | taint_sources | self.headers.values() |
| http_server.py:62 | ok | taint_sources | self.headers.items() |
| http_server.py:63 | fail | taint_sources | self.headers.as_bytes() |
| http_server.py:64 | fail | taint_sources | self.headers.as_string() |
| http_server.py:65 | ok | taint_sources | str(..) |
| http_server.py:66 | ok | taint_sources | bytes(..) |
| http_server.py:68 | ok | taint_sources | self.rfile |
| http_server.py:69 | fail | taint_sources | self.rfile.read() |
| http_server.py:78 | ok | taint_sources | form |

View File

@@ -1,2 +1,9 @@
import experimental.dataflow.tainttracking.TestTaintLib
import semmle.python.dataflow.new.RemoteFlowSources
class WithRemoteFlowSources extends TestTaintTrackingConfiguration {
override predicate isSource(DataFlow::Node source) {
super.isSource(source) or
source instanceof RemoteFlowSource
}
}

View File

@@ -0,0 +1,122 @@
import sys
import os
import cgi
if sys.version_info[0] == 2:
from BaseHTTPServer import BaseHTTPRequestHandler
from BaseHTTPServer import HTTPServer
from SimpleHTTPServer import SimpleHTTPRequestHandler
from CGIHTTPServer import CGIHTTPRequestHandler
if sys.version_info[0] == 3:
from http.server import HTTPServer, BaseHTTPRequestHandler, SimpleHTTPRequestHandler, CGIHTTPRequestHandler
def test_cgi_FieldStorage_taint():
# When a python script is invoked through CGI, the default values used by
# `cgi.FieldStorage` constructor makes it handle data from incoming request.
# You _can_ also manually set the input-data, as is shown below in `MyHandler`.
form = cgi.FieldStorage()
ensure_tainted(
form,
form['key'], # will be a list, if multiple fields named "key" are provided
form['key'].value,
form['key'].file,
form['key'].filename,
form['key'][0],
form['key'][0].value,
form['key'][0].file,
form['key'][0].filename,
[field.value for field in form['key']],
form.getvalue('key'), # will be a list, if multiple fields named "key" are provided
form.getvalue('key')[0],
form.getfirst('key'),
form.getlist('key'),
form.getlist('key')[0],
[field.value for field in form.getlist('key')],
)
class MyHandler(BaseHTTPRequestHandler):
def taint_sources(self):
ensure_tainted(
self,
self.requestline,
self.path,
self.headers,
self.headers['Foo'],
self.headers.get('Foo'),
self.headers.get_all('Foo'),
self.headers.keys(),
self.headers.values(),
self.headers.items(),
self.headers.as_bytes(),
self.headers.as_string(),
str(self.headers),
bytes(self.headers),
self.rfile,
self.rfile.read(),
)
form = cgi.FieldStorage(
self.rfile,
self.headers,
environ={'REQUEST_METHOD': 'POST', 'CONTENT_TYPE': self.headers.get('content-type')},
)
ensure_tainted(form)
def do_GET(self):
# send_response will log a line to stderr
self.send_response(200)
self.send_header("Content-type", "text/plain; charset=utf-8")
self.end_headers()
self.wfile.write(b"Hello BaseHTTPRequestHandler\n")
self.wfile.writelines([b"1\n", b"2\n", b"3\n"])
print(self.headers)
def do_POST(self):
form = cgi.FieldStorage(
self.rfile,
self.headers,
environ={'REQUEST_METHOD': 'POST', 'CONTENT_TYPE': self.headers.get('content-type')},
)
if 'myfile' not in form:
self.send_response(422)
self.end_headers()
return
field = form['myfile']
field.file.seek(0, os.SEEK_END)
filesize = field.file.tell()
print("Uploaded {!r} with {} bytes".format(field.filename, filesize))
self.send_response(200)
self.end_headers()
if __name__ == "__main__":
server = HTTPServer(("127.0.0.1", 8080), MyHandler)
server.serve_forever()
# Headers works case insensitvely, so self.headers['foo'] == self.headers['FOO']
# curl localhost:8080 --header "Foo: 1" --header "foo: 2"
# To test file submission through forms, use
# curl -F myfile=@<yourfile> localhost:8080