Merge pull request #9157 from alexrford/crypto-op-block-mode

Ruby/Python: Add a `BlockMode` concept for `CryptographicOperations`
This commit is contained in:
Alex Ford
2022-06-13 21:32:36 +02:00
committed by GitHub
34 changed files with 478 additions and 113 deletions

View File

@@ -1211,38 +1211,5 @@ module Cryptography {
}
}
import semmle.python.concepts.CryptoAlgorithms
/**
* A data-flow node that is an application of a cryptographic algorithm. For example,
* encryption, decryption, signature-validation.
*
* Extend this class to refine existing API models. If you want to model new APIs,
* extend `CryptographicOperation::Range` instead.
*/
class CryptographicOperation extends DataFlow::Node instanceof CryptographicOperation::Range {
/** Gets the algorithm used, if it matches a known `CryptographicAlgorithm`. */
CryptographicAlgorithm getAlgorithm() { result = super.getAlgorithm() }
/** Gets an input the algorithm is used on, for example the plain text input to be encrypted. */
DataFlow::Node getAnInput() { result = super.getAnInput() }
}
/** Provides classes for modeling new applications of a cryptographic algorithms. */
module CryptographicOperation {
/**
* A data-flow node that is an application of a cryptographic algorithm. For example,
* encryption, decryption, signature-validation.
*
* Extend this class to model new APIs. If you want to refine existing API models,
* extend `CryptographicOperation` instead.
*/
abstract class Range extends DataFlow::Node {
/** Gets the algorithm used, if it matches a known `CryptographicAlgorithm`. */
abstract CryptographicAlgorithm getAlgorithm();
/** Gets an input the algorithm is used on, for example the plain text input to be encrypted. */
abstract DataFlow::Node getAnInput();
}
}
import semmle.python.internal.ConceptsShared::Cryptography
}

View File

@@ -81,6 +81,9 @@ class EncryptionAlgorithm extends MkEncryptionAlgorithm, CryptographicAlgorithm
override string getName() { result = name }
override predicate isWeak() { isWeak = true }
/** Holds if this algorithm is a stream cipher. */
predicate isStreamCipher() { isStreamCipher(name) }
}
/**

View File

@@ -67,6 +67,6 @@ predicate isStrongPasswordHashingAlgorithm(string name) {
predicate isWeakPasswordHashingAlgorithm(string name) { name = "EVPKDF" }
/**
* Holds if `name` corresponds to a weak block cipher mode of operation.
* Holds if `name` corresponds to a stream cipher.
*/
predicate isWeakBlockMode(string name) { name = "ECB" }
predicate isStreamCipher(string name) { name = ["CHACHA", "RC4", "ARC4", "ARCFOUR", "RABBIT"] }

View File

