diff --git a/python/ql/test/experimental/query-tests/Security/CWE-327-UnsafeUsageOfClientSideEncryptionVersion/UnsafeUsageOfClientSideEncryptionVersion.expected b/python/ql/test/experimental/query-tests/Security/CWE-327-UnsafeUsageOfClientSideEncryptionVersion/UnsafeUsageOfClientSideEncryptionVersion.expected new file mode 100644 index 00000000000..5fdc975907a --- /dev/null +++ b/python/ql/test/experimental/query-tests/Security/CWE-327-UnsafeUsageOfClientSideEncryptionVersion/UnsafeUsageOfClientSideEncryptionVersion.expected @@ -0,0 +1 @@ +| test.py:8:5:8:34 | ControlFlowNode for Attribute | Unsafe usage of v1 version of Azure Storage client-side encryption. | diff --git a/python/ql/test/experimental/query-tests/Security/CWE-327-UnsafeUsageOfClientSideEncryptionVersion/UnsafeUsageOfClientSideEncryptionVersion.qlref b/python/ql/test/experimental/query-tests/Security/CWE-327-UnsafeUsageOfClientSideEncryptionVersion/UnsafeUsageOfClientSideEncryptionVersion.qlref new file mode 100644 index 00000000000..b737b32c815 --- /dev/null +++ b/python/ql/test/experimental/query-tests/Security/CWE-327-UnsafeUsageOfClientSideEncryptionVersion/UnsafeUsageOfClientSideEncryptionVersion.qlref @@ -0,0 +1 @@ +experimental/Security/CWE-327/Azure/UnsafeUsageOfClientSideEncryptionVersion.ql diff --git a/python/ql/test/experimental/query-tests/Security/CWE-327-UnsafeUsageOfClientSideEncryptionVersion/test.py b/python/ql/test/experimental/query-tests/Security/CWE-327-UnsafeUsageOfClientSideEncryptionVersion/test.py new file mode 100644 index 00000000000..44d72f0bf1f --- /dev/null +++ b/python/ql/test/experimental/query-tests/Security/CWE-327-UnsafeUsageOfClientSideEncryptionVersion/test.py @@ -0,0 +1,59 @@ +from azure.storage.blob import BlobServiceClient + + +def unsafe(): + # does not set encryption_version to 2.0, default is unsafe + blob_client = BlobServiceClient.get_blob_client(...) + blob_client.require_encryption = True + blob_client.key_encryption_key = ... + with open("decryptedcontentfile.txt", "rb") as stream: + blob_client.upload_blob(stream) # BAD + + +def potentially_unsafe(use_new_version=False): + blob_client = BlobServiceClient.get_blob_client(...) + blob_client.require_encryption = True + blob_client.key_encryption_key = ... + + if use_new_version: + blob_client.encryption_version = '2.0' + + with open("decryptedcontentfile.txt", "rb") as stream: + blob_client.upload_blob(stream) # BAD + + +def safe(): + blob_client = BlobServiceClient.get_blob_client(...) + blob_client.require_encryption = True + blob_client.key_encryption_key = ... + # GOOD: Must use `encryption_version` set to `2.0` + blob_client.encryption_version = '2.0' + with open("decryptedcontentfile.txt", "rb") as stream: + blob_client.upload_blob(stream) # OK + + +def get_unsafe_blob_client(): + blob_client = BlobServiceClient.get_blob_client(...) + blob_client.require_encryption = True + blob_client.key_encryption_key = ... + return blob_client + + +def unsafe_with_calls(): + bc = get_unsafe_blob_client() + with open("decryptedcontentfile.txt", "rb") as stream: + bc.upload_blob(stream) # BAD + + +def get_safe_blob_client(): + blob_client = BlobServiceClient.get_blob_client(...) + blob_client.require_encryption = True + blob_client.key_encryption_key = ... + blob_client.encryption_version = '2.0' + return blob_client + + +def safe_with_calls(): + bc = get_safe_blob_client() + with open("decryptedcontentfile.txt", "rb") as stream: + bc.upload_blob(stream) # OK