Source code for kappaml_core.meta.base
from collections import deque
from copy import deepcopy
from typing import List
import numpy as np
from pymfe.mfe import MFE
from river.base import Classifier, Regressor
from river.metrics import MAE
from river.metrics.base import Metric
from river.model_selection.base import ModelSelector
from river.tree import HoeffdingTreeClassifier
[docs]
class MetaEstimator(ModelSelector):
"""Meta-estimator for model selection using meta-learning.
This implements a meta-estimator that uses a list of base estimator models
and a meta learner. The meta learner uses meta features from stream characteristics
to select the best base estimator at a given point in time.
Parameters
----------
models: list of Estimator
A list of base estimator models.
meta_learner: Classifier
default=HoeffdingTreeClassifier
Meta learner used to predict the best base estimator.
metric: Metric
Metric used to evaluate the performance of the base estimators.
mfe_groups: list (default=['general'])
Groups of meta-features to use from PyMFE
window_size: int (default=200)
The size of the window used for extracting meta-features.
meta_update_frequency: int (default=50)
How frequently to extract meta-features and update the meta-learner.
Higher values mean less frequent updates but more stable meta-model.
"""
def __init__(
self,
models: List[Regressor | Classifier],
meta_learner: Classifier = HoeffdingTreeClassifier(),
metric: Metric = MAE(),
mfe_groups: list = ["general"],
window_size: int = 200,
meta_update_frequency: int = 50,
):
super().__init__(models, metric)
self.meta_learner = meta_learner
self.mfe_groups = mfe_groups
self.window_size = window_size
self.meta_update_frequency = meta_update_frequency
# Track performance of each model globally
self.metrics = [deepcopy(metric) for _ in range(len(self))]
self.mfe = MFE(groups=self.mfe_groups, suppress_warnings=True)
# Window of (x, y) pairs for meta-feature extraction
self.window_data_x = deque(maxlen=window_size)
self.window_data_y = deque(maxlen=window_size)
# Track performance of each model on the current window
self.window_metrics = [deepcopy(metric) for _ in range(len(self))]
# Counter to track samples for meta-update frequency
self.sample_counter = 0
# Track the best model predicted by the meta-learner
self._best_model = models[0]
def _extract_meta_features(self):
"""Extract meta-features from the current window."""
if len(self.window_data_x) < self.window_size:
return None
# Convert deque to appropriate format for PyMFE
X = np.array(
[
list(x.values()) if isinstance(x, dict) else list(x)
for x in self.window_data_x
]
)
y = np.array(self.window_data_y)
try:
self.mfe.fit(X, y, suppress_warnings=True)
meta_features = self.mfe.extract(suppress_warnings=True)
# Convert to dict for easier use with River
features_dict = {
name: value for name, value in zip(meta_features[0], meta_features[1])
}
# Remove nan values
features_dict = {k: v for k, v in features_dict.items() if not np.isnan(v)}
return features_dict
except Exception as e:
print(f"Error extracting meta-features: {e}")
return None
def _get_best_window_model_index(self):
"""Get the index of the best performing model on the current window."""
best_metric = self.window_metrics[0]
best_index = 0
for i, metric in enumerate(self.window_metrics):
if metric.is_better_than(best_metric):
best_metric = metric
best_index = i
return best_index, best_metric.get()
def _get_best_global_model_index(self):
"""Get the best global model."""
best_metric = self.metrics[0]
best_index = 0
for i, metric in enumerate(self.metrics):
if metric.is_better_than(best_metric):
best_metric = metric
best_index = i
return best_index, best_metric.get()
[docs]
def learn_one(self, x, y):
# Store data in window
self.window_data_x.append(x)
self.window_data_y.append(y)
self.sample_counter += 1
# Update all models and their metrics
for i, (model, metric) in enumerate(zip(self, self.metrics)):
y_pred = model.predict_one(x)
metric.update(y, y_pred)
model.learn_one(x, y)
# Update window metrics
self.window_metrics[i].update(y, y_pred)
# Only extract meta-features and update meta-learner periodically
if (
len(self.window_data_x) >= self.window_size
and self.sample_counter >= self.meta_update_frequency
):
meta_features = self._extract_meta_features()
if meta_features:
# Get the best model index for this window
best_model_idx, _ = self._get_best_window_model_index()
# Train meta-learner to predict the best model index
self.meta_learner.learn_one(meta_features, best_model_idx)
# Predict the best model using the meta-learner
predicted_model_idx = int(
round(self.meta_learner.predict_one(meta_features))
)
# Update the best model
self._best_model = self.models[predicted_model_idx]
# Reset window metrics for next window
self.window_metrics = [deepcopy(self.metric) for _ in range(len(self))]
# Reset sample counter
self.sample_counter = 0
return self
@property
def best_model(self):
return self._best_model