Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ dependencies = [
"pymysql>=1.1.2",
"mysql-connector-python>=9.5.0",
"oracledb>=3.4.1",
"androguard>=4.1.2",
]

[dependency-groups]
Expand Down
119 changes: 92 additions & 27 deletions server/mobile.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
import asyncio
import base64
import hashlib
import os
import shutil
import subprocess
import base64
from typing import Literal
import asyncio

import requests
from fastapi import APIRouter
from androguard.core.apk import APK
from fastapi import APIRouter, UploadFile, File
from pydantic import BaseModel

from Framework.Utilities import ConfigModule, CommonUtil
from Framework.Utilities import CommonUtil, ConfigModule
from settings import ZEUZ_NODE_DOWNLOADS_DIR

ADB_PATH = "adb" # Ensure ADB is in PATH
UI_XML_PATH = "ui.xml"
Expand All @@ -29,12 +32,14 @@

class DeviceInfo(BaseModel):
"""Model for device information."""

serial: str
status: str
name: str | None = None
# model: str | None = None
# product: str | None = None


@router.get("/devices", response_model=list[DeviceInfo])
def get_devices():
"""Get list of connected Android devices."""
Expand Down Expand Up @@ -72,29 +77,28 @@
capture_screenshot(device_serial=device_serial)

# Read XML file
with open(UI_XML_PATH, 'r') as xml_file:
with open(UI_XML_PATH, "r") as xml_file:
xml_content = xml_file.read()

# Read and encode screenshot
with open(SCREENSHOT_PATH, 'rb') as img_file:
with open(SCREENSHOT_PATH, "rb") as img_file:
screenshot_bytes = img_file.read()
screenshot_base64 = base64.b64encode(screenshot_bytes).decode('utf-8')
screenshot_base64 = base64.b64encode(screenshot_bytes).decode("utf-8")

return InspectorResponse(
status="ok",
ui_xml=xml_content,
screenshot=screenshot_base64
status="ok", ui_xml=xml_content, screenshot=screenshot_base64
)
except Exception as e:
return InspectorResponse(
status="error",
error=str(e)
)
return InspectorResponse(status="error", error=str(e))


@router.get("/dump/driver")
def dump_driver():
"""Dump the current driver."""
from Framework.Built_In_Automation.Mobile.CrossPlatform.Appium.BuiltInFunctions import appium_driver
from Framework.Built_In_Automation.Mobile.CrossPlatform.Appium.BuiltInFunctions import (
appium_driver,
)