@@ -108,20 +108,20 @@ private module CryptodomeModel {
DataFlow::CallCfgNode {
string methodName;
string cipherName;
API::CallNode newCall;
CryptodomeGenericCipherOperation() {
methodName in [
"encrypt", "decrypt", "verify", "update", "hexverify", "encrypt_and_digest",
"decrypt_and_verify"
] and
this =
newCall =
API::moduleImport(["Crypto", "Cryptodome"])
.getMember("Cipher")
.getMember(cipherName)
.getMember("new")
.getReturn()
.getMember(methodName)
.getACall()
.getACall() and
this = newCall.getReturn().getMember(methodName).getACall()
}
override Cryptography::CryptographicAlgorithm getAlgorithm() { result.matchesName(cipherName) }
@@ -155,6 +155,20 @@ private module CryptodomeModel {
this.getArgByName("mac_tag")
]
}
override Cryptography::BlockMode getBlockMode() {
// `modeName` is of the form "MODE_<BlockMode>"
exists(string modeName |
newCall.getArg(1) =
API::moduleImport(["Crypto", "Cryptodome"])
.getMember("Cipher")
.getMember(cipherName)
.getMember(modeName)
.getAUse()
|
result = modeName.splitAt("_", 1)
)
}
}
/**
@@ -192,6 +206,8 @@ private module CryptodomeModel {
result in [this.getArg(1), this.getArgByName("signature")]
)
}
override Cryptography::BlockMode getBlockMode() { none() }
}
/**
@@ -215,5 +231,7 @@ private module CryptodomeModel {
override Cryptography::CryptographicAlgorithm getAlgorithm() { result.matchesName(hashName) }
override DataFlow::Node getAnInput() { result in [this.getArg(0), this.getArgByName("data")] }
override Cryptography::BlockMode getBlockMode() { none() }
}
}

View File

@@ -170,8 +170,19 @@ private module CryptographyModel {
.getMember(algorithmName)
}
/** Gets a reference to a `cryptography.hazmat.primitives.ciphers.modes` Class */
API::Node modeClassRef(string modeName) {
result =
API::moduleImport("cryptography")
.getMember("hazmat")
.getMember("primitives")
.getMember("ciphers")
.getMember("modes")
.getMember(modeName)
}
/** Gets a reference to a Cipher instance using algorithm with `algorithmName`. */
API::Node cipherInstance(string algorithmName) {
API::Node cipherInstance(string algorithmName, string modeName) {
exists(API::CallNode call | result = call.getReturn() |
call =
API::moduleImport("cryptography")
@@ -182,7 +193,12 @@ private module CryptographyModel {
.getACall() and
algorithmClassRef(algorithmName).getReturn().getAUse() in [
call.getArg(0), call.getArgByName("algorithm")
]
] and
exists(DataFlow::Node modeArg | modeArg in [call.getArg(1), call.getArgByName("mode")] |
if modeArg = modeClassRef(_).getReturn().getAUse()
then modeArg = modeClassRef(modeName).getReturn().getAUse()
else modeName = "<None or unknown>"
)
)
}
@@ -192,10 +208,11 @@ private module CryptographyModel {
class CryptographyGenericCipherOperation extends Cryptography::CryptographicOperation::Range,
DataFlow::MethodCallNode {
string algorithmName;
string modeName;
CryptographyGenericCipherOperation() {
this =
cipherInstance(algorithmName)
cipherInstance(algorithmName, modeName)
.getMember(["decryptor", "encryptor"])
.getReturn()
.getMember(["update", "update_into"])
@@ -207,6 +224,8 @@ private module CryptographyModel {
}
override DataFlow::Node getAnInput() { result in [this.getArg(0), this.getArgByName("data")] }
override Cryptography::BlockMode getBlockMode() { result = modeName }
}
}
@@ -257,6 +276,8 @@ private module CryptographyModel {
}
override DataFlow::Node getAnInput() { result in [this.getArg(0), this.getArgByName("data")] }
override Cryptography::BlockMode getBlockMode() { none() }
}
}
}

View File

@@ -41,6 +41,8 @@ private module Rsa {
override DataFlow::Node getAnInput() {
result in [this.getArg(0), this.getArgByName("message")]
}
override Cryptography::BlockMode getBlockMode() { none() }
}
/**
@@ -54,6 +56,8 @@ private module Rsa {
override Cryptography::CryptographicAlgorithm getAlgorithm() { result.getName() = "RSA" }
override DataFlow::Node getAnInput() { result in [this.getArg(0), this.getArgByName("crypto")] }
override Cryptography::BlockMode getBlockMode() { none() }
}
/**
@@ -79,6 +83,8 @@ private module Rsa {
override DataFlow::Node getAnInput() {
result in [this.getArg(0), this.getArgByName("message")]
}
override Cryptography::BlockMode getBlockMode() { none() }
}
/**
@@ -100,6 +106,8 @@ private module Rsa {
or
result in [this.getArg(1), this.getArgByName("signature")]
}
override Cryptography::BlockMode getBlockMode() { none() }
}
/**
@@ -122,6 +130,8 @@ private module Rsa {
override DataFlow::Node getAnInput() {
result in [this.getArg(0), this.getArgByName("message")]
}
override Cryptography::BlockMode getBlockMode() { none() }
}
/**
@@ -137,5 +147,7 @@ private module Rsa {
override DataFlow::Node getAnInput() {
result in [this.getArg(0), this.getArgByName("hash_value")]
}
override Cryptography::BlockMode getBlockMode() { none() }
}
}

View File

@@ -2671,6 +2671,8 @@ private module StdlibPrivate {
override Cryptography::CryptographicAlgorithm getAlgorithm() { result.matchesName(hashName) }
override DataFlow::Node getAnInput() { result = this.getParameter(1, "data").getARhs() }
override Cryptography::BlockMode getBlockMode() { none() }
}
/**
@@ -2686,6 +2688,8 @@ private module StdlibPrivate {
override Cryptography::CryptographicAlgorithm getAlgorithm() { result.matchesName(hashName) }
override DataFlow::Node getAnInput() { result = this.getArg(0) }
override Cryptography::BlockMode getBlockMode() { none() }
}
/** Helper predicate for the `HashLibGenericHashOperation` charpred, to prevent a bad join order. */
@@ -2709,6 +2713,8 @@ private module StdlibPrivate {
HashlibGenericHashOperation() { hashClass = hashlibMember(hashName) }
override Cryptography::CryptographicAlgorithm getAlgorithm() { result.matchesName(hashName) }
override Cryptography::BlockMode getBlockMode() { none() }
}
/**

View File

@@ -4,3 +4,4 @@
*/
import semmle.python.dataflow.new.DataFlow
import semmle.python.concepts.CryptoAlgorithms as CryptoAlgorithms

