File: updateCVElist.py

package info (click to toggle)
mysqltuner 2.8.29-1
  • links: PTS
  • area: main
  • in suites: forky, sid
  • size: 3,128 kB
  • sloc: perl: 7,229; sh: 620; python: 135; makefile: 119
file content (182 lines) | stat: -rw-r--r-- 7,353 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
import requests
import json
import csv
import zipfile
import io
import os
from datetime import datetime

# Range of years to analyze
start_year = 2020
current_year = datetime.now().year
years_to_process = list(range(start_year, current_year + 1))

# Filter on MySQL and MariaDB
# Note: The vendor for MySQL is often "oracle" and for MariaDB "mariadb"
target_products = ["mysql", "mariadb"]
output_file = "mysql_mariadb_cve_full.csv"

def get_cvss_score(cve_data_metrics, version):
    if version == 'V3':
        cvss_metrics_v31 = cve_data_metrics.get('cvssMetricV31', [])
        if cvss_metrics_v31:
            cvss_data = cvss_metrics_v31[0].get('cvssData', {})
            return cvss_data.get('baseScore'), cvss_data.get('baseSeverity')
    elif version == 'V2':
        cvss_metrics_v2 = cve_data_metrics.get('cvssMetricV2', [])
        if cvss_metrics_v2:
            cvss_data = cvss_metrics_v2[0].get('cvssData', {})
            return cvss_data.get('baseScore'), cvss_metrics_v2[0].get('baseSeverity') # baseSeverity is directly here
    return None, None

def extract_affected_versions(node):
    """
    Recursively extracts affected products from configuration nodes.
    Returns a list of dicts with vendor, product, version.
    """
    affected = []
    
    # Handle children (nested logic)
    if 'children' in node:
        for child in node['children']:
            affected.extend(extract_affected_versions(child))
            
    # Handle CPE matches
    if 'cpeMatch' in node:
        for match in node['cpeMatch']:
            if match.get('vulnerable'):
                # In JSON 2.0, the URI is often in 'criteria'
                cpe_uri = match.get('criteria')
                
                if cpe_uri:
                    parts = cpe_uri.split(':')
                    if len(parts) >= 6:
                        vendor = parts[3]
                        product = parts[4]
                        version = parts[5]
                        
                        # If the version is generic (* or -), try to enrich with range info
                        version_str = version
                        
                        ranges = []
                        if match.get('versionStartIncluding'):
                            ranges.append(f">= {match['versionStartIncluding']}")
                        if match.get('versionStartExcluding'):
                            ranges.append(f"> {match['versionStartExcluding']}")
                        if match.get('versionEndIncluding'):
                            ranges.append(f"<= {match['versionEndIncluding']}")
                        if match.get('versionEndExcluding'):
                            ranges.append(f"< {match['versionEndExcluding']}")
                        
                        if ranges and (version == '*' or version == '-'):
                            version_str = " ".join(ranges)
                        
                        if any(p_name in product for p_name in target_products):
                            affected.append({
                                'vendor': vendor,
                                'product': product,
                                'version': version_str
                            })
    return affected

print(f"Starting processing for years: {years_to_process}")

# Initialize CSV file with header
with open(output_file, "w", newline="", encoding="utf-8") as csvfile:
    fieldnames = [
        "cve_id", "published_date", "last_modified", "cvss_v3_score", "cvss_v3_severity",
        "cvss_v2_score", "cvss_v2_severity", "summary", "vendor", "product", "version",
        "references"
    ]
    writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
    writer.writeheader()

total_count = 0

for year in years_to_process:
    url = f"https://nvd.nist.gov/feeds/json/cve/2.0/nvdcve-2.0-{year}.json.zip"
    print(f"--- Processing year {year} ---")
    print(f"Downloading from {url}...")
    
    try:
        response = requests.get(url, timeout=60, headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'})
        response.raise_for_status()
    except requests.RequestException as e:
        print(f"Error downloading for {year} : {e}")
        continue

    print("Extracting and parsing JSON...")
    try:
        with zipfile.ZipFile(io.BytesIO(response.content)) as z:
            json_filename = [name for name in z.namelist() if name.endswith('.json')][0]
            with z.open(json_filename) as f:
                data = json.load(f)
    except Exception as e:
        print(f"Error extracting or parsing JSON for {year} : {e}")
        continue

    cve_items = data.get('vulnerabilities', [])
    print(f"Analyzing {len(cve_items)} CVE entries for {year}...")

    count_year = 0
    with open(output_file, "a", newline="", encoding="utf-8") as csvfile:
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        # No writeheader() here because it's already done
        
        for vuln_entry in cve_items:
            cve = vuln_entry.get('cve', {})
            cve_id = cve.get('id')
            
            published_date = cve.get('published')
            last_modified = cve.get('lastModified')
            
            description_data = cve.get('descriptions', [])
            summary = description_data[0].get('value') if description_data else ""
            
            references_data = cve.get('references', [])
            references = "; ".join([ref.get('url') for ref in references_data])

            v3_score, v3_severity = get_cvss_score(cve.get('metrics', {}), 'V3')
            v2_score, v2_severity = get_cvss_score(cve.get('metrics', {}), 'V2')

            # Analyze configurations to find products
            configurations = cve.get('configurations', {})
            if isinstance(configurations, list) and configurations:
                configurations = configurations[0]
            
            nodes = configurations.get('nodes', [])
            
            affected_products = []
            for node in nodes:
                affected_products.extend(extract_affected_versions(node))
            
            # Deduplication
            seen = set()
            for prod in affected_products:
                key = (prod['vendor'], prod['product'], prod['version'])
                if key in seen:
                    continue
                seen.add(key)
                
                row = {
                    "cve_id": cve_id,
                    "published_date": published_date,
                    "last_modified": last_modified,
                    "cvss_v3_score": v3_score,
                    "cvss_v3_severity": v3_severity,
                    "cvss_v2_score": v2_score,
                    "cvss_v2_severity": v2_severity,
                    "summary": summary,
                    "vendor": prod['vendor'],
                    "product": prod['product'],
                    "version": prod['version'],
                    "references": references
                }
                writer.writerow(row)
                count_year += 1
    
    print(f"Added {count_year} vulnerabilities for {year}.")
    total_count += count_year

print(f"Done. Total: {total_count} vulnerabilities exported to {output_file}")
exit(0)