mirror of
https://github.com/github/codeql.git
synced 2026-04-30 03:05:15 +02:00
Refactor in progress
This commit is contained in:
@@ -11,77 +11,15 @@
|
||||
|
||||
// Determine precision above
|
||||
import python
|
||||
import semmle.python.dataflow.new.RemoteFlowSources
|
||||
import semmle.python.dataflow.new.DataFlow
|
||||
import semmle.python.dataflow.new.TaintTracking
|
||||
import semmle.python.dataflow.new.internal.TaintTrackingPublic
|
||||
import experimental.semmle.python.security.injection.LDAPInjection
|
||||
import DataFlow::PathGraph
|
||||
|
||||
class InitializeSink extends DataFlow::Node {
|
||||
InitializeSink() {
|
||||
exists(SsaVariable initVar, CallNode searchCall |
|
||||
// get variable whose value equals a call to ldap.initialize
|
||||
initVar.getDefinition().getImmediateDominator() = Value::named("ldap.initialize").getACall() and
|
||||
// get the Call in which the previous variable is used
|
||||
initVar.getAUse().getNode() = searchCall.getNode().getFunc().(Attribute).getObject() and
|
||||
// restrict that call's attribute (something.this) to match %search%
|
||||
searchCall.getNode().getFunc().(Attribute).getName().matches("%search%") and
|
||||
// set the third argument (search_filter) as sink
|
||||
this.asExpr() = searchCall.getArg(2).getNode()
|
||||
// set the first argument (DN) as sink
|
||||
// or this.asExpr() = searchCall.getArg(0) // Should this be set?
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
class ConnectionSink extends DataFlow::Node {
|
||||
ConnectionSink() {
|
||||
exists(SsaVariable connVar, CallNode searchCall |
|
||||
// get variable whose value equals a call to ldap.initialize
|
||||
connVar.getDefinition().getImmediateDominator() = Value::named("ldap3.Connection").getACall() and
|
||||
// get the Call in which the previous variable is used
|
||||
connVar.getAUse().getNode() = searchCall.getNode().getFunc().(Attribute).getObject() and
|
||||
// restrict that call's attribute (something.this) to match %search%
|
||||
searchCall.getNode().getFunc().(Attribute).getName().matches("%search%") and
|
||||
// set the second argument (search_filter) as sink
|
||||
this.asExpr() = searchCall.getArg(1).getNode()
|
||||
// set the first argument (DN) as sink
|
||||
// or this.asExpr() = searchCall.getArg(0) // Should this be set?
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
class EscapeSanitizer extends DataFlow::Node {
|
||||
EscapeSanitizer() {
|
||||
exists(Call c |
|
||||
(
|
||||
// avoid flow through any %escape% function
|
||||
c.getFunc().(Attribute).getName().matches("%escape%") or // something.%escape%()
|
||||
c.getFunc().(Name).getId().matches("%escape%") // %escape%()
|
||||
) and
|
||||
this.asExpr() = c
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
class LDAPInjectionSink extends DataFlow::Node {
|
||||
LDAPInjectionSink() {
|
||||
this instanceof InitializeSink or
|
||||
this instanceof ConnectionSink
|
||||
}
|
||||
}
|
||||
|
||||
class LDAPInjectionFlowConfig extends TaintTracking::Configuration {
|
||||
LDAPInjectionFlowConfig() { this = "LDAPInjectionFlowConfig" }
|
||||
|
||||
override predicate isSource(DataFlow::Node source) { source instanceof RemoteFlowSource }
|
||||
|
||||
override predicate isSink(DataFlow::Node sink) { sink instanceof LDAPInjectionSink }
|
||||
|
||||
override predicate isSanitizer(DataFlow::Node sanitizer) { sanitizer instanceof EscapeSanitizer }
|
||||
}
|
||||
|
||||
from LDAPInjectionFlowConfig config, DataFlow::PathNode source, DataFlow::PathNode sink
|
||||
where config.hasFlowPath(source, sink)
|
||||
select sink.getNode(), source, sink, "$@ LDAP query executes $@.", sink.getNode(), "This",
|
||||
source.getNode(), "a user-provided value"
|
||||
from
|
||||
LDAPInjectionFlowConfig config, DataFlow::PathNode source, DataFlow::PathNode sink,
|
||||
LDAPQuery castedSink
|
||||
where
|
||||
config.hasFlowPath(source, sink) and
|
||||
castedSink = sink.getNode()
|
||||
select sink.getNode(), source, sink, "$@ LDAP query executes $@ as a $@ probably leaking $@.",
|
||||
sink.getNode(), "This", source.getNode(), "a user-provided value", castedSink.getLDAPNode(),
|
||||
castedSink.getLDAPPart(), castedSink.getAttrList(), "this attribute(s)"
|
||||
|
||||
@@ -1,57 +0,0 @@
|
||||
from ldap3 import Server, Connection
|
||||
from ldap3.utils.conv import escape_filter_chars
|
||||
from flask import request, Flask
|
||||
|
||||
app = Flask(__name__)
|
||||
|
||||
|
||||
@app.route("/tainted_var")
|
||||
def tainted_var():
|
||||
sanitized_dn = "dc=%s" % request.args['dc']
|
||||
sanitized_filter = "(&(objectClass=*)(uid=%s))" % escape_filter_chars(
|
||||
request.args['username'])
|
||||
|
||||
srv = Server('localhost', port=1337)
|
||||
conn = Connection(srv, user=sanitized_dn, auto_bind=True)
|
||||
conn.search(sanitized_dn, sanitized_filter)
|
||||
return conn.response
|
||||
|
||||
|
||||
@app.route("/var_tainted")
|
||||
def var_tainted():
|
||||
sanitized_dn = request.args['dc']
|
||||
sanitized_filter = request.args['username']
|
||||
|
||||
dn = "dc=%s" % sanitized_dn
|
||||
search_filter = "(&(objectClass=*)(uid=%s))" % escape_filter_chars(sanitized_filter)
|
||||
|
||||
srv = Server('localhost', port=1337)
|
||||
conn = Connection(srv, user=dn, auto_bind=True)
|
||||
conn.search(dn, search_filter)
|
||||
return conn.response
|
||||
|
||||
|
||||
@app.route("/direct")
|
||||
def direct():
|
||||
srv = Server('localhost', port=1337)
|
||||
conn = Connection(srv, user="dc=%s" % request.args['dc'], auto_bind=True)
|
||||
conn.search("dc=%s" % request.args['dc'], "(&(objectClass=*)(uid=%s))" %
|
||||
escape_filter_chars(request.args['username']))
|
||||
return conn.response
|
||||
|
||||
|
||||
@app.route("/with_2")
|
||||
def with_2():
|
||||
sanitized_dn = request.args['dc']
|
||||
sanitized_filter = escape_filter_chars(request.args['username'])
|
||||
|
||||
dn = "dc=%s" % sanitized_dn
|
||||
search_filter = "(&(objectClass=*)(uid=%s))" % sanitized_filter
|
||||
|
||||
srv = Server('localhost', port=1337)
|
||||
with Connection(server, auto_bind=True) as conn:
|
||||
conn.search(dn, search_filter)
|
||||
return conn.response
|
||||
|
||||
# if __name__ == "__main__":
|
||||
# app.run(debug=True)
|
||||
@@ -1,55 +0,0 @@
|
||||
from ldap3 import Server, Connection
|
||||
from flask import request, Flask
|
||||
|
||||
app = Flask(__name__)
|
||||
|
||||
|
||||
@app.route("/tainted_var")
|
||||
def tainted_var():
|
||||
unsanitized_dn = "dc=%s" % request.args['dc']
|
||||
unsanitized_filter = "(&(objectClass=*)(uid=%s))" % request.args['username']
|
||||
|
||||
srv = Server('localhost', port=1337)
|
||||
conn = Connection(srv, user=unsanitized_dn, auto_bind=True)
|
||||
conn.search(unsanitized_dn, unsanitized_filter)
|
||||
return conn.response
|
||||
|
||||
|
||||
@app.route("/var_tainted")
|
||||
def var_tainted():
|
||||
unsanitized_dn = request.args['dc']
|
||||
unsanitized_filter = request.args['username']
|
||||
|
||||
dn = "dc=%s" % unsanitized_dn
|
||||
search_filter = "(&(objectClass=*)(uid=%s))" % unsanitized_filter
|
||||
|
||||
srv = Server('localhost', port=1337)
|
||||
conn = Connection(srv, user=dn, auto_bind=True)
|
||||
conn.search(dn, search_filter)
|
||||
return conn.response
|
||||
|
||||
|
||||
@app.route("/direct")
|
||||
def direct():
|
||||
srv = Server('localhost', port=1337)
|
||||
conn = Connection(srv, user="dc=%s" % request.args['dc'], auto_bind=True)
|
||||
conn.search(
|
||||
"dc=%s" % request.args['dc'], "(&(objectClass=*)(uid=%s))" % request.args['username'])
|
||||
return conn.response
|
||||
|
||||
|
||||
@app.route("/with_2")
|
||||
def with_2():
|
||||
unsanitized_dn = request.args['dc']
|
||||
unsanitized_filter = request.args['username']
|
||||
|
||||
dn = "dc=%s" % unsanitized_dn
|
||||
search_filter = "(&(objectClass=*)(uid=%s))" % unsanitized_filter
|
||||
|
||||
srv = Server('localhost', port=1337)
|
||||
with Connection(server, auto_bind=True) as conn:
|
||||
conn.search(dn, search_filter)
|
||||
return conn.response
|
||||
|
||||
# if __name__ == "__main__":
|
||||
# app.run(debug=True)
|
||||
@@ -1,56 +0,0 @@
|
||||
from flask import request, Flask
|
||||
import ldap
|
||||
import ldap.filter
|
||||
import ldap.dn
|
||||
|
||||
app = Flask(__name__)
|
||||
|
||||
|
||||
@app.route("/tainted_var")
|
||||
def tainted_var():
|
||||
sanitized_dn = "dc=%s" % ldap.dn.escape_dn_chars(request.args['dc'])
|
||||
sanitized_filter = "(&(objectClass=*)(uid=%s))" % ldap.filter.escape_filter_chars(
|
||||
request.args['username'])
|
||||
|
||||
ldap_connection = ldap.initialize("ldap://127.0.0.1:1337")
|
||||
user = ldap_connection.search_s(
|
||||
sanitized_dn, ldap.SCOPE_SUBTREE, sanitized_filter)
|
||||
return user[0]
|
||||
|
||||
|
||||
@app.route("/var_tainted")
|
||||
def var_tainted():
|
||||
sanitized_dn = request.args['dc']
|
||||
sanitized_filter = request.args['username']
|
||||
|
||||
dn = "dc=%s" % ldap.dn.escape_dn_chars(sanitized_dn)
|
||||
search_filter = "(&(objectClass=*)(uid=%s))" % ldap.filter.escape_filter_chars(sanitized_filter)
|
||||
|
||||
ldap_connection = ldap.initialize("ldap://127.0.0.1:1337")
|
||||
user = ldap_connection.search_s(dn, ldap.SCOPE_SUBTREE, search_filter)
|
||||
return user[0]
|
||||
|
||||
|
||||
@app.route("/direct")
|
||||
def direct():
|
||||
ldap_connection = ldap.initialize("ldap://127.0.0.1:1337")
|
||||
user = ldap_connection.search_s("dc=%s" % ldap.dn.escape_dn_chars(
|
||||
request.args['dc']), ldap.SCOPE_SUBTREE, "(&(objectClass=*)(uid=%s))" % ldap.filter.escape_filter_chars(request.args['username']))
|
||||
return user[0]
|
||||
|
||||
|
||||
@app.route("/with_")
|
||||
def with_():
|
||||
sanitized_dn = ldap.dn.escape_dn_chars(request.args['dc'])
|
||||
sanitized_filter = ldap.filter.escape_filter_chars(
|
||||
request.args['username'])
|
||||
|
||||
dn = "dc=%s" % sanitized_dn
|
||||
search_filter = "(&(objectClass=*)(uid=%s))" % sanitized_filter
|
||||
|
||||
with ldap.initialize("ldap://127.0.0.1:1337") as ldap_connection:
|
||||
user = ldap_connection.search_s(dn, ldap.SCOPE_SUBTREE, search_filter)
|
||||
return user[0]
|
||||
|
||||
# if __name__ == "__main__":
|
||||
# app.run(debug=True)
|
||||
@@ -1,57 +0,0 @@
|
||||
from flask import request, Flask
|
||||
from ldap import initialize
|
||||
import ldap.filter
|
||||
import ldap.dn
|
||||
|
||||
app = Flask(__name__)
|
||||
|
||||
|
||||
@app.route("/tainted_var_2")
|
||||
def tainted_var_2():
|
||||
sanitized_dn = "dc=%s" % ldap.dn.escape_dn_chars(request.args['dc'])
|
||||
sanitized_filter = "(&(objectClass=*)(uid=%s))" % ldap.filter.escape_filter_chars(
|
||||
request.args['username'])
|
||||
|
||||
ldap_connection = initialize("ldap://127.0.0.1:1337")
|
||||
user = ldap_connection.search_s(
|
||||
sanitized_dn, ldap.SCOPE_SUBTREE, sanitized_filter)
|
||||
return user[0]
|
||||
|
||||
|
||||
@app.route("/var_tainted_2")
|
||||
def var_tainted_2():
|
||||
sanitized_dn = ldap.dn.escape_dn_chars(request.args['dc'])
|
||||
sanitized_filter = ldap.filter.escape_filter_chars(
|
||||
request.args['username'])
|
||||
|
||||
dn = "dc=%s" % sanitized_dn
|
||||
search_filter = "(&(objectClass=*)(uid=%s))" % sanitized_filter
|
||||
|
||||
ldap_connection = initialize("ldap://127.0.0.1:1337")
|
||||
user = ldap_connection.search_s(dn, ldap.SCOPE_SUBTREE, search_filter)
|
||||
return user[0]
|
||||
|
||||
|
||||
@app.route("/direct_2")
|
||||
def direct_2():
|
||||
ldap_connection = initialize("ldap://127.0.0.1:1337")
|
||||
user = ldap_connection.search_s("dc=%s" % ldap.dn.escape_dn_chars(
|
||||
request.args['dc']), ldap.SCOPE_SUBTREE, "(&(objectClass=*)(uid=%s))" % ldap.filter.escape_filter_chars(request.args['username']))
|
||||
return user[0]
|
||||
|
||||
|
||||
@app.route("/with_2")
|
||||
def with_2():
|
||||
sanitized_dn = ldap.dn.escape_dn_chars(request.args['dc'])
|
||||
sanitized_filter = ldap.filter.escape_filter_chars(
|
||||
request.args['username'])
|
||||
|
||||
dn = "dc=%s" % sanitized_dn
|
||||
search_filter = "(&(objectClass=*)(uid=%s))" % sanitized_filter
|
||||
|
||||
with initialize("ldap://127.0.0.1:1337") as ldap_connection:
|
||||
user = ldap_connection.search_s(dn, ldap.SCOPE_SUBTREE, search_filter)
|
||||
return user[0]
|
||||
|
||||
# if __name__ == "__main__":
|
||||
# app.run(debug=True)
|
||||
@@ -1,52 +0,0 @@
|
||||
from flask import request, Flask
|
||||
import ldap
|
||||
|
||||
app = Flask(__name__)
|
||||
|
||||
|
||||
@app.route("/tainted_var")
|
||||
def tainted_var():
|
||||
unsanitized_dn = "dc=%s" % request.args['dc']
|
||||
unsanitized_filter = "(&(objectClass=*)(uid=%s))" % request.args['username']
|
||||
|
||||
ldap_connection = ldap.initialize("ldap://127.0.0.1:1337")
|
||||
user = ldap_connection.search_s(
|
||||
unsanitized_dn, ldap.SCOPE_SUBTREE, unsanitized_filter)
|
||||
return user[0]
|
||||
|
||||
|
||||
@app.route("/var_tainted")
|
||||
def var_tainted():
|
||||
unsanitized_dn = request.args['dc']
|
||||
unsanitized_filter = request.args['username']
|
||||
|
||||
dn = "dc=%s" % unsanitized_dn
|
||||
search_filter = "(&(objectClass=*)(uid=%s))" % unsanitized_filter
|
||||
|
||||
ldap_connection = ldap.initialize("ldap://127.0.0.1:1337")
|
||||
user = ldap_connection.search_s(dn, ldap.SCOPE_SUBTREE, search_filter)
|
||||
return user[0]
|
||||
|
||||
|
||||
@app.route("/direct")
|
||||
def direct():
|
||||
ldap_connection = ldap.initialize("ldap://127.0.0.1:1337")
|
||||
user = ldap_connection.search_s(
|
||||
"dc=%s" % request.args['dc'], ldap.SCOPE_SUBTREE, "(&(objectClass=*)(uid=%s))" % request.args['username'])
|
||||
return user[0]
|
||||
|
||||
|
||||
@app.route("/with_")
|
||||
def with_():
|
||||
sanitized_dn = request.args['dc']
|
||||
sanitized_filter = request.args['username']
|
||||
|
||||
dn = "dc=%s" % sanitized_dn
|
||||
search_filter = "(&(objectClass=*)(uid=%s))" % sanitized_filter
|
||||
|
||||
with ldap.initialize("ldap://127.0.0.1:1337") as ldap_connection:
|
||||
user = ldap_connection.search_s(dn, ldap.SCOPE_SUBTREE, search_filter)
|
||||
return user[0]
|
||||
|
||||
# if __name__ == "__main__":
|
||||
# app.run(debug=True)
|
||||
@@ -1,52 +0,0 @@
|
||||
from flask import request, Flask
|
||||
from ldap import initialize
|
||||
|
||||
app = Flask(__name__)
|
||||
|
||||
|
||||
@app.route("/tainted_var")
|
||||
def tainted_var():
|
||||
unsanitized_dn = "dc=%s" % request.args['dc']
|
||||
unsanitized_filter = "(&(objectClass=*)(uid=%s))" % request.args['username']
|
||||
|
||||
ldap_connection = initialize("ldap://127.0.0.1:1337")
|
||||
user = ldap_connection.search_s(
|
||||
unsanitized_dn, ldap.SCOPE_SUBTREE, unsanitized_filter)
|
||||
return user[0]
|
||||
|
||||
|
||||
@app.route("/var_tainted")
|
||||
def var_tainted():
|
||||
unsanitized_dn = request.args['dc']
|
||||
unsanitized_filter = request.args['username']
|
||||
|
||||
dn = "dc=%s" % unsanitized_dn
|
||||
search_filter = "(&(objectClass=*)(uid=%s))" % unsanitized_filter
|
||||
|
||||
ldap_connection = initialize("ldap://127.0.0.1:1337")
|
||||
user = ldap_connection.search_s(dn, ldap.SCOPE_SUBTREE, search_filter)
|
||||
return user[0]
|
||||
|
||||
|
||||
@app.route("/direct")
|
||||
def direct():
|
||||
ldap_connection = initialize("ldap://127.0.0.1:1337")
|
||||
user = ldap_connection.search_s(
|
||||
"dc=%s" % request.args['dc'], ldap.SCOPE_SUBTREE, "(&(objectClass=*)(uid=%s))" % request.args['username'])
|
||||
return user[0]
|
||||
|
||||
|
||||
@app.route("/with_2")
|
||||
def with_2():
|
||||
sanitized_dn = request.args['dc']
|
||||
sanitized_filter = request.args['username']
|
||||
|
||||
dn = "dc=%s" % sanitized_dn
|
||||
search_filter = "(&(objectClass=*)(uid=%s))" % sanitized_filter
|
||||
|
||||
with initialize("ldap://127.0.0.1:1337") as ldap_connection:
|
||||
user = ldap_connection.search_s(dn, ldap.SCOPE_SUBTREE, search_filter)
|
||||
return user[0]
|
||||
|
||||
# if __name__ == "__main__":
|
||||
# app.run(debug=True)
|
||||
@@ -0,0 +1,46 @@
|
||||
from flask import request, Flask
|
||||
import ldap
|
||||
|
||||
app = Flask(__name__)
|
||||
|
||||
|
||||
@app.route("/normal")
|
||||
def normal():
|
||||
unsafe_dn = "dc=%s" % request.args['dc']
|
||||
unsafe_filter = "(user=%s)" % request.args['username']
|
||||
|
||||
ldap_connection = ldap.initialize("ldap://127.0.0.1:1337")
|
||||
user = ldap_connection.search_s(
|
||||
unsafe_dn, ldap.SCOPE_SUBTREE, unsafe_filter, ["testAttr1", "testAttr2"])
|
||||
|
||||
|
||||
@app.route("/direct")
|
||||
def direct():
|
||||
unsafe_dn = "dc=%s" % request.args['dc']
|
||||
unsafe_filter = "(user=%s)" % request.args['username']
|
||||
|
||||
user = ldap.initialize("ldap://127.0.0.1:1337").search_s(
|
||||
unsafe_dn, ldap.SCOPE_SUBTREE, unsafe_filter, ["testAttr1", "testAttr2"])
|
||||
|
||||
|
||||
@app.route("/normal_argbyname")
|
||||
def normal_argbyname():
|
||||
unsafe_dn = "dc=%s" % request.args['dc']
|
||||
unsafe_filter = "(user=%s)" % request.args['username']
|
||||
|
||||
ldap_connection = ldap.initialize("ldap://127.0.0.1:1337")
|
||||
user = ldap_connection.search_s(
|
||||
unsafe_dn, ldap.SCOPE_SUBTREE, attrlist=["testAttr1", "testAttr2"], filterstr=unsafe_filter)
|
||||
|
||||
|
||||
@app.route("/direct_argbyname")
|
||||
def direct_argbyname():
|
||||
unsafe_dn = "dc=%s" % request.args['dc']
|
||||
unsafe_filter = "(user=%s)" % request.args['username']
|
||||
|
||||
user = ldap.initialize("ldap://127.0.0.1:1337").search_s(
|
||||
unsafe_dn, ldap.SCOPE_SUBTREE, attrlist=["testAttr1", "testAttr2"], filterstr=unsafe_filter)
|
||||
|
||||
|
||||
# if __name__ == "__main__":
|
||||
# app.run(debug=True)
|
||||
@@ -0,0 +1,60 @@
|
||||
from flask import request, Flask
|
||||
import ldap
|
||||
import ldap.filter
|
||||
import ldap.dn
|
||||
|
||||
app = Flask(__name__)
|
||||
|
||||
|
||||
@app.route("/normal")
|
||||
def normal():
|
||||
unsafe_dn = "dc=%s" % request.args['dc']
|
||||
unsafe_filter = "(user=%s)" % request.args['username']
|
||||
|
||||
safe_dn = ldap.dn.escape_dn_chars(unsafe_dn)
|
||||
safe_filter = ldap.filter.escape_filter_chars(unsafe_filter)
|
||||
|
||||
ldap_connection = ldap.initialize("ldap://127.0.0.1:1337")
|
||||
user = ldap_connection.search_s(
|
||||
safe_dn, ldap.SCOPE_SUBTREE, safe_filter, ["testAttr1", "testAttr2"])
|
||||
|
||||
|
||||
@app.route("/direct")
|
||||
def direct():
|
||||
unsafe_dn = "dc=%s" % request.args['dc']
|
||||
unsafe_filter = "(user=%s)" % request.args['username']
|
||||
|
||||
safe_dn = ldap.dn.escape_dn_chars(unsafe_dn)
|
||||
safe_filter = ldap.filter.escape_filter_chars(unsafe_filter)
|
||||
|
||||
user = ldap.initialize("ldap://127.0.0.1:1337").search_s(
|
||||
safe_dn, ldap.SCOPE_SUBTREE, safe_filter, ["testAttr1", "testAttr2"])
|
||||
|
||||
|
||||
@app.route("/normal_argbyname")
|
||||
def normal_argbyname():
|
||||
unsafe_dn = "dc=%s" % request.args['dc']
|
||||
unsafe_filter = "(user=%s)" % request.args['username']
|
||||
|
||||
safe_dn = ldap.dn.escape_dn_chars(unsafe_dn)
|
||||
safe_filter = ldap.filter.escape_filter_chars(unsafe_filter)
|
||||
|
||||
ldap_connection = ldap.initialize("ldap://127.0.0.1:1337")
|
||||
user = ldap_connection.search_s(
|
||||
safe_dn, ldap.SCOPE_SUBTREE, attrlist=["testAttr1", "testAttr2"], filterstr=safe_filter)
|
||||
|
||||
|
||||
@app.route("/direct_argbyname")
|
||||
def direct_argbyname():
|
||||
unsafe_dn = "dc=%s" % request.args['dc']
|
||||
unsafe_filter = "(user=%s)" % request.args['username']
|
||||
|
||||
safe_dn = ldap.dn.escape_dn_chars(unsafe_dn)
|
||||
safe_filter = ldap.filter.escape_filter_chars(unsafe_filter)
|
||||
|
||||
user = ldap.initialize("ldap://127.0.0.1:1337").search_s(
|
||||
safe_dn, ldap.SCOPE_SUBTREE, attrlist=["testAttr1", "testAttr2"], filterstr=safe_filter)
|
||||
|
||||
|
||||
# if __name__ == "__main__":
|
||||
# app.run(debug=True)
|
||||
@@ -13,3 +13,46 @@ private import semmle.python.dataflow.new.DataFlow
|
||||
private import semmle.python.dataflow.new.RemoteFlowSources
|
||||
private import semmle.python.dataflow.new.TaintTracking
|
||||
private import experimental.semmle.python.Frameworks
|
||||
private import semmle.python.ApiGraphs
|
||||
|
||||
/**
|
||||
* To-Do
|
||||
*
|
||||
* LDAPQuery -> collect functions executing a search filter/DN
|
||||
* LDAPEscape -> collect functions escaping a search filter/DN
|
||||
*/
|
||||
module LDAPQuery {
|
||||
abstract class Range extends DataFlow::Node {
|
||||
abstract DataFlow::Node getLDAPNode();
|
||||
|
||||
abstract string getLDAPPart();
|
||||
|
||||
abstract DataFlow::Node getAttrs();
|
||||
}
|
||||
}
|
||||
|
||||
class LDAPQuery extends DataFlow::Node {
|
||||
LDAPQuery::Range range;
|
||||
|
||||
LDAPQuery() { this = range }
|
||||
|
||||
DataFlow::Node getLDAPNode() { result = range.getLDAPNode() }
|
||||
|
||||
string getLDAPPart() { result = range.getLDAPPart() }
|
||||
|
||||
DataFlow::Node getAttrs() { result = range.getAttrs() }
|
||||
}
|
||||
|
||||
module LDAPEscape {
|
||||
abstract class Range extends DataFlow::Node {
|
||||
abstract DataFlow::Node getEscapeNode();
|
||||
}
|
||||
}
|
||||
|
||||
class LDAPEscape extends DataFlow::Node {
|
||||
LDAPEscape::Range range;
|
||||
|
||||
LDAPEscape() { this = range }
|
||||
|
||||
DataFlow::Node getEscapeNode() { result = range.getEscapeNode() }
|
||||
}
|
||||
|
||||
@@ -9,3 +9,59 @@ private import semmle.python.dataflow.new.TaintTracking
|
||||
private import semmle.python.dataflow.new.RemoteFlowSources
|
||||
private import experimental.semmle.python.Concepts
|
||||
private import semmle.python.ApiGraphs
|
||||
|
||||
private module LDAP {
|
||||
private module LDAP2 {
|
||||
private class LDAP2QueryMethods extends string {
|
||||
LDAP2QueryMethods() {
|
||||
this in ["search", "search_s", "search_st", "search_ext", "search_ext_s"]
|
||||
}
|
||||
}
|
||||
|
||||
private class LDAP2Query extends DataFlow::CallCfgNode, LDAPQuery::Range {
|
||||
DataFlow::Node ldapNode;
|
||||
string ldapPart;
|
||||
DataFlow::Node attrs;
|
||||
|
||||
LDAP2Query() {
|
||||
exists(DataFlow::AttrRead searchMethod, DataFlow::CallCfgNode initCall |
|
||||
this.getFunction() = searchMethod and
|
||||
initCall = API::moduleImport("ldap").getMember("initialize").getACall() and
|
||||
initCall = searchMethod.getObject().getALocalSource() and
|
||||
searchMethod.getAttributeName() instanceof LDAP2QueryMethods and
|
||||
(
|
||||
(
|
||||
ldapNode = this.getArg(2) or
|
||||
ldapNode = this.getArgByName("filterstr")
|
||||
) and
|
||||
ldapPart = "search_filter"
|
||||
or
|
||||
ldapNode = this.getArg(0) and
|
||||
ldapPart = "DN"
|
||||
) and
|
||||
( // what if they're not set?
|
||||
attrs = this.getArg(3) or
|
||||
attrs = this.getArgByName("attrlist")
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
override DataFlow::Node getLDAPNode() { result = ldapNode }
|
||||
|
||||
override string getLDAPPart() { result = ldapPart }
|
||||
|
||||
override DataFlow::Node getAttrs() { result = attrs }
|
||||
}
|
||||
|
||||
private class LDAP2EscapeDN extends DataFlow::CallCfgNode, LDAPEscape::Range {
|
||||
DataFlow::Node escapeNode;
|
||||
|
||||
LDAP2EscapeDN() {
|
||||
this = API::moduleImport("ldap").getMember("dn").getMember("escape_dn_chars").getACall() and
|
||||
escapeNode = this.getArg(0)
|
||||
}
|
||||
|
||||
override DataFlow::Node getEscapeNode() { result = escapeNode }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,21 @@
|
||||
/**
|
||||
* Provides a taint-tracking configuration for detecting LDAP injection vulnerabilities
|
||||
*/
|
||||
|
||||
import python
|
||||
import experimental.semmle.python.Concepts
|
||||
import semmle.python.dataflow.new.DataFlow
|
||||
import semmle.python.dataflow.new.TaintTracking
|
||||
import semmle.python.dataflow.new.RemoteFlowSources
|
||||
|
||||
/**
|
||||
* A taint-tracking configuration for detecting regular expression injections.
|
||||
*/
|
||||
class LDAPInjectionFlowConfig extends TaintTracking::Configuration {
|
||||
LDAPInjectionFlowConfig() { this = "LDAPInjectionFlowConfig" }
|
||||
|
||||
override predicate isSource(DataFlow::Node source) { source instanceof RemoteFlowSource }
|
||||
|
||||
override predicate isSink(DataFlow::Node sink) { sink = any(LDAPQuery lQ).getLDAPNode() }
|
||||
// override predicate isSanitizer(DataFlow::Node sanitizer) { sanitizer instanceof RemoteFlowSource } // any(LDAPEscape ldapEsc).getEscapeNode() }
|
||||
}
|
||||
Reference in New Issue
Block a user