View File

@@ -11,3 +11,79 @@
*/
private import ConceptsImports
/**
* Provides models for cryptographic concepts.
*
* Note: The `CryptographicAlgorithm` class currently doesn't take weak keys into
* consideration for the `isWeak` member predicate. So RSA is always considered
* secure, although using a low number of bits will actually make it insecure. We plan
* to improve our libraries in the future to more precisely capture this aspect.
*/
module Cryptography {
class CryptographicAlgorithm = CryptoAlgorithms::CryptographicAlgorithm;
class EncryptionAlgorithm = CryptoAlgorithms::EncryptionAlgorithm;
class HashingAlgorithm = CryptoAlgorithms::HashingAlgorithm;
class PasswordHashingAlgorithm = CryptoAlgorithms::PasswordHashingAlgorithm;
/**
* A data-flow node that is an application of a cryptographic algorithm. For example,
* encryption, decryption, signature-validation.
*
* Extend this class to refine existing API models. If you want to model new APIs,
* extend `CryptographicOperation::Range` instead.
*/
class CryptographicOperation extends DataFlow::Node instanceof CryptographicOperation::Range {
/** Gets the algorithm used, if it matches a known `CryptographicAlgorithm`. */
CryptographicAlgorithm getAlgorithm() { result = super.getAlgorithm() }
/** Gets an input the algorithm is used on, for example the plain text input to be encrypted. */
DataFlow::Node getAnInput() { result = super.getAnInput() }
/**
* Gets the block mode used to perform this cryptographic operation.
* This may have no result - for example if the `CryptographicAlgorithm` used
* is a stream cipher rather than a block cipher.
*/
BlockMode getBlockMode() { result = super.getBlockMode() }
}
/** Provides classes for modeling new applications of a cryptographic algorithms. */
module CryptographicOperation {
/**
* A data-flow node that is an application of a cryptographic algorithm. For example,
* encryption, decryption, signature-validation.
*
* Extend this class to model new APIs. If you want to refine existing API models,
* extend `CryptographicOperation` instead.
*/
abstract class Range extends DataFlow::Node {
/** Gets the algorithm used, if it matches a known `CryptographicAlgorithm`. */
abstract CryptographicAlgorithm getAlgorithm();
/** Gets an input the algorithm is used on, for example the plain text input to be encrypted. */
abstract DataFlow::Node getAnInput();
/**
* Gets the block mode used to perform this cryptographic operation.
* This may have no result - for example if the `CryptographicAlgorithm` used
* is a stream cipher rather than a block cipher.
*/
abstract BlockMode getBlockMode();
}
}
/**
* A cryptographic block cipher mode of operation. This can be used to encrypt
* data of arbitrary length using a block encryption algorithm.
*/
class BlockMode extends string {
BlockMode() { this = ["ECB", "CBC", "GCM", "CCM", "CFB", "OFB", "CTR", "OPENPGP"] }
/** Holds if this block mode is considered to be insecure. */
predicate isWeak() { this = "ECB" }
}
}

