"""Supervised learning with ensembles and model router."""
import numpy as np
from sklearn.base import BaseEstimator, ClassifierMixin, check_array, check_is_fitted
from sklearn.model_selection import check_cv
from sklearn.utils.multiclass import type_of_target, unique_labels
try:
from sklearn.utils.parallel import delayed, Parallel
except ModuleNotFoundError:
from joblib import Parallel
# pylint: disable=ungrouped-imports
from sklearn.utils.fixes import delayed
from ..utils._legacy import (
_fit_context,
HasMethods,
Integral,
Interval,
Real,
validate_params,
)
from ._common import fit_one, fit_and_predict_one_on_test
from ..core.array import earray
[docs]class RoutingClassifier(BaseEstimator, ClassifierMixin):
"""Defers input to the most appropriate classifier chosen through semantic routing.
Trains a pool of `estimators` and a `router` that learns to select the best
estimator for each input based on a `costs` vector. The router is trained
using cross-validated predictions from the estimators to determine which
estimator is most appropriate for each input.
Parameters
----------
estimators : list of objects
List of candidate estimators to choose from.
router : object
Classifier used to route inputs to estimators.
costs : float or list of float, default=None
List of costs associated with each estimator (positive, higher is more costly).
If scalar, costs are uniform. If None, defaults to uniform 1.0.
cv : int, cross-validation generator or an iterable, default=None
Cross-validation strategy for training estimators and router.
return_earray : bool, default=False
Whether to return :class:`~skfb.core.array.ENDArray` of predicted classes
or plain numpy ndarray. ENDArray tracks which estimator made each prediction.
n_jobs : int, default=None
Number of jobs to run in parallel for cross-validation.
If None, use 1.
verbose : int or bool, default=0
Verbosity of parallel jobs.
Attributes
----------
router_ : object
Router trained on estimators' signals.
router_class_ratios_ : dict, int -> float
Keys are estimator indices and values fraction of accepted samples.
Examples
--------
>>> from skfb.ensemble import RoutingClassifier
>>> from sklearn.datasets import make_classification
>>> from sklearn.linear_model import LogisticRegression
>>> from sklearn.naive_bayes import GaussianNB
>>> from sklearn.svm import SVC
>>> X, y = make_classification(
... n_samples=300, n_features=100, n_redundant=90, class_sep=0.3,
... random_state=0)
>>> maxent = LogisticRegression(random_state=0)
>>> nb = GaussianNB()
>>> svm = SVC(kernel="linear", probability=True, random_state=0)
>>> router = LogisticRegression(random_state=0)
>>> routing = RoutingClassifier(
... estimators=[maxent, nb, svm],
... router=router,
... cv=3,
... return_earray=True).fit(X, y)
>>> routing.router_class_ratios_
{np.int64(0): np.float64(0.05),
np.int64(1): np.float64(0.8566666666666667),
np.int64(2): np.float64(0.09333333333333334)}
>>> routing.predict(X[:5])
ENDArray([1, 0, 1, 0, 1])
>>> routing.set_params(return_earray=False).predict(X[:5])
array([1, 0, 1, 0, 1])
>>> routing.set_params(return_earray=True).predict(X).acceptance_rates
array([0. , 0.97333333, 0.02666667])
"""
_parameter_constraints = {
"estimators": ["array-like"],
"router": [HasMethods(["fit", "predict"])],
"costs": ["array-like", Interval(Real, 0, None, closed="left"), None],
"cv": ["cv_object", None],
"return_earray": ["boolean"],
"n_jobs": [Interval(Integral, -1, None, closed="left"), None],
"verbose": ["verbose"],
}
def __init__(
self,
estimators,
router,
costs=None,
cv=None,
return_earray=False,
n_jobs=None,
verbose=0,
):
self.estimators = estimators
self.router = router
self.costs = costs
self.cv = cv
self.return_earray = return_earray
self.n_jobs = n_jobs
self.verbose = verbose
[docs] @_fit_context(prefer_skip_nested_validation=False)
@validate_params(
{
"X": ["array-like", "sparse matrix"],
"y": ["array-like"],
"sample_weight": ["array-like", None],
},
prefer_skip_nested_validation=True,
)
def fit(self, X, y, sample_weight=None):
"""Trains estimators and router.
Steps:
- Use cross-validated predictions from candidate estimators to
build routing targets (best estimator index per sample).
- Train the router on full data and store it in `self.router_`
to predict chosen estimator index.
- Fit all candidate estimators on full data and store them in
`self.estimators_` for inference.
Parameters
----------
X : {array-like, sparse matrix}, shape (n_samples, n_features)
The training input samples.
y : array-like, shape (n_samples,)
The target values.
sample_weight : array-like, shape (n_samples,), default=None
Sample weights. If None, then samples are equally weighted.
Returns
-------
self : object
Returns self.
"""
self.classes_ = unique_labels(y)
# region Normalize costs
if self.costs is None:
costs_ = np.ones(len(self.estimators), dtype=np.float64)
elif isinstance(self.costs, float):
costs_ = [self.costs] * len(self.estimators)
else:
costs_ = self.costs
self.costs_ = np.asarray(costs_, dtype=np.float64)
# endregion
# Validate and/or create cv.
self.cv_ = check_cv(self.cv, y=y, classifier=True)
# Build router targets using cross-validated predictions
y_route = self._make_router_targets(X, y, sample_weight=sample_weight)
# Fit router on inputs and router targets
self.router_ = fit_one(self.router, X, y_route, sample_weight=sample_weight)
# region Fit final estimators on full data
self.estimators_ = Parallel(n_jobs=self.n_jobs, verbose=self.verbose)(
delayed(fit_one)(estimator, X, y, sample_weight)
for estimator in self.estimators
)
# endregion
self.is_fitted_ = True
self._route_target_type = type_of_target(y_route)
return self
def _make_router_targets(
self,
X,
y,
sample_weight=None,
):
"""Builds data for router training."""
n_samples = len(X)
n_estimators = len(self.estimators)
n_classes = len(np.unique(y))
# Accumulate probabilities (handle repeated CV folds)
y_proba_sum = np.zeros((n_estimators, n_samples, n_classes))
fold_counts = np.zeros(n_samples, dtype=int)
for train_idx, test_idx in self.cv_.split(X, y):
X_train = np.take(X, train_idx, axis=0)
y_train = np.take(y, train_idx, axis=0)
X_test = np.take(X, test_idx, axis=0)
if sample_weight is not None:
sw_train = np.take(sample_weight, train_idx, axis=0)
else:
sw_train = None
# Get probability predictions from each estimator
probas = Parallel(n_jobs=self.n_jobs)(
delayed(fit_and_predict_one_on_test)(
estimator,
X_train,
y_train,
sw_train,
X_test,
"predict_proba",
)
for estimator in self.estimators
)
# Accumulate for averaging
for i, proba in enumerate(probas):
y_proba_sum[i, test_idx, :] += proba
fold_counts[test_idx] += 1
# Average probabilities across folds
y_proba = y_proba_sum / fold_counts[np.newaxis, :, np.newaxis]
return self._collect_target_labels(y, y_proba)
def _collect_target_labels(self, y_true, y_proba):
"""Collects routing target labels based on estimator predictions and costs."""
n_estimators, n_samples = y_proba.shape[0], y_proba.shape[1]
sample_idx = np.arange(n_samples)
# Compute log-loss: -log(P(true_class))
log_losses = np.zeros((n_estimators, n_samples))
for i in range(n_estimators):
true_class_proba = y_proba[i, sample_idx, y_true]
# Clip to avoid log(0) / log(epsilon)
true_class_proba = np.clip(true_class_proba, 1e-15, 1.0 - 1e-15)
log_losses[i] = -np.log(true_class_proba)
# Scale losses by cost and find best estimator per sample
cost_scaled_losses = log_losses * self.costs_[:, np.newaxis]
y_route = cost_scaled_losses.argmin(axis=0)
router_classes, router_class_counts = np.unique(y_route, return_counts=True)
self.router_class_ratios_ = dict(
zip(router_classes, router_class_counts / sum(router_class_counts))
)
return y_route
[docs] @validate_params(
{
"X": ["array-like", "sparse matrix"],
},
prefer_skip_nested_validation=True,
)
def predict(self, X):
"""Predicts class labels for samples in X.
Parameters
----------
X : {array-like, sparse matrix}, shape (n_samples, n_features)
Samples to classify.
Returns
-------
y_pred : ndarray, shape (n_samples,)
Predicted class labels.
"""
return self._predict(X, "predict")
[docs] @validate_params(
{
"X": ["array-like", "sparse matrix"],
},
prefer_skip_nested_validation=True,
)
def predict_proba(self, X):
"""Predicts class probabilities for X.
Parameters
----------
X : {array-like, sparse matrix}, shape (n_samples, n_features)
Samples to classify.
Returns
-------
y_proba : ndarray, shape (n_samples, n_classes)
Class probabilities.
"""
return self._predict(X, "predict_proba")
[docs] @validate_params(
{
"X": ["array-like", "sparse matrix"],
},
prefer_skip_nested_validation=True,
)
def predict_log_proba(self, X):
"""Predicts log class probabilities for X.
Parameters
----------
X : {array-like, sparse matrix}, shape (n_samples, n_features)
Samples to classify.
Returns
-------
y_log_proba : ndarray, shape (n_samples, n_classes)
Log class probabilities.
"""
return np.log(self.predict_proba(X))
[docs] @validate_params(
{
"X": ["array-like", "sparse matrix"],
},
prefer_skip_nested_validation=True,
)
def decision_function(self, X):
"""Compute decision function for X.
Parameters
----------
X : {array-like, sparse matrix}, shape (n_samples, n_features)
Samples to evaluate.
Returns
-------
y_score : ndarray, shape (n_samples, n_classes) or (n_samples,)
Decision function values.
"""
return self._predict(X, "decision_function")
def _predict(self, X, method):
"""Main function to route inputs and make predictions."""
check_is_fitted(self, attributes="is_fitted_")
# Ensure we can extract input metadata
try:
X = check_array(
X,
accept_sparse=True,
dtype=None,
ensure_2d=False,
ensure_all_finite=False,
allow_nd=True,
)
except TypeError:
X = check_array(
X,
accept_sparse=True,
dtype=None,
ensure_2d=False,
force_all_finite=False,
allow_nd=True,
)
# Choose estimators
y_route = self.router_.predict(X)
# Create prediction matrix
n_samples, n_estimators = len(X), len(self.estimators_)
if method == "predict":
y_pred = np.empty(n_samples, dtype=self.classes_.dtype)
else:
if self._route_target_type == "binary" and method == "decision_function":
output_shape = (n_samples,)
else:
output_shape = (n_samples, len(self.classes_))
y_pred = np.zeros(output_shape, dtype=np.float64)
# Ensemble mask if `self.return_earray` is True.
if self.return_earray:
ensemble_mask = np.zeros((n_samples, n_estimators), dtype=np.bool_)
# Predict classes for each chosen estimator
for estimator_idx in np.unique(y_route):
estimator_mask = y_route == estimator_idx
if not np.any(estimator_mask):
continue
estimator = self.estimators_[estimator_idx]
y_chunk = getattr(estimator, method)(X[estimator_mask])
y_pred[estimator_mask] = y_chunk
if self.return_earray:
ensemble_mask[estimator_mask, estimator_idx] = True
if self.return_earray:
return earray(y_pred, ensemble_mask)
else:
return y_pred