Upload LDAP Injection query, qhelp and tests

This commit is contained in:
jorgectf
2021-03-18 17:31:41 +01:00
parent 7f16c52217
commit 799d509f26
10 changed files with 561 additions and 0 deletions

View File

@@ -0,0 +1,32 @@
<!DOCTYPE qhelp PUBLIC "-//Semmle//qhelp//EN" "qhelp.dtd">
<qhelp>
<overview>
<p>If an LDAP query is built by a not sanitized user-provided value, a user is likely to be able to run malicious LDAP queries.</p>
</overview>
<recommendation>
<p>In case user input must compose an LDAP query, it should be escaped in order to avoid a malicious user supplying special characters that change the actual purpose of the query. To do so, functions that ldap frameworks provide such as <code>escape_filter_chars</code> should be applied to that user input.
<recommendation>
<references>
<li>
OWASP
<a href="https://owasp.org/www-community/attacks/LDAP_Injection">LDAP Injection</a>
</li>
<li>
SonarSource
<a href="https://rules.sonarsource.com/python/RSPEC-2078">RSPEC-2078</a>
</li>
<li>
Python
<a href="https://www.python-ldap.org/en/python-ldap-3.3.0/reference/ldap.html">LDAP Documentation</a>
</li>
<li>
CWE-
<a href="https://cwe.mitre.org/data/definitions/90.html">090</a>
</li>
</references>
</qhelp>

View File

@@ -0,0 +1,86 @@
/**
* @name Python LDAP Injection
* @description Python LDAP Injection through search filter
* @kind path-problem
* @problem.severity error
* @id python/ldap-injection
* @tags experimental
* security
* external/cwe/cwe-090
*/
import python
import semmle.python.dataflow.new.RemoteFlowSources
import semmle.python.dataflow.new.DataFlow
import semmle.python.dataflow.new.TaintTracking
import semmle.python.dataflow.new.internal.TaintTrackingPublic
import DataFlow::PathGraph
class InitializeSink extends DataFlow::Node {
InitializeSink() {
exists(SsaVariable initVar, CallNode searchCall |
// get variable whose value equals a call to ldap.initialize
initVar.getDefinition().getImmediateDominator() = Value::named("ldap.initialize").getACall() and
// get the Call in which the previous variable is used
initVar.getAUse().getNode() = searchCall.getNode().getFunc().(Attribute).getObject() and
// restrict that call's attribute (something.this) to match %search%
searchCall.getNode().getFunc().(Attribute).getName().matches("%search%") and
// set the third argument (search_filter) as sink
this.asExpr() = searchCall.getArg(2).getNode()
// set the first argument (DN) as sink
// or this.asExpr() = searchCall.getArg(0) // Should this be set?
)
}
}
class ConnectionSink extends DataFlow::Node {
ConnectionSink() {
exists(SsaVariable connVar, CallNode searchCall |
// get variable whose value equals a call to ldap.initialize
connVar.getDefinition().getImmediateDominator() = Value::named("ldap3.Connection").getACall() and
// get the Call in which the previous variable is used
connVar.getAUse().getNode() = searchCall.getNode().getFunc().(Attribute).getObject() and
// restrict that call's attribute (something.this) to match %search%
searchCall.getNode().getFunc().(Attribute).getName().matches("%search%") and
// set the second argument (search_filter) as sink
this.asExpr() = searchCall.getArg(1).getNode()
// set the first argument (DN) as sink
// or this.asExpr() = searchCall.getArg(0) // Should this be set?
)
}
}
class EscapeSanitizer extends DataFlow::Node {
EscapeSanitizer() {
exists(Call c |
(
// avoid flow through any %escape% function
c.getFunc().(Attribute).getName().matches("%escape%") or // something.%escape%()
c.getFunc().(Name).getId().matches("%escape%") // %escape%()
) and
this.asExpr() = c
)
}
}
class LDAPInjectionSink extends DataFlow::Node {
LDAPInjectionSink() {
this instanceof InitializeSink or
this instanceof ConnectionSink
}
}
class LDAPInjectionFlowConfig extends TaintTracking::Configuration {
LDAPInjectionFlowConfig() { this = "LDAPInjectionFlowConfig" }
override predicate isSource(DataFlow::Node source) { source instanceof RemoteFlowSource }
override predicate isSink(DataFlow::Node sink) { sink instanceof LDAPInjectionSink }
override predicate isSanitizer(DataFlow::Node sanitizer) { sanitizer instanceof EscapeSanitizer }
}
from LDAPInjectionFlowConfig config, DataFlow::PathNode source, DataFlow::PathNode sink
where config.hasFlowPath(source, sink)
select sink.getNode(), source, sink, "$@ LDAP query executes $@.", sink.getNode(), "This",
source.getNode(), "a user-provided value"

