mirror of
https://github.com/github/codeql.git
synced 2025-12-20 18:56:32 +01:00
Upload LDAP Improper authentication query, qhelp and tests
This commit is contained in:
31
python/ql/src/Security/CWE-287/LDAPImproperAuth.qhelp
Normal file
31
python/ql/src/Security/CWE-287/LDAPImproperAuth.qhelp
Normal file
@@ -0,0 +1,31 @@
|
||||
<!DOCTYPE qhelp PUBLIC "-//Semmle//qhelp//EN" "qhelp.dtd">
|
||||
|
||||
<qhelp>
|
||||
|
||||
<overview>
|
||||
<p>If an LDAP connection doesn't carry any kind of authentication, gets produced an access to information
|
||||
in the LDAP directory.</p>
|
||||
|
||||
<p>Simple authentication in LDAP can be used with three different mechanisms:<p>
|
||||
|
||||
<li>Anonymous Authentication Mechanism by performing a bind request with a username and password value of zero length.</li>
|
||||
<li>Unauthenticated Authentication Mechanism by performing a bind request with a password value of zero length.</li>
|
||||
<li>Name/Password Authentication Mechanism by performing a bind request with a password value of non-zero length.</li>
|
||||
|
||||
</overview>
|
||||
|
||||
<recommendation>
|
||||
<p>Every LDAP authentication should be done by using a password taken from a safe place.
|
||||
<recommendation>
|
||||
|
||||
<references>
|
||||
<li>
|
||||
SonarSource
|
||||
<a href="https://rules.sonarsource.com/python/type/Vulnerability/RSPEC-4433">RSPEC-4433</a>
|
||||
</li>
|
||||
<li>
|
||||
CWE-
|
||||
<a href="https://cwe.mitre.org/data/definitions/287.html">287</a>
|
||||
</references>
|
||||
|
||||
</qhelp>
|
||||
85
python/ql/src/Security/CWE-287/LDAPImproperAuth.ql
Normal file
85
python/ql/src/Security/CWE-287/LDAPImproperAuth.ql
Normal file
@@ -0,0 +1,85 @@
|
||||
/**
|
||||
* @name Python LDAP Improper Authentication
|
||||
* @description Check if a user-controlled query carry no authentication
|
||||
* @kind path-problem
|
||||
* @problem.severity warning
|
||||
* @id python/ldap-improper-auth
|
||||
* @tags experimental
|
||||
* security
|
||||
* external/cwe/cwe-287
|
||||
*/
|
||||
|
||||
import python
|
||||
import semmle.python.dataflow.new.RemoteFlowSources
|
||||
import semmle.python.dataflow.new.DataFlow
|
||||
import semmle.python.dataflow.new.TaintTracking
|
||||
import semmle.python.dataflow.new.internal.TaintTrackingPublic
|
||||
import DataFlow::PathGraph
|
||||
|
||||
// LDAP2
|
||||
class BindSink extends DataFlow::Node {
|
||||
BindSink() {
|
||||
exists(SsaVariable bindVar, CallNode bindCall, CallNode searchCall |
|
||||
// get variable initializing the connection
|
||||
bindVar.getDefinition().getImmediateDominator() = Value::named("ldap.initialize").getACall() and
|
||||
// get a call using that variable
|
||||
bindVar.getAUse().getImmediateDominator() = bindCall and
|
||||
// restrict call to any bind method
|
||||
bindCall.getNode().getFunc().(Attribute).getName().matches("%bind%") and
|
||||
(
|
||||
// check second argument (password)
|
||||
bindCall.getArg(1).getNode() instanceof None or
|
||||
count(bindCall.getAnArg()) = 1
|
||||
) and
|
||||
// get another call using that variable
|
||||
bindVar.getAUse().getNode() = searchCall.getNode().getFunc().(Attribute).getObject() and
|
||||
// restrict call to any search method
|
||||
searchCall.getNode().getFunc().(Attribute).getName().matches("%search%") and
|
||||
// set the third argument as sink
|
||||
this.asExpr() = searchCall.getArg(2).getNode()
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// LDAP3
|
||||
class ConnectionSink extends DataFlow::Node {
|
||||
ConnectionSink() {
|
||||
exists(SsaVariable connectionVar, CallNode connectionCall, CallNode searchCall |
|
||||
// get call initializing the connection
|
||||
connectionCall = Value::named("ldap3.Connection").getACall() and
|
||||
(
|
||||
// check password argument
|
||||
not exists(connectionCall.getArgByName("password")) or
|
||||
connectionCall.getArgByName("password").pointsTo(Value::named("None"))
|
||||
) and
|
||||
// get the variable initializing the connection
|
||||
connectionVar.getDefinition().getImmediateDominator() = connectionCall and
|
||||
// get a call using that variable
|
||||
connectionVar.getAUse().getNode() = searchCall.getNode().getFunc().(Attribute).getObject() and
|
||||
// restrict call to any search method
|
||||
searchCall.getNode().getFunc().(Attribute).getName().matches("%search%") and
|
||||
// set the second argument as sink
|
||||
this.asExpr() = searchCall.getArg(1).getNode()
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
class LDAPImproperAuthSink extends DataFlow::Node {
|
||||
LDAPImproperAuthSink() {
|
||||
this instanceof BindSink or
|
||||
this instanceof ConnectionSink
|
||||
}
|
||||
}
|
||||
|
||||
class LDAPImproperAuthConfig extends TaintTracking::Configuration {
|
||||
LDAPImproperAuthConfig() { this = "LDAPImproperAuthConfig" }
|
||||
|
||||
override predicate isSource(DataFlow::Node source) { source instanceof RemoteFlowSource }
|
||||
|
||||
override predicate isSink(DataFlow::Node sink) { sink instanceof LDAPImproperAuthSink }
|
||||
}
|
||||
|
||||
from LDAPImproperAuthConfig config, DataFlow::PathNode source, DataFlow::PathNode sink
|
||||
where config.hasFlowPath(source, sink)
|
||||
select sink.getNode(), source, sink, "$@ LDAP query executes $@.", sink.getNode(), "This",
|
||||
source.getNode(), "a user-provided value without authentication."
|
||||
30
python/ql/src/Security/CWE-287/tests/3_auth_bad.py
Normal file
30
python/ql/src/Security/CWE-287/tests/3_auth_bad.py
Normal file
@@ -0,0 +1,30 @@
|
||||
from ldap3 import Server, Connection, ALL
|
||||
from flask import request, Flask
|
||||
|
||||
app = Flask(__name__)
|
||||
|
||||
|
||||
@app.route("/passwordNone")
|
||||
def passwordNone():
|
||||
dn = request.args['dc']
|
||||
search_filter = request.args['search']
|
||||
|
||||
srv = Server('servername', get_info=ALL)
|
||||
conn = Connection(srv, user='user_dn', password=None)
|
||||
status, result, response, _ = conn.search(dn, search_filter)
|
||||
return result
|
||||
|
||||
|
||||
@app.route("/notPassword")
|
||||
def notPassword():
|
||||
dn = request.args['dc']
|
||||
search_filter = request.args['search']
|
||||
|
||||
srv = Server('servername', get_info=ALL)
|
||||
conn = Connection(srv, user='user_dn')
|
||||
status, result, response, _ = conn.search(dn, search_filter)
|
||||
return result
|
||||
|
||||
|
||||
# if __name__ == "__main__":
|
||||
# app.run(debug=True)
|
||||
20
python/ql/src/Security/CWE-287/tests/3_auth_good.py
Normal file
20
python/ql/src/Security/CWE-287/tests/3_auth_good.py
Normal file
@@ -0,0 +1,20 @@
|
||||
from ldap3 import Server, Connection, ALL
|
||||
from flask import request, Flask
|
||||
import os
|
||||
|
||||
app = Flask(__name__)
|
||||
|
||||
|
||||
@app.route("/passwordFromEnv")
|
||||
def passwordFromEnv():
|
||||
dn = request.args['dc']
|
||||
search_filter = request.args['search']
|
||||
|
||||
srv = Server('servername', get_info=ALL)
|
||||
conn = Connection(srv, user='user_dn',
|
||||
password=os.environ.get('LDAP_PASSWORD'))
|
||||
status, result, response, _ = conn.search(dn, search_filter)
|
||||
return result
|
||||
|
||||
# if __name__ == "__main__":
|
||||
# app.run(debug=True)
|
||||
52
python/ql/src/Security/CWE-287/tests/auth_bad.py
Normal file
52
python/ql/src/Security/CWE-287/tests/auth_bad.py
Normal file
@@ -0,0 +1,52 @@
|
||||
from flask import request, Flask
|
||||
import ldap
|
||||
|
||||
app = Flask(__name__)
|
||||
|
||||
|
||||
@app.route("/simple_bind")
|
||||
def simple_bind():
|
||||
dn = request.args['dc']
|
||||
search_filter = request.args['search']
|
||||
|
||||
ldap_connection = ldap.initialize("ldap://127.0.0.1:1337")
|
||||
ldap_connection.simple_bind('cn=root')
|
||||
user = ldap_connection.search_s(dn, ldap.SCOPE_SUBTREE, search_filter)
|
||||
return user[0]
|
||||
|
||||
|
||||
@app.route("/simple_bind_s")
|
||||
def simple_bind_s():
|
||||
dn = request.args['dc']
|
||||
search_filter = request.args['search']
|
||||
|
||||
ldap_connection = ldap.initialize("ldap://127.0.0.1:1337")
|
||||
ldap_connection.simple_bind_s('cn=root')
|
||||
user = ldap_connection.search_s(dn, ldap.SCOPE_SUBTREE, search_filter)
|
||||
return user[0]
|
||||
|
||||
|
||||
@app.route("/bind_s")
|
||||
def bind_s():
|
||||
dn = request.args['dc']
|
||||
search_filter = request.args['search']
|
||||
|
||||
ldap_connection = ldap.initialize("ldap://127.0.0.1:1337")
|
||||
ldap_connection.bind_s('cn=root', None)
|
||||
user = ldap_connection.search_s(dn, ldap.SCOPE_SUBTREE, search_filter)
|
||||
return user[0]
|
||||
|
||||
|
||||
@app.route("/bind")
|
||||
def bind():
|
||||
dn = request.args['dc']
|
||||
search_filter = request.args['search']
|
||||
|
||||
ldap_connection = ldap.initialize("ldap://127.0.0.1:1337")
|
||||
ldap_connection.bind('cn=root', None)
|
||||
user = ldap_connection.search_s(dn, ldap.SCOPE_SUBTREE, search_filter)
|
||||
return user[0]
|
||||
|
||||
|
||||
# if __name__ == "__main__":
|
||||
# app.run(debug=True)
|
||||
52
python/ql/src/Security/CWE-287/tests/auth_good.py
Normal file
52
python/ql/src/Security/CWE-287/tests/auth_good.py
Normal file
@@ -0,0 +1,52 @@
|
||||
from flask import request, Flask
|
||||
import ldap
|
||||
import os
|
||||
|
||||
app = Flask(__name__)
|
||||
|
||||
|
||||
@app.route("/simple_bind")
|
||||
def simple_bind():
|
||||
dn = request.args['dc']
|
||||
search_filter = request.args['search']
|
||||
|
||||
ldap_connection = ldap.initialize("ldap://127.0.0.1:1337")
|
||||
ldap_connection.simple_bind('cn=root', os.environ.get('LDAP_PASSWORD'))
|
||||
user = ldap_connection.search_s(dn, ldap.SCOPE_SUBTREE, search_filter)
|
||||
return user[0]
|
||||
|
||||
|
||||
@app.route("/simple_bind_s")
|
||||
def simple_bind_s():
|
||||
dn = request.args['dc']
|
||||
search_filter = request.args['search']
|
||||
|
||||
ldap_connection = ldap.initialize("ldap://127.0.0.1:1337")
|
||||
ldap_connection.simple_bind_s('cn=root', os.environ.get('LDAP_PASSWORD'))
|
||||
user = ldap_connection.search_s(dn, ldap.SCOPE_SUBTREE, search_filter)
|
||||
return user[0]
|
||||
|
||||
|
||||
@app.route("/bind_s")
|
||||
def bind_s():
|
||||
dn = request.args['dc']
|
||||
search_filter = request.args['search']
|
||||
|
||||
ldap_connection = ldap.initialize("ldap://127.0.0.1:1337")
|
||||
ldap_connection.bind_s('cn=root', os.environ.get('LDAP_PASSWORD'))
|
||||
user = ldap_connection.search_s(dn, ldap.SCOPE_SUBTREE, search_filter)
|
||||
return user[0]
|
||||
|
||||
|
||||
@app.route("/bind")
|
||||
def bind():
|
||||
dn = request.args['dc']
|
||||
search_filter = request.args['search']
|
||||
|
||||
ldap_connection = ldap.initialize("ldap://127.0.0.1:1337")
|
||||
ldap_connection.bind('cn=root', os.environ.get('LDAP_PASSWORD'))
|
||||
user = ldap_connection.search_s(dn, ldap.SCOPE_SUBTREE, search_filter)
|
||||
return user[0]
|
||||
|
||||
# if __name__ == "__main__":
|
||||
# app.run(debug=True)
|
||||
Reference in New Issue
Block a user