Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(integration): Get Project Discovery scan results (#104)
- Loading branch information
Showing
4 changed files
with
183 additions
and
0 deletions.
There are no files selected for viewing
69 changes: 69 additions & 0 deletions
69
tests/data/log_samples/project_discovery/results/get_all_results.json
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,69 @@ | ||
{ | ||
"current_page": 123, | ||
"result_count": 123, | ||
"total_results": 123, | ||
"total_pages": 123, | ||
"message": "<string>", | ||
"data": [ | ||
{ | ||
"matcher_status": true, | ||
"vuln_id": "<string>", | ||
"target": "<string>", | ||
"template_url": "<string>", | ||
"created_at": "<string>", | ||
"updated_at": "<string>", | ||
"scan_id": "<string>", | ||
"event": [ | ||
{ | ||
"curl-command": "<string>", | ||
"extracted-results": ["<string>"], | ||
"extractor-name": "<string>", | ||
"host": "<string>", | ||
"info": { | ||
"classification": { | ||
"cpe": "<string>", | ||
"cve-id": ["<string>"], | ||
"cvss-metrics": "<string>", | ||
"cvss-score": 123, | ||
"cwe-id": ["<string>"], | ||
"epss-percentile": 123, | ||
"epss-score": 123 | ||
}, | ||
"metadata": {}, | ||
"author": ["<string>"], | ||
"description": "<string>", | ||
"impact": "<string>", | ||
"name": "<string>", | ||
"tags": ["<string>"], | ||
"reference": ["<string>"], | ||
"remediation": "<string>", | ||
"severity": "<string>" | ||
}, | ||
"ip": "<string>", | ||
"matched-at": "<string>", | ||
"matched-line": [123], | ||
"matcher-name": "<string>", | ||
"matcher-status": true, | ||
"path": "<string>", | ||
"request": "<string>", | ||
"response": "<string>", | ||
"template-id": "<string>", | ||
"template-path": "<string>", | ||
"timestamp": "<string>", | ||
"type": "<string>" | ||
} | ||
], | ||
"template_id": "<string>", | ||
"template_path": "<string>", | ||
"template_encoded": "<string>", | ||
"result_type": "<string>", | ||
"vuln_status": "<string>", | ||
"vuln_hash": "<string>", | ||
"labels": ["<string>"] | ||
} | ||
], | ||
"stats": { | ||
"total": 123 | ||
}, | ||
"filters": {} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
import os | ||
import time | ||
|
||
import pytest | ||
from httpx import Response | ||
|
||
from tracecat.integrations.project_discovery import get_all_scan_results | ||
|
||
|
||
@pytest.fixture | ||
def project_discovery_secret(create_mock_secret) -> dict[str, str | bytes]: | ||
mock_secret = create_mock_secret( | ||
"project_discovery", {"PD_API_KEY": os.environ["PD_API_KEY"]} | ||
) | ||
mock_secret_obj = mock_secret.model_dump_json() | ||
return mock_secret_obj | ||
|
||
|
||
@pytest.mark.respx(assert_all_mocked=False) | ||
@pytest.mark.parametrize( | ||
"severity,time_filter,vuln_status", | ||
[ | ||
(None, None, None), | ||
("low", None, None), | ||
("medium", "last_week", "fixed"), | ||
("high", "last_month", "open"), | ||
], | ||
) | ||
def test_get_all_scan_results( | ||
severity, time_filter, vuln_status, project_discovery_secret, respx_mock | ||
): | ||
# Mock secrets manager | ||
tracecat_api_url = os.environ["TRACECAT__API_URL"] | ||
route = respx_mock.get(f"{tracecat_api_url}/secrets/emailrep").mock( | ||
return_value=Response(status_code=200, content=project_discovery_secret) | ||
) | ||
|
||
# Assuming the API key is required for live calls and is set in the environment | ||
result = get_all_scan_results( | ||
offset=10, | ||
limit=100, | ||
severity=severity, | ||
search=None, | ||
time=time_filter, | ||
vuln_status=vuln_status, | ||
) | ||
|
||
# Asserts to check if the API call was successful and returns the expected structure | ||
assert route.called | ||
assert isinstance(result, dict) | ||
assert "data" in result | ||
|
||
# Throttling API requests to avoid rate limiting | ||
time.sleep(3) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
"""Integrations with Project Discovery API. | ||
Supported endpoints: | ||
- Results: see and managed vulnerabilities detected by PD Cloud Platform | ||
- (Coming soon) Scans: manage scans, scan schedules, and create new scans | ||
Required credentials: `project_discovery` secret with `PD_API_KEY` key. | ||
API reference: https://docs.projectdiscovery.io/api-reference/introduction | ||
""" | ||
|
||
import os | ||
from typing import Any, Literal | ||
|
||
import httpx | ||
|
||
from tracecat.integrations._registry import registry | ||
|
||
PD_BASE_URL = "https://api.projectdiscovery.io/v1" | ||
# https://docs.projectdiscovery.io/introduction | ||
PD_SEVERITIES = Literal["info", "low", "medium", "high", "critical"] | ||
PD_TIME_FILTERS = Literal["last_day", "last_week", "last_month"] | ||
PD_VULN_STATUSES = Literal["open", "closed" "false_positive", "fixed"] | ||
|
||
|
||
def create_pd_client() -> httpx.Client: | ||
headers = {"X-Api-Key": os.environ["PD_API_KEY"]} | ||
return httpx.Client(base_url=PD_BASE_URL, headers=headers) | ||
|
||
|
||
@registry.register(description="Get all scan results", secrets=["project_discovery"]) | ||
def get_all_scan_results( | ||
offset: int | None = None, | ||
limit: int | None = None, | ||
severity: PD_SEVERITIES | None = None, | ||
search: str | None = None, | ||
time: PD_TIME_FILTERS | None = None, | ||
vuln_status: PD_VULN_STATUSES | None = None, | ||
) -> dict[str, Any]: | ||
"""Get all scan results. | ||
API reference: https://docs.projectdiscovery.io/api-reference/results/get-all-results | ||
""" | ||
|
||
with create_pd_client() as client: | ||
response = client.get( | ||
"/scans/results", | ||
params={ | ||
"offset": offset, | ||
"limit": limit, | ||
"severity": severity, | ||
"search": search, | ||
"time": time, | ||
"vuln_status": vuln_status, | ||
}, | ||
) | ||
response.raise_for_status() | ||
return response.json() |