View File

@@ -0,0 +1,58 @@
import ldap3
from ldap3.utils.conv import escape_filter_chars
from flask import request, Flask
app = Flask(__name__)
@app.route("/tainted_var")
def tainted_var():
sanitized_dn = "dc=%s" % request.args['dc']
sanitized_filter = "(&(objectClass=*)(uid=%s))" % escape_filter_chars(
request.args['username'])
srv = ldap3.Server('localhost', port=1337)
conn = ldap3.Connection(srv, user=sanitized_dn, auto_bind=True)
conn.search(sanitized_dn, sanitized_filter)
return conn.response
@app.route("/var_tainted")
def var_tainted():
sanitized_dn = request.args['dc']
sanitized_filter = request.args['username']
dn = "dc=%s" % sanitized_dn
search_filter = "(&(objectClass=*)(uid=%s))" % escape_filter_chars(sanitized_filter)
srv = ldap3.Server('localhost', port=1337)
conn = ldap3.Connection(srv, user=dn, auto_bind=True)
conn.search(dn, search_filter)
return conn.response
@app.route("/direct")
def direct():
srv = ldap3.Server('localhost', port=1337)
conn = ldap3.Connection(srv, user="dc=%s" %
request.args['dc'], auto_bind=True)
conn.search("dc=%s" % request.args['dc'], "(&(objectClass=*)(uid=%s))" %
escape_filter_chars(request.args['username']))
return conn.response
@ app.route("/with_")
def with_():
sanitized_dn = request.args['dc']
sanitized_filter = escape_filter_chars(request.args['username'])
dn = "dc=%s" % sanitized_dn
search_filter = "(&(objectClass=*)(uid=%s))" % sanitized_filter
srv = ldap3.Server('localhost', port=1337)
with ldap3.Connection(server, auto_bind=True) as conn:
conn.search(dn, search_filter)
return conn.response
# if __name__ == "__main__":
# app.run(debug=True)

View File

@@ -0,0 +1,57 @@
from ldap3 import Server, Connection
from ldap3.utils.conv import escape_filter_chars
from flask import request, Flask
app = Flask(__name__)
@app.route("/tainted_var")
def tainted_var():
sanitized_dn = "dc=%s" % request.args['dc']
sanitized_filter = "(&(objectClass=*)(uid=%s))" % escape_filter_chars(
request.args['username'])
srv = Server('localhost', port=1337)
conn = Connection(srv, user=sanitized_dn, auto_bind=True)
conn.search(sanitized_dn, sanitized_filter)
return conn.response
@app.route("/var_tainted")
def var_tainted():
sanitized_dn = request.args['dc']
sanitized_filter = request.args['username']
dn = "dc=%s" % sanitized_dn
search_filter = "(&(objectClass=*)(uid=%s))" % escape_filter_chars(sanitized_filter)
srv = Server('localhost', port=1337)
conn = Connection(srv, user=dn, auto_bind=True)
conn.search(dn, search_filter)
return conn.response
@app.route("/direct")
def direct():
srv = Server('localhost', port=1337)
conn = Connection(srv, user="dc=%s" % request.args['dc'], auto_bind=True)
conn.search("dc=%s" % request.args['dc'], "(&(objectClass=*)(uid=%s))" %
escape_filter_chars(request.args['username']))
return conn.response
@app.route("/with_2")
def with_2():
sanitized_dn = request.args['dc']
sanitized_filter = escape_filter_chars(request.args['username'])
dn = "dc=%s" % sanitized_dn
search_filter = "(&(objectClass=*)(uid=%s))" % sanitized_filter
srv = Server('localhost', port=1337)
with Connection(server, auto_bind=True) as conn:
conn.search(dn, search_filter)
return conn.response
# if __name__ == "__main__":
# app.run(debug=True)

