#!/usr/bin/env python
# Created by "Thieu" at 9:23 PM, 08/10/2024 --------%
# Email: nguyenthieu2102@gmail.com %
# Github: https://github.com/thieu1995 %
# --------------------------------------------------%
import numbers
import pickle
import numpy as np
import pandas as pd
from pathlib import Path
from typing import Optional
from sklearn.base import BaseEstimator
from permetrics import RegressionMetric, ClassificationMetric
from mealpy import get_optimizer_by_class, Optimizer, get_all_optimizers, FloatVar
from intelelm.utils import activation, validator
from intelelm.utils.evaluator import get_all_regression_metrics, get_all_classification_metrics
[docs]class MultiLayerELM:
"""
Initializes the Multi-Layer ELM model.
Parameters
----------
layer_sizes : list of int
List of integers, where each integer represents the number of neurons in the respective hidden layers.
act_name : str, optional
Activation function to be used in the hidden layers ("relu", "leaky_relu", "celu", "prelu", "gelu",
"elu", "selu", "rrelu", "tanh", "hard_tanh", "sigmoid", "hard_sigmoid", "log_sigmoid", "silu",
"swish", "hard_swish", "soft_plus", "mish", "soft_sign", "tanh_shrink", "soft_shrink",
"hard_shrink", "softmin", "softmax", "log_softmax"). Default is 'relu'.
seed : int, optional
Seed for random number generator. Default is None.
"""
def __init__(self, layer_sizes=(10, ), act_name='relu', seed=None):
"""
Initializes the Multi-Layer ELM model.
Parameters:
- layer_sizes: List of integers, where each integer represents the number of neurons in the respective hidden layers. Default is (10, )
- act_name: Activation function to be used in the hidden layers. Default is 'relu'.
"""
if not isinstance(layer_sizes, (list, tuple, np.ndarray, int)):
raise ValueError(f"layer_sizes should be an int, list, tuple, or np.ndarray. Got {type(layer_sizes)}")
self.layer_sizes = layer_sizes if isinstance(layer_sizes, (list, tuple, np.ndarray)) else [layer_sizes]
self.act_name = act_name
self.act_func = getattr(activation, self.act_name)
self.generator = np.random.default_rng(seed)
self.weights = []
self.biases = []
self.beta = None
self.input_size, self.obj_scaler = None, None
def _initialize_weights(self, input_size):
self.weights = []
self.biases = []
for size in self.layer_sizes:
weight = self.generator.standard_normal(size=(input_size, size))
bias = self.generator.standard_normal(size)
self.weights.append(weight)
self.biases.append(bias)
input_size = size
def _forward(self, X):
# Forward pass through multiple layers
for i in range(len(self.layer_sizes)):
X = self.act_func(np.dot(X, self.weights[i]) + self.biases[i])
return X
[docs] def fit(self, X, y):
"""Fit the model to data matrix X and target(s) y.
Parameters
----------
X : ndarray or sparse matrix of shape (n_samples, n_features)
The input data.
y : ndarray of shape (n_samples,) or (n_samples, n_outputs)
The target values (class labels in classification, real numbers in regression).
Returns
-------
self : object
Returns a trained ELM model.
"""
# Initialize random weights for each layer
self.input_size = X.shape[1]
self._initialize_weights(input_size=self.input_size)
# Forward pass to compute hidden layer output
H = self._forward(X)
# Compute output weights (beta) using Moore-Penrose pseudoinverse
self.beta = np.dot(np.linalg.pinv(H), y)
return self
[docs] def predict(self, X):
"""Predict using the Extreme Learning Machine model.
Parameters
----------
X : {array-like, sparse matrix} of shape (n_samples, n_features)
The input data.
Returns
-------
y : ndarray of shape (n_samples, n_outputs)
The predicted values.
"""
H = self._forward(X)
return np.dot(H, self.beta)
[docs] def encode(self):
"""
Encode the current weights and biases into a 1-D vector (solution vector).
Returns:
- A 1-D numpy array containing all the weights and biases of the network.
"""
flat_weights = [w.flatten() for w in self.weights] # Flatten each weight matrix
flat_biases = [b.flatten() for b in self.biases] # Flatten each bias vector
solution_vector = np.concatenate(flat_weights + flat_biases) # Concatenate all into a 1-D vector
return solution_vector
[docs] def decode(self, solution_vector, X, y):
"""
Decode a 1-D solution vector into the weights and biases of the network.
Parameters:
- solution_vector: 1-D numpy array containing the flattened weights and biases.
"""
start = 0
input_size = self.input_size
self.weights = []
self.biases = []
# Decode weights and biases for each layer
for i, size in enumerate(self.layer_sizes):
weight_size = input_size * size
bias_size = size
# Extract the weights and reshape them to the correct matrix shape
weight = solution_vector[start:start + weight_size].reshape((input_size, size))
self.weights.append(weight)
start += weight_size
# Extract the biases
bias = solution_vector[start:start + bias_size]
self.biases.append(bias)
start += bias_size
input_size = size
# Update beta
H = self._forward(X)
self.beta = np.dot(np.linalg.pinv(H), y)
[docs] def get_ndim(self):
"""
Get the total number of dimensions (weights + biases) in the network.
Returns:
- An integer representing the total number of parameters (weights + biases).
"""
total_params = 0
input_size = self.input_size
for i, size in enumerate(self.layer_sizes):
total_params += input_size * size # Add weights
total_params += size # Add biases
input_size = size
return total_params
[docs] def get_weights(self):
print( [w.shape for w in self.weights])
print(self.beta.shape)
return {
"w": self.weights,
"b": self.biases,
"beta": self.beta
}
[docs]class BaseElm(BaseEstimator):
"""
class BaseElm(BaseEstimator):
A base class for implementing Extreme Learning Machines (ELM) with support for both classification and regression tasks.
Attributes
----------
layer_sizes : list
List containing the sizes of each layer in the network.
act_name : str
The name of the activation function to be used.
network : object
The ELM network object.
loss_train : list
List of loss values recorded during training.
n_labels : int
Number of labels in the dataset.
"""
SUPPORTED_CLS_METRICS = get_all_classification_metrics()
SUPPORTED_REG_METRICS = get_all_regression_metrics()
CLS_OBJ_LOSSES = None
def __init__(self, layer_sizes=(10, ), act_name='relu'):
super().__init__()
# Directly assign layer_sizes without modification
if not isinstance(layer_sizes, (list, tuple, np.ndarray, int)):
raise ValueError(f"layer_sizes should be an int, list, tuple, or np.ndarray. Got {type(layer_sizes)}")
self.layer_sizes = layer_sizes if isinstance(layer_sizes, (list, tuple, np.ndarray)) else [layer_sizes]
self.act_name = act_name
self.network, self.loss_train, self.n_labels, self.input_size = None, None, None, None
@staticmethod
def _check_method(method=None, list_supported_methods=None):
if type(method) is str:
return validator.check_str("method", method, list_supported_methods)
else:
raise ValueError(f"method should be a string and belongs to {list_supported_methods}")
[docs] def get_weights(self):
"""
Retrieves the current weights of the neural network.
Returns
-------
list
A list containing the weights of the neural network.
"""
return self.network.get_weights()
[docs] def create_network(self, X, y) -> Optional["MultiLayerELM"]:
"""
Parameters
----------
X : ndarray
Input data used to train the network.
y : ndarray
Target values corresponding to the input data X.
Returns
-------
Optional["MultiLayerELM"]
Returns an instance of the MultiLayerELM class if creation is successful, otherwise, returns None.
"""
return None
[docs] def fit(self, X, y):
"""
Parameters
----------
X : array-like of shape (n_samples, n_features)
Training data.
y : array-like of shape (n_samples,) or (n_samples, n_outputs)
Target values.
"""
self.network = self.create_network(X, y)
y_scaled = self.network.obj_scaler.transform(y)
self.network.fit(X, y_scaled)
return self
[docs] def predict(self, X, return_prob=False):
"""
Inherit the predict function from BaseElm class, with 1 more parameter `return_prob`.
Parameters
----------
X : {array-like, sparse matrix} of shape (n_samples, n_features)
The input data.
return_prob : bool, default=False
It is used for classification problem:
- If True, the returned results are the probability for each sample
- If False, the returned results are the predicted labels
"""
pred = self.network.predict(X)
if return_prob:
return pred
return self.network.obj_scaler.inverse_transform(pred)
def __evaluate_reg(self, y_true, y_pred, list_metrics=("MSE", "MAE")):
"""
Parameters
----------
y_true : array-like
Ground truth (correct) target values.
y_pred : array-like
Estimated target values.
list_metrics : tuple/list of str, optional
List of metric names to evaluate. Default is ("MSE", "MAE").
"""
rm = RegressionMetric(y_true=y_true, y_pred=y_pred)
return rm.get_metrics_by_list_names(list_metrics)
def __evaluate_cls(self, y_true, y_pred, list_metrics=("AS", "RS")):
"""
Parameters
----------
y_true : array-like of shape (n_samples,)
True class labels.
y_pred : array-like of shape (n_samples,)
Predicted class labels by the classifier.
list_metrics : tuple/list of str, optional
List of metric names to evaluate, by default ("AS", "RS").
"""
cm = ClassificationMetric(y_true, y_pred)
return cm.get_metrics_by_list_names(list_metrics)
def __score_reg(self, X, y, method="RMSE"):
"""
Parameters
----------
X : array-like of shape (n_samples, n_features)
The input samples used for prediction.
y : array-like of shape (n_samples,)
The true target values.
method : str, optional, default="RMSE"
The regression metric to be used for scoring. Must be one of the supported metrics in SUPPORTED_REG_METRICS.
Returns
-------
float
The calculated regression metric based on the method provided.
"""
method = self._check_method(method, list(self.SUPPORTED_REG_METRICS.keys()))
y_pred = self.network.predict(X)
return RegressionMetric(y, y_pred).get_metric_by_name(method)[method]
def __scores_reg(self, X, y, list_methods=("MSE", "MAE")):
"""
Parameters
----------
X : array-like of shape (n_samples, n_features)
Input data.
y : array-like of shape (n_samples,)
True values for X.
list_methods : tuple of str, optional
List of evaluation metrics to be used. Default is ("MSE", "MAE").
"""
y_pred = self.network.predict(X)
return self.__evaluate_reg(y_true=y, y_pred=y_pred, list_metrics=list_methods)
def __score_cls(self, X, y, method="AS"):
"""
Parameters
----------
X : array-like of shape (n_samples, n_features)
Test samples to score.
y : array-like of shape (n_samples,)
True labels for X.
method : str, default="AS"
Scoring method to use. Supported methods are determined by the keys in self.SUPPORTED_CLS_METRICS.
Returns
-------
float
Computed score based on the specified method.
"""
method = self._check_method(method, list(self.SUPPORTED_CLS_METRICS.keys()))
return_prob = False
if self.n_labels > 2:
if method in self.CLS_OBJ_LOSSES:
return_prob = True
y_pred = self.predict(X, return_prob=return_prob)
cm = ClassificationMetric(y_true=y, y_pred=y_pred)
return cm.get_metric_by_name(method)[method]
def __scores_cls(self, X, y, list_methods=("AS", "RS")):
"""
Parameters
----------
X : array-like of shape (n_samples, n_features)
Feature matrix for the samples for which predictions are to be made.
y : array-like of shape (n_samples,)
True labels for the samples.
list_methods : tuple of str, optional
List of method names to evaluate. Possible values include 'AS', 'RS', etc. Default is ('AS', 'RS').
Returns
-------
dict
A dictionary with the performance metrics from the selected methods listed in `list_methods`.
"""
list_errors = list(set(list_methods) & set(self.CLS_OBJ_LOSSES))
list_scores = list((set(self.SUPPORTED_CLS_METRICS.keys()) - set(self.CLS_OBJ_LOSSES)) & set(list_methods))
t1 = {}
if len(list_errors) > 0:
return_prob = False
if self.n_labels > 2:
return_prob = True
y_pred = self.predict(X, return_prob=return_prob)
t1 = self.__evaluate_cls(y_true=y, y_pred=y_pred, list_metrics=list_errors)
y_pred = self.predict(X, return_prob=False)
t2 = self.__evaluate_cls(y_true=y, y_pred=y_pred, list_metrics=list_scores)
return {**t2, **t1}
[docs] def evaluate(self, y_true, y_pred, list_metrics=None):
"""Return the list of performance metrics of the prediction.
Parameters
----------
y_true : array-like of shape (n_samples,) or (n_samples, n_outputs)
True values for `X`.
y_pred : array-like of shape (n_samples,) or (n_samples, n_outputs)
Predicted values for `X`.
list_metrics : list
You can get metrics from Permetrics library: https://github.com/thieu1995/permetrics
Returns
-------
results : dict
The results of the list metrics
"""
pass
[docs] def score(self, X, y, method=None):
"""Return the metric of the prediction.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Test samples. For some estimators this may be a precomputed kernel matrix or a list of generic objects instead with shape
``(n_samples, n_samples_fitted)``, where ``n_samples_fitted`` is the number of samples used in the fitting for the estimator.
y : array-like of shape (n_samples,) or (n_samples, n_outputs)
True values for `X`.
method : str, default="RMSE"
You can get metrics from Permetrics library: https://github.com/thieu1995/permetrics
Returns
-------
result : float
The result of selected metric
"""
pass
[docs] def scores(self, X, y, list_methods=None):
"""Return the list of metrics of the prediction.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Test samples. For some estimators this may be a precomputed kernel matrix or a list of generic objects instead with shape
``(n_samples, n_samples_fitted)``, where ``n_samples_fitted`` is the number of samples used in the fitting for the estimator.
y : array-like of shape (n_samples,) or (n_samples, n_outputs)
True values for `X`.
list_methods : list, default=("MSE", "MAE")
You can get metrics from Permetrics library: https://github.com/thieu1995/permetrics
Returns
-------
results : dict
The results of the list metrics
"""
pass
[docs] def save_loss_train(self, save_path="history", filename="loss.csv"):
"""
Save the loss (convergence) during the training process to csv file.
Parameters
----------
save_path : saved path (relative path, consider from current executed script path)
filename : name of the file, needs to have ".csv" extension
"""
Path(save_path).mkdir(parents=True, exist_ok=True)
if self.loss_train is None:
print(f"{self.__class__.__name__} model doesn't have training loss!")
else:
data = {"epoch": list(range(1, len(self.loss_train) + 1)), "loss": self.loss_train}
pd.DataFrame(data).to_csv(f"{save_path}/{filename}", index=False)
[docs] def save_metrics(self, y_true, y_pred, list_metrics=("RMSE", "MAE"), save_path="history", filename="metrics.csv"):
"""
Save evaluation metrics to csv file
Parameters
----------
y_true : ground truth data
y_pred : predicted output
list_metrics : list of evaluation metrics
save_path : saved path (relative path, consider from current executed script path)
filename : name of the file, needs to have ".csv" extension
"""
Path(save_path).mkdir(parents=True, exist_ok=True)
results = self.evaluate(y_true, y_pred, list_metrics)
df = pd.DataFrame.from_dict(results, orient='index').T
df.to_csv(f"{save_path}/{filename}", index=False)
[docs] def save_y_predicted(self, X, y_true, save_path="history", filename="y_predicted.csv"):
"""
Save the predicted results to csv file
Parameters
----------
X : The features data, nd.ndarray
y_true : The ground truth data
save_path : saved path (relative path, consider from current executed script path)
filename : name of the file, needs to have ".csv" extension
"""
Path(save_path).mkdir(parents=True, exist_ok=True)
y_pred = self.predict(X, return_prob=False)
data = {"y_true": np.squeeze(np.asarray(y_true)), "y_pred": np.squeeze(np.asarray(y_pred))}
pd.DataFrame(data).to_csv(f"{save_path}/{filename}", index=False)
[docs] def save_model(self, save_path="history", filename="model.pkl"):
"""
Save model to pickle file
Parameters
----------
save_path : saved path (relative path, consider from current executed script path)
filename : name of the file, needs to have ".pkl" extension
"""
Path(save_path).mkdir(parents=True, exist_ok=True)
if filename[-4:] != ".pkl":
filename += ".pkl"
pickle.dump(self, open(f"{save_path}/{filename}", 'wb'))
[docs] @staticmethod
def load_model(load_path="history", filename="model.pkl"):
"""
Parameters
----------
load_path : str, optional
Directory path where the model file is located. Defaults to "history".
filename : str
Name of the file to be loaded. If the filename doesn't end with ".pkl", the extension is automatically added.
Returns
-------
object
The model loaded from the specified pickle file.
"""
if filename[-4:] != ".pkl":
filename += ".pkl"
return pickle.load(open(f"{load_path}/{filename}", 'rb'))
[docs]class BaseMhaElm(BaseElm):
"""
BaseMhaElm Class
================
An optimization-based extension of the `BaseElm` class, designed for flexible optimization and parameter tuning
of networks utilizing various optimizers from the Mealpy library. It supports different activation functions,
objectives, and optimization strategies.
Attributes
----------
SUPPORTED_OPTIMIZERS : list
List of supported optimizers, obtained from the Mealpy library.
SUPPORTED_CLS_OBJECTIVES : dict
Dictionary of supported classification objectives.
SUPPORTED_REG_OBJECTIVES : dict
Dictionary of supported regression objectives.
obj_name : str
Name of the objective function used for optimization.
optim : str or Optimizer
Name or instance of the optimizer used for optimization.
optim_params : dict
Parameters for configuring the optimizer.
verbose : bool
Whether to display optimization logs.
seed : int
Seed for random number generation.
lb : list, tuple, np.ndarray, int, or float
Lower bounds for optimization variables.
ub : list, tuple, np.ndarray, int, or float
Upper bounds for optimization variables.
mode : str
Optimization mode, e.g., 'single' or 'multi'.
n_workers : int
Number of workers for parallel optimization.
termination : object
Termination criteria for the optimization process.
Methods
-------
__init__(layer_sizes=(10,), act_name="elu", obj_name=None, optim="BaseGA", optim_params=None, seed=None, verbose=True,
lb=None, ub=None, mode='single', n_workers=None, termination=None)
Initializes the `BaseMhaElm` with specified parameters.
get_name()
Returns the name of the optimizer used, appended with `-ELM`.
set_params(**params)
Sets the parameters for the optimizer and updates parent class parameters.
set_optimizer(optim)
Sets the optimizer attribute.
set_optim_paras(optim_params)
Sets the optimizer parameters attribute.
set_optimizer_object(optim=None, optim_params=None)
Sets the optimizer object with the specified parameters.
set_seed(seed)
Sets the random seed for the class.
objective_function(solution=None)
Placeholder fitness function to be overridden by the subclass or user.
set_lb_ub(lb=None, ub=None, n_dims=None)
Validates and sets the lower and upper bounds for optimization.
_get_minmax(obj_name=None)
Retrieves the minmax value for the specified objective name.
fit(X, y)
Fits the model to the provided data using the specified optimization parameters.
"""
SUPPORTED_OPTIMIZERS = list(get_all_optimizers(verbose=False).keys())
SUPPORTED_CLS_OBJECTIVES = get_all_classification_metrics()
SUPPORTED_REG_OBJECTIVES = get_all_regression_metrics()
def __init__(self, layer_sizes=(10, ), act_name="elu",
obj_name=None, optim="BaseGA", optim_params=None, seed=None, verbose=True,
lb=None, ub=None, mode='single', n_workers=None, termination=None):
super().__init__(layer_sizes=layer_sizes, act_name=act_name)
self.obj_name = obj_name
self.optim = optim
self.optim_params = optim_params
self.verbose = verbose
self.seed = seed
self.lb = lb
self.ub = ub
self.mode = mode
self.n_workers = n_workers
self.termination = termination
self.network, self.obj_weights = None, None
[docs] def get_name(self):
if type(self.optim) is str:
return f"{self.optim_params}-ELM"
return f"{self.optimizer.name}-ELM"
[docs] def set_params(self, **params):
if self.optim_params is None:
self.optim_params = {}
# Handle nested parameters for the optimizer
optimizer_params = {k.split('__')[1]: v for k, v in params.items() if k.startswith('optim_paras__')}
if optimizer_params:
self.optim_params.update(optimizer_params)
# Pass non-optimizer parameters to the parent class set_params
super_params = {k: v for k, v in params.items() if not k.startswith('optim_paras__')}
super().set_params(**super_params)
return self
[docs] def set_optimizer(self, optim):
self.optimizer = optim
[docs] def set_optim_paras(self, optim_params):
self.optim_params = optim_params
[docs] def set_optimizer_object(self, optim=None, optim_params=None):
"""
Validates the real optimizer based on the provided `optim` and `optim_pras`.
Parameters
----------
optim : str or Optimizer
The optimizer name or instance to be set.
optim_params : dict, optional
Parameters to configure the optimizer.
Returns
-------
Optimizer
An instance of the selected optimizer.
Raises
------
TypeError
If the provided optimizer is neither a string nor an instance of Optimizer.
"""
if isinstance(optim, str):
opt_class = get_optimizer_by_class(optim)
if isinstance(optim_params, dict):
return opt_class(**optim_params)
else:
return opt_class(epoch=300, pop_size=30)
elif isinstance(optim, Optimizer):
if isinstance(optim_params, dict):
if "name" in optim_params: # Check if key exists and remove it
optim.name = optim_params.pop("name")
optim.set_parameters(optim_params)
return optim
else:
raise TypeError(f"optimizer needs to set as a string and supported by Mealpy library.")
[docs] def set_seed(self, seed):
"""
Parameters
----------
seed : int
The seed value to initialize the random number generator.
"""
self.seed = seed
[docs] def objective_function(self, solution=None):
"""
Evaluates the fitness function for classification metrics based on the provided solution.
Parameters
----------
solution : np.ndarray, default=None
The proposed solution to evaluate.
Returns
-------
result : float
The fitness value, representing the loss for the current solution.
"""
pass
[docs] @staticmethod
def set_lb_ub(lb=None, ub=None, n_dims=None):
"""
Validates and sets the lower and upper bounds for optimization.
Parameters
----------
lb : list, tuple, np.ndarray, int, or float, optional
The lower bounds for weights and biases in network.
ub : list, tuple, np.ndarray, int, or float, optional
The upper bounds for weights and biases in network.
n_dims : int
The number of dimensions.
Returns
-------
tuple
A tuple containing validated lower and upper bounds.
Raises
------
ValueError
If the bounds are not valid.
"""
if lb is None:
lb = (-10.,) * n_dims
elif isinstance(lb, numbers.Number):
lb = (lb, ) * n_dims
elif isinstance(lb, (list, tuple, np.ndarray)):
if len(lb) == 1:
lb = np.array(lb * n_dims, dtype=float)
else:
lb = np.array(lb, dtype=float).ravel()
if ub is None:
ub = (10.,) * n_dims
elif isinstance(ub, numbers.Number):
ub = (ub, ) * n_dims
elif isinstance(ub, (list, tuple, np.ndarray)):
if len(ub) == 1:
ub = np.array(ub * n_dims, dtype=float)
else:
ub = np.array(ub, dtype=float).ravel()
if len(lb) != len(ub):
raise ValueError(f"Invalid lb and ub. Their length should be equal to 1 or {n_dims}.")
return np.array(lb).ravel(), np.array(ub).ravel()
[docs] def fit(self, X, y):
"""
Parameters
----------
X : The features data, np.ndarray
y : The ground truth data
"""
self.network = self.create_network(X, y)
y_scaled = self.network.obj_scaler.transform(y)
self.X_temp, self.y_temp = X, y_scaled
problem_size = self.network.get_ndim()
lb, ub = self.set_lb_ub(self.lb, self.ub, problem_size)
log_to = "console" if self.verbose else "None"
if self.obj_name is None:
raise ValueError("obj_name can't be None")
else:
if self.obj_name in self.SUPPORTED_REG_OBJECTIVES.keys():
minmax = self.SUPPORTED_REG_OBJECTIVES[self.obj_name]
elif self.obj_name in self.SUPPORTED_CLS_OBJECTIVES.keys():
minmax = self.SUPPORTED_CLS_OBJECTIVES[self.obj_name]
else:
raise ValueError("obj_name is not supported. Please check the library: permetrics to see the supported objective function.")
problem = {
"obj_func": self.objective_function,
"bounds": FloatVar(lb=lb, ub=ub),
"minmax": minmax,
"log_to": log_to,
"obj_weights": self.obj_weights
}
self.optimizer = self.set_optimizer_object(self.optim, self.optim_params)
g_best = self.optimizer.solve(problem, mode=self.mode, n_workers=self.n_workers, termination=self.termination, seed=self.seed)
self.solution, self.best_fit = g_best.solution, g_best.target.fitness
self.network.decode(self.solution, self.X_temp, self.y_temp)
self.loss_train = np.array(self.optimizer.history.list_global_best_fit)
return self