Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(integration): Get Project Discovery scan results #104

Merged
merged 1 commit into from Apr 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -0,0 +1,69 @@
{
"current_page": 123,
"result_count": 123,
"total_results": 123,
"total_pages": 123,
"message": "<string>",
"data": [
{
"matcher_status": true,
"vuln_id": "<string>",
"target": "<string>",
"template_url": "<string>",
"created_at": "<string>",
"updated_at": "<string>",
"scan_id": "<string>",
"event": [
{
"curl-command": "<string>",
"extracted-results": ["<string>"],
"extractor-name": "<string>",
"host": "<string>",
"info": {
"classification": {
"cpe": "<string>",
"cve-id": ["<string>"],
"cvss-metrics": "<string>",
"cvss-score": 123,
"cwe-id": ["<string>"],
"epss-percentile": 123,
"epss-score": 123
},
"metadata": {},
"author": ["<string>"],
"description": "<string>",
"impact": "<string>",
"name": "<string>",
"tags": ["<string>"],
"reference": ["<string>"],
"remediation": "<string>",
"severity": "<string>"
},
"ip": "<string>",
"matched-at": "<string>",
"matched-line": [123],
"matcher-name": "<string>",
"matcher-status": true,
"path": "<string>",
"request": "<string>",
"response": "<string>",
"template-id": "<string>",
"template-path": "<string>",
"timestamp": "<string>",
"type": "<string>"
}
],
"template_id": "<string>",
"template_path": "<string>",
"template_encoded": "<string>",
"result_type": "<string>",
"vuln_status": "<string>",
"vuln_hash": "<string>",
"labels": ["<string>"]
}
],
"stats": {
"total": 123
},
"filters": {}
}
54 changes: 54 additions & 0 deletions tests/integrations/test_project_discovery.py
@@ -0,0 +1,54 @@
import os
import time

import pytest
from httpx import Response

from tracecat.integrations.project_discovery import get_all_scan_results


@pytest.fixture
def project_discovery_secret(create_mock_secret) -> dict[str, str | bytes]:
mock_secret = create_mock_secret(
"project_discovery", {"PD_API_KEY": os.environ["PD_API_KEY"]}
)
mock_secret_obj = mock_secret.model_dump_json()
return mock_secret_obj


@pytest.mark.respx(assert_all_mocked=False)
@pytest.mark.parametrize(
"severity,time_filter,vuln_status",
[
(None, None, None),
("low", None, None),
("medium", "last_week", "fixed"),
("high", "last_month", "open"),
],
)
def test_get_all_scan_results(
severity, time_filter, vuln_status, project_discovery_secret, respx_mock
):
# Mock secrets manager
tracecat_api_url = os.environ["TRACECAT__API_URL"]
route = respx_mock.get(f"{tracecat_api_url}/secrets/emailrep").mock(
topher-lo marked this conversation as resolved.
Show resolved Hide resolved
return_value=Response(status_code=200, content=project_discovery_secret)
)

# Assuming the API key is required for live calls and is set in the environment
result = get_all_scan_results(
offset=10,
limit=100,
severity=severity,
search=None,
time=time_filter,
vuln_status=vuln_status,
)

# Asserts to check if the API call was successful and returns the expected structure
assert route.called
assert isinstance(result, dict)
assert "data" in result

# Throttling API requests to avoid rate limiting
time.sleep(3)
2 changes: 2 additions & 0 deletions tracecat/integrations/__init__.py
Expand Up @@ -5,6 +5,7 @@
aws_cloudtrail,
datadog,
emailrep,
project_discovery,
sublime,
urlscan,
virustotal,
Expand All @@ -19,6 +20,7 @@
"aws_cloudtrail",
"datadog",
"emailrep",
"project_discovery",
"sublime",
"urlscan",
"virustotal",
Expand Down
58 changes: 58 additions & 0 deletions tracecat/integrations/project_discovery.py
@@ -0,0 +1,58 @@
"""Integrations with Project Discovery API.

Supported endpoints:
- Results: see and managed vulnerabilities detected by PD Cloud Platform
- (Coming soon) Scans: manage scans, scan schedules, and create new scans

Required credentials: `project_discovery` secret with `PD_API_KEY` key.

API reference: https://docs.projectdiscovery.io/api-reference/introduction
"""

import os
from typing import Any, Literal

import httpx

from tracecat.integrations._registry import registry

PD_BASE_URL = "https://api.projectdiscovery.io/v1"
# https://docs.projectdiscovery.io/introduction
PD_SEVERITIES = Literal["info", "low", "medium", "high", "critical"]
PD_TIME_FILTERS = Literal["last_day", "last_week", "last_month"]
PD_VULN_STATUSES = Literal["open", "closed" "false_positive", "fixed"]


def create_pd_client() -> httpx.Client:
headers = {"X-Api-Key": os.environ["PD_API_KEY"]}
return httpx.Client(base_url=PD_BASE_URL, headers=headers)


@registry.register(description="Get all scan results", secrets=["project_discovery"])
def get_all_scan_results(
offset: int | None = None,
limit: int | None = None,
severity: PD_SEVERITIES | None = None,
search: str | None = None,
time: PD_TIME_FILTERS | None = None,
vuln_status: PD_VULN_STATUSES | None = None,
) -> dict[str, Any]:
"""Get all scan results.

API reference: https://docs.projectdiscovery.io/api-reference/results/get-all-results
"""

with create_pd_client() as client:
response = client.get(
"/scans/results",
params={
"offset": offset,
"limit": limit,
"severity": severity,
"search": search,
"time": time,
"vuln_status": vuln_status,
},
)
response.raise_for_status()
return response.json()