View File

@@ -0,0 +1,56 @@
import ldap3
from flask import request, Flask
app = Flask(__name__)
@app.route("/tainted_var")
def tainted_var():
unsanitized_dn = "dc=%s" % request.args['dc']
unsanitized_filter = "(&(objectClass=*)(uid=%s))" % request.args['username']
srv = ldap3.Server('localhost', port=1337)
conn = ldap3.Connection(srv, user=unsanitized_dn, auto_bind=True)
conn.search(unsanitized_dn, unsanitized_filter)
return conn.response
@app.route("/var_tainted")
def var_tainted():
unsanitized_dn = request.args['dc']
unsanitized_filter = request.args['username']
dn = "dc=%s" % unsanitized_dn
search_filter = "(&(objectClass=*)(uid=%s))" % unsanitized_filter
srv = ldap3.Server('localhost', port=1337)
conn = ldap3.Connection(srv, user=dn, auto_bind=True)
conn.search(dn, search_filter)
return conn.response
@app.route("/direct")
def direct():
srv = ldap3.Server('localhost', port=1337)
conn = ldap3.Connection(srv, user="dc=%s" %
request.args['dc'], auto_bind=True)
conn.search("dc=%s" % unsanitized_dn,
"(&(objectClass=*)(uid=%s))" % request.args['username'])
return conn.response
@app.route("/with_")
def with_():
unsanitized_dn = request.args['dc']
unsanitized_filter = request.args['username']
dn = "dc=%s" % unsanitized_dn
search_filter = "(&(objectClass=*)(uid=%s))" % unsanitized_filter
srv = ldap3.Server('localhost', port=1337)
with ldap3.Connection(server, auto_bind=True) as conn:
conn.search(dn, search_filter)
return conn.response
# if __name__ == "__main__":
# app.run(debug=True)

View File

@@ -0,0 +1,55 @@
from ldap3 import Server, Connection
from flask import request, Flask
app = Flask(__name__)
@app.route("/tainted_var")
def tainted_var():
unsanitized_dn = "dc=%s" % request.args['dc']
unsanitized_filter = "(&(objectClass=*)(uid=%s))" % request.args['username']
srv = Server('localhost', port=1337)
conn = Connection(srv, user=unsanitized_dn, auto_bind=True)
conn.search(unsanitized_dn, unsanitized_filter)
return conn.response
@app.route("/var_tainted")
def var_tainted():
unsanitized_dn = request.args['dc']
unsanitized_filter = request.args['username']
dn = "dc=%s" % unsanitized_dn
search_filter = "(&(objectClass=*)(uid=%s))" % unsanitized_filter
srv = Server('localhost', port=1337)
conn = Connection(srv, user=dn, auto_bind=True)
conn.search(dn, search_filter)
return conn.response
@app.route("/direct")
def direct():
srv = Server('localhost', port=1337)
conn = Connection(srv, user="dc=%s" % request.args['dc'], auto_bind=True)
conn.search(
"dc=%s" % request.args['dc'], "(&(objectClass=*)(uid=%s))" % request.args['username'])
return conn.response
@app.route("/with_2")
def with_2():
unsanitized_dn = request.args['dc']
unsanitized_filter = request.args['username']
dn = "dc=%s" % unsanitized_dn
search_filter = "(&(objectClass=*)(uid=%s))" % unsanitized_filter
srv = Server('localhost', port=1337)
with Connection(server, auto_bind=True) as conn:
conn.search(dn, search_filter)
return conn.response
# if __name__ == "__main__":
# app.run(debug=True)

