mirror of
https://github.com/github/codeql.git
synced 2026-04-26 17:25:19 +02:00
finalize Secondary server command injection queries and tests.
This commit is contained in:
@@ -0,0 +1,19 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
from fastapi import FastAPI
|
||||
import asyncssh
|
||||
|
||||
|
||||
app = FastAPI()
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
sock.connect(("host", "port"))
|
||||
session = Session()
|
||||
session.handshake(sock)
|
||||
session.userauth_password("user", "password")
|
||||
|
||||
@app.get("/bad1")
|
||||
async def bad1(cmd: str):
|
||||
async with asyncssh.connect('localhost') as conn:
|
||||
result = await conn.run(cmd, check=True) # $ result=BAD getSecondaryCommand=cmd
|
||||
print(result.stdout, end='')
|
||||
return {"success": "Dangerous"}
|
||||
@@ -0,0 +1,2 @@
|
||||
testFailures
|
||||
failures
|
||||
@@ -0,0 +1,23 @@
|
||||
import python
|
||||
import semmle.python.dataflow.new.DataFlow
|
||||
import semmle.python.dataflow.new.internal.DataFlowDispatch as DataFlowDispatch
|
||||
import TestUtilities.InlineExpectationsTest
|
||||
private import semmle.python.dataflow.new.internal.PrintNode
|
||||
import experimental.semmle.python.Concepts
|
||||
|
||||
module SystemCommandExecutionTest implements TestSig {
|
||||
string getARelevantTag() { result = "getSecondaryCommand" }
|
||||
|
||||
predicate hasActualResult(Location location, string element, string tag, string value) {
|
||||
exists(location.getFile().getRelativePath()) and
|
||||
exists(SecondaryCommandInjection sci, DataFlow::Node command |
|
||||
command = sci and
|
||||
location = command.getLocation() and
|
||||
element = command.toString() and
|
||||
value = prettyNodeForInlineTest(command) and
|
||||
tag = "getSecondaryCommand"
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
import MakeTest<SystemCommandExecutionTest>
|
||||
@@ -0,0 +1,25 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
from fastapi import FastAPI
|
||||
from netmiko import ConnectHandler
|
||||
|
||||
|
||||
app = FastAPI()
|
||||
cisco_881 = {
|
||||
'device_type': 'cisco_ios',
|
||||
'host': '10.10.10.10',
|
||||
'username': 'test',
|
||||
'password': 'password',
|
||||
'port': 8022, # optional, defaults to 22
|
||||
'secret': 'secret', # optional, defaults to ''
|
||||
}
|
||||
|
||||
@app.get("/bad1")
|
||||
async def bad1(cmd: str):
|
||||
net_connect = ConnectHandler(**cisco_881)
|
||||
net_connect.send_command(command_string=cmd) # $ result=BAD getSecondaryCommand=cmd
|
||||
net_connect.send_command_expect(command_string=cmd) # $ result=BAD getSecondaryCommand=cmd
|
||||
net_connect.send_command_timing(command_string=cmd) # $ result=BAD getSecondaryCommand=cmd
|
||||
net_connect.send_multiline(commands=[[cmd, "expect"]]) # $ result=BAD getSecondaryCommand=List
|
||||
net_connect.send_multiline_timing(commands=cmd) # $ result=BAD getSecondaryCommand=cmd
|
||||
return {"success": "Dangerous"}
|
||||
@@ -0,0 +1,93 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
from fastapi import FastAPI
|
||||
from scrapli import Scrapli
|
||||
from scrapli.driver.core import AsyncNXOSDriver, AsyncJunosDriver, AsyncEOSDriver, AsyncIOSXEDriver, AsyncIOSXRDriver
|
||||
from scrapli.driver.core import NXOSDriver, JunosDriver, EOSDriver, IOSXEDriver, IOSXRDriver
|
||||
from scrapli.driver import GenericDriver
|
||||
|
||||
|
||||
app = FastAPI()
|
||||
cisco_881 = {
|
||||
'device_type': 'cisco_ios',
|
||||
'host': '10.10.10.10',
|
||||
'username': 'test',
|
||||
'password': 'password',
|
||||
'port': 8022, # optional, defaults to 22
|
||||
'secret': 'secret', # optional, defaults to ''
|
||||
}
|
||||
|
||||
@app.get("/bad1")
|
||||
async def bad1(cmd: str):
|
||||
dev_connect = {
|
||||
"host": host,
|
||||
"auth_username": user,
|
||||
"auth_password": password,
|
||||
"port": port,
|
||||
"auth_strict_key": False,
|
||||
"transport": "asyncssh",
|
||||
}
|
||||
driver = AsyncIOSXEDriver
|
||||
async with driver(**dev_connect) as conn:
|
||||
output = await conn.send_command(cmd) # $ result=BAD getSecondaryCommand=cmd
|
||||
driver = AsyncIOSXRDriver
|
||||
async with driver(**dev_connect) as conn:
|
||||
output = await conn.send_command(cmd) # $ result=BAD getSecondaryCommand=cmd
|
||||
driver = AsyncNXOSDriver
|
||||
async with driver(**dev_connect) as conn:
|
||||
output = await conn.send_command(cmd) # $ result=BAD getSecondaryCommand=cmd
|
||||
driver = AsyncEOSDriver
|
||||
async with driver(**dev_connect) as conn:
|
||||
output = await conn.send_command(cmd) # $ result=BAD getSecondaryCommand=cmd
|
||||
driver = AsyncJunosDriver
|
||||
async with driver(**dev_connect) as conn:
|
||||
output = await conn.send_command(cmd) # $ result=BAD getSecondaryCommand=cmd
|
||||
return {"success": "Dangerous"}
|
||||
|
||||
@app.get("/bad1")
|
||||
def bad2(cmd: str):
|
||||
dev_connect = {
|
||||
"host": host,
|
||||
"auth_username": user,
|
||||
"auth_password": password,
|
||||
"port": port,
|
||||
"auth_strict_key": False,
|
||||
"transport": "ssh2",
|
||||
}
|
||||
driver = NXOSDriver
|
||||
with driver(**dev_connect) as conn:
|
||||
output = conn.send_command(cmd) # $ result=BAD getSecondaryCommand=cmd
|
||||
driver = IOSXRDriver
|
||||
with driver(**dev_connect) as conn:
|
||||
output = conn.send_command(cmd) # $ result=BAD getSecondaryCommand=cmd
|
||||
driver = IOSXEDriver
|
||||
with driver(**dev_connect) as conn:
|
||||
output = conn.send_command(cmd) # $ result=BAD getSecondaryCommand=cmd
|
||||
driver = EOSDriver
|
||||
with driver(**dev_connect) as conn:
|
||||
output = conn.send_command(cmd) # $ result=BAD getSecondaryCommand=cmd
|
||||
driver = JunosDriver
|
||||
with driver(**dev_connect) as conn:
|
||||
output = conn.send_command(cmd) # $ result=BAD getSecondaryCommand=cmd
|
||||
|
||||
dev_connect = {
|
||||
"host": "65.65.65.65",
|
||||
"auth_username": "root",
|
||||
"auth_private_key": "keyPath",
|
||||
"auth_strict_key": False,
|
||||
"transport": "ssh2",
|
||||
"platform": "cisco_iosxe",
|
||||
}
|
||||
with Scrapli(**dev_connect) as conn:
|
||||
result = conn.send_command(cmd) # $ result=BAD getSecondaryCommand=cmd
|
||||
|
||||
dev_connect = {
|
||||
"host": "65.65.65.65",
|
||||
"auth_username": "root",
|
||||
"auth_password": "password",
|
||||
"auth_strict_key": False,
|
||||
"transport": "ssh2",
|
||||
}
|
||||
with GenericDriver(**dev_connect) as conn:
|
||||
result = conn.send_command(cmd) # $ result=BAD getSecondaryCommand=cmd
|
||||
return {"success": "Dangerous"}
|
||||
@@ -12,16 +12,11 @@ app = FastAPI()
|
||||
|
||||
|
||||
@app.get("/bad1")
|
||||
async def read_item(cmd: str):
|
||||
stdin, stdout, stderr = paramiko_ssh_client.exec_command(cmd) # $ result=BAD
|
||||
async def bad1(cmd: str):
|
||||
stdin, stdout, stderr = paramiko_ssh_client.exec_command(cmd) # $ result=BAD getSecondaryCommand=cmd
|
||||
return {"success": stdout}
|
||||
|
||||
@app.get("/bad2")
|
||||
async def read_item(cmd: str):
|
||||
stdin, stdout, stderr = paramiko_ssh_client.exec_command(command=cmd) # $ result=BAD
|
||||
return {"success": "OK"}
|
||||
|
||||
@app.get("/bad3")
|
||||
async def read_item(cmd: str):
|
||||
paramiko_ssh_client.connect('hostname', username='user',password='yourpassword',sock=paramiko.ProxyCommand(cmd)) # $ result=BAD
|
||||
async def bad2(cmd: str):
|
||||
stdin, stdout, stderr = paramiko_ssh_client.exec_command(command=cmd) # $ result=BAD getSecondaryCommand=cmd
|
||||
return {"success": "OK"}
|
||||
|
||||
@@ -0,0 +1,21 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
from fastapi import FastAPI
|
||||
from socket import socket
|
||||
from ssh2.session import Session
|
||||
|
||||
app = FastAPI()
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
sock.connect(("host", "port"))
|
||||
session = Session()
|
||||
session.handshake(sock)
|
||||
session.userauth_password("user", "password")
|
||||
|
||||
@app.get("/bad1")
|
||||
async def bad1(cmd: str):
|
||||
channel = session.open_session()
|
||||
channel.execute(cmd) # $ result=BAD getSecondaryCommand=cmd
|
||||
channel.wait_eof()
|
||||
channel.close()
|
||||
channel.wait_closed()
|
||||
return {"success": "Dangerous"}
|
||||
Reference in New Issue
Block a user