diff --git a/pygad/__init__.pyc b/pygad/__init__.pyc new file mode 100644 index 0000000..51dc8ce Binary files /dev/null and b/pygad/__init__.pyc differ diff --git a/pygad/__pycache__/__init__.cpython-313.pyc b/pygad/__pycache__/__init__.cpython-313.pyc new file mode 100644 index 0000000..ef33fca Binary files /dev/null and b/pygad/__pycache__/__init__.cpython-313.pyc differ diff --git a/pygad/__pycache__/__init__.cpython-39.pyc b/pygad/__pycache__/__init__.cpython-39.pyc new file mode 100644 index 0000000..4f4fef0 Binary files /dev/null and b/pygad/__pycache__/__init__.cpython-39.pyc differ diff --git a/pygad/__pycache__/pygad.cpython-313.pyc b/pygad/__pycache__/pygad.cpython-313.pyc new file mode 100644 index 0000000..13d6645 Binary files /dev/null and b/pygad/__pycache__/pygad.cpython-313.pyc differ diff --git a/pygad/__pycache__/pygad.cpython-39.pyc b/pygad/__pycache__/pygad.cpython-39.pyc new file mode 100644 index 0000000..813df2f Binary files /dev/null and b/pygad/__pycache__/pygad.cpython-39.pyc differ diff --git a/pygad/helper/__pycache__/__init__.cpython-313.pyc b/pygad/helper/__pycache__/__init__.cpython-313.pyc new file mode 100644 index 0000000..a776c12 Binary files /dev/null and b/pygad/helper/__pycache__/__init__.cpython-313.pyc differ diff --git a/pygad/helper/__pycache__/__init__.cpython-39.pyc b/pygad/helper/__pycache__/__init__.cpython-39.pyc new file mode 100644 index 0000000..0427562 Binary files /dev/null and b/pygad/helper/__pycache__/__init__.cpython-39.pyc differ diff --git a/pygad/helper/__pycache__/misc.cpython-313.pyc b/pygad/helper/__pycache__/misc.cpython-313.pyc new file mode 100644 index 0000000..a27b181 Binary files /dev/null and b/pygad/helper/__pycache__/misc.cpython-313.pyc differ diff --git a/pygad/helper/__pycache__/misc.cpython-39.pyc b/pygad/helper/__pycache__/misc.cpython-39.pyc new file mode 100644 index 0000000..5911a36 Binary files /dev/null and b/pygad/helper/__pycache__/misc.cpython-39.pyc differ diff --git a/pygad/helper/__pycache__/unique.cpython-313.pyc b/pygad/helper/__pycache__/unique.cpython-313.pyc new file mode 100644 index 0000000..39e71b9 Binary files /dev/null and b/pygad/helper/__pycache__/unique.cpython-313.pyc differ diff --git a/pygad/helper/__pycache__/unique.cpython-39.pyc b/pygad/helper/__pycache__/unique.cpython-39.pyc new file mode 100644 index 0000000..1425991 Binary files /dev/null and b/pygad/helper/__pycache__/unique.cpython-39.pyc differ diff --git a/pygad/pygad.py b/pygad/pygad.py index 6eca219..6126494 100644 --- a/pygad/pygad.py +++ b/pygad/pygad.py @@ -1,3 +1,4 @@ +# -*- coding: utf-8 -*- import numpy import random import cloudpickle @@ -133,90 +134,314 @@ def __init__(self, logger: Added in PyGAD 2.20.0. It accepts a logger object of the 'logging.Logger' class to log the messages. If no logger is passed, then a default logger is created to log/print the messages to the console exactly like using the 'print()' function. """ - try: - # If no logger is passed, then create a logger that logs only the messages to the console. - if logger is None: - # Create a logger named with the module name. - logger = logging.getLogger(__name__) - # Set the logger log level to 'DEBUG' to log all kinds of messages. - logger.setLevel(logging.DEBUG) - - # Clear any attached handlers to the logger from the previous runs. - # If the handlers are not cleared, then the new handler will be appended to the list of handlers. - # This makes the single log message be repeated according to the length of the list of handlers. - logger.handlers.clear() - - # Create the handlers. - stream_handler = logging.StreamHandler() - # Set the handler log level to 'DEBUG' to log all kinds of messages received from the logger. - stream_handler.setLevel(logging.DEBUG) - - # Create the formatter that just includes the log message. - formatter = logging.Formatter('%(message)s') - - # Add the formatter to the handler. - stream_handler.setFormatter(formatter) - - # Add the handler to the logger. - logger.addHandler(stream_handler) - else: - # Validate that the passed logger is of type 'logging.Logger'. - if isinstance(logger, logging.Logger): - pass - else: - raise TypeError(f"The expected type of the 'logger' parameter is 'logging.Logger' but {type(logger)} found.") - - # Create the 'self.logger' attribute to hold the logger. - # Instead of using 'print()', use 'self.logger.info()' - self.logger = logger - - self.random_seed = random_seed - if random_seed is None: + # Initialize the valid_parameters attribute + self.valid_parameters = False + + # Initialize basic attributes + self.run_completed = False + self.generations_completed = 0 + + # If no logger is passed, then create a logger that logs only the messages to the console. + if logger is None: + # Create a logger named with the module name. + logger = logging.getLogger(__name__) + # Set the logger log level to 'DEBUG' to log all kinds of messages. + logger.setLevel(logging.DEBUG) + + # Clear any attached handlers to the logger from the previous runs. + # If the handlers are not cleared, then the new handler will be appended to the list of handlers. + # This makes the single log message be repeated according to the length of the list of handlers. + logger.handlers.clear() + + # Create the handlers. + stream_handler = logging.StreamHandler() + # Set the handler log level to 'DEBUG' to log all kinds of messages received from the logger. + stream_handler.setLevel(logging.DEBUG) + + # Create the formatter that just includes the log message. + formatter = logging.Formatter('%(message)s') + + # Add the formatter to the handler. + stream_handler.setFormatter(formatter) + + # Add the handler to the logger. + logger.addHandler(stream_handler) + else: + # Validate that the passed logger is of type 'logging.Logger'. + if isinstance(logger, logging.Logger): pass else: - numpy.random.seed(self.random_seed) - random.seed(self.random_seed) + raise TypeError(f"The expected type of the 'logger' parameter is 'logging.Logger' but {type(logger)} found.") - # If suppress_warnings is bool and its value is False, then print warning messages. - if type(suppress_warnings) is bool: - self.suppress_warnings = suppress_warnings - else: - self.valid_parameters = False - raise TypeError(f"The expected type of the 'suppress_warnings' parameter is bool but {type(suppress_warnings)} found.") + # Create the 'self.logger' attribute to hold the logger. + # Instead of using 'print()', use 'self.logger.info()' + self.logger = logger - # Validating mutation_by_replacement - if not (type(mutation_by_replacement) is bool): - self.valid_parameters = False - raise TypeError(f"The expected type of the 'mutation_by_replacement' parameter is bool but {type(mutation_by_replacement)} found.") - - self.mutation_by_replacement = mutation_by_replacement + self.random_seed = random_seed + if random_seed is None: + pass + else: + numpy.random.seed(self.random_seed) + random.seed(self.random_seed) - # Validate the sample_size parameter. + # If suppress_warnings is bool and its value is False, then print warning messages. + if type(suppress_warnings) is bool: + self.suppress_warnings = suppress_warnings + else: + self.valid_parameters = False + raise TypeError(f"The expected type of the 'suppress_warnings' parameter is bool but {type(suppress_warnings)} found.") + + # Multi-objective environment states + self.ENV_FAST = "ENV_FAST" + self.ENV_STABLE = "ENV_STABLE" + self.ENV_DIVERSE = "ENV_DIVERSE" + self.environment_cycle = 10 # Switch every 10 generations + self.base_crossover_rate = crossover_probability if crossover_probability is not None else 0.9 + self.base_mutation_rate = mutation_probability if mutation_probability is not None else 0.1 + self.base_parent_selection = parent_selection_type + + # Validating mutation_by_replacement + if not (type(mutation_by_replacement) is bool): + self.valid_parameters = False + raise TypeError(f"The expected type of the 'mutation_by_replacement' parameter is bool but {type(mutation_by_replacement)} found.") + + self.mutation_by_replacement = mutation_by_replacement + + # Initialize core parameters + self.num_generations = num_generations + self.num_parents_mating = num_parents_mating + self.fitness_func = fitness_func + self.fitness_batch_size = fitness_batch_size + self.initial_population = initial_population + self.sol_per_pop = sol_per_pop + self.num_genes = num_genes + self.init_range_low = init_range_low + self.init_range_high = init_range_high + self.gene_type = gene_type + # Handle both single type and (type, precision) tuple formats + if isinstance(gene_type, (list, tuple)): + self.gene_type_single = False + # Ensure each element is a (type, precision) tuple + for i, gt in enumerate(gene_type): + if not isinstance(gt, (list, tuple)) or len(gt) != 2: + self.valid_parameters = False + raise TypeError(f"Each element in gene_type list/tuple must be a (type, precision) tuple, but found {gt} at index {i}") + # Convert list to tuple for consistency + gene_type[i] = tuple(gt) + else: + self.gene_type_single = True + # Wrap single type into (type, None) tuple format for consistency + self.gene_type = (gene_type, None) + self.gene_space = gene_space + self.gene_space_nested = False + self.gene_space_unpacked = None + self.allow_duplicate_genes = allow_duplicate_genes + self.parent_selection_type = parent_selection_type + self.keep_parents = keep_parents + self.keep_elitism = keep_elitism + self.K_tournament = K_tournament + self.crossover_type = crossover_type + self.crossover_probability = crossover_probability + self.mutation_type = mutation_type + self.mutation_probability = mutation_probability + self.mutation_percent_genes = mutation_percent_genes + self.mutation_num_genes = mutation_num_genes + # Initialize parallel processing attribute + self.parallel_processing = parallel_processing + # Initialize solution saving attributes + self.save_solutions = save_solutions + self.save_best_solutions = save_best_solutions + # Initialize history tracking attributes + self.best_solutions = None + self.best_solutions_fitness = [] + self.last_generation_fitness = None + # Initialize parent selection attribute + self.select_parents = self.steady_state_selection + # Initialize crossover attribute + self.crossover = self.single_point_crossover + # Initialize keep_parents with default value if not set + if not hasattr(self, 'keep_parents'): + self.keep_parents = 1 + # Initialize offspring size attribute + self.num_offspring = self.sol_per_pop - self.keep_parents + # Ensure num_offspring is not negative + if self.num_offspring < 0: + self.num_offspring = 0 + # Initialize mutation attribute + self.mutation = self.random_mutation + # Initialize random mutation range attributes + self.random_mutation_min_val = -1.0 + self.random_mutation_max_val = 1.0 + self.last_generation_parents = None + self.last_generation_elitism = None + self.fitness_history = [] + self.generations_completed = 0 + self.run_completed = False + # Initialize population and best solution attributes + self.population = None + self.solutions = None + self.solutions_fitness = None + self.best_solution_instance = None + self.best_solution_fitness = None + # Initialize callback functions + self.on_start = on_start + self.on_fitness = on_fitness + self.on_parents = on_parents + self.on_crossover = on_crossover + self.on_mutation = on_mutation + self.on_generation = on_generation + self.on_stop = on_stop + + # Validate the sample_size parameter. + if sample_size is None: + self.sample_size = 100 + else: if type(sample_size) in GA.supported_int_types: if sample_size > 0: - pass + self.sample_size = sample_size else: self.valid_parameters = False raise ValueError(f"The value of the sample_size parameter must be > 0 but the value ({sample_size}) found.") else: self.valid_parameters = False - raise TypeError(f"The type of the sample_size parameter must be integer but the value ({sample_size}) of type ({type(sample_size)}) found.") + raise TypeError(f"The expected type of the 'sample_size' parameter is int but {type(sample_size)} found.") + + # Validate the gene_constraint parameter + if gene_constraint is not None: + if type(gene_constraint) in [list, tuple]: + for constraint_idx, constraint in enumerate(gene_constraint): + if not (type(constraint) is list or type(constraint) is tuple): + self.valid_parameters = False + raise TypeError(f"Each constraint in the 'gene_constraint' parameter must be a list or tuple but found type {type(constraint)} at index {constraint_idx}.") + if not len(constraint) == 2: + self.valid_parameters = False + raise ValueError(f"Each constraint in the 'gene_constraint' parameter must have exactly 2 elements but found {len(constraint)} elements at index {constraint_idx}.") + else: + self.valid_parameters = False + raise TypeError(f"The 'gene_constraint' parameter must be a list or tuple but found type {type(gene_constraint)}.") - self.sample_size = sample_size + # Validate num_parents_mating + if num_parents_mating is None: + self.num_parents_mating = int(self.sol_per_pop / 2) if hasattr(self, 'sol_per_pop') else 2 + else: + if type(num_parents_mating) in GA.supported_int_types: + if hasattr(self, 'sol_per_pop') and num_parents_mating > self.sol_per_pop: + self.valid_parameters = False + raise ValueError(f"The num_parents_mating parameter value ({num_parents_mating}) cannot be greater than the sol_per_pop parameter value ({self.sol_per_pop}).") + if num_parents_mating <= 0: + self.valid_parameters = False + raise ValueError(f"The num_parents_mating parameter value must be > 0 but found {num_parents_mating}.") + self.num_parents_mating = num_parents_mating + else: + self.valid_parameters = False + raise TypeError(f"The num_parents_mating parameter must be an integer but found type {type(num_parents_mating)}.") - # Validate allow_duplicate_genes - if not (type(allow_duplicate_genes) is bool): + # Validate crossover_type + if crossover_type is None: + self.crossover_type = "single_point" + else: + if callable(crossover_type): + # Check function signature + sig = inspect.signature(crossover_type) + params = list(sig.parameters.keys()) + if len(params) != 3: + self.valid_parameters = False + raise TypeError(f"Custom crossover function must accept exactly 3 parameters (parents, offspring_size, ga_instance) but found {len(params)} parameters.") + self.crossover_type = crossover_type + elif type(crossover_type) is str: + supported_crossover = ["single_point", "two_points", "uniform", "scattered"] + if crossover_type not in supported_crossover: + self.valid_parameters = False + raise ValueError(f"Unsupported crossover_type '{crossover_type}'. Supported types are: {supported_crossover}") + self.crossover_type = crossover_type + else: self.valid_parameters = False - raise TypeError(f"The expected type of the 'allow_duplicate_genes' parameter is bool but {type(allow_duplicate_genes)} found.") + raise TypeError(f"crossover_type must be either a string or a callable function but found type {type(crossover_type)}.") - self.allow_duplicate_genes = allow_duplicate_genes + # Initialize population + if self.initial_population is None: + if (self.sol_per_pop is None) or (self.num_genes is None): + self.valid_parameters = False + raise TypeError("When initial_population is None, sol_per_pop and num_genes cannot be None.") + else: + # Create initial population using initialize_population method + self.initialize_population(allow_duplicate_genes=self.allow_duplicate_genes, gene_type=self.gene_type, gene_constraint=gene_constraint) + else: + # Use provided initial population + self.population = numpy.array(self.initial_population) + self.sol_per_pop = self.population.shape[0] + self.num_genes = self.population.shape[1] - # Validate gene_space - self.gene_space_nested = False - if type(gene_space) is type(None): - pass - elif type(gene_space) is range: - if len(gene_space) == 0: + # Set valid_parameters to True after all validations pass + self.valid_parameters = True + + def get_environment_state(self, generation): + """ + Returns the current environment state based on the generation number. + Cycles through ENV_STABLE -> ENV_FAST -> ENV_DIVERSE every 10 generations. + """ + cycle_pos = (generation // self.environment_cycle) % 3 + if cycle_pos == 0: + return self.ENV_STABLE + elif cycle_pos == 1: + return self.ENV_FAST + else: + return self.ENV_DIVERSE + + def dominates(self, sol1, sol2): + """ + Checks if solution 1 dominates solution 2 based on the 3 objectives: + - fitness_score: higher is better + - time_cost: lower is better + - diversity_score: higher is better + """ + # Solution 1 must be better or equal in all objectives and strictly better in at least one + better_in_all = (sol1["fitness"] >= sol2["fitness"] and + sol1["time"] <= sol2["time"] and + sol1["diversity"] >= sol2["diversity"]) + + strictly_better_in_one = (sol1["fitness"] > sol2["fitness"] or + sol1["time"] < sol2["time"] or + sol1["diversity"] > sol2["diversity"]) + + return better_in_all and strictly_better_in_one + + def calculate_pareto_front(self, pop_fitness): + """ + Calculates the Pareto optimal front from the population fitness values. + """ + pareto_front = [] + + for i, sol1 in enumerate(pop_fitness): + is_non_dominated = True + for j, sol2 in enumerate(pop_fitness): + if i != j and self.dominates(sol2, sol1): + is_non_dominated = False + break + if is_non_dominated: + pareto_front.append({ + "fitness": sol1["fitness"], + "time": sol1["time"], + "diversity": sol1["diversity"] + }) + + return pareto_front + + self.sample_size = sample_size + + # Validate allow_duplicate_genes + if not (type(allow_duplicate_genes) is bool): + self.valid_parameters = False + raise TypeError(f"The expected type of the 'allow_duplicate_genes' parameter is bool but {type(allow_duplicate_genes)} found.") + + self.allow_duplicate_genes = allow_duplicate_genes + + # Validate gene_space + self.gene_space_nested = False + if type(gene_space) is type(None): + pass + elif type(gene_space) is range: + if len(gene_space) == 0: self.valid_parameters = False raise ValueError("'gene_space' cannot be empty (i.e. its length must be >= 0).") elif type(gene_space) in [list, numpy.ndarray]: @@ -1333,10 +1558,6 @@ def validate_multi_stop_criteria(self, stop_word, number): self.last_generation_elitism_indices = None # Supported in PyGAD 3.2.0. It holds the pareto fronts when solving a multi-objective problem. self.pareto_fronts = None - except Exception as e: - self.logger.exception(e) - # sys.exit(-1) - raise e def round_genes(self, solutions): for gene_idx in range(self.num_genes): @@ -1527,10 +1748,13 @@ def cal_pop_fitness(self): if type(self.best_solutions) is numpy.ndarray: self.best_solutions = self.best_solutions.tolist() + # Ensure solutions is initialized properly + if self.solutions is None: + self.solutions = [] if (self.save_solutions) and (len(self.solutions) > 0) and (list(sol) in self.solutions): solution_idx = self.solutions.index(list(sol)) fitness = self.solutions_fitness[solution_idx] - elif (self.save_best_solutions) and (len(self.best_solutions) > 0) and (list(sol) in self.best_solutions): + elif (self.save_best_solutions) and (self.best_solutions is not None) and (len(self.best_solutions) > 0) and (list(sol) in self.best_solutions): solution_idx = self.best_solutions.index(list(sol)) fitness = self.best_solutions_fitness[solution_idx] elif (self.keep_elitism > 0) and (self.last_generation_elitism is not None) and (len(self.last_generation_elitism) > 0) and (list(sol) in last_generation_elitism_as_list): @@ -1556,17 +1780,21 @@ def cal_pop_fitness(self): else: # Check if batch processing is used. If not, then calculate this missing fitness value. if self.fitness_batch_size in [1, None]: - fitness = self.fitness_func(self, sol, sol_idx) - if type(fitness) in GA.supported_int_float_types: - # The fitness function returns a single numeric value. - # This is a single-objective optimization problem. - pass - elif type(fitness) in [list, tuple, numpy.ndarray]: - # The fitness function returns a list/tuple/numpy.ndarray. - # This is a multi-objective optimization problem. - pass - else: - raise ValueError(f"The fitness function should return a number or an iterable (list, tuple, or numpy.ndarray) but the value {fitness} of type {type(fitness)} found.") + import time + start_time = time.time() + fitness_score = self.fitness_func(self, sol, sol_idx) + time_cost = (time.time() - start_time) * 1000 + pop_mean = numpy.mean(self.population, axis=0) + diversity_score = numpy.linalg.norm(sol - pop_mean) + + fitness = { + "fitness": fitness_score, + "time": time_cost, + "diversity": diversity_score + } + + if type(fitness_score) not in GA.supported_int_float_types + [list, tuple, numpy.ndarray]: + raise ValueError(f"The fitness function should return a number or an iterable (list, tuple, or numpy.ndarray) but the value {fitness_score} of type {type(fitness_score)} found.") else: # Reaching this point means that batch processing is in effect to calculate the fitness values. # Do not continue the loop as no fitness is calculated. The fitness will be calculated later in batch mode. @@ -1589,24 +1817,28 @@ def cal_pop_fitness(self): batch_indices = solutions_indices[batch_first_index:batch_last_index] batch_solutions = self.population[batch_indices, :] - batch_fitness = self.fitness_func( + import time + start_time = time.time() + batch_fitness_scores = self.fitness_func( self, batch_solutions, batch_indices) - if type(batch_fitness) not in [list, tuple, numpy.ndarray]: - raise TypeError(f"Expected to receive a list, tuple, or numpy.ndarray from the fitness function but the value ({batch_fitness}) of type {type(batch_fitness)}.") - elif len(numpy.array(batch_fitness)) != len(batch_indices): - raise ValueError(f"There is a mismatch between the number of solutions passed to the fitness function ({len(batch_indices)}) and the number of fitness values returned ({len(batch_fitness)}). They must match.") - - for index, fitness in zip(batch_indices, batch_fitness): - if type(fitness) in GA.supported_int_float_types: - # The fitness function returns a single numeric value. - # This is a single-objective optimization problem. - pop_fitness[index] = fitness - elif type(fitness) in [list, tuple, numpy.ndarray]: - # The fitness function returns a list/tuple/numpy.ndarray. - # This is a multi-objective optimization problem. - pop_fitness[index] = fitness - else: - raise ValueError(f"The fitness function should return a number or an iterable (list, tuple, or numpy.ndarray) but the value {fitness} of type {type(fitness)} found.") + batch_time_cost = (time.time() - start_time) * 1000 / len(batch_solutions) # Average time per solution + + # Calculate diversity scores for batch + pop_mean = numpy.mean(self.population, axis=0) + batch_diversity_scores = [numpy.linalg.norm(sol - pop_mean) for sol in batch_solutions] + + if type(batch_fitness_scores) not in [list, tuple, numpy.ndarray]: + raise TypeError(f"Expected to receive a list, tuple, or numpy.ndarray from the fitness function but the value ({batch_fitness_scores}) of type {type(batch_fitness_scores)}.") + elif len(numpy.array(batch_fitness_scores)) != len(batch_indices): + raise ValueError(f"There is a mismatch between the number of solutions passed to the fitness function ({len(batch_indices)}) and the number of fitness values returned ({len(batch_fitness_scores)}). They must match.") + + for index, fitness_score, diversity_score in zip(batch_indices, batch_fitness_scores, batch_diversity_scores): + # Always wrap fitness in a dictionary for consistency + pop_fitness[index] = { + "fitness": fitness_score, + "time": batch_time_cost, + "diversity": diversity_score + } else: # Calculating the fitness value of each solution in the current population. for sol_idx, sol in enumerate(self.population): @@ -1667,13 +1899,27 @@ def cal_pop_fitness(self): # Check if batch processing is used. If not, then calculate the fitness value for individual solutions. if self.fitness_batch_size in [1, None]: - for index, fitness in zip(solutions_to_submit_indices, executor.map(self.fitness_func, [self]*len(solutions_to_submit_indices), solutions_to_submit, solutions_to_submit_indices)): - if type(fitness) in GA.supported_int_float_types: - # The fitness function returns a single numeric value. - # This is a single-objective optimization problem. + import time + # Calculate diversity scores first + pop_mean = numpy.mean(self.population, axis=0) + + for idx, (index, sol) in enumerate(zip(solutions_to_submit_indices, solutions_to_submit)): + start_time = time.time() + fitness_score = self.fitness_func(self, sol, index) + time_cost = (time.time() - start_time) * 1000 + diversity_score = numpy.linalg.norm(sol - pop_mean) + + fitness = { + "fitness": fitness_score, + "time": time_cost, + "diversity": diversity_score + } + + if type(fitness_score) in GA.supported_int_float_types: + # Single value fitness pop_fitness[index] = fitness - elif type(fitness) in [list, tuple, numpy.ndarray]: - # The fitness function returns a list/tuple/numpy.ndarray. + elif type(fitness_score) in [list, tuple, numpy.ndarray]: + # Multi-objective fitness # This is a multi-objective optimization problem. pop_fitness[index] = fitness else: @@ -1697,23 +1943,27 @@ def cal_pop_fitness(self): batches_solutions.append(batch_solutions) batches_indices.append(batch_indices) + # Calculate diversity scores first + pop_mean = numpy.mean(self.population, axis=0) + for batch_indices, batch_fitness in zip(batches_indices, executor.map(self.fitness_func, [self]*len(solutions_to_submit_indices), batches_solutions, batches_indices)): if type(batch_fitness) not in [list, tuple, numpy.ndarray]: raise TypeError(f"Expected to receive a list, tuple, or numpy.ndarray from the fitness function but the value ({batch_fitness}) of type {type(batch_fitness)}.") elif len(numpy.array(batch_fitness)) != len(batch_indices): raise ValueError(f"There is a mismatch between the number of solutions passed to the fitness function ({len(batch_indices)}) and the number of fitness values returned ({len(batch_fitness)}). They must match.") - for index, fitness in zip(batch_indices, batch_fitness): - if type(fitness) in GA.supported_int_float_types: - # The fitness function returns a single numeric value. - # This is a single-objective optimization problem. - pop_fitness[index] = fitness - elif type(fitness) in [list, tuple, numpy.ndarray]: - # The fitness function returns a list/tuple/numpy.ndarray. - # This is a multi-objective optimization problem. - pop_fitness[index] = fitness - else: - raise ValueError(f"The fitness function should return a number or an iterable (list, tuple, or numpy.ndarray) but the value ({fitness}) of type {type(fitness)} found.") + import time + start_time = time.time() + batch_time_cost = (time.time() - start_time) * 1000 / len(batch_indices) + + for index, fitness_score in zip(batch_indices, batch_fitness): + diversity_score = numpy.linalg.norm(self.population[index] - pop_mean) + fitness = { + "fitness": fitness_score, + "time": batch_time_cost, + "diversity": diversity_score + } + pop_fitness[index] = fitness pop_fitness = numpy.array(pop_fitness) except Exception as ex: @@ -1782,6 +2032,31 @@ def run(self): for generation in range(generation_first_idx, generation_last_idx): + # Update environment state every cycle generations + current_env = self.get_environment_state(generation) + + # Log environment switch + if generation % self.environment_cycle == 0: + self.logger.info(f"[Generation {generation}] Environment switched to: {current_env}") + + # Adjust GA parameters based on environment + if current_env == self.ENV_STABLE: + # Prioritize fitness_score - use exploitation + self.parent_selection_type = "sss" # Steady-state selection + self.crossover_probability = self.base_crossover_rate * 0.8 + self.mutation_probability = self.base_mutation_rate * 0.5 + elif current_env == self.ENV_FAST: + # Prioritize time_cost - faster convergence + self.parent_selection_type = "rws" # Roulette wheel selection + self.crossover_probability = self.base_crossover_rate * 1.2 + self.mutation_probability = self.base_mutation_rate * 0.3 + elif current_env == self.ENV_DIVERSE: + # Prioritize diversity_score - use exploration + self.parent_selection_type = "tournament" # Tournament selection with high K + self.K_tournament = 5 + self.crossover_probability = self.base_crossover_rate * 1.0 + self.mutation_probability = self.base_mutation_rate * 1.5 + self.run_loop_head(best_solution_fitness) # Call the 'run_select_parents()' method to select the parents. @@ -1816,6 +2091,17 @@ def run(self): best_solution, best_solution_fitness, best_match_idx = self.best_solution( pop_fitness=self.last_generation_fitness) + # Generate Pareto optimal set + pareto_front = self.calculate_pareto_front(self.last_generation_fitness) + + # Output Pareto front information + pareto_data = { + "generation": generation, + "pareto_front": pareto_front + } + + self.logger.info(f"[Generation {generation}] Pareto Front: {len(pareto_front)} individuals") + # Appending the best solution in the current generation to the best_solutions list. if self.save_best_solutions: self.best_solutions.append(list(best_solution)) @@ -1830,6 +2116,10 @@ def run(self): self.best_solutions_fitness.append(best_solution_fitness) break + # Check if stop_criteria attribute exists, initialize to None if not + if not hasattr(self, 'stop_criteria'): + self.stop_criteria = None + if not self.stop_criteria is None: for criterion in self.stop_criteria: if criterion[0] == "reach": @@ -1910,8 +2200,16 @@ def run(self): pop_fitness=self.last_generation_fitness) self.best_solutions_fitness.append(best_solution_fitness) - self.best_solution_generation = numpy.where(numpy.array( - self.best_solutions_fitness) == numpy.max(numpy.array(self.best_solutions_fitness)))[0][0] + # Handle dictionary format fitness values in best_solutions_fitness + if len(self.best_solutions_fitness) > 0 and isinstance(self.best_solutions_fitness[0], dict): + # Extract the fitness values from the dictionaries + best_fitness_values = numpy.array([sol["fitness"] for sol in self.best_solutions_fitness]) + max_fitness = numpy.max(best_fitness_values) + self.best_solution_generation = numpy.where(best_fitness_values == max_fitness)[0][0] + else: + # Original handling for non-dictionary fitness values + self.best_solution_generation = numpy.where(numpy.array( + self.best_solutions_fitness) == numpy.max(numpy.array(self.best_solutions_fitness)))[0][0] # After the run() method completes, the run_completed flag is changed from False to True. # Set to True only after the run() method completes gracefully. self.run_completed = True @@ -1957,6 +2255,11 @@ def run_loop_head(self, best_solution_fitness): # self.solutions.extend(self.population.copy()) population_as_list = self.population.copy() population_as_list = [list(item) for item in population_as_list] + # Ensure solutions and solutions_fitness are initialized properly + if self.solutions is None: + self.solutions = [] + if self.solutions_fitness is None: + self.solutions_fitness = [] self.solutions.extend(population_as_list) self.solutions_fitness.extend(self.last_generation_fitness) @@ -2064,6 +2367,11 @@ def run_crossover(self): None. """ + # If both crossover_type and mutation_type are None, then no changes to the population + if self.crossover_type is None and self.mutation_type is None: + self.last_generation_offspring_crossover = self.population.copy() + return + # If self.crossover_type=None, then no crossover is applied and thus no offspring will be created in the next generations. The next generation will use the solutions in the current population. if self.crossover_type is None: if self.keep_elitism == 0: @@ -2094,6 +2402,9 @@ def run_crossover(self): else: self.last_generation_offspring_crossover = self.crossover(self.last_generation_parents, offspring_size=(self.num_offspring, self.num_genes)) + # Ensure we have exactly num_offspring offspring after crossover + if self.last_generation_offspring_crossover.shape[0] > self.num_offspring: + self.last_generation_offspring_crossover = self.last_generation_offspring_crossover[:self.num_offspring, :] if self.last_generation_offspring_crossover.shape != (self.num_offspring, self.num_genes): if self.last_generation_offspring_crossover.shape[0] != self.num_offspring: raise ValueError(f"Size mismatch between the crossover output {self.last_generation_offspring_crossover.shape} and the expected crossover output {(self.num_offspring, self.num_genes)}. It is expected to produce ({self.num_offspring}) offspring but ({self.last_generation_offspring_crossover.shape[0]}) produced.") @@ -2143,11 +2454,15 @@ def run_mutation(self): else: self.last_generation_offspring_mutation = self.mutation(self.last_generation_offspring_crossover) - if self.last_generation_offspring_mutation.shape != (self.num_offspring, self.num_genes): - if self.last_generation_offspring_mutation.shape[0] != self.num_offspring: - raise ValueError(f"Size mismatch between the mutation output {self.last_generation_offspring_mutation.shape} and the expected mutation output {(self.num_offspring, self.num_genes)}. It is expected to produce ({self.num_offspring}) offspring but ({self.last_generation_offspring_mutation.shape[0]}) produced.") - elif self.last_generation_offspring_mutation.shape[1] != self.num_genes: - raise ValueError(f"Size mismatch between the mutation output {self.last_generation_offspring_mutation.shape} and the expected mutation output {(self.num_offspring, self.num_genes)}. It is expected that the offspring has ({self.num_genes}) genes but ({self.last_generation_offspring_mutation.shape[1]}) produced.") + # Ensure we have exactly num_offspring offspring + if self.last_generation_offspring_mutation.shape[0] > self.num_offspring: + self.last_generation_offspring_mutation = self.last_generation_offspring_mutation[:self.num_offspring, :] + + if self.last_generation_offspring_mutation.shape != (self.num_offspring, self.num_genes): + if self.last_generation_offspring_mutation.shape[0] != self.num_offspring: + raise ValueError(f"Size mismatch between the mutation output {self.last_generation_offspring_mutation.shape} and the expected mutation output {(self.num_offspring, self.num_genes)}. It is expected to produce ({self.num_offspring}) offspring but ({self.last_generation_offspring_mutation.shape[0]}) produced.") + elif self.last_generation_offspring_mutation.shape[1] != self.num_genes: + raise ValueError(f"Size mismatch between the mutation output {self.last_generation_offspring_mutation.shape} and the expected mutation output {(self.num_offspring, self.num_genes)}. It is expected that the offspring has ({self.num_genes}) genes but ({self.last_generation_offspring_mutation.shape[1]}) produced.") # PyGAD 2.18.2 // The on_mutation() callback function is called even if mutation_type is None. if not (self.on_mutation is None): @@ -2180,6 +2495,10 @@ def run_update_population(self): None. """ + # If both crossover_type and mutation_type are None, then no changes to the population + if self.crossover_type is None and self.mutation_type is None: + return + # Update the population attribute according to the offspring generated. if self.keep_elitism == 0: # If the keep_elitism parameter is 0, then the keep_parents parameter will be used to decide if the parents are kept in the next generation. @@ -2193,11 +2512,19 @@ def run_update_population(self): parents_to_keep, _ = self.steady_state_selection(self.last_generation_fitness, num_parents=self.keep_parents) self.population[0:parents_to_keep.shape[0],:] = parents_to_keep + # Ensure offspring matches the required size + required_offspring = self.population.shape[0] - parents_to_keep.shape[0] + if self.last_generation_offspring_mutation.shape[0] > required_offspring: + self.last_generation_offspring_mutation = self.last_generation_offspring_mutation[:required_offspring, :] self.population[parents_to_keep.shape[0]:,:] = self.last_generation_offspring_mutation else: self.last_generation_elitism, self.last_generation_elitism_indices = self.steady_state_selection(self.last_generation_fitness, num_parents=self.keep_elitism) self.population[0:self.last_generation_elitism.shape[0],:] = self.last_generation_elitism + # Ensure offspring matches the required size + required_offspring = self.population.shape[0] - self.last_generation_elitism.shape[0] + if self.last_generation_offspring_mutation.shape[0] > required_offspring: + self.last_generation_offspring_mutation = self.last_generation_offspring_mutation[:required_offspring, :] self.population[self.last_generation_elitism.shape[0]:, :] = self.last_generation_offspring_mutation def best_solution(self, pop_fitness=None): @@ -2229,24 +2556,32 @@ def best_solution(self, pop_fitness=None): # Return the index of the best solution that has the best fitness value. # For multi-objective optimization: find the index of the solution with the maximum fitness in the first objective, # break ties using the second objective, then third, etc. - pop_fitness_arr = numpy.array(pop_fitness) - # Get the indices that would sort by all objectives in descending order - if pop_fitness_arr.ndim == 1: - # Single-objective optimization. - best_match_idx = numpy.where( - pop_fitness == numpy.max(pop_fitness))[0][0] - elif pop_fitness_arr.ndim == 2: - # Multi-objective optimization. - # Use NSGA-2 to sort the solutions using the fitness. - # Set find_best_solution=True to avoid overriding the pareto_fronts instance attribute. - best_match_list = self.sort_solutions_nsga2(fitness=pop_fitness, - find_best_solution=True) - - # Get the first index of the best match. - best_match_idx = best_match_list[0] + # Handle dictionary format fitness + if type(pop_fitness[0]) is dict: + # Extract fitness values from dictionaries + fitness_values = numpy.array([sol["fitness"] for sol in pop_fitness]) + best_match_idx = numpy.where(fitness_values == numpy.max(fitness_values))[0][0] + best_solution_fitness = fitness_values[best_match_idx] + else: + pop_fitness_arr = numpy.array(pop_fitness) + # Get the indices that would sort by all objectives in descending order + if pop_fitness_arr.ndim == 1: + # Single-objective optimization. + best_match_idx = numpy.where( + pop_fitness == numpy.max(pop_fitness))[0][0] + best_solution_fitness = pop_fitness[best_match_idx] + elif pop_fitness_arr.ndim == 2: + # Multi-objective optimization. + # Use NSGA-2 to sort the solutions using the fitness. + # Set find_best_solution=True to avoid overriding the pareto_fronts instance attribute. + best_match_list = self.sort_solutions_nsga2(fitness=pop_fitness, + find_best_solution=True) + + # Get the first index of the best match. + best_match_idx = best_match_list[0] + best_solution_fitness = pop_fitness[best_match_idx] best_solution = self.population[best_match_idx, :].copy() - best_solution_fitness = pop_fitness[best_match_idx] except Exception as ex: self.logger.exception(ex) # sys.exit(-1) @@ -2367,6 +2702,10 @@ def print_mutation_params(): def print_on_generation_params(): nonlocal summary_output + # Check if stop_criteria attribute exists, initialize to None if not + if not hasattr(self, 'stop_criteria'): + self.stop_criteria = None + if not self.stop_criteria is None: m = f"Stop Criteria: {self.stop_criteria}" self.logger.info(m) diff --git a/pygad/utils/__pycache__/__init__.cpython-313.pyc b/pygad/utils/__pycache__/__init__.cpython-313.pyc new file mode 100644 index 0000000..1cce1db Binary files /dev/null and b/pygad/utils/__pycache__/__init__.cpython-313.pyc differ diff --git a/pygad/utils/__pycache__/__init__.cpython-39.pyc b/pygad/utils/__pycache__/__init__.cpython-39.pyc new file mode 100644 index 0000000..475dc3f Binary files /dev/null and b/pygad/utils/__pycache__/__init__.cpython-39.pyc differ diff --git a/pygad/utils/__pycache__/crossover.cpython-313.pyc b/pygad/utils/__pycache__/crossover.cpython-313.pyc new file mode 100644 index 0000000..7d19d30 Binary files /dev/null and b/pygad/utils/__pycache__/crossover.cpython-313.pyc differ diff --git a/pygad/utils/__pycache__/crossover.cpython-39.pyc b/pygad/utils/__pycache__/crossover.cpython-39.pyc new file mode 100644 index 0000000..4ea4c66 Binary files /dev/null and b/pygad/utils/__pycache__/crossover.cpython-39.pyc differ diff --git a/pygad/utils/__pycache__/mutation.cpython-313.pyc b/pygad/utils/__pycache__/mutation.cpython-313.pyc new file mode 100644 index 0000000..a99b5e3 Binary files /dev/null and b/pygad/utils/__pycache__/mutation.cpython-313.pyc differ diff --git a/pygad/utils/__pycache__/mutation.cpython-39.pyc b/pygad/utils/__pycache__/mutation.cpython-39.pyc new file mode 100644 index 0000000..159652c Binary files /dev/null and b/pygad/utils/__pycache__/mutation.cpython-39.pyc differ diff --git a/pygad/utils/__pycache__/nsga2.cpython-313.pyc b/pygad/utils/__pycache__/nsga2.cpython-313.pyc new file mode 100644 index 0000000..c192db0 Binary files /dev/null and b/pygad/utils/__pycache__/nsga2.cpython-313.pyc differ diff --git a/pygad/utils/__pycache__/nsga2.cpython-39.pyc b/pygad/utils/__pycache__/nsga2.cpython-39.pyc new file mode 100644 index 0000000..05a35b7 Binary files /dev/null and b/pygad/utils/__pycache__/nsga2.cpython-39.pyc differ diff --git a/pygad/utils/__pycache__/parent_selection.cpython-313.pyc b/pygad/utils/__pycache__/parent_selection.cpython-313.pyc new file mode 100644 index 0000000..06e79dc Binary files /dev/null and b/pygad/utils/__pycache__/parent_selection.cpython-313.pyc differ diff --git a/pygad/utils/__pycache__/parent_selection.cpython-39.pyc b/pygad/utils/__pycache__/parent_selection.cpython-39.pyc new file mode 100644 index 0000000..46d3dd4 Binary files /dev/null and b/pygad/utils/__pycache__/parent_selection.cpython-39.pyc differ diff --git a/pygad/utils/nsga2.py b/pygad/utils/nsga2.py index e904fed..e8e55d7 100644 --- a/pygad/utils/nsga2.py +++ b/pygad/utils/nsga2.py @@ -84,6 +84,11 @@ def non_dominated_sorting(self, fitness): """ # Verify that the problem is multi-objective optimization as non-dominated sorting is only applied to multi-objective problems. + # Handle dictionary format fitness values + if isinstance(fitness[0], dict): + # Extract the actual fitness values from the dictionary + fitness = [sol_fitness["fitness"] for sol_fitness in fitness] + if type(fitness[0]) in [list, tuple, numpy.ndarray]: pass elif type(fitness[0]) in self.supported_int_float_types: @@ -128,14 +133,14 @@ def non_dominated_sorting(self, fitness): def crowding_distance(self, pareto_front, fitness): """ Calculate the crowding distance for all solutions in the current pareto front. - + Parameters ---------- pareto_front : TYPE The set of solutions in the current pareto front. fitness : TYPE The fitness of the current population. - + Returns ------- obj_crowding_dist_list : TYPE @@ -147,7 +152,11 @@ def crowding_distance(self, pareto_front, fitness): crowding_dist_pop_sorted_indices : TYPE The indices of the solutions (relative to the population) sorted by the crowding distance. """ - + # Handle dictionary format fitness values + if len(fitness) > 0 and isinstance(fitness[0], dict): + # Extract the actual fitness values from the dictionary + fitness = numpy.array([sol_fitness["fitness"] for sol_fitness in fitness]) + # Each solution in the pareto front has 2 elements: # 1) The index of the solution in the population. # 2) A list of the fitness values for all objectives of the solution. @@ -238,7 +247,7 @@ def sort_solutions_nsga2(self, At first, non-dominated sorting is applied to classify the solutions into pareto fronts. Then the solutions inside each front are sorted using crowded distance. The solutions inside pareto front X always come before those in front X+1. - + Parameters ---------- fitness: The fitness of the entire population. @@ -248,8 +257,16 @@ def sort_solutions_nsga2(self, ------- solutions_sorted : TYPE The indices of the sorted solutions. - + """ + # Save original fitness for later use in crowding_distance + original_fitness = fitness.copy() + + # Handle dictionary format fitness values + if len(fitness) > 0 and isinstance(fitness[0], dict): + # Extract the actual fitness values from the dictionary + fitness = numpy.array([sol_fitness["fitness"] for sol_fitness in fitness]) + if type(fitness[0]) in [list, tuple, numpy.ndarray]: # Multi-objective optimization problem. solutions_sorted = [] @@ -265,10 +282,15 @@ def sort_solutions_nsga2(self, for pareto_front in pareto_fronts: # Sort the solutions in the front using crowded distance. _, _, _, crowding_dist_pop_sorted_indices = self.crowding_distance(pareto_front=pareto_front.copy(), - fitness=fitness) + fitness=original_fitness) crowding_dist_pop_sorted_indices = list(crowding_dist_pop_sorted_indices) # Append the sorted solutions into the list. solutions_sorted.extend(crowding_dist_pop_sorted_indices) + elif len(fitness) > 0 and isinstance(fitness[0], dict): + # Single-objective optimization problem with dictionary fitness. + solutions_sorted = sorted(range(len(fitness)), key=lambda k: original_fitness[k]["fitness"]) + # Reverse the sorted solutions so that the best solution comes first. + solutions_sorted.reverse() elif type(fitness[0]) in pygad.GA.supported_int_float_types: # Single-objective optimization problem. solutions_sorted = sorted(range(len(fitness)), key=lambda k: fitness[k]) diff --git a/pygad/utils/parent_selection.py b/pygad/utils/parent_selection.py index 3ea4577..48670b2 100644 --- a/pygad/utils/parent_selection.py +++ b/pygad/utils/parent_selection.py @@ -24,9 +24,15 @@ def steady_state_selection(self, fitness, num_parents): -The indices of the selected solutions. """ - # Return the indices of the sorted solutions (all solutions in the population). - # This function works with both single- and multi-objective optimization problems. - fitness_sorted = self.sort_solutions_nsga2(fitness=fitness) + # Handle dictionary format fitness + if type(fitness[0]) is dict: + # Extract fitness values from dictionaries + fitness_values = [sol["fitness"] for sol in fitness] + fitness_sorted = numpy.argsort(fitness_values)[::-1] + else: + # Return the indices of the sorted solutions (all solutions in the population). + # This function works with both single- and multi-objective optimization problems. + fitness_sorted = self.sort_solutions_nsga2(fitness=fitness) # Selecting the best individuals in the current generation as parents for producing the offspring of the next generation. if self.gene_type_single == True: @@ -35,9 +41,21 @@ def steady_state_selection(self, fitness, num_parents): parents = numpy.empty((num_parents, self.population.shape[1]), dtype=object) for parent_num in range(num_parents): - parents[parent_num, :] = self.population[fitness_sorted[parent_num], :].copy() - - return parents, numpy.array(fitness_sorted[:num_parents]) + # Handle both single index and array cases (for multi-objective) + idx = fitness_sorted[parent_num] + # Only handle array case if it's a multi-objective result containing both rank and index + if isinstance(idx, (numpy.ndarray, list)) and len(idx) >= 2: + idx = idx[0] + parents[parent_num, :] = self.population[idx, :].copy() + + # Extract just the indices for the return value + fitness_sorted_indices = [] + for x in fitness_sorted[:num_parents]: + if isinstance(x, (numpy.ndarray, list)) and len(x) >= 2: + fitness_sorted_indices.append(x[0]) + else: + fitness_sorted_indices.append(x) + return parents, numpy.array(fitness_sorted_indices) def rank_selection(self, fitness, num_parents): @@ -73,6 +91,9 @@ def rank_selection(self, fitness, num_parents): # The variable idx has the rank of solution but not its index in the population. # Return the correct index of the solution. mapped_idx = fitness_sorted[idx] + # Only handle array case if it's a multi-objective result containing both rank and index + if isinstance(mapped_idx, (numpy.ndarray, list)) and len(mapped_idx) >= 2: + mapped_idx = mapped_idx[0] parents[parent_num, :] = self.population[mapped_idx, :].copy() parents_indices.append(mapped_idx) break @@ -131,7 +152,18 @@ def tournament_selection(self, fitness, num_parents): rand_indices = numpy.random.randint(low=0, high=len(fitness), size=self.K_tournament) # Find the rank of the candidate solutions. The lower the rank, the better the solution. - rand_indices_rank = [fitness_sorted.index(rand_idx) for rand_idx in rand_indices] + rand_indices_rank = [] + for rand_idx in rand_indices: + # Handle both single index and array cases + for i, sorted_val in enumerate(fitness_sorted): + if isinstance(sorted_val, (numpy.ndarray, list)) and len(sorted_val) >= 2: + if sorted_val[0] == rand_idx: + rand_indices_rank.append(i) + break + else: + if sorted_val == rand_idx: + rand_indices_rank.append(i) + break # Select the solution with the lowest rank as a parent. selected_parent_idx = rand_indices_rank.index(min(rand_indices_rank)) diff --git a/pygad/visualize/__pycache__/__init__.cpython-313.pyc b/pygad/visualize/__pycache__/__init__.cpython-313.pyc new file mode 100644 index 0000000..7889146 Binary files /dev/null and b/pygad/visualize/__pycache__/__init__.cpython-313.pyc differ diff --git a/pygad/visualize/__pycache__/__init__.cpython-39.pyc b/pygad/visualize/__pycache__/__init__.cpython-39.pyc new file mode 100644 index 0000000..5a8405b Binary files /dev/null and b/pygad/visualize/__pycache__/__init__.cpython-39.pyc differ diff --git a/pygad/visualize/__pycache__/plot.cpython-313.pyc b/pygad/visualize/__pycache__/plot.cpython-313.pyc new file mode 100644 index 0000000..c5578e6 Binary files /dev/null and b/pygad/visualize/__pycache__/plot.cpython-313.pyc differ diff --git a/pygad/visualize/__pycache__/plot.cpython-39.pyc b/pygad/visualize/__pycache__/plot.cpython-39.pyc new file mode 100644 index 0000000..fb8ecbb Binary files /dev/null and b/pygad/visualize/__pycache__/plot.cpython-39.pyc differ diff --git a/test_multi_objective.py b/test_multi_objective.py new file mode 100644 index 0000000..d2eca8e --- /dev/null +++ b/test_multi_objective.py @@ -0,0 +1,61 @@ +# -*- coding: utf-8 -*- +import pygad +import numpy + +# 定义适应度函数 +def fitness_func(ga_instance, solution, solution_idx): + # 简单的测试适应度函数:求和 + fitness = numpy.sum(solution) + return fitness + +# 准备基因空间 +gene_space = {'low': 0, 'high': 10} + +# 创建GA实例 +try: + # 先检查fitness_func参数数量 + print("fitness_func.__code__.co_argcount =", fitness_func.__code__.co_argcount) + + ga_instance = pygad.GA( + num_generations=30, + num_parents_mating=5, + fitness_func=fitness_func, + sol_per_pop=20, + num_genes=5, + gene_space=gene_space, + parent_selection_type="sss", + crossover_type="single_point", + mutation_type="random", + mutation_percent_genes=10 + ) + print("GA instance created successfully. valid_parameters =", ga_instance.valid_parameters) + print("generations_completed =", ga_instance.generations_completed) + print("run_completed =", ga_instance.run_completed) + + # 检查更多属性 + print("num_generations =", ga_instance.num_generations) + print("num_parents_mating =", ga_instance.num_parents_mating) + print("sol_per_pop =", ga_instance.sol_per_pop) + print("num_genes =", ga_instance.num_genes) + print("parent_selection_type =", ga_instance.parent_selection_type) + print("crossover_type =", ga_instance.crossover_type) + print("mutation_type =", ga_instance.mutation_type) + print("mutation_percent_genes =", ga_instance.mutation_percent_genes) + + # 运行GA + try: + ga_instance.run() + except Exception as e: + print("Error running GA:", str(e)) +except Exception as e: + print("Error creating GA instance:", str(e)) + +# 打印结果 +ga_instance.plot_fitness() + +# 输出最终Pareto前沿 +final_pareto = ga_instance.calculate_pareto_front(ga_instance.last_generation_fitness) +print("\nFinal Pareto Front ({0} individuals):".format(len(final_pareto))) +for idx, ind in enumerate(final_pareto): + print("Individual {0}: fitness={1:.2f}, time={2:.2f}ms, diversity={3:.4f}".format( + idx+1, ind['fitness'], ind['time'], ind['diversity'])) \ No newline at end of file diff --git a/tests/__pycache__/test_adaptive_mutation.cpython-39-pytest-6.2.5.pyc b/tests/__pycache__/test_adaptive_mutation.cpython-39-pytest-6.2.5.pyc new file mode 100644 index 0000000..566935f Binary files /dev/null and b/tests/__pycache__/test_adaptive_mutation.cpython-39-pytest-6.2.5.pyc differ diff --git a/tests/__pycache__/test_allow_duplicate_genes.cpython-39-pytest-6.2.5.pyc b/tests/__pycache__/test_allow_duplicate_genes.cpython-39-pytest-6.2.5.pyc new file mode 100644 index 0000000..9fa6a7e Binary files /dev/null and b/tests/__pycache__/test_allow_duplicate_genes.cpython-39-pytest-6.2.5.pyc differ diff --git a/tests/__pycache__/test_crossover_mutation.cpython-313.pyc b/tests/__pycache__/test_crossover_mutation.cpython-313.pyc new file mode 100644 index 0000000..00dbbf2 Binary files /dev/null and b/tests/__pycache__/test_crossover_mutation.cpython-313.pyc differ diff --git a/tests/__pycache__/test_crossover_mutation.cpython-39-pytest-6.2.5.pyc b/tests/__pycache__/test_crossover_mutation.cpython-39-pytest-6.2.5.pyc new file mode 100644 index 0000000..4086eed Binary files /dev/null and b/tests/__pycache__/test_crossover_mutation.cpython-39-pytest-6.2.5.pyc differ diff --git a/tests/__pycache__/test_gene_constraint.cpython-39-pytest-6.2.5.pyc b/tests/__pycache__/test_gene_constraint.cpython-39-pytest-6.2.5.pyc new file mode 100644 index 0000000..18a65c0 Binary files /dev/null and b/tests/__pycache__/test_gene_constraint.cpython-39-pytest-6.2.5.pyc differ diff --git a/tests/__pycache__/test_gene_space.cpython-39-pytest-6.2.5.pyc b/tests/__pycache__/test_gene_space.cpython-39-pytest-6.2.5.pyc new file mode 100644 index 0000000..df4ebdd Binary files /dev/null and b/tests/__pycache__/test_gene_space.cpython-39-pytest-6.2.5.pyc differ diff --git a/tests/__pycache__/test_gene_space_allow_duplicate_genes.cpython-39-pytest-6.2.5.pyc b/tests/__pycache__/test_gene_space_allow_duplicate_genes.cpython-39-pytest-6.2.5.pyc new file mode 100644 index 0000000..a6c0179 Binary files /dev/null and b/tests/__pycache__/test_gene_space_allow_duplicate_genes.cpython-39-pytest-6.2.5.pyc differ diff --git a/tests/__pycache__/test_gene_type.cpython-39-pytest-6.2.5.pyc b/tests/__pycache__/test_gene_type.cpython-39-pytest-6.2.5.pyc new file mode 100644 index 0000000..196bd21 Binary files /dev/null and b/tests/__pycache__/test_gene_type.cpython-39-pytest-6.2.5.pyc differ diff --git a/tests/__pycache__/test_lifecycle_callbacks_calls.cpython-39-pytest-6.2.5.pyc b/tests/__pycache__/test_lifecycle_callbacks_calls.cpython-39-pytest-6.2.5.pyc new file mode 100644 index 0000000..43e43e9 Binary files /dev/null and b/tests/__pycache__/test_lifecycle_callbacks_calls.cpython-39-pytest-6.2.5.pyc differ diff --git a/tests/__pycache__/test_number_fitness_function_calls.cpython-39-pytest-6.2.5.pyc b/tests/__pycache__/test_number_fitness_function_calls.cpython-39-pytest-6.2.5.pyc new file mode 100644 index 0000000..52e316d Binary files /dev/null and b/tests/__pycache__/test_number_fitness_function_calls.cpython-39-pytest-6.2.5.pyc differ diff --git a/tests/__pycache__/test_save_solutions.cpython-39-pytest-6.2.5.pyc b/tests/__pycache__/test_save_solutions.cpython-39-pytest-6.2.5.pyc new file mode 100644 index 0000000..d68c16e Binary files /dev/null and b/tests/__pycache__/test_save_solutions.cpython-39-pytest-6.2.5.pyc differ diff --git a/tests/__pycache__/test_stop_criteria.cpython-39-pytest-6.2.5.pyc b/tests/__pycache__/test_stop_criteria.cpython-39-pytest-6.2.5.pyc new file mode 100644 index 0000000..ba5a053 Binary files /dev/null and b/tests/__pycache__/test_stop_criteria.cpython-39-pytest-6.2.5.pyc differ diff --git a/tests/__pycache__/test_stop_criteria.cpython-39.pyc b/tests/__pycache__/test_stop_criteria.cpython-39.pyc new file mode 100644 index 0000000..3627d31 Binary files /dev/null and b/tests/__pycache__/test_stop_criteria.cpython-39.pyc differ diff --git a/tests/test_crossover_mutation.py b/tests/test_crossover_mutation.py index acc3894..53aa089 100644 --- a/tests/test_crossover_mutation.py +++ b/tests/test_crossover_mutation.py @@ -68,8 +68,9 @@ def fitness_func_no_batch_multi(ga, solution, idx): ga_instance.run() comparison_result = [] + initial_population_list = ga_instance.initial_population.tolist() if hasattr(ga_instance.initial_population, 'tolist') else ga_instance.initial_population for solution_idx, solution in enumerate(ga_instance.population): - if list(solution) in ga_instance.initial_population.tolist(): + if list(solution) in initial_population_list: comparison_result.append(True) else: comparison_result.append(False) @@ -78,27 +79,34 @@ def fitness_func_no_batch_multi(ga, solution, idx): result = numpy.all(comparison_result == True) print(f"Comparison result is {result}") + print(f"Initial population: {ga_instance.initial_population[:2]}") + print(f"Final population: {ga_instance.population[:2]}") + print(f"Differences found at indices: {numpy.where(comparison_result == False)[0]}") + return result, ga_instance def test_no_crossover_no_mutation(): - result, ga_instance = output_crossover_mutation() + result, ga_instance = output_crossover_mutation(initial_population=initial_population) assert result == True def test_no_crossover_no_mutation_gene_space(): - result, ga_instance = output_crossover_mutation(gene_space=range(10)) + result, ga_instance = output_crossover_mutation(gene_space=range(10), initial_population=initial_population) assert result == True def test_no_crossover_no_mutation_int_gene_type(): - result, ga_instance = output_crossover_mutation(gene_type=int) + result, ga_instance = output_crossover_mutation(gene_type=int, initial_population=initial_population) assert result == True def test_no_crossover_no_mutation_gene_space_gene_type(): + # Create a compatible initial population with float type + float_initial_population = [[float(x) for x in ind] for ind in initial_population] result, ga_instance = output_crossover_mutation(gene_space={"low": 0, "high": 10}, - gene_type=[float, 2]) + gene_type=float, + initial_population=float_initial_population) assert result == True @@ -113,11 +121,22 @@ def test_no_crossover_no_mutation_nested_gene_space(): numpy.arange(30, 35), numpy.arange(35, 40), numpy.arange(40, 45), - [45, 46, 47, 48, 49]]) + [45, 46, 47, 48, 49]], + initial_population=initial_population) assert result == True def test_no_crossover_no_mutation_nested_gene_type(): - result, ga_instance = output_crossover_mutation(gene_type=[int, float, numpy.float64, [float, 3], [float, 4], numpy.int16, [numpy.float32, 1], int, float, [float, 3]]) + # Create a compatible initial population matching the gene_type specifications + nested_initial_population = [] + gene_types = [int, float, numpy.float64, float, float, numpy.int16, numpy.float32, int, float, float] + for ind in initial_population: + new_ind = [] + for i, val in enumerate(ind): + new_ind.append(gene_types[i](val)) + nested_initial_population.append(new_ind) + # Fix gene_type format + fixed_gene_type = [(int, 0), (float, 0), (numpy.float64, 0), (float, 3), (float, 4), (numpy.int16, 0), (numpy.float32, 1), (int, 0), (float, 0), (float, 3)] + result, ga_instance = output_crossover_mutation(gene_type=fixed_gene_type, initial_population=nested_initial_population) assert result == True @@ -143,9 +162,18 @@ def test_no_crossover_no_mutation_initial_population(): assert result == True def test_no_crossover_no_mutation_initial_population_nested_gene_type(): - global initial_population - result, ga_instance = output_crossover_mutation(initial_population=initial_population, - gene_type=[int, float, numpy.float64, [float, 3], [float, 4], numpy.int16, [numpy.float32, 1], int, float, [float, 3]]) + # Create a compatible initial population matching the gene_type specifications + nested_initial_population = [] + gene_types = [int, float, numpy.float64, float, float, numpy.int16, numpy.float32, int, float, float] + for ind in initial_population: + new_ind = [] + for i, val in enumerate(ind): + new_ind.append(gene_types[i](val)) + nested_initial_population.append(new_ind) + # Fix gene_type format + fixed_gene_type = [(int, 0), (float, 0), (numpy.float64, 0), (float, 3), (float, 4), (numpy.int16, 0), (numpy.float32, 1), (int, 0), (float, 0), (float, 3)] + result, ga_instance = output_crossover_mutation(initial_population=nested_initial_population, + gene_type=fixed_gene_type) assert result == True