Source code for intelelm.base_elm

#!/usr/bin/env python
# Created by "Thieu" at 09:48, 17/08/2023 ----------%                                                                               
#       Email: nguyenthieu2102@gmail.com            %                                                    
#       Github: https://github.com/thieu1995        %                         
# --------------------------------------------------%

import pickle
import numpy as np
import pandas as pd
from pathlib import Path
from permetrics import RegressionMetric, ClassificationMetric
from sklearn.base import BaseEstimator
from mealpy import get_optimizer_by_name, Optimizer, get_all_optimizers, FloatVar
from intelelm.utils import activation, validator
from intelelm.utils.evaluator import get_all_regression_metrics, get_all_classification_metrics


[docs]class ELM: """Extreme Learning Machine This class defines the general ELM model Parameters ---------- size_input : int, default=5 The number of input nodes size_hidden : int, default=10 The number of hidden nodes size_output : int, default=1 The number of output nodes act_name : {"relu", "leaky_relu", "celu", "prelu", "gelu", "elu", "selu", "rrelu", "tanh", "hard_tanh", "sigmoid", "hard_sigmoid", "log_sigmoid", "silu", "swish", "hard_swish", "soft_plus", "mish", "soft_sign", "tanh_shrink", "soft_shrink", "hard_shrink", "softmin", "softmax", "log_softmax" }, default='sigmoid' Activation function for the hidden layer. seed: int, default=None Determines random number generation for weights and bias initialization. Pass an int for reproducible results across multiple function calls. """ def __init__(self, size_input=5, size_hidden=10, size_output=1, act_name='sigmoid', seed=None): self.generator = np.random.default_rng(seed) self.input_nodes = size_input self.hidden_nodes = size_hidden self.output_nodes = size_output self.size_w1 = self.input_nodes * self.hidden_nodes self.size_b = self.hidden_nodes self.size_w2 = self.hidden_nodes * self.output_nodes self.act_name = act_name self.act_func = getattr(activation, self.act_name) self.weights = { "w1": self.generator.random((self.input_nodes, self.hidden_nodes)), "b": self.generator.random(self.hidden_nodes), "w2": self.generator.random((self.hidden_nodes, self.output_nodes)) }
[docs] def fit(self, X, y): """Fit the model to data matrix X and target(s) y. Parameters ---------- X : ndarray or sparse matrix of shape (n_samples, n_features) The input data. y : ndarray of shape (n_samples,) or (n_samples, n_outputs) The target values (class labels in classification, real numbers in regression). Returns ------- self : object Returns a trained ELM model. """ H = self.act_func(np.dot(X, self.weights["w1"]) + self.weights["b"]) self.weights["w2"] = np.linalg.pinv(H) @ y return self
[docs] def predict(self, X): """Predict using the Extreme Learning Machine model. Parameters ---------- X : {array-like, sparse matrix} of shape (n_samples, n_features) The input data. Returns ------- y : ndarray of shape (n_samples, n_outputs) The predicted values. """ H = self.act_func(np.dot(X, self.weights["w1"]) + self.weights["b"]) y_pred = np.dot(H, self.weights["w2"]) return y_pred
[docs] def get_weights(self): return self.weights
[docs] def set_weights(self, weights): self.weights = weights
[docs] def get_weights_size(self): return np.sum([item.size for item in self.weights.values()])
[docs] def update_weights_from_solution(self, solution, X, y): w1 = np.reshape(solution[:self.size_w1], (self.input_nodes, self.hidden_nodes)) b = np.reshape(solution[self.size_w1:self.size_w1 + self.size_b], self.hidden_nodes) H = self.act_func(np.dot(X, w1) + b) w2 = np.dot(np.linalg.pinv(H), y) self.set_weights({"w1": w1, "b": b, "w2": w2})
[docs]class BaseElm(BaseEstimator): """ Defines the most general class for ELM network that inherits the BaseEstimator class of Scikit-Learn library. Parameters ---------- hidden_size : int, default=10 The number of hidden nodes act_name : {"relu", "leaky_relu", "celu", "prelu", "gelu", "elu", "selu", "rrelu", "tanh", "hard_tanh", "sigmoid", "hard_sigmoid", "log_sigmoid", "silu", "swish", "hard_swish", "soft_plus", "mish", "soft_sign", "tanh_shrink", "soft_shrink", "hard_shrink", "softmin", "softmax", "log_softmax" }, default='sigmoid' Activation function for the hidden layer. """ SUPPORTED_CLS_METRICS = get_all_classification_metrics() SUPPORTED_REG_METRICS = get_all_regression_metrics() CLS_OBJ_LOSSES = None def __init__(self, hidden_size=10, act_name="elu"): super().__init__() self.hidden_size = hidden_size self.act_name = act_name self.weights = {} self.network, self.obj_scaler, self.loss_train = None, None, None self.n_labels, self.obj_scaler = None, None @staticmethod def _check_method(method=None, list_supported_methods=None): if type(method) is str: return validator.check_str("method", method, list_supported_methods) else: raise ValueError(f"method should be a string and belongs to {list_supported_methods}")
[docs] def create_network(self, X, y): return None, None
[docs] def fit(self, X, y): self.network, self.obj_scaler = self.create_network(X, y) y_scaled = self.obj_scaler.transform(y) self.network.fit(X, y_scaled) return self
[docs] def predict(self, X, return_prob=False): """ Inherit the predict function from BaseElm class, with 1 more parameter `return_prob`. Parameters ---------- X : {array-like, sparse matrix} of shape (n_samples, n_features) The input data. return_prob : bool, default=False It is used for classification problem: - If True, the returned results are the probability for each sample - If False, the returned results are the predicted labels """ pred = self.network.predict(X) if return_prob: return pred return self.obj_scaler.inverse_transform(pred)
def __evaluate_reg(self, y_true, y_pred, list_metrics=("MSE", "MAE")): rm = RegressionMetric(y_true=y_true, y_pred=y_pred) return rm.get_metrics_by_list_names(list_metrics) def __evaluate_cls(self, y_true, y_pred, list_metrics=("AS", "RS")): cm = ClassificationMetric(y_true, y_pred) return cm.get_metrics_by_list_names(list_metrics) def __score_reg(self, X, y, method="RMSE"): method = self._check_method(method, list(self.SUPPORTED_REG_METRICS.keys())) y_pred = self.network.predict(X) return RegressionMetric(y, y_pred).get_metric_by_name(method)[method] def __scores_reg(self, X, y, list_methods=("MSE", "MAE")): y_pred = self.network.predict(X) return self.__evaluate_reg(y_true=y, y_pred=y_pred, list_metrics=list_methods) def __score_cls(self, X, y, method="AS"): method = self._check_method(method, list(self.SUPPORTED_CLS_METRICS.keys())) return_prob = False if self.n_labels > 2: if method in self.CLS_OBJ_LOSSES: return_prob = True y_pred = self.predict(X, return_prob=return_prob) cm = ClassificationMetric(y_true=y, y_pred=y_pred) return cm.get_metric_by_name(method)[method] def __scores_cls(self, X, y, list_methods=("AS", "RS")): list_errors = list(set(list_methods) & set(self.CLS_OBJ_LOSSES)) list_scores = list((set(self.SUPPORTED_CLS_METRICS.keys()) - set(self.CLS_OBJ_LOSSES)) & set(list_methods)) t1 = {} if len(list_errors) > 0: return_prob = False if self.n_labels > 2: return_prob = True y_pred = self.predict(X, return_prob=return_prob) t1 = self.__evaluate_cls(y_true=y, y_pred=y_pred, list_metrics=list_errors) y_pred = self.predict(X, return_prob=False) t2 = self.__evaluate_cls(y_true=y, y_pred=y_pred, list_metrics=list_scores) return {**t2, **t1}
[docs] def evaluate(self, y_true, y_pred, list_metrics=None): """Return the list of performance metrics of the prediction. Parameters ---------- y_true : array-like of shape (n_samples,) or (n_samples, n_outputs) True values for `X`. y_pred : array-like of shape (n_samples,) or (n_samples, n_outputs) Predicted values for `X`. list_metrics : list You can get metrics from Permetrics library: https://github.com/thieu1995/permetrics Returns ------- results : dict The results of the list metrics """ pass
[docs] def score(self, X, y, method=None): """Return the metric of the prediction. Parameters ---------- X : array-like of shape (n_samples, n_features) Test samples. For some estimators this may be a precomputed kernel matrix or a list of generic objects instead with shape ``(n_samples, n_samples_fitted)``, where ``n_samples_fitted`` is the number of samples used in the fitting for the estimator. y : array-like of shape (n_samples,) or (n_samples, n_outputs) True values for `X`. method : str, default="RMSE" You can get metrics from Permetrics library: https://github.com/thieu1995/permetrics Returns ------- result : float The result of selected metric """ pass
[docs] def scores(self, X, y, list_methods=None): """Return the list of metrics of the prediction. Parameters ---------- X : array-like of shape (n_samples, n_features) Test samples. For some estimators this may be a precomputed kernel matrix or a list of generic objects instead with shape ``(n_samples, n_samples_fitted)``, where ``n_samples_fitted`` is the number of samples used in the fitting for the estimator. y : array-like of shape (n_samples,) or (n_samples, n_outputs) True values for `X`. list_methods : list, default=("MSE", "MAE") You can get metrics from Permetrics library: https://github.com/thieu1995/permetrics Returns ------- results : dict The results of the list metrics """ pass
[docs] def save_loss_train(self, save_path="history", filename="loss.csv"): """ Save the loss (convergence) during the training process to csv file. Parameters ---------- save_path : saved path (relative path, consider from current executed script path) filename : name of the file, needs to have ".csv" extension """ Path(save_path).mkdir(parents=True, exist_ok=True) if self.loss_train is None: print(f"{self.__class__.__name__} model doesn't have training loss!") else: data = {"epoch": list(range(1, len(self.loss_train) + 1)), "loss": self.loss_train} pd.DataFrame(data).to_csv(f"{save_path}/{filename}", index=False)
[docs] def save_metrics(self, y_true, y_pred, list_metrics=("RMSE", "MAE"), save_path="history", filename="metrics.csv"): """ Save evaluation metrics to csv file Parameters ---------- y_true : ground truth data y_pred : predicted output list_metrics : list of evaluation metrics save_path : saved path (relative path, consider from current executed script path) filename : name of the file, needs to have ".csv" extension """ Path(save_path).mkdir(parents=True, exist_ok=True) results = self.evaluate(y_true, y_pred, list_metrics) df = pd.DataFrame.from_dict(results, orient='index').T df.to_csv(f"{save_path}/{filename}", index=False)
[docs] def save_y_predicted(self, X, y_true, save_path="history", filename="y_predicted.csv"): """ Save the predicted results to csv file Parameters ---------- X : The features data, nd.ndarray y_true : The ground truth data save_path : saved path (relative path, consider from current executed script path) filename : name of the file, needs to have ".csv" extension """ Path(save_path).mkdir(parents=True, exist_ok=True) y_pred = self.predict(X, return_prob=False) data = {"y_true": np.squeeze(np.asarray(y_true)), "y_pred": np.squeeze(np.asarray(y_pred))} pd.DataFrame(data).to_csv(f"{save_path}/{filename}", index=False)
[docs] def save_model(self, save_path="history", filename="model.pkl"): """ Save model to pickle file Parameters ---------- save_path : saved path (relative path, consider from current executed script path) filename : name of the file, needs to have ".pkl" extension """ Path(save_path).mkdir(parents=True, exist_ok=True) if filename[-4:] != ".pkl": filename += ".pkl" pickle.dump(self, open(f"{save_path}/{filename}", 'wb'))
[docs] @staticmethod def load_model(load_path="history", filename="model.pkl"): if filename[-4:] != ".pkl": filename += ".pkl" return pickle.load(open(f"{load_path}/{filename}", 'rb'))
[docs]class BaseMhaElm(BaseElm): """ Defines the most general class for Metaheuristic-based ELM model that inherits the BaseELM class Parameters ---------- hidden_size : int, default=10 The number of hidden nodes act_name : {"relu", "leaky_relu", "celu", "prelu", "gelu", "elu", "selu", "rrelu", "tanh", "hard_tanh", "sigmoid", "hard_sigmoid", "log_sigmoid", "silu", "swish", "hard_swish", "soft_plus", "mish", "soft_sign", "tanh_shrink", "soft_shrink", "hard_shrink", "softmin", "softmax", "log_softmax" }, default='sigmoid' Activation function for the hidden layer. obj_name : None or str, default=None The name of objective for the problem, also depend on the problem is classification and regression. optimizer : str or instance of Optimizer class (from Mealpy library), default = "BaseGA" The Metaheuristic Algorithm that use to solve the feature selection problem. Current supported list, please check it here: https://github.com/thieu1995/mealpy. If a custom optimizer is passed, make sure it is an instance of `Optimizer` class. optimizer_paras : None or dict of parameter, default=None The parameter for the `optimizer` object. If `None`, the default parameters of optimizer is used (defined in https://github.com/thieu1995/mealpy.) If `dict` is passed, make sure it has at least `epoch` and `pop_size` parameters. verbose : bool, default=True Whether to print progress messages to stdout. seed: int, default=None Determines random number generation for weights and bias initialization. Pass an int for reproducible results across multiple function calls. """ SUPPORTED_OPTIMIZERS = list(get_all_optimizers().keys()) SUPPORTED_CLS_OBJECTIVES = get_all_classification_metrics() SUPPORTED_REG_OBJECTIVES = get_all_regression_metrics() def __init__(self, hidden_size=10, act_name="elu", obj_name=None, optimizer="BaseGA", optimizer_paras=None, verbose=True, seed=None): super().__init__(hidden_size=hidden_size, act_name=act_name) self.obj_name = obj_name self.optimizer_paras = optimizer_paras self.optimizer = self._set_optimizer(optimizer, optimizer_paras) self.verbose = verbose self.seed = seed self.network, self.obj_scaler = None, None self.obj_weights = None def _set_optimizer(self, optimizer=None, optimizer_paras=None): if type(optimizer) is str: opt_class = get_optimizer_by_name(optimizer) if type(optimizer_paras) is dict: return opt_class(**optimizer_paras) else: return opt_class(epoch=500, pop_size=50) elif isinstance(optimizer, Optimizer): if type(optimizer_paras) is dict: return optimizer.set_parameters(optimizer_paras) return optimizer else: raise TypeError(f"optimizer needs to set as a string and supported by Mealpy library.") def _get_history_loss(self, optimizer=None): list_global_best = optimizer.history.list_global_best # 2D array / matrix 2D global_obj_list = np.array([agent.target.objectives for agent in list_global_best]) # Make each obj_list as an element in array for drawing return global_obj_list[:, 0]
[docs] def fitness_function(self, solution=None): pass
[docs] def fit(self, X, y, lb=(-10.0, ), ub=(10.0, ), mode="single", n_workers=None, termination=None, save_population=False): """ Parameters ---------- X : The features data, np.ndarray y : The ground truth data lb : The lower bound for decision variables in optimization problem (The weights and biases of network) ub : The upper bound for decision variables in optimization problem (The weights and biases of network) mode: Parallel: 'process', 'thread'; Sequential: 'swarm', 'single'. * 'process': The parallel mode with multiple cores run the tasks * 'thread': The parallel mode with multiple threads run the tasks * 'swarm': The sequential mode that no effect on updating phase of other agents * 'single': The sequential mode that effect on updating phase of other agents, this is default mode n_workers: The number of workers (cores or threads) to do the tasks (effect only on parallel mode) termination: The termination dictionary or an instance of Termination class in Mealpy library save_population : Save the population of search agents (Don't set it to True when you don't know how to use it) """ self.network, self.obj_scaler = self.create_network(X, y) y_scaled = self.obj_scaler.transform(y) self.X_temp, self.y_temp = X, y_scaled problem_size = X.shape[1] * self.hidden_size + self.hidden_size if type(lb) in (list, tuple, np.ndarray) and type(ub) in (list, tuple, np.ndarray): if len(lb) == len(ub): if len(lb) == 1: lb = np.array(lb * problem_size, dtype=float) ub = np.array(ub * problem_size, dtype=float) elif len(lb) != problem_size: raise ValueError(f"Invalid lb and ub. Their length should be equal to 1 or problem_size.") else: raise ValueError(f"Invalid lb and ub. They should have the same length.") elif type(lb) in (int, float) and type(ub) in (int, float): lb = (float(lb), ) * problem_size ub = (float(ub), ) * problem_size else: raise ValueError(f"Invalid lb and ub. They should be a number of list/tuple/np.ndarray with size equal to problem_size") log_to = "console" if self.verbose else "None" if self.obj_name is None: raise ValueError("obj_name can't be None") else: if self.obj_name in self.SUPPORTED_REG_OBJECTIVES.keys(): minmax = self.SUPPORTED_REG_OBJECTIVES[self.obj_name] elif self.obj_name in self.SUPPORTED_CLS_OBJECTIVES.keys(): minmax = self.SUPPORTED_CLS_OBJECTIVES[self.obj_name] else: raise ValueError("obj_name is not supported. Please check the library: permetrics to see the supported objective function.") problem = { "obj_func": self.fitness_function, "bounds": FloatVar(lb=lb, ub=ub), "minmax": minmax, "log_to": log_to, "save_population": save_population, "obj_weights": self.obj_weights } g_best = self.optimizer.solve(problem, mode=mode, n_workers=n_workers, termination=termination, seed=self.seed) self.solution, self.best_fit = g_best.solution, g_best.target.fitness self.network.update_weights_from_solution(self.solution, self.X_temp, self.y_temp) self.loss_train = self._get_history_loss(optimizer=self.optimizer) return self