View File

@@ -0,0 +1,56 @@
from flask import request, Flask
import ldap
import ldap.filter
import ldap.dn
app = Flask(__name__)
@app.route("/tainted_var")
def tainted_var():
sanitized_dn = "dc=%s" % ldap.dn.escape_dn_chars(request.args['dc'])
sanitized_filter = "(&(objectClass=*)(uid=%s))" % ldap.filter.escape_filter_chars(
request.args['username'])
ldap_connection = ldap.initialize("ldap://127.0.0.1:1337")
user = ldap_connection.search_s(
sanitized_dn, ldap.SCOPE_SUBTREE, sanitized_filter)
return user[0]
@app.route("/var_tainted")
def var_tainted():
sanitized_dn = request.args['dc']
sanitized_filter = request.args['username']
dn = "dc=%s" % ldap.dn.escape_dn_chars(sanitized_dn)
search_filter = "(&(objectClass=*)(uid=%s))" % ldap.filter.escape_filter_chars(sanitized_filter)
ldap_connection = ldap.initialize("ldap://127.0.0.1:1337")
user = ldap_connection.search_s(dn, ldap.SCOPE_SUBTREE, search_filter)
return user[0]
@app.route("/direct")
def direct():
ldap_connection = ldap.initialize("ldap://127.0.0.1:1337")
user = ldap_connection.search_s("dc=%s" % ldap.dn.escape_dn_chars(
request.args['dc']), ldap.SCOPE_SUBTREE, "(&(objectClass=*)(uid=%s))" % ldap.filter.escape_filter_chars(request.args['username']))
return user[0]
@app.route("/with_")
def with_():
sanitized_dn = ldap.dn.escape_dn_chars(request.args['dc'])
sanitized_filter = ldap.filter.escape_filter_chars(
request.args['username'])
dn = "dc=%s" % sanitized_dn
search_filter = "(&(objectClass=*)(uid=%s))" % sanitized_filter
with ldap.initialize("ldap://127.0.0.1:1337") as ldap_connection:
user = ldap_connection.search_s(dn, ldap.SCOPE_SUBTREE, search_filter)
return user[0]
# if __name__ == "__main__":
# app.run(debug=True)

View File

@@ -0,0 +1,57 @@
from flask import request, Flask
from ldap import initialize
import ldap.filter
import ldap.dn
app = Flask(__name__)
@app.route("/tainted_var_2")
def tainted_var_2():
sanitized_dn = "dc=%s" % ldap.dn.escape_dn_chars(request.args['dc'])
sanitized_filter = "(&(objectClass=*)(uid=%s))" % ldap.filter.escape_filter_chars(
request.args['username'])
ldap_connection = initialize("ldap://127.0.0.1:1337")
user = ldap_connection.search_s(
sanitized_dn, ldap.SCOPE_SUBTREE, sanitized_filter)
return user[0]
@app.route("/var_tainted_2")
def var_tainted_2():
sanitized_dn = ldap.dn.escape_dn_chars(request.args['dc'])
sanitized_filter = ldap.filter.escape_filter_chars(
request.args['username'])
dn = "dc=%s" % sanitized_dn
search_filter = "(&(objectClass=*)(uid=%s))" % sanitized_filter
ldap_connection = initialize("ldap://127.0.0.1:1337")
user = ldap_connection.search_s(dn, ldap.SCOPE_SUBTREE, search_filter)
return user[0]
@app.route("/direct_2")
def direct_2():
ldap_connection = initialize("ldap://127.0.0.1:1337")
user = ldap_connection.search_s("dc=%s" % ldap.dn.escape_dn_chars(
request.args['dc']), ldap.SCOPE_SUBTREE, "(&(objectClass=*)(uid=%s))" % ldap.filter.escape_filter_chars(request.args['username']))
return user[0]
@app.route("/with_2")
def with_2():
sanitized_dn = ldap.dn.escape_dn_chars(request.args['dc'])
sanitized_filter = ldap.filter.escape_filter_chars(
request.args['username'])
dn = "dc=%s" % sanitized_dn
search_filter = "(&(objectClass=*)(uid=%s))" % sanitized_filter
with initialize("ldap://127.0.0.1:1337") as ldap_connection:
user = ldap_connection.search_s(dn, ldap.SCOPE_SUBTREE, search_filter)
return user[0]
# if __name__ == "__main__":
# app.run(debug=True)