View File

@@ -13,13 +13,18 @@
import python
import semmle.python.Concepts
from Cryptography::CryptographicOperation operation, Cryptography::CryptographicAlgorithm algorithm
from
Cryptography::CryptographicOperation operation, Cryptography::CryptographicAlgorithm algorithm,
string msgPrefix
where
algorithm = operation.getAlgorithm() and
algorithm.isWeak() and
// `Cryptography::HashingAlgorithm` and `Cryptography::PasswordHashingAlgorithm` are
// handled by `py/weak-sensitive-data-hashing`
algorithm instanceof Cryptography::EncryptionAlgorithm
select operation,
"The cryptographic algorithm " + algorithm.getName() +
" is broken or weak, and should not be used."
algorithm instanceof Cryptography::EncryptionAlgorithm and
(
algorithm.isWeak() and
msgPrefix = "The cryptographic algorithm " + operation.getAlgorithm().getName()
)
or
operation.getBlockMode().isWeak() and msgPrefix = "The block mode " + operation.getBlockMode()
select operation, msgPrefix + " is broken or weak, and should not be used."

View File

@@ -0,0 +1,4 @@
---
category: minorAnalysis
---
* The query "Use of a broken or weak cryptographic algorithm" (`py/weak-cryptographic-algorithm`) now report if a cryptographic operation is potentially insecure due to use of a weak block mode.

View File

@@ -487,7 +487,8 @@ class CryptographicOperationTest extends InlineExpectationsTest {
override string getARelevantTag() {
result in [
"CryptographicOperation", "CryptographicOperationInput", "CryptographicOperationAlgorithm"
"CryptographicOperation", "CryptographicOperationInput", "CryptographicOperationAlgorithm",
"CryptographicOperationBlockMode"
]
}
@@ -507,6 +508,10 @@ class CryptographicOperationTest extends InlineExpectationsTest {
element = cryptoOperation.toString() and
value = cryptoOperation.getAlgorithm().getName() and
tag = "CryptographicOperationAlgorithm"
or
element = cryptoOperation.toString() and
value = cryptoOperation.getBlockMode() and
tag = "CryptographicOperationBlockMode"
)
)
}

View File

@@ -21,14 +21,14 @@ padding = b"\0"*padding_len
cipher = AES.new(key, AES.MODE_CBC, iv=iv)
# using separate .encrypt calls on individual lines does not work
whole_plantext = secret_message + padding
encrypted = cipher.encrypt(whole_plantext) # $ CryptographicOperation CryptographicOperationAlgorithm=AES CryptographicOperationInput=whole_plantext
encrypted = cipher.encrypt(whole_plantext) # $ CryptographicOperation CryptographicOperationAlgorithm=AES CryptographicOperationInput=whole_plantext CryptographicOperationBlockMode=CBC
print("encrypted={}".format(encrypted))
print()
cipher = AES.new(key, AES.MODE_CBC, iv=iv)
decrypted = cipher.decrypt(encrypted) # $ CryptographicOperation CryptographicOperationAlgorithm=AES CryptographicOperationInput=encrypted
decrypted = cipher.decrypt(encrypted) # $ CryptographicOperation CryptographicOperationAlgorithm=AES CryptographicOperationInput=encrypted CryptographicOperationBlockMode=CBC
decrypted = decrypted[:-padding_len]

View File