if appium_driver is None:
return
return appium_driver.page_source
Expand All @@ -103,7 +107,14 @@
def run_adb_command(command):
"""Run an ADB command and return the output."""
try:
result = subprocess.run(command, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
result = subprocess.run(
command,

Check failure

Code scanning / CodeQL

Uncontrolled command line Critical

This command line depends on a
user-provided value
.

Copilot Autofix

AI 14 days ago

In general, to fix uncontrolled command-line issues you should avoid shell=True with untrusted input and instead pass arguments as a list to subprocess.run so they are not interpreted by a shell. If shell usage is unavoidable, you must strictly validate or allowlist user input before including it in the command.

For this file, the safest change without altering functionality is:

  1. Refactor run_adb_command to accept the command as a list of arguments and invoke subprocess.run with shell=False.
  2. Update all callers (shown snippet: get_devices and capture_ui_dump) to pass argument lists instead of format-assembled strings.
  3. In particular, build the optional -s <serial> portion as separate list elements, not interpolated into a shell string.

Concretely:

  • In run_adb_command (around lines 107–121):

    • Change the command parameter to be a sequence of arguments (we can keep the name but adjust usage).
    • Change the subprocess.run call to subprocess.run(command, shell=False, ...).
  • In get_devices (around line 48):

    • Replace devices_output = run_adb_command(f"{ADB_PATH} devices -l") by devices_output = run_adb_command([ADB_PATH, "devices", "-l"]).
  • In capture_ui_dump (around lines 123–128):

    • Replace construction of device_flag as a string and the .strip()-based assembly with list concatenation, e.g.:
    cmd = [ADB_PATH]
    if device_serial:
        cmd.extend(["-s", device_serial])
    cmd.extend(["shell", "uiautomator", "dump", "/sdcard/ui.xml"])
    out = run_adb_command(cmd)

No new imports are required; we only adjust how subprocess.run is invoked and how commands are constructed.

Suggested changeset 1
server/mobile.py

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/server/mobile.py b/server/mobile.py
--- a/server/mobile.py
+++ b/server/mobile.py
@@ -45,7 +45,7 @@
     """Get list of connected Android devices."""
     try:
         # Get list of devices
-        devices_output = run_adb_command(f"{ADB_PATH} devices -l")
+        devices_output = run_adb_command([ADB_PATH, "devices", "-l"])
         devices = []
 
         # Parse adb devices output
@@ -58,8 +58,8 @@
                     status = parts[1]
                     name = f"device {index}"
                     index += 1
-                    # model = run_adb_command(f"{ADB_PATH} -s {serial} shell getprop ro.product.model")
-                    # product = run_adb_command(f"{ADB_PATH} -s {serial} shell getprop ro.product.name")
+                    # model = run_adb_command([ADB_PATH, "-s", serial, "shell", "getprop", "ro.product.model"])
+                    # product = run_adb_command([ADB_PATH, "-s", serial, "shell", "getprop", "ro.product.name"])
 
                     devices.append(DeviceInfo(serial=serial, status=status, name=name))
 
@@ -105,11 +105,14 @@
 
 
 def run_adb_command(command):
-    """Run an ADB command and return the output."""
+    """Run an ADB command and return the output.
+
+    The command must be provided as a list of arguments to avoid shell injection.
+    """
     try:
         result = subprocess.run(
             command,
-            shell=True,
+            shell=False,
             check=True,
             stdout=subprocess.PIPE,
             stderr=subprocess.PIPE,
@@ -122,10 +122,11 @@
 
 def capture_ui_dump(device_serial: str | None = None):
     """Capture the current UI hierarchy from the device"""
-    device_flag = f"-s {device_serial}" if device_serial else ""
-    out = run_adb_command(
-        f"{ADB_PATH} {device_flag} shell uiautomator dump /sdcard/ui.xml".strip()
-    )
+    cmd = [ADB_PATH]
+    if device_serial:
+        cmd.extend(["-s", device_serial])
+    cmd.extend(["shell", "uiautomator", "dump", "/sdcard/ui.xml"])
+    out = run_adb_command(cmd)
     if out.startswith("Error:"):
         from Framework.Built_In_Automation.Mobile.CrossPlatform.Appium.BuiltInFunctions import (
             appium_driver,
EOF
@@ -45,7 +45,7 @@
"""Get list of connected Android devices."""
try:
# Get list of devices
devices_output = run_adb_command(f"{ADB_PATH} devices -l")
devices_output = run_adb_command([ADB_PATH, "devices", "-l"])
devices = []

# Parse adb devices output
@@ -58,8 +58,8 @@
status = parts[1]
name = f"device {index}"
index += 1
# model = run_adb_command(f"{ADB_PATH} -s {serial} shell getprop ro.product.model")
# product = run_adb_command(f"{ADB_PATH} -s {serial} shell getprop ro.product.name")
# model = run_adb_command([ADB_PATH, "-s", serial, "shell", "getprop", "ro.product.model"])
# product = run_adb_command([ADB_PATH, "-s", serial, "shell", "getprop", "ro.product.name"])

devices.append(DeviceInfo(serial=serial, status=status, name=name))

@@ -105,11 +105,14 @@


def run_adb_command(command):
"""Run an ADB command and return the output."""
"""Run an ADB command and return the output.

The command must be provided as a list of arguments to avoid shell injection.
"""
try:
result = subprocess.run(
command,
shell=True,
shell=False,
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
@@ -122,10 +122,11 @@

def capture_ui_dump(device_serial: str | None = None):
"""Capture the current UI hierarchy from the device"""
device_flag = f"-s {device_serial}" if device_serial else ""
out = run_adb_command(
f"{ADB_PATH} {device_flag} shell uiautomator dump /sdcard/ui.xml".strip()
)
cmd = [ADB_PATH]
if device_serial:
cmd.extend(["-s", device_serial])
cmd.extend(["shell", "uiautomator", "dump", "/sdcard/ui.xml"])
out = run_adb_command(cmd)
if out.startswith("Error:"):
from Framework.Built_In_Automation.Mobile.CrossPlatform.Appium.BuiltInFunctions import (
appium_driver,
Copilot is powered by AI and may make mistakes. Always verify output.
shell=True,
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
return result.stdout.strip()
except subprocess.CalledProcessError as e:
return f"Error: {e.stderr.strip()}"
Expand All @@ -116,11 +127,14 @@
f"{ADB_PATH} {device_flag} shell uiautomator dump /sdcard/ui.xml".strip()
)
if out.startswith("Error:"):
from Framework.Built_In_Automation.Mobile.CrossPlatform.Appium.BuiltInFunctions import appium_driver
from Framework.Built_In_Automation.Mobile.CrossPlatform.Appium.BuiltInFunctions import (
appium_driver,
)

if appium_driver is None:
return
page_src = appium_driver.page_source
with open(UI_XML_PATH, 'w') as xml_file:
with open(UI_XML_PATH, "w") as xml_file:
xml_file.write(page_src)
else:
out = run_adb_command(
Expand All @@ -137,7 +151,10 @@
f"{ADB_PATH} {device_flag} shell screencap -p /sdcard/screen.png".strip()
)
if out.startswith("Error:"):
from Framework.Built_In_Automation.Mobile.CrossPlatform.Appium.BuiltInFunctions import appium_driver
from Framework.Built_In_Automation.Mobile.CrossPlatform.Appium.BuiltInFunctions import (
appium_driver,
)

if appium_driver is None:
return
full_screenshot_path = os.path.join(os.getcwd(), SCREENSHOT_PATH)
Expand All @@ -156,10 +173,16 @@
try:
capture_ui_dump()
try:
with open(UI_XML_PATH, 'r') as xml_file:
with open(UI_XML_PATH, "r") as xml_file:
xml_content = xml_file.read()
xml_content = xml_content.replace("<?xml version='1.0' encoding='UTF-8' standalone='yes' ?>", "", 1)
new_xml_hash = hashlib.sha256(xml_content.encode('utf-8')).hexdigest()
xml_content = xml_content.replace(
"<?xml version='1.0' encoding='UTF-8' standalone='yes' ?>",
"",
1,
)
new_xml_hash = hashlib.sha256(
xml_content.encode("utf-8")
).hexdigest()
# Don't upload if the content hasn't changed
if prev_xml_hash == new_xml_hash:
await asyncio.sleep(5)
Expand All @@ -169,17 +192,59 @@
except FileNotFoundError:
await asyncio.sleep(5)
continue
url = ConfigModule.get_config_value("Authentication", "server_address").strip() + "/node_ai_contents/"
url = (
ConfigModule.get_config_value(
"Authentication", "server_address"
).strip()
+ "/node_ai_contents/"
)
apiKey = ConfigModule.get_config_value("Authentication", "api-key").strip()
res = requests.post(
url,
headers={"X-Api-Key": apiKey},
json={
"dom_mob": {"dom": xml_content},
"node_id": CommonUtil.MachineInfo().getLocalUser().lower()
})
"node_id": CommonUtil.MachineInfo().getLocalUser().lower(),
},
)
if res.ok:
CommonUtil.ExecLog("", "UI dump uploaded successfully", iLogLevel=1)
except Exception as e:
CommonUtil.ExecLog("", f"Error uploading UI dump: {str(e)}", iLogLevel=3)
await asyncio.sleep(5)


@router.post("/apk-upload")
def handle_apk_upload(file: UploadFile = File(...)):
dir_path = f"{ZEUZ_NODE_DOWNLOADS_DIR}/apk"
if not os.path.exists(dir_path):
os.makedirs(dir_path)
filename = file.filename or "uploaded.apk"
file_path = os.path.join(dir_path, filename)
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
return {"message": "APK uploaded successfully", "filename": filename}


def get_package_name(file_path: str) -> str | None:
"""Extract package name from APK using androguard."""
try:
apk = APK(file_path)
return apk.get_package()
except Exception:
return None


@router.post("/apk-install")
def handle_apk_install(filename: str, serial: str):
dir_path = f"{ZEUZ_NODE_DOWNLOADS_DIR}/apk"
file_path = os.path.join(dir_path, filename)
if not os.path.exists(file_path):

Check failure

Code scanning / CodeQL

Uncontrolled data used in path expression High

This path depends on a
user-provided value
.

Copilot Autofix

AI 14 days ago

In general, to fix uncontrolled path usage, normalize the path, enforce that it stays within an expected base directory, and/or restrict names to a safe subset. Here, the safest and simplest fix without changing functionality is: (1) derive an absolute, normalized path from ZEUZ_NODE_DOWNLOADS_DIR/apk and the provided filename; (2) reject any request where the resolved path is not within that directory; and (3) optionally disallow path separators in filename so only simple filenames are accepted. This preserves the existing behavior for valid names in the apk directory while preventing traversal or absolute paths.

Concretely in server/mobile.py, update handle_apk_install:

  • Compute base_dir = os.path.abspath(os.path.join(ZEUZ_NODE_DOWNLOADS_DIR, "apk")).
  • Normalize the user-supplied filename and construct file_path = os.path.abspath(os.path.join(base_dir, filename)).
  • Check file_path.startswith(base_dir + os.sep) (or equality if ever equal to base), and if not, return an error like {"message": "Invalid filename"}.
  • Optionally, before path computations, enforce that filename does not contain path separators ('/' or os.sep) and is non-empty.
  • Keep the rest of the logic (existence check, get_package_name, subprocess.run) unchanged.

No new external libraries are needed; os.path utilities already imported via import os are sufficient.

Suggested changeset 1
server/mobile.py

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/server/mobile.py b/server/mobile.py
--- a/server/mobile.py
+++ b/server/mobile.py
@@ -237,8 +237,13 @@
 
 @router.post("/apk-install")
 def handle_apk_install(filename: str, serial: str):
-    dir_path = f"{ZEUZ_NODE_DOWNLOADS_DIR}/apk"
-    file_path = os.path.join(dir_path, filename)
+    base_dir = os.path.abspath(os.path.join(ZEUZ_NODE_DOWNLOADS_DIR, "apk"))
+    # Reject filenames that attempt to traverse directories or use absolute/invalid paths
+    if not filename or os.path.isabs(filename) or any(sep in filename for sep in (os.sep, os.altsep) if sep):
+        return {"message": "Invalid filename", "filename": filename}
+    file_path = os.path.abspath(os.path.join(base_dir, filename))
+    if not (file_path == base_dir or file_path.startswith(base_dir + os.sep)):
+        return {"message": "Invalid filename", "filename": filename}
     if not os.path.exists(file_path):
         return {"message": "APK not found", "filename": filename}
     package_name = get_package_name(file_path)
EOF
@@ -237,8 +237,13 @@

@router.post("/apk-install")
def handle_apk_install(filename: str, serial: str):
dir_path = f"{ZEUZ_NODE_DOWNLOADS_DIR}/apk"
file_path = os.path.join(dir_path, filename)
base_dir = os.path.abspath(os.path.join(ZEUZ_NODE_DOWNLOADS_DIR, "apk"))
# Reject filenames that attempt to traverse directories or use absolute/invalid paths
if not filename or os.path.isabs(filename) or any(sep in filename for sep in (os.sep, os.altsep) if sep):
return {"message": "Invalid filename", "filename": filename}
file_path = os.path.abspath(os.path.join(base_dir, filename))
if not (file_path == base_dir or file_path.startswith(base_dir + os.sep)):
return {"message": "Invalid filename", "filename": filename}
if not os.path.exists(file_path):
return {"message": "APK not found", "filename": filename}
package_name = get_package_name(file_path)
Copilot is powered by AI and may make mistakes. Always verify output.
return {"message": "APK not found", "filename": filename}
package_name = get_package_name(file_path)
try:
subprocess.run([ADB_PATH, "-s", serial, "install", file_path], check=True)
return {"message": "APK installed successfully", "filename": filename, "package_name": package_name}
except Exception as e:
return {"message": f"Error installing APK: {str(e)}", "filename": filename, "package_name": package_name}

Check warning

Code scanning / CodeQL

Information exposure through an exception Medium

Stack trace information
flows to this location and may be exposed to an external user.

Copilot Autofix

AI 14 days ago

In general, to fix this kind of issue you should avoid including raw exception text or stack traces in responses sent back to clients. Instead, log the full error details on the server (to logs or monitoring systems) and return a generic, user-friendly error message that does not reveal internal implementation details.

For this specific case in server/mobile.py, the best fix is:

  • In handle_apk_install, modify the except Exception as e: block so that:
    • It logs the error details using CommonUtil.ExecLog (already used elsewhere in this file) including str(e) and any helpful context such as filename and serial.
    • It returns a response with a generic error message, e.g. "Failed to install APK.", without embedding str(e).

This change only affects the error path and does not alter the success path or the API shape (the JSON keys can remain "message", "filename", and "package_name"). No new imports are required; we reuse CommonUtil.ExecLog, which is already imported and used in upload_android_ui_dump.

Concretely:

  • Edit the except block around lines 248–249 to:
    • Call CommonUtil.ExecLog("", f"Error installing APK '{filename}' on device '{serial}': {str(e)}", iLogLevel=3).
    • Return {"message": "Failed to install APK.", "filename": filename, "package_name": package_name}.
Suggested changeset 1
server/mobile.py

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/server/mobile.py b/server/mobile.py
--- a/server/mobile.py
+++ b/server/mobile.py
@@ -246,5 +246,14 @@
         subprocess.run([ADB_PATH, "-s", serial, "install", file_path], check=True)
         return {"message": "APK installed successfully", "filename": filename, "package_name": package_name}
     except Exception as e:
-        return {"message": f"Error installing APK: {str(e)}", "filename": filename, "package_name": package_name}
+        CommonUtil.ExecLog(
+            "",
+            f"Error installing APK '{filename}' on device '{serial}': {str(e)}",
+            iLogLevel=3,
+        )
+        return {
+            "message": "Failed to install APK.",
+            "filename": filename,
+            "package_name": package_name,
+        }
 
EOF
@@ -246,5 +246,14 @@
subprocess.run([ADB_PATH, "-s", serial, "install", file_path], check=True)
return {"message": "APK installed successfully", "filename": filename, "package_name": package_name}
except Exception as e:
return {"message": f"Error installing APK: {str(e)}", "filename": filename, "package_name": package_name}
CommonUtil.ExecLog(
"",
f"Error installing APK '{filename}' on device '{serial}': {str(e)}",
iLogLevel=3,
)
return {
"message": "Failed to install APK.",
"filename": filename,
"package_name": package_name,
}

Copilot is powered by AI and may make mistakes. Always verify output.

Loading