Python: refactor into modules

and turn on the pyOpenSSL module
This commit is contained in:
Rasmus Lerchedahl Petersen
2021-02-17 14:30:28 +01:00
parent 72b37a5b1b
commit 7ed018aff6

View File

@@ -39,96 +39,18 @@ abstract class ContextCreation extends DataFlow::CfgNode {
abstract DataFlow::CfgNode getProtocol();
}
class SSLContextCreation extends ContextCreation {
override CallNode node;
SSLContextCreation() { this = API::moduleImport("ssl").getMember("SSLContext").getACall() }
override DataFlow::CfgNode getProtocol() {
result.getNode() in [node.getArg(0), node.getArgByName("protocol")]
}
}
class PyOpenSSLContextCreation extends ContextCreation {
override CallNode node;
PyOpenSSLContextCreation() {
this = API::moduleImport("pyOpenSSL").getMember("SSL").getMember("Context").getACall()
}
override DataFlow::CfgNode getProtocol() {
result.getNode() in [node.getArg(0), node.getArgByName("method")]
}
}
abstract class ConnectionCreation extends DataFlow::CfgNode {
abstract DataFlow::CfgNode getContext();
}
class WrapSocketCall extends ConnectionCreation {
override CallNode node;
WrapSocketCall() { node.getFunction().(AttrNode).getName() = "wrap_socket" }
override DataFlow::CfgNode getContext() {
result.getNode() = node.getFunction().(AttrNode).getObject()
}
}
class ConnectionCall extends ConnectionCreation {
override CallNode node;
ConnectionCall() {
this = API::moduleImport("pyOpenSSL").getMember("SSL").getMember("Connection").getACall()
}
override DataFlow::CfgNode getContext() {
result.getNode() in [node.getArg(0), node.getArgByName("context")]
}
}
class ProtocolRestriction extends DataFlow::CfgNode {
abstract DataFlow::CfgNode getContext();
abstract string getRestriction();
}
class OptionsAugOr extends ProtocolRestriction {
string restriction;
OptionsAugOr() {
exists(AugAssign aa, AttrNode attr |
aa.getOperation().getOp() instanceof BitOr and
aa.getTarget() = attr.getNode() and
attr.getName() = "options" and
attr.getObject() = node and
aa.getValue() = API::moduleImport("ssl").getMember(restriction).getAUse().asExpr()
)
}
override DataFlow::CfgNode getContext() { result = this }
override string getRestriction() { result = restriction }
}
class SetOptionsCall extends ProtocolRestriction {
override CallNode node;
SetOptionsCall() { node.getFunction().(AttrNode).getName() = "set_options" }
override DataFlow::CfgNode getContext() {
result.getNode() = node.getFunction().(AttrNode).getObject()
}
override string getRestriction() {
API::moduleImport("PyOpenSSL").getMember("SSL").getMember(result).getAUse().asCfgNode() in [
node.getArg(0), node.getArgByName("options")
]
}
}
abstract class TlsLibrary extends string {
TlsLibrary() { this in ["ssl"] }
TlsLibrary() { this in ["ssl", "pyOpenSSL"] }
abstract string specific_insecure_version_name();
@@ -160,85 +82,149 @@ abstract class TlsLibrary extends string {
result.(ContextCreation).getProtocol() = unspecific_version()
}
/** A connection is created in an outright insecure manner. */
abstract DataFlow::CfgNode insecure_connection_creation();
/** A connection is created from a context. */
abstract ConnectionCreation connection_creation();
abstract ProtocolRestriction protocol_restriction();
}
class Ssl extends TlsLibrary {
Ssl() { this = "ssl" }
override string specific_insecure_version_name() {
result in [
"PROTOCOL_SSLv2", "PROTOCOL_SSLv3", "PROTOCOL_SSLv23", "PROTOCOL_TLSv1", "PROTOCOL_TLSv1_1"
]
}
override string unspecific_version_name() { result = "PROTOCOL_TLS" }
override API::Node version_constants() { result = API::moduleImport("ssl") }
override DataFlow::CfgNode default_context_creation() {
result = API::moduleImport("ssl").getMember("create_default_context").getACall()
}
override ContextCreation specific_context_creation() { result instanceof SSLContextCreation }
override ConnectionCreation connection_creation() { result instanceof WrapSocketCall }
override ProtocolRestriction protocol_restriction() { result instanceof OptionsAugOr }
}
class PyOpenSSL extends TlsLibrary {
PyOpenSSL() { this = "pyOpenSSL" }
override string specific_insecure_version_name() {
result in ["SSLv2_METHOD", "SSLv23_METHOD", "SSLv3_METHOD", "TLSv1_METHOD", "TLSv1_1_METHOD"]
}
override string unspecific_version_name() { result = "TLS_METHOD" }
override API::Node version_constants() {
result = API::moduleImport("pyOpenSSL").getMember("SSL")
}
override DataFlow::CfgNode default_context_creation() { none() }
override ContextCreation specific_context_creation() {
result instanceof PyOpenSSLContextCreation
}
override ConnectionCreation connection_creation() { result instanceof ConnectionCall }
override ProtocolRestriction protocol_restriction() { result instanceof OptionsAugOr }
}
module ssl {
string insecure_version_name() {
result = "PROTOCOL_SSLv2" or
result = "PROTOCOL_SSLv3" or
result = "PROTOCOL_SSLv23" or
result = "PROTOCOL_TLSv1" or
result = "PROTOCOL_TLSv1_1"
class SSLContextCreation extends ContextCreation {
override CallNode node;
SSLContextCreation() { this = API::moduleImport("ssl").getMember("SSLContext").getACall() }
override DataFlow::CfgNode getProtocol() {
result.getNode() in [node.getArg(0), node.getArgByName("protocol")]
}
}
DataFlow::Node insecure_version() {
result = API::moduleImport("ssl").getMember(insecure_version_name()).getAUse()
class WrapSocketCall extends ConnectionCreation {
override CallNode node;
WrapSocketCall() { node.getFunction().(AttrNode).getName() = "wrap_socket" }
override DataFlow::CfgNode getContext() {
result.getNode() = node.getFunction().(AttrNode).getObject()
}
}
class OptionsAugOr extends ProtocolRestriction {
string restriction;
OptionsAugOr() {
exists(AugAssign aa, AttrNode attr |
aa.getOperation().getOp() instanceof BitOr and
aa.getTarget() = attr.getNode() and
attr.getName() = "options" and
attr.getObject() = node and
aa.getValue() = API::moduleImport("ssl").getMember(restriction).getAUse().asExpr()
)
}
override DataFlow::CfgNode getContext() { result = this }
override string getRestriction() { result = restriction }
}
class Ssl extends TlsLibrary {
Ssl() { this = "ssl" }
override string specific_insecure_version_name() {
result in [
"PROTOCOL_SSLv2", "PROTOCOL_SSLv3", "PROTOCOL_SSLv23", "PROTOCOL_TLSv1",
"PROTOCOL_TLSv1_1"
]
}
override string unspecific_version_name() { result = "PROTOCOL_TLS" }
override API::Node version_constants() { result = API::moduleImport("ssl") }
override DataFlow::CfgNode default_context_creation() {
result = API::moduleImport("ssl").getMember("create_default_context").getACall()
}
override ContextCreation specific_context_creation() { result instanceof SSLContextCreation }
override DataFlow::CfgNode insecure_connection_creation() {
result = API::moduleImport("ssl").getMember("wrap_socket").getACall()
}
override ConnectionCreation connection_creation() { result instanceof WrapSocketCall }
override ProtocolRestriction protocol_restriction() { result instanceof OptionsAugOr }
}
}
module pyOpenSSL {
string insecure_version_name() {
result = "SSLv2_METHOD" or
result = "SSLv23_METHOD" or
result = "SSLv3_METHOD" or
result = "TLSv1_METHOD" or
result = "TLSv1_1_METHOD"
class PyOpenSSLContextCreation extends ContextCreation {
override CallNode node;
PyOpenSSLContextCreation() {
this = API::moduleImport("pyOpenSSL").getMember("SSL").getMember("Context").getACall()
}
override DataFlow::CfgNode getProtocol() {
result.getNode() in [node.getArg(0), node.getArgByName("method")]
}
}
DataFlow::Node insecure_version() {
result =
API::moduleImport("pyOpenSSL").getMember("SSL").getMember(insecure_version_name()).getAUse()
class ConnectionCall extends ConnectionCreation {
override CallNode node;
ConnectionCall() {
this = API::moduleImport("pyOpenSSL").getMember("SSL").getMember("Connection").getACall()
}
override DataFlow::CfgNode getContext() {
result.getNode() in [node.getArg(0), node.getArgByName("context")]
}
}
class SetOptionsCall extends ProtocolRestriction {
override CallNode node;
SetOptionsCall() { node.getFunction().(AttrNode).getName() = "set_options" }
override DataFlow::CfgNode getContext() {
result.getNode() = node.getFunction().(AttrNode).getObject()
}
override string getRestriction() {
API::moduleImport("PyOpenSSL").getMember("SSL").getMember(result).getAUse().asCfgNode() in [
node.getArg(0), node.getArgByName("options")
]
}
}
class PyOpenSSL extends TlsLibrary {
PyOpenSSL() { this = "pyOpenSSL" }
override string specific_insecure_version_name() {
result in ["SSLv2_METHOD", "SSLv23_METHOD", "SSLv3_METHOD", "TLSv1_METHOD", "TLSv1_1_METHOD"]
}
override string unspecific_version_name() { result = "TLS_METHOD" }
override API::Node version_constants() {
result = API::moduleImport("pyOpenSSL").getMember("SSL")
}
override DataFlow::CfgNode default_context_creation() { none() }
override ContextCreation specific_context_creation() {
result instanceof PyOpenSSLContextCreation
}
override DataFlow::CfgNode insecure_connection_creation() { none() }
override ConnectionCreation connection_creation() { result instanceof ConnectionCall }
override ProtocolRestriction protocol_restriction() { result instanceof SetOptionsCall }
}
}
@@ -278,129 +264,34 @@ class AllowsTLSv1_1 extends InsecureContextConfiguration {
override string flag() { result = "OP_NO_TLSv1_1" }
}
predicate unsafe_connection_creation(DataFlow::Node node) {
exists(AllowsTLSv1 c | c.hasFlowTo(node)) or
exists(AllowsTLSv1_1 c | c.hasFlowTo(node)) //or
// node = API::moduleImport("ssl").getMember("wrap_socket").getACall()
}
predicate unsafe_context_creation(DataFlow::Node node) {
exists(TlsLibrary l | l.insecure_context_creation() = node)
}
// class InsecureTLSContextConfiguration extends DataFlow::Configuration {
// InsecureTLSContextConfiguration() { this in ["AllowsTLSv1", "AllowsTLSv1_1"] }
// override predicate isSource(DataFlow::Node source) {
// source instanceof InsecureSSLContextCreation
// }
// override predicate isSink(DataFlow::Node sink) { sink = any(WrapSocketCall c).getContext() }
// abstract string flag();
// override predicate isBarrierOut(DataFlow::Node node) {
// exists(AugAssign aa, AttrNode attr |
// aa.getOperation().getOp() instanceof BitOr and
// aa.getTarget() = attr.getNode() and
// attr.getName() = "options" and
// attr.getObject() = node.asCfgNode() and
// aa.getValue() = API::moduleImport("ssl").getMember(flag()).getAUse().asExpr()
// )
// }
// }
// class AllowsTLSv1 extends InsecureTLSContextConfiguration {
// AllowsTLSv1() { this = "AllowsTLSv1" }
// override string flag() { result = "OP_NO_TLSv1" }
// }
// class AllowsTLSv1_1 extends InsecureTLSContextConfiguration {
// AllowsTLSv1_1() { this = "AllowsTLSv1_1" }
// override string flag() { result = "OP_NO_TLSv1_1" }
// }
// predicate unsafe_wrap_socket_call(DataFlow::Node node) {
// exists(AllowsTLSv1 c | c.hasFlowTo(node)) or
// exists(AllowsTLSv1_1 c | c.hasFlowTo(node)) or
// node = API::moduleImport("ssl").getMember("wrap_socket").getACall()
// }
private ModuleValue the_ssl_module() { result = Module::named("ssl") }
FunctionValue ssl_wrap_socket() { result = the_ssl_module().attr("wrap_socket") }
ClassValue ssl_Context_class() { result = the_ssl_module().attr("SSLContext") }
private ModuleValue the_pyOpenSSL_module() { result = Value::named("pyOpenSSL.SSL") }
ClassValue the_pyOpenSSL_Context_class() { result = Value::named("pyOpenSSL.SSL.Context") }
// Since version 3.6, it is fine to call `ssl.SSLContext(protocol=PROTOCOL_TLS)`
// if one also specifies either OP_NO_TLSv1 (introduced in 3.2)
// or SSLContext.minimum_version other than TLSVersion.TLSv1 (introduced in 3.7)
// See https://docs.python.org/3/library/ssl.html?highlight=ssl#ssl.SSLContext
// and https://docs.python.org/3/library/ssl.html?highlight=ssl#protocol-versions
// FP reported here: https://github.com/github/codeql/issues/2554
// string insecure_version_name() {
// // For `pyOpenSSL.SSL`
// result = "SSLv2_METHOD" or
// result = "SSLv23_METHOD" or
// result = "SSLv3_METHOD" or
// result = "TLSv1_METHOD" or
// // For the `ssl` module
// result = "PROTOCOL_SSLv2" or
// result = "PROTOCOL_SSLv3" or
// result = "PROTOCOL_SSLv23" or
// result = "PROTOCOL_TLSv1"
// }
/*
* A syntactic check for cases where points-to analysis cannot infer the presence of
* a protocol constant, e.g. if it has been removed in later versions of the `ssl`
* library.
*/
bindingset[named_argument]
predicate probable_insecure_ssl_constant(
CallNode call, string insecure_version, string named_argument
) {
exists(ControlFlowNode arg |
arg = call.getArgByName(named_argument) or
arg = call.getArg(0)
|
arg.(AttrNode).getObject(insecure_version).pointsTo(the_ssl_module())
or
arg.(NameNode).getId() = insecure_version and
exists(Import imp |
imp.getAnImportedModuleName() = "ssl" and
imp.getAName().getAsname().(Name).getId() = insecure_version
)
)
}
predicate unsafe_ssl_wrap_socket_call(
CallNode call, string method_name, string insecure_version, string named_argument
) {
(
call = ssl_wrap_socket().getACall() and
method_name = "deprecated method ssl.wrap_socket" and
named_argument = "ssl_version"
or
call = ssl_Context_class().getACall() and
named_argument = "protocol" and
method_name = "ssl.SSLContext"
) and
insecure_version = ssl::insecure_version_name() and
(
call.getArgByName(named_argument).pointsTo(the_ssl_module().attr(insecure_version))
or
probable_insecure_ssl_constant(call, insecure_version, named_argument)
)
}
predicate unsafe_pyOpenSSL_Context_call(CallNode call, string insecure_version) {
call = the_pyOpenSSL_Context_class().getACall() and
insecure_version = pyOpenSSL::insecure_version_name() and
call.getArg(0).pointsTo(the_pyOpenSSL_module().attr(insecure_version))
}
from CallNode call, string method_name, string insecure_version
where
unsafe_ssl_wrap_socket_call(call, method_name, insecure_version, _)
predicate unsafe_connection_creation(DataFlow::Node node, string insecure_version) {
exists(AllowsTLSv1 c | c.hasFlowTo(node)) and
insecure_version = "TLSv1"
or
unsafe_pyOpenSSL_Context_call(call, insecure_version) and method_name = "pyOpenSSL.SSL.Context"
select call,
"Insecure SSL/TLS protocol version " + insecure_version + " specified in call to " + method_name +
"."
exists(AllowsTLSv1_1 c | c.hasFlowTo(node)) and
insecure_version = "TLSv1"
or
exists(TlsLibrary l | l.insecure_connection_creation() = node) and
insecure_version = "[multiple]"
}
predicate unsafe_context_creation(DataFlow::Node node, string insecure_version) {
exists(TlsLibrary l, ContextCreation cc | cc = l.insecure_context_creation() |
cc = node and insecure_version = cc.getProtocol().toString()
)
}
from DataFlow::Node node, string insecure_version
where
unsafe_connection_creation(node, insecure_version)
or
unsafe_context_creation(node, insecure_version)
select node, "Insecure SSL/TLS protocol version " + insecure_version //+ " specified in call to " + method_name + "."
// from CallNode call, string method_name, string insecure_version
// where
// unsafe_ssl_wrap_socket_call(call, method_name, insecure_version, _)
// or
// unsafe_pyOpenSSL_Context_call(call, insecure_version) and method_name = "pyOpenSSL.SSL.Context"
// select call,
// "Insecure SSL/TLS protocol version " + insecure_version + " specified in call to " + method_name +
// "."