@@ -21,14 +21,14 @@ padding = b"\0"*padding_len
cipher = AES.new(key, AES.MODE_CBC, iv=iv)
# using separate .encrypt calls on individual lines does not work
whole_plantext = secret_message + padding
encrypted = cipher.encrypt(whole_plantext) # $ CryptographicOperation CryptographicOperationAlgorithm=AES CryptographicOperationInput=whole_plantext
encrypted = cipher.encrypt(whole_plantext) # $ CryptographicOperation CryptographicOperationAlgorithm=AES CryptographicOperationInput=whole_plantext CryptographicOperationBlockMode=CBC
print("encrypted={}".format(encrypted))
print()
cipher = AES.new(key, AES.MODE_CBC, iv=iv)
decrypted = cipher.decrypt(encrypted) # $ CryptographicOperation CryptographicOperationAlgorithm=AES CryptographicOperationInput=encrypted
decrypted = cipher.decrypt(encrypted) # $ CryptographicOperation CryptographicOperationAlgorithm=AES CryptographicOperationInput=encrypted CryptographicOperationBlockMode=CBC
decrypted = decrypted[:-padding_len]

View File

@@ -22,8 +22,8 @@ padding = b"\0"*padding_len
encryptor = cipher.encryptor()
print(padding_len)
encrypted = encryptor.update(secret_message) # $ CryptographicOperation CryptographicOperationAlgorithm=AES CryptographicOperationInput=secret_message
encrypted += encryptor.update(padding) # $ CryptographicOperation CryptographicOperationAlgorithm=AES CryptographicOperationInput=padding
encrypted = encryptor.update(secret_message) # $ CryptographicOperation CryptographicOperationAlgorithm=AES CryptographicOperationInput=secret_message CryptographicOperationBlockMode=CBC
encrypted += encryptor.update(padding) # $ CryptographicOperation CryptographicOperationAlgorithm=AES CryptographicOperationInput=padding CryptographicOperationBlockMode=CBC
encrypted += encryptor.finalize()
print("encrypted={}".format(encrypted))
@@ -31,7 +31,7 @@ print("encrypted={}".format(encrypted))
print()
decryptor = cipher.decryptor()
decrypted = decryptor.update(encrypted) # $ CryptographicOperation CryptographicOperationAlgorithm=AES CryptographicOperationInput=encrypted
decrypted = decryptor.update(encrypted) # $ CryptographicOperation CryptographicOperationAlgorithm=AES CryptographicOperationInput=encrypted CryptographicOperationBlockMode=CBC
decrypted += decryptor.finalize()
decrypted = decrypted[:-padding_len]

View File

@@ -1,2 +1,4 @@
| test_cryptodome.py:11:13:11:42 | ControlFlowNode for Attribute() | The cryptographic algorithm ARC4 is broken or weak, and should not be used. |
| test_cryptodome.py:16:13:16:42 | ControlFlowNode for Attribute() | The block mode ECB is broken or weak, and should not be used. |
| test_cryptography.py:13:13:13:44 | ControlFlowNode for Attribute() | The cryptographic algorithm ARC4 is broken or weak, and should not be used. |
| test_cryptography.py:22:13:22:58 | ControlFlowNode for Attribute() | The block mode ECB is broken or weak, and should not be used. |

View File

@@ -1,5 +1,5 @@
# snippet from python/ql/test/experimental/library-tests/frameworks/cryptodome/test_rc4.py
from Cryptodome.Cipher import ARC4
from Cryptodome.Cipher import ARC4, AES
import os
@@ -11,3 +11,8 @@ cipher = ARC4.new(key)
encrypted = cipher.encrypt(secret_message) # NOT OK
print(secret_message, encrypted)
cipher = AES.new(key, AES.MODE_ECB)
encrypted = cipher.encrypt(secret_message) # NOT OK
print(secret_message, encrypted)

View File

@@ -1,5 +1,5 @@
# snippet from python/ql/test/experimental/library-tests/frameworks/cryptography/test_rc4.py
from cryptography.hazmat.primitives.ciphers import algorithms, Cipher
from cryptography.hazmat.primitives.ciphers import algorithms, modes, Cipher
import os
key = os.urandom(256//8)
@@ -14,3 +14,12 @@ encrypted = encryptor.update(secret_message) # NOT OK
encrypted += encryptor.finalize()
print(secret_message, encrypted)
algorithm = algorithms.AES(key)
cipher = Cipher(algorithm, mode=modes.ECB())
encryptor = cipher.encryptor()
encrypted = encryptor.update(secret_message + b'\x80\x00') # NOT OK
encrypted += encryptor.finalize()
print(secret_message, encrypted)