finalize Secondary server command injection queries and tests.

This commit is contained in:
amammad
2024-02-25 14:11:19 +04:00
parent 95c9a3fc9a
commit 3e6b4a161b
16 changed files with 215 additions and 63 deletions

View File

@@ -15,6 +15,13 @@ private import semmle.python.dataflow.new.TaintTracking
private import experimental.semmle.python.Frameworks
private import semmle.python.Concepts
/**
* A data-flow node that responsible for a command that can be executed on a secondary remote system,
*
* Extend this class to model new APIs.
*/
abstract class SecondaryCommandInjection extends DataFlow::Node { }
/** Provides classes for modeling copying file related APIs. */
module CopyFile {
/**

View File

@@ -2,11 +2,15 @@
* Helper file that imports all framework modeling.
*/
private import experimental.semmle.python.frameworks.AsyncSsh
private import experimental.semmle.python.frameworks.Stdlib
private import experimental.semmle.python.frameworks.Flask
private import experimental.semmle.python.frameworks.Django
private import experimental.semmle.python.frameworks.Werkzeug
private import experimental.semmle.python.frameworks.LDAP
private import experimental.semmle.python.frameworks.Netmiko
private import experimental.semmle.python.frameworks.Paramiko
private import experimental.semmle.python.frameworks.Scrapli
private import experimental.semmle.python.frameworks.JWT
private import experimental.semmle.python.frameworks.Csv
private import experimental.semmle.python.libraries.PyJWT
@@ -15,5 +19,6 @@ private import experimental.semmle.python.libraries.Authlib
private import experimental.semmle.python.libraries.PythonJose
private import experimental.semmle.python.frameworks.CopyFile
private import experimental.semmle.python.frameworks.Sendgrid
private import experimental.semmle.python.frameworks.Ssh2
private import experimental.semmle.python.libraries.FlaskMail
private import experimental.semmle.python.libraries.SmtpLib

View File

@@ -8,7 +8,7 @@ private import semmle.python.dataflow.new.DataFlow
private import semmle.python.dataflow.new.RemoteFlowSources
private import semmle.python.Concepts
private import semmle.python.ApiGraphs
import experimental.semmle.python.security.SecondaryServerCmdInjectionCustomizations
import experimental.semmle.python.Concepts
/**
* Provides models for the `asyncssh` PyPI package.
@@ -23,7 +23,7 @@ private module Asyncssh {
/**
* A `run` method responsible for executing commands on remote secondary servers.
*/
class AsyncsshRun extends SecondaryCommandInjection::Sink {
class AsyncsshRun extends SecondaryCommandInjection {
AsyncsshRun() {
this =
asyncssh()

View File

@@ -8,7 +8,7 @@ private import semmle.python.dataflow.new.DataFlow
private import semmle.python.dataflow.new.RemoteFlowSources
private import semmle.python.Concepts
private import semmle.python.ApiGraphs
import experimental.semmle.python.security.SecondaryServerCmdInjectionCustomizations
import experimental.semmle.python.Concepts
/**
* Provides models for the `netmiko` PyPI package.
@@ -30,7 +30,7 @@ private module Netmiko {
/**
* The `send_*` methods responsible for executing commands on remote secondary servers.
*/
class NetmikoSendCommand extends SecondaryCommandInjection::Sink {
class NetmikoSendCommand extends SecondaryCommandInjection {
NetmikoSendCommand() {
this =
netmikoConnectHandler()

View File

@@ -8,7 +8,7 @@ private import semmle.python.dataflow.new.DataFlow
private import semmle.python.dataflow.new.RemoteFlowSources
private import semmle.python.Concepts
private import semmle.python.ApiGraphs
import experimental.semmle.python.security.SecondaryServerCmdInjectionCustomizations
import experimental.semmle.python.Concepts
/**
* Provides models for the `paramiko` PyPI package.
@@ -28,7 +28,7 @@ private module Paramiko {
/**
* The `exec_command` of `paramiko.SSHClient` class execute command on ssh target server
*/
class ParamikoExecCommand extends SecondaryCommandInjection::Sink {
class ParamikoExecCommand extends SecondaryCommandInjection {
ParamikoExecCommand() {
this =
paramikoClient().getMember("exec_command").getACall().getParameter(0, "command").asSink()

View File

@@ -8,7 +8,7 @@ private import semmle.python.dataflow.new.DataFlow
private import semmle.python.dataflow.new.RemoteFlowSources
private import semmle.python.Concepts
private import semmle.python.ApiGraphs
import experimental.semmle.python.security.SecondaryServerCmdInjectionCustomizations
import experimental.semmle.python.Concepts
/**
* Provides models for the `scrapli` PyPI package.
@@ -33,7 +33,7 @@ private module Scrapli {
/**
* A `send_command` method responsible for executing commands on remote secondary servers.
*/
class ScrapliSendCommand extends SecondaryCommandInjection::Sink {
class ScrapliSendCommand extends SecondaryCommandInjection {
ScrapliSendCommand() {
this =
scrapliCore()

View File

@@ -8,7 +8,7 @@ private import semmle.python.dataflow.new.DataFlow
private import semmle.python.dataflow.new.RemoteFlowSources
private import semmle.python.Concepts
private import semmle.python.ApiGraphs
import experimental.semmle.python.security.SecondaryServerCmdInjectionCustomizations
import experimental.semmle.python.Concepts
/**
* Provides models for the `ssh2-python` PyPI package.
@@ -20,20 +20,17 @@ private module Ssh2 {
*/
private API::Node ssh2() { result = API::moduleImport("ssh2") }
/**
* Gets `ssh2.session` package.
*/
private API::Node ssh2Session() { result = API::moduleImport("ssh2").getMember("session") }
/**
* Gets `ssh2.session.Session` return value.
*/
private API::Node ssh2Session() { result = ssh2Session().getMember("Session").getReturn() }
private API::Node ssh2Session() {
result = ssh2().getMember("session").getMember("Session").getReturn()
}
/**
* A `execute` method responsible for executing commands on remote secondary servers.
*/
class Ssh2Execute extends SecondaryCommandInjection::Sink {
class Ssh2Execute extends SecondaryCommandInjection {
Ssh2Execute() {
this =
ssh2Session()

View File

@@ -4,12 +4,12 @@ import semmle.python.dataflow.new.RemoteFlowSources
import semmle.python.ApiGraphs
import semmle.python.dataflow.new.internal.DataFlowPublic
import codeql.util.Unit
import SecondaryServerCmdInjectionCustomizations
import experimental.semmle.python.Concepts
module SecondaryCommandInjectionConfig implements DataFlow::ConfigSig {
predicate isSource(DataFlow::Node source) { source instanceof SecondaryCommandInjection::Source }
predicate isSource(DataFlow::Node source) { source instanceof RemoteFlowSource }
predicate isSink(DataFlow::Node sink) { sink instanceof SecondaryCommandInjection::Sink }
predicate isSink(DataFlow::Node sink) { sink instanceof SecondaryCommandInjection }
}
/** Global taint-tracking for detecting "paramiko command injection" vulnerabilities. */

View File

@@ -1,35 +0,0 @@
import python
import semmle.python.dataflow.new.TaintTracking
import semmle.python.dataflow.new.RemoteFlowSources
import semmle.python.ApiGraphs
import semmle.python.dataflow.new.internal.DataFlowPublic
import codeql.util.Unit
/**
* Provides sinks and additional taint steps for the secondary command injection configuration
*/
module SecondaryCommandInjection {
/**
* The additional taint steps that need for creating taint tracking or dataflow.
*/
class AdditionalTaintStep extends Unit {
/**
* Holds if there is a additional taint step between pred and succ.
*/
abstract predicate isAdditionalTaintStep(DataFlow::Node pred, DataFlow::Node succ);
}
/**
* A abstract class responsible for extending secondary command injection dataflow sinks.
*/
abstract class Sink extends DataFlow::Node { }
/**
* A data flow source for secondary command injection data flow queries.
*/
abstract class Source extends DataFlow::Node { }
class RemoteSources extends Source {
RemoteSources() { this instanceof RemoteFlowSource }
}
}

View File

@@ -0,0 +1,19 @@
#!/usr/bin/env python
from fastapi import FastAPI
import asyncssh
app = FastAPI()
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(("host", "port"))
session = Session()
session.handshake(sock)
session.userauth_password("user", "password")
@app.get("/bad1")
async def bad1(cmd: str):
async with asyncssh.connect('localhost') as conn:
result = await conn.run(cmd, check=True) # $ result=BAD getSecondaryCommand=cmd
print(result.stdout, end='')
return {"success": "Dangerous"}

View File

@@ -0,0 +1,23 @@
import python
import semmle.python.dataflow.new.DataFlow
import semmle.python.dataflow.new.internal.DataFlowDispatch as DataFlowDispatch
import TestUtilities.InlineExpectationsTest
private import semmle.python.dataflow.new.internal.PrintNode
import experimental.semmle.python.Concepts
module SystemCommandExecutionTest implements TestSig {
string getARelevantTag() { result = "getSecondaryCommand" }
predicate hasActualResult(Location location, string element, string tag, string value) {
exists(location.getFile().getRelativePath()) and
exists(SecondaryCommandInjection sci, DataFlow::Node command |
command = sci and
location = command.getLocation() and
element = command.toString() and
value = prettyNodeForInlineTest(command) and
tag = "getSecondaryCommand"
)
}
}
import MakeTest<SystemCommandExecutionTest>

View File

@@ -0,0 +1,25 @@
#!/usr/bin/env python
from fastapi import FastAPI
from netmiko import ConnectHandler
app = FastAPI()
cisco_881 = {
'device_type': 'cisco_ios',
'host': '10.10.10.10',
'username': 'test',
'password': 'password',
'port': 8022, # optional, defaults to 22
'secret': 'secret', # optional, defaults to ''
}
@app.get("/bad1")
async def bad1(cmd: str):
net_connect = ConnectHandler(**cisco_881)
net_connect.send_command(command_string=cmd) # $ result=BAD getSecondaryCommand=cmd
net_connect.send_command_expect(command_string=cmd) # $ result=BAD getSecondaryCommand=cmd
net_connect.send_command_timing(command_string=cmd) # $ result=BAD getSecondaryCommand=cmd
net_connect.send_multiline(commands=[[cmd, "expect"]]) # $ result=BAD getSecondaryCommand=List
net_connect.send_multiline_timing(commands=cmd) # $ result=BAD getSecondaryCommand=cmd
return {"success": "Dangerous"}

View File

@@ -0,0 +1,93 @@
#!/usr/bin/env python
from fastapi import FastAPI
from scrapli import Scrapli
from scrapli.driver.core import AsyncNXOSDriver, AsyncJunosDriver, AsyncEOSDriver, AsyncIOSXEDriver, AsyncIOSXRDriver
from scrapli.driver.core import NXOSDriver, JunosDriver, EOSDriver, IOSXEDriver, IOSXRDriver
from scrapli.driver import GenericDriver
app = FastAPI()
cisco_881 = {
'device_type': 'cisco_ios',
'host': '10.10.10.10',
'username': 'test',
'password': 'password',
'port': 8022, # optional, defaults to 22
'secret': 'secret', # optional, defaults to ''
}
@app.get("/bad1")
async def bad1(cmd: str):
dev_connect = {
"host": host,
"auth_username": user,
"auth_password": password,
"port": port,
"auth_strict_key": False,
"transport": "asyncssh",
}
driver = AsyncIOSXEDriver
async with driver(**dev_connect) as conn:
output = await conn.send_command(cmd) # $ result=BAD getSecondaryCommand=cmd
driver = AsyncIOSXRDriver
async with driver(**dev_connect) as conn:
output = await conn.send_command(cmd) # $ result=BAD getSecondaryCommand=cmd
driver = AsyncNXOSDriver
async with driver(**dev_connect) as conn:
output = await conn.send_command(cmd) # $ result=BAD getSecondaryCommand=cmd
driver = AsyncEOSDriver
async with driver(**dev_connect) as conn:
output = await conn.send_command(cmd) # $ result=BAD getSecondaryCommand=cmd
driver = AsyncJunosDriver
async with driver(**dev_connect) as conn:
output = await conn.send_command(cmd) # $ result=BAD getSecondaryCommand=cmd
return {"success": "Dangerous"}
@app.get("/bad1")
def bad2(cmd: str):
dev_connect = {
"host": host,
"auth_username": user,
"auth_password": password,
"port": port,
"auth_strict_key": False,
"transport": "ssh2",
}
driver = NXOSDriver
with driver(**dev_connect) as conn:
output = conn.send_command(cmd) # $ result=BAD getSecondaryCommand=cmd
driver = IOSXRDriver
with driver(**dev_connect) as conn:
output = conn.send_command(cmd) # $ result=BAD getSecondaryCommand=cmd
driver = IOSXEDriver
with driver(**dev_connect) as conn:
output = conn.send_command(cmd) # $ result=BAD getSecondaryCommand=cmd
driver = EOSDriver
with driver(**dev_connect) as conn:
output = conn.send_command(cmd) # $ result=BAD getSecondaryCommand=cmd
driver = JunosDriver
with driver(**dev_connect) as conn:
output = conn.send_command(cmd) # $ result=BAD getSecondaryCommand=cmd
dev_connect = {
"host": "65.65.65.65",
"auth_username": "root",
"auth_private_key": "keyPath",
"auth_strict_key": False,
"transport": "ssh2",
"platform": "cisco_iosxe",
}
with Scrapli(**dev_connect) as conn:
result = conn.send_command(cmd) # $ result=BAD getSecondaryCommand=cmd
dev_connect = {
"host": "65.65.65.65",
"auth_username": "root",
"auth_password": "password",
"auth_strict_key": False,
"transport": "ssh2",
}
with GenericDriver(**dev_connect) as conn:
result = conn.send_command(cmd) # $ result=BAD getSecondaryCommand=cmd
return {"success": "Dangerous"}

View File

@@ -12,16 +12,11 @@ app = FastAPI()
@app.get("/bad1")
async def read_item(cmd: str):
stdin, stdout, stderr = paramiko_ssh_client.exec_command(cmd) # $ result=BAD
async def bad1(cmd: str):
stdin, stdout, stderr = paramiko_ssh_client.exec_command(cmd) # $ result=BAD getSecondaryCommand=cmd
return {"success": stdout}
@app.get("/bad2")
async def read_item(cmd: str):
stdin, stdout, stderr = paramiko_ssh_client.exec_command(command=cmd) # $ result=BAD
return {"success": "OK"}
@app.get("/bad3")
async def read_item(cmd: str):
paramiko_ssh_client.connect('hostname', username='user',password='yourpassword',sock=paramiko.ProxyCommand(cmd)) # $ result=BAD
async def bad2(cmd: str):
stdin, stdout, stderr = paramiko_ssh_client.exec_command(command=cmd) # $ result=BAD getSecondaryCommand=cmd
return {"success": "OK"}

View File

@@ -0,0 +1,21 @@
#!/usr/bin/env python
from fastapi import FastAPI
from socket import socket
from ssh2.session import Session
app = FastAPI()
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(("host", "port"))
session = Session()
session.handshake(sock)
session.userauth_password("user", "password")
@app.get("/bad1")
async def bad1(cmd: str):
channel = session.open_session()
channel.execute(cmd) # $ result=BAD getSecondaryCommand=cmd
channel.wait_eof()
channel.close()
channel.wait_closed()
return {"success": "Dangerous"}