View File

@@ -0,0 +1,52 @@
from flask import request, Flask
import ldap
app = Flask(__name__)
@app.route("/tainted_var")
def tainted_var():
unsanitized_dn = "dc=%s" % request.args['dc']
unsanitized_filter = "(&(objectClass=*)(uid=%s))" % request.args['username']
ldap_connection = ldap.initialize("ldap://127.0.0.1:1337")
user = ldap_connection.search_s(
unsanitized_dn, ldap.SCOPE_SUBTREE, unsanitized_filter)
return user[0]
@app.route("/var_tainted")
def var_tainted():
unsanitized_dn = request.args['dc']
unsanitized_filter = request.args['username']
dn = "dc=%s" % unsanitized_dn
search_filter = "(&(objectClass=*)(uid=%s))" % unsanitized_filter
ldap_connection = ldap.initialize("ldap://127.0.0.1:1337")
user = ldap_connection.search_s(dn, ldap.SCOPE_SUBTREE, search_filter)
return user[0]
@app.route("/direct")
def direct():
ldap_connection = ldap.initialize("ldap://127.0.0.1:1337")
user = ldap_connection.search_s(
"dc=%s" % request.args['dc'], ldap.SCOPE_SUBTREE, "(&(objectClass=*)(uid=%s))" % request.args['username'])
return user[0]
@app.route("/with_")
def with_():
sanitized_dn = request.args['dc']
sanitized_filter = request.args['username']
dn = "dc=%s" % sanitized_dn
search_filter = "(&(objectClass=*)(uid=%s))" % sanitized_filter
with ldap.initialize("ldap://127.0.0.1:1337") as ldap_connection:
user = ldap_connection.search_s(dn, ldap.SCOPE_SUBTREE, search_filter)
return user[0]
# if __name__ == "__main__":
# app.run(debug=True)

View File

@@ -0,0 +1,52 @@
from flask import request, Flask
from ldap import initialize
app = Flask(__name__)
@app.route("/tainted_var")
def tainted_var():
unsanitized_dn = "dc=%s" % request.args['dc']
unsanitized_filter = "(&(objectClass=*)(uid=%s))" % request.args['username']
ldap_connection = initialize("ldap://127.0.0.1:1337")
user = ldap_connection.search_s(
unsanitized_dn, ldap.SCOPE_SUBTREE, unsanitized_filter)
return user[0]
@app.route("/var_tainted")
def var_tainted():
unsanitized_dn = request.args['dc']
unsanitized_filter = request.args['username']
dn = "dc=%s" % unsanitized_dn
search_filter = "(&(objectClass=*)(uid=%s))" % unsanitized_filter
ldap_connection = initialize("ldap://127.0.0.1:1337")
user = ldap_connection.search_s(dn, ldap.SCOPE_SUBTREE, search_filter)
return user[0]
@app.route("/direct")
def direct():
ldap_connection = initialize("ldap://127.0.0.1:1337")
user = ldap_connection.search_s(
"dc=%s" % request.args['dc'], ldap.SCOPE_SUBTREE, "(&(objectClass=*)(uid=%s))" % request.args['username'])
return user[0]
@app.route("/with_2")
def with_2():
sanitized_dn = request.args['dc']
sanitized_filter = request.args['username']
dn = "dc=%s" % sanitized_dn
search_filter = "(&(objectClass=*)(uid=%s))" % sanitized_filter
with initialize("ldap://127.0.0.1:1337") as ldap_connection:
user = ldap_connection.search_s(dn, ldap.SCOPE_SUBTREE, search_filter)
return user[0]
# if __name__ == "__main__":
# app.run(debug=True)