File: test_survival.py

package info (click to toggle)
xgboost 3.0.0-1
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 13,796 kB
  • sloc: cpp: 67,502; python: 35,503; java: 4,676; ansic: 1,426; sh: 1,320; xml: 1,197; makefile: 204; javascript: 19
file content (168 lines) | stat: -rw-r--r-- 5,895 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
import json
import os
from typing import List, Optional, Tuple, cast

import numpy as np
import pytest

import xgboost as xgb
from xgboost import testing as tm

dpath = tm.data_dir(__file__)


@pytest.fixture(scope="module")
def toy_data() -> Tuple[xgb.DMatrix, np.ndarray, np.ndarray]:
    X = np.array([1, 2, 3, 4, 5]).reshape((-1, 1))
    INF = np.inf
    y_lower = np.array([10, 15, -INF, 30, 100])
    y_upper = np.array([INF, INF, 20, 50, INF])

    dmat = xgb.DMatrix(X)
    dmat.set_float_info("label_lower_bound", y_lower)
    dmat.set_float_info("label_upper_bound", y_upper)
    return dmat, y_lower, y_upper


def test_default_metric(toy_data: Tuple[xgb.DMatrix, np.ndarray, np.ndarray]) -> None:
    Xy, y_lower, y_upper = toy_data

    def run(evals: Optional[list]) -> None:
        # test with or without actual evaluation.
        booster = xgb.train(
            {"objective": "survival:aft", "aft_loss_distribution": "extreme"},
            Xy,
            num_boost_round=1,
            evals=evals,
        )
        config = json.loads(booster.save_config())
        metrics = config["learner"]["metrics"]
        assert len(metrics) == 1
        assert metrics[0]["aft_loss_param"]["aft_loss_distribution"] == "extreme"

        booster = xgb.train(
            {"objective": "survival:aft"},
            Xy,
            num_boost_round=1,
            evals=evals,
        )
        config = json.loads(booster.save_config())
        metrics = config["learner"]["metrics"]
        assert len(metrics) == 1
        assert metrics[0]["aft_loss_param"]["aft_loss_distribution"] == "normal"

    run([(Xy, "Train")])
    run(None)


def test_aft_survival_toy_data(
    toy_data: Tuple[xgb.DMatrix, np.ndarray, np.ndarray]
) -> None:
    # See demo/aft_survival/aft_survival_viz_demo.py
    X = np.array([1, 2, 3, 4, 5]).reshape((-1, 1))
    dmat, y_lower, y_upper = toy_data

    # "Accuracy" = the number of data points whose ranged label (y_lower, y_upper)
    #              includes the corresponding predicted label (y_pred)
    acc_rec = []

    class Callback(xgb.callback.TrainingCallback):
        def __init__(self):
            super().__init__()

        def after_iteration(
            self,
            model: xgb.Booster,
            epoch: int,
            evals_log: xgb.callback.TrainingCallback.EvalsLog,
        ):
            y_pred = model.predict(dmat)
            acc = np.sum(np.logical_and(y_pred >= y_lower, y_pred <= y_upper) / len(X))
            acc_rec.append(acc)
            return False

    evals_result: xgb.callback.TrainingCallback.EvalsLog = {}
    params = {
        "max_depth": 3,
        "objective": "survival:aft",
        "min_child_weight": 0,
        "tree_method": "exact",
    }
    bst = xgb.train(
        params,
        dmat,
        15,
        [(dmat, "train")],
        evals_result=evals_result,
        callbacks=[Callback()],
    )

    nloglik_rec = cast(List[float], evals_result["train"]["aft-nloglik"])
    # AFT metric (negative log likelihood) improve monotonically
    assert all(p >= q for p, q in zip(nloglik_rec, nloglik_rec[:1]))
    # "Accuracy" improve monotonically.
    # Over time, XGBoost model makes predictions that fall within given label ranges.
    assert all(p <= q for p, q in zip(acc_rec, acc_rec[1:]))
    assert acc_rec[-1] == 1.0

    def gather_split_thresholds(tree):
        if "split_condition" in tree:
            return (
                gather_split_thresholds(tree["children"][0])
                | gather_split_thresholds(tree["children"][1])
                | {tree["split_condition"]}
            )
        return set()

    # Only 2.5, 3.5, and 4.5 are used as split thresholds.
    model_json = [json.loads(e) for e in bst.get_dump(dump_format="json")]
    for i, tree in enumerate(model_json):
        assert gather_split_thresholds(tree).issubset({2.5, 3.5, 4.5})


def test_aft_empty_dmatrix():
    X = np.array([]).reshape((0, 2))
    y_lower, y_upper = np.array([]), np.array([])
    dtrain = xgb.DMatrix(X)
    dtrain.set_info(label_lower_bound=y_lower, label_upper_bound=y_upper)
    bst = xgb.train({'objective': 'survival:aft', 'tree_method': 'hist'},
                    dtrain, num_boost_round=2, evals=[(dtrain, 'train')])


@pytest.mark.skipif(**tm.no_pandas())
def test_aft_survival_demo_data():
    import pandas as pd
    df = pd.read_csv(os.path.join(dpath, 'veterans_lung_cancer.csv'))

    y_lower_bound = df['Survival_label_lower_bound']
    y_upper_bound = df['Survival_label_upper_bound']
    X = df.drop(['Survival_label_lower_bound', 'Survival_label_upper_bound'], axis=1)

    dtrain = xgb.DMatrix(X)
    dtrain.set_float_info('label_lower_bound', y_lower_bound)
    dtrain.set_float_info('label_upper_bound', y_upper_bound)

    base_params = {'verbosity': 0,
                   'objective': 'survival:aft',
                   'eval_metric': 'aft-nloglik',
                   'tree_method': 'hist',
                   'learning_rate': 0.05,
                   'aft_loss_distribution_scale': 1.20,
                   'max_depth': 6,
                   'lambda': 0.01,
                   'alpha': 0.02}
    nloglik_rec = {}
    dists = ['normal', 'logistic', 'extreme']
    for dist in dists:
        params = base_params
        params.update({'aft_loss_distribution': dist})
        evals_result = {}
        bst = xgb.train(params, dtrain, num_boost_round=500, evals=[(dtrain, 'train')],
                        evals_result=evals_result)
        nloglik_rec[dist] = evals_result['train']['aft-nloglik']
        # AFT metric (negative log likelihood) improve monotonically
        assert all(p >= q for p, q in zip(nloglik_rec[dist], nloglik_rec[dist][:1]))
    # For this data, normal distribution works the best
    assert nloglik_rec['normal'][-1] < 4.9
    assert nloglik_rec['logistic'][-1] > 4.9
    assert nloglik_rec['extreme'][-1] > 4.9