diff --git a/src/lead/core/multi_objective/__init__.py b/src/lead/core/multi_objective/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/lead/core/multi_objective/evolution.py b/src/lead/core/multi_objective/evolution.py new file mode 100644 index 0000000..95acecb --- /dev/null +++ b/src/lead/core/multi_objective/evolution.py @@ -0,0 +1,182 @@ +import os +import json +import numpy as np +from typing import List, Dict, Any +from ...utils.multi_objective.evaluator import MultiObjectiveEvaluator +from ...utils.multi_objective.storage import MultiObjectiveGenerationStorage +from .individual import MultiObjectiveIndividual +from ..llm_integration import LLMClient +from ...config.settings import DEFAULT_EVOLUTION_PARAMS + +class MultiObjectiveEvolutionEngine: + """多目标进化引擎""" + + def __init__(self, problem_path: str): + self.problem_path = problem_path + self.storage = MultiObjectiveGenerationStorage(problem_path) + self.evaluator = MultiObjectiveEvaluator(problem_path) + self.llm_client = LLMClient.from_config(problem_path) + + # 加载进化参数 + config = self._load_problem_config() + self.evolution_params = { + **DEFAULT_EVOLUTION_PARAMS, + **config.get("evolution_params", {}) + } + print(f"多目标进化参数:{self.evolution_params}") + + def initialize_population(self, size: int) -> List[MultiObjectiveIndividual]: + """初始化种群""" + # 这里可以集成LLM生成初始代码 + population = [] + for _ in range(size): + # 示例代码,实际应使用LLM生成 + code = "def solution(input):\n return {'f1': 0.0, 'f2': 0.0}" + population.append(MultiObjectiveIndividual(code, generation=0)) + return population + + def run_evolution(self, generations: int = None, population_size: int = None): + """运行多目标进化""" + generations = generations or self.evolution_params["generations"] + population_size = population_size or self.evolution_params["population_size"] + + print(f"开始多目标进化,代数:{generations},种群大小:{population_size}") + population = self.initialize_population(population_size) + + # 评估初始种群 + print("\n评估初始种群...") + for ind in population: + ind.fitnesses = self.evaluator.evaluate(ind.code) + print(f"初始个体适应度:{ind.fitnesses}") + + # 主进化循环 + for gen in range(generations): + print(f"\n开始第 {gen+1}/{generations} 代进化") + + # 生成子代 + offspring = self._generate_offspring(population) + + # 评估子代 + for ind in offspring: + ind.fitnesses = self.evaluator.evaluate(ind.code) + print(f"新个体适应度:{ind.fitnesses}") + + # 合并父代和子代 + combined_pop = population + offspring + + # 非支配排序和选择 + population = self._select_new_population(combined_pop, population_size) + + # 保存当前代信息 + self.storage.save_generation(gen, population) + + # 打印前沿信息 + front = self._calculate_pareto_front(population) + print(f"当前代Pareto前沿大小:{len(front)}") + + print("\n多目标进化完成!") + return population + + def _generate_offspring(self, population: List[MultiObjectiveIndividual]) -> List[MultiObjectiveIndividual]: + """生成子代个体""" + offspring = [] + for _ in range(len(population) // 2): + # 选择父代 + parents = self._select_parents(population) + + # 交叉 (这里可以集成LLM) + child_code = self._crossover(parents[0].code, parents[1].code) + + # 变异 (这里可以集成LLM) + mutated_code = self._mutate(child_code) + + offspring.append(MultiObjectiveIndividual(mutated_code, generation=parents[0].generation + 1)) + return offspring + + def _select_parents(self, population: List[MultiObjectiveIndividual]) -> List[MultiObjectiveIndividual]: + """使用锦标赛选择父代""" + tournament_size = min(5, len(population)) + parents = [] + for _ in range(2): + candidates = np.random.choice(population, tournament_size, replace=False) + # 基于非支配排序和拥挤距离选择 + candidates = sorted(candidates, key=lambda x: ( + -x.rank, # 优先选择前沿等级高的 + x.crowding_distance(population) # 其次选择拥挤距离大的 + )) + parents.append(candidates[0]) + return parents + + def _select_new_population(self, combined_pop: List[MultiObjectiveIndividual], size: int) -> List[MultiObjectiveIndividual]: + """选择新一代种群""" + fronts = self._non_dominated_sort(combined_pop) + new_pop = [] + for front in fronts: + if len(new_pop) + len(front) <= size: + new_pop.extend(front) + else: + # 计算拥挤距离并选择 + for ind in front: + ind.crowding_distance(front) + front = sorted(front, key=lambda x: x.crowding_distance(front), reverse=True) + new_pop.extend(front[:size - len(new_pop)]) + break + return new_pop + + def _non_dominated_sort(self, population: List[MultiObjectiveIndividual]) -> List[List[MultiObjectiveIndividual]]: + """非支配排序""" + fronts = [[]] + for ind in population: + ind.domination_count = 0 + ind.dominated_set = [] + for other in population: + if ind.dominates(other): + ind.dominated_set.append(other) + elif other.dominates(ind): + ind.domination_count += 1 + if ind.domination_count == 0: + fronts[0].append(ind) + ind.rank = 0 + + i = 0 + while fronts[i]: + next_front = [] + for ind in fronts[i]: + for dominated_ind in ind.dominated_set: + dominated_ind.domination_count -= 1 + if dominated_ind.domination_count == 0: + dominated_ind.rank = i + 1 + next_front.append(dominated_ind) + i += 1 + fronts.append(next_front) + + return fronts[:-1] # 最后一个是空的 + + def _calculate_pareto_front(self, population: List[MultiObjectiveIndividual]) -> List[MultiObjectiveIndividual]: + """计算Pareto前沿""" + front = [] + for ind in population: + is_dominated = False + for other in population: + if other.dominates(ind): + is_dominated = True + break + if not is_dominated: + front.append(ind) + return front + + def _crossover(self, code1: str, code2: str) -> str: + """交叉操作""" + # 这里可以集成LLM的交叉操作 + return code1 # 简化示例 + + def _mutate(self, code: str) -> str: + """变异操作""" + # 这里可以集成LLM的变异操作 + return code # 简化示例 + + def _load_problem_config(self) -> dict: + """加载问题配置""" + config_path = os.path.join(self.problem_path, "problem_config.json") + with open(config_path, "r", encoding="utf-8") as f: + return json.load(f) \ No newline at end of file diff --git a/src/lead/core/multi_objective/individual.py b/src/lead/core/multi_objective/individual.py new file mode 100644 index 0000000..f71f289 --- /dev/null +++ b/src/lead/core/multi_objective/individual.py @@ -0,0 +1,63 @@ +from dataclasses import dataclass, field +from typing import Dict, Any, List +import numpy as np + +@dataclass +class MultiObjectiveIndividual: + """多目标优化个体表示""" + code: str + generation: int + fitnesses: Dict[str, float] = field(default_factory=dict) + + def dominates(self, other: 'MultiObjectiveIndividual') -> bool: + """检查当前个体是否Pareto支配另一个个体""" + if not self.fitnesses or not other.fitnesses: + return False + + at_least_one_better = False + for key in self.fitnesses: + if self.fitnesses[key] > other.fitnesses[key]: # 假设都是最小化 + return False + if self.fitnesses[key] < other.fitnesses[key]: + at_least_one_better = True + return at_least_one_better + + def to_dict(self) -> Dict[str, Any]: + """转换为字典格式""" + return { + "code": self.code, + "generation": self.generation, + "fitnesses": self.fitnesses + } + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> 'MultiObjectiveIndividual': + """从字典创建个体""" + return cls( + code=data["code"], + generation=data["generation"], + fitnesses=data.get("fitnesses", {}) + ) + + def crowding_distance(self, front: List['MultiObjectiveIndividual']) -> float: + """计算拥挤距离""" + if not front or len(front) == 1: + return float('inf') + + distance = 0.0 + for obj in self.fitnesses: + # 按当前目标值排序 + sorted_front = sorted(front, key=lambda x: x.fitnesses[obj]) + min_val = sorted_front[0].fitnesses[obj] + max_val = sorted_front[-1].fitnesses[obj] + + # 找到当前个体在排序中的位置 + for i, ind in enumerate(sorted_front): + if ind == self: + if i == 0 or i == len(sorted_front) - 1: + distance += float('inf') + else: + distance += (sorted_front[i+1].fitnesses[obj] - + sorted_front[i-1].fitnesses[obj]) / (max_val - min_val) + break + return distance \ No newline at end of file diff --git a/src/lead/core/operators/crossover_analysis.py b/src/lead/core/operators/crossover_analysis.py new file mode 100644 index 0000000..a2e0c24 --- /dev/null +++ b/src/lead/core/operators/crossover_analysis.py @@ -0,0 +1,61 @@ +from typing import Dict, List, Tuple +from lead.core.multi_objective.individual import MultiObjectiveIndividual + +class CrossoverAnalysisOperator: + """交叉分析算子:分析两个父代个体的目标值和支配关系""" + + def analyze(self, parent1: MultiObjectiveIndividual, + parent2: MultiObjectiveIndividual, + objective_names: List[str]) -> Dict: + """ + 分析两个父代个体,确定交叉优化方向 + 返回: + { + "dominance": "parent1"|"parent2"|"non_dominated", + "target_objective": str, # 主要优化目标 + "parent1_strengths": List[str], # parent1优势目标 + "parent2_strengths": List[str] # parent2优势目标 + } + """ + result = { + "dominance": self._check_dominance(parent1, parent2), + "target_objective": None, + "parent1_strengths": [], + "parent2_strengths": [] + } + + # 比较各目标值,找出各自的优势目标 + for obj in objective_names: + if parent1.fitnesses[obj] < parent2.fitnesses[obj]: # 假设最小化 + result["parent1_strengths"].append(obj) + elif parent1.fitnesses[obj] > parent2.fitnesses[obj]: + result["parent2_strengths"].append(obj) + + # 选择交叉优化的主要目标 + if result["dominance"] == "parent1": + # 如果parent1支配parent2,优化parent2的最差目标 + worst_obj = max(parent2.fitnesses.items(), key=lambda x: x[1])[0] + result["target_objective"] = worst_obj + elif result["dominance"] == "parent2": + # 如果parent2支配parent1,优化parent1的最差目标 + worst_obj = max(parent1.fitnesses.items(), key=lambda x: x[1])[0] + result["target_objective"] = worst_obj + else: + # 非支配关系,选择差异最大的目标进行优化 + max_diff = 0 + for obj in objective_names: + diff = abs(parent1.fitnesses[obj] - parent2.fitnesses[obj]) + if diff > max_diff: + max_diff = diff + result["target_objective"] = obj + + return result + + def _check_dominance(self, ind1: MultiObjectiveIndividual, + ind2: MultiObjectiveIndividual) -> str: + """检查支配关系""" + if ind1.dominates(ind2): + return "parent1" + elif ind2.dominates(ind1): + return "parent2" + return "non_dominated" \ No newline at end of file diff --git a/src/lead/core/operators/multi_objective_crossover.py b/src/lead/core/operators/multi_objective_crossover.py new file mode 100644 index 0000000..df2c681 --- /dev/null +++ b/src/lead/core/operators/multi_objective_crossover.py @@ -0,0 +1,66 @@ +from typing import Tuple, List, Dict +from lead.core.multi_objective.individual import MultiObjectiveIndividual +from .crossover_analysis import CrossoverAnalysisOperator +from ..llm_integration import LLMClient +from .verify_operator import VerifyOperator + +class MultiObjectiveCrossoverOperator: + """多目标交叉算子""" + + def __init__(self, llm_client: LLMClient, verify_operator: VerifyOperator): + self.llm_client = llm_client + self.analysis_operator = CrossoverAnalysisOperator() + self.verify_operator = verify_operator + + def crossover(self, parent1: MultiObjectiveIndividual, + parent2: MultiObjectiveIndividual, + objective_names: List[str]) -> Tuple[str, Dict]: + """ + 执行多目标交叉 + 返回: + (new_code, analysis_info) + """ + # 1. 分析父代个体 + analysis_info = self.analysis_operator.analyze( + parent1, parent2, objective_names) + + # 2. 构建多目标交叉提示词 + prompt = self._build_multi_obj_prompt( + parent1, parent2, analysis_info) + + # 3. 生成新代码 + new_code = self.llm_client._call_llm(prompt, operator="crossover") + + # 4. 验证代码 + verified_code = self._verify_code(new_code) + + return verified_code, analysis_info + + def _build_multi_obj_prompt(self, + parent1: MultiObjectiveIndividual, + parent2: MultiObjectiveIndividual, + analysis_info: Dict) -> str: + """构建多目标交叉提示词""" + target_obj = analysis_info["target_objective"] + return f""" + 请基于以下两个父代算法进行多目标优化交叉,重点关注目标 '{target_obj}': + + 父代1优势: {', '.join(analysis_info["parent1_strengths"])} + 父代1代码: + {parent1.code} + + 父代2优势: {', '.join(analysis_info["parent2_strengths"])} + 父代2代码: + {parent2.code} + + 请生成一个新算法,要求: + 1. 保留两个父代在各自优势目标上的优点 + 2. 特别优化目标 '{target_obj}' 的性能 + 3. 确保代码完整且符合问题要求 + + 只返回最终的Python代码,不要包含任何解释。 + """ + + def _verify_code(self, code: str, function_name: str) -> str: + """使用VerifyOperator验证代码""" + return self.verify_operator.verify_code_format(code, function_name) \ No newline at end of file diff --git a/src/lead/core/operators/multi_objective_mutation.py b/src/lead/core/operators/multi_objective_mutation.py new file mode 100644 index 0000000..6291a94 --- /dev/null +++ b/src/lead/core/operators/multi_objective_mutation.py @@ -0,0 +1,64 @@ +from typing import Tuple, List, Dict +from lead.core.multi_objective.individual import MultiObjectiveIndividual +from .mutation_analysis import MutationAnalysisOperator +from ..llm_integration import LLMClient +from .verify_operator import VerifyOperator + +class MultiObjectiveMutationOperator: + """多目标变异算子""" + + def __init__(self, llm_client: LLMClient, verify_operator: VerifyOperator): + self.llm_client = llm_client + self.analysis_operator = MutationAnalysisOperator() + self.verify_operator = verify_operator + + def mutate(self, individual: MultiObjectiveIndividual, + objective_names: List[str], + reference_front: List[Dict] = None) -> Tuple[str, Dict]: + """ + 执行多目标变异 + 返回: + (new_code, analysis_info) + """ + # 1. 分析变异方向 + analysis_info = self.analysis_operator.analyze( + individual, objective_names, reference_front) + + # 2. 构建多目标变异提示词 + prompt = self._build_multi_obj_prompt( + individual, analysis_info) + + # 3. 生成变异代码 + new_code = self.llm_client._call_llm(prompt, operator="mutation") + + # 4. 验证代码 + verified_code = self._verify_code( + new_code, analysis_info["target_objective"]) + + return verified_code, analysis_info + + def _build_multi_obj_prompt(self, + individual: MultiObjectiveIndividual, + analysis_info: Dict) -> str: + """构建多目标变异提示词""" + target_obj = analysis_info["target_objective"] + improvement = analysis_info["improvement_potential"][target_obj] + + return f""" + 请对以下算法进行多目标优化变异,重点改进目标 '{target_obj}' (当前改进潜力: {improvement:.2f}): + + 原始代码: + {individual.code} + + 请: + 1. 保持在其他目标上的表现 + 2. 专门优化目标 '{target_obj}' + 3. 可以引入新的数学概念或优化方法 + 4. 确保代码完整且可执行 + + 只返回最终的Python代码,不要包含任何解释。 + """ + + def _verify_code(self, code: str, function_name: str) -> str: + """使用VerifyOperator验证代码""" + return self.verify_operator.verify_code_format(code, function_name) \ No newline at end of file diff --git a/src/lead/core/operators/mutation_analysis.py b/src/lead/core/operators/mutation_analysis.py new file mode 100644 index 0000000..e0345b2 --- /dev/null +++ b/src/lead/core/operators/mutation_analysis.py @@ -0,0 +1,53 @@ +from typing import Dict, List +from lead.core.multi_objective.individual import MultiObjectiveIndividual + +class MutationAnalysisOperator: + """变异分析算子:分析单个个体的优化潜力""" + + def analyze(self, individual: MultiObjectiveIndividual, + objective_names: List[str], + reference_front: List[Dict[str, float]] = None) -> Dict: + """ + 分析个体,确定变异优化方向 + 返回: + { + "target_objective": str, # 主要优化目标 + "improvement_potential": Dict[str, float], # 各目标改进潜力 + "nearest_solution": Dict[str, float] # 参考前沿中最接近的解(可选) + } + """ + result = { + "target_objective": None, + "improvement_potential": {}, + "nearest_solution": None + } + + # 计算各目标的改进潜力 + for obj in objective_names: + if reference_front: + # 如果有参考前沿,计算与前沿中最好解的差距 + best_val = min(sol[obj] for sol in reference_front) + result["improvement_potential"][obj] = individual.fitnesses[obj] - best_val + else: + # 没有参考前沿,使用归一化值 + result["improvement_potential"][obj] = individual.fitnesses[obj] + + # 选择改进潜力最大的目标作为变异方向 + result["target_objective"] = max( + result["improvement_potential"].items(), + key=lambda x: x[1] + )[0] + + # 如果有参考前沿,找到最接近的解 + if reference_front: + min_distance = float('inf') + for sol in reference_front: + distance = sum( + (individual.fitnesses[obj] - sol[obj]) ** 2 + for obj in objective_names + ) + if distance < min_distance: + min_distance = distance + result["nearest_solution"] = sol + + return result \ No newline at end of file diff --git a/src/lead/utils/multi_objective/evaluator.py b/src/lead/utils/multi_objective/evaluator.py new file mode 100644 index 0000000..1be493f --- /dev/null +++ b/src/lead/utils/multi_objective/evaluator.py @@ -0,0 +1,91 @@ +import os +import json +import importlib.util +import traceback +from typing import Dict, List, Any +from ..data_loader import DataLoader +from ..timeout import Timeout, TimeoutError +from lead.core.multi_objective.individual import MultiObjectiveIndividual + +class MultiObjectiveEvaluator: + """多目标评估器""" + + def __init__(self, problem_path: str): + self.problem_path = problem_path + self.evaluation_module = self._load_evaluation_module() + self.data_loader = self._load_data_loader() + self.objective_names = self._load_objective_names() + + # 加载数据集 + if self.data_loader: + self.dataset = self.data_loader.get_data() + else: + self.dataset = None + + # 从配置文件加载超时设置,默认30秒 + config = self._load_problem_config() + self.timeout = config.get("evaluation_timeout", 30) + + def evaluate(self, code: str) -> Dict[str, float]: + """评估算法代码的多个目标值""" + try: + # 动态编译代码 + namespace = {} + exec(code, namespace) + + # 获取函数对象 + config = self._load_problem_config() + func = namespace[config["function_name"]] + + # 使用超时控制调用评估函数 + timeout = Timeout(self.timeout) + if self.dataset is not None: + results = timeout.run(self.evaluation_module.evaluate, func, self.dataset) + else: + results = timeout.run(self.evaluation_module.evaluate, func) + + # 确保返回结果与目标名称匹配 + if isinstance(results, dict): + return {name: results[name] for name in self.objective_names} + elif isinstance(results, (list, tuple)): + return {name: results[i] for i, name in enumerate(self.objective_names)} + else: + raise ValueError("评估函数应返回字典或列表/元组") + + except TimeoutError: + print(f"评估超时({self.timeout}秒)") + return {name: float('inf') for name in self.objective_names} + except Exception as e: + print(f"评估失败:{str(e)}") + print(f"错误信息:{traceback.format_exc()}") + return {name: float('inf') for name in self.objective_names} + + def _load_objective_names(self) -> List[str]: + """从配置加载目标名称""" + config = self._load_problem_config() + return config.get("objective_names", ["f1", "f2"]) + + def _load_evaluation_module(self): + """加载评估模块""" + eval_path = os.path.join(self.problem_path, "evaluation.py") + spec = importlib.util.spec_from_file_location("evaluation", eval_path) + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + return module + + def _load_data_loader(self): + """加载数据加载器""" + loader_path = os.path.join(self.problem_path, "dataset", "data_loader.py") + if not os.path.exists(loader_path): + return None + + spec = importlib.util.spec_from_file_location("data_loader", loader_path) + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + return module.ProblemDataLoader(self.problem_path) + + def _load_problem_config(self) -> dict: + """加载问题配置""" + config_path = os.path.join(self.problem_path, "problem_config.json") + with open(config_path, "r", encoding="utf-8") as f: + return json.load(f) \ No newline at end of file diff --git a/src/lead/utils/multi_objective/storage.py b/src/lead/utils/multi_objective/storage.py new file mode 100644 index 0000000..4f7edb1 --- /dev/null +++ b/src/lead/utils/multi_objective/storage.py @@ -0,0 +1,116 @@ +import os +import json +from datetime import datetime +from typing import List, Dict, Any +import numpy as np +from lead.core.multi_objective.individual import MultiObjectiveIndividual + +class MultiObjectiveGenerationStorage: + """多目标进化历史存储""" + + def __init__(self, problem_path: str): + self.problem_path = problem_path + # 创建存储目录 + self.storage_dir = os.path.join(problem_path, "multi_objective_history", + datetime.now().strftime("%Y%m%d_%H%M%S")) + os.makedirs(self.storage_dir, exist_ok=True) + + # 创建进化日志文件 + self.log_file = os.path.join(problem_path, "multi_objective_evolution.log") + + # 从配置加载目标名称 + self.objective_names = self._load_objective_names() + + def save_generation(self, generation: int, population: List[MultiObjectiveIndividual]): + """保存一代种群信息""" + # 计算当前代的Pareto前沿 + pareto_front = self._calculate_pareto_front(population) + + # 准备存储数据 + data = { + "generation": generation, + "population": [ind.to_dict() for ind in population], + "pareto_front": [ind.to_dict() for ind in pareto_front], + "objective_names": self.objective_names + } + + # 保存到文件 + file_path = os.path.join(self.storage_dir, f"generation_{generation}.json") + with open(file_path, "w", encoding="utf-8") as f: + json.dump(data, f, indent=2, ensure_ascii=False) + + # 记录日志 + self._log_generation(generation, pareto_front) + + def get_pareto_front(self) -> List[Dict[str, Any]]: + """获取历史Pareto前沿""" + all_fronts = [] + + for file in os.listdir(self.storage_dir): + if file.startswith("generation_") and file.endswith(".json"): + with open(os.path.join(self.storage_dir, file), "r", encoding="utf-8") as f: + data = json.load(f) + all_fronts.extend(data["pareto_front"]) + + # 合并所有前沿并去重 + unique_front = [] + seen_codes = set() + for ind in all_fronts: + if ind["code"] not in seen_codes: + unique_front.append(ind) + seen_codes.add(ind["code"]) + + # 计算全局Pareto前沿 + return self._calculate_global_pareto_front(unique_front) + + def _calculate_pareto_front(self, population: List[MultiObjectiveIndividual]) -> List[MultiObjectiveIndividual]: + """计算当前代的Pareto前沿""" + front = [] + for ind in population: + is_dominated = False + for other in population: + if other.dominates(ind): + is_dominated = True + break + if not is_dominated: + front.append(ind) + return front + + def _calculate_global_pareto_front(self, individuals: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """从历史数据计算全局Pareto前沿""" + if not individuals: + return [] + + front = [] + for ind in individuals: + is_dominated = False + for other in individuals: + # 检查other是否支配ind + dominates = True + for obj in self.objective_names: + if other["fitnesses"][obj] > ind["fitnesses"][obj]: # 假设都是最小化 + dominates = False + break + if dominates and any(other["fitnesses"][obj] < ind["fitnesses"][obj] for obj in self.objective_names): + is_dominated = True + break + if not is_dominated: + front.append(ind) + return front + + def _load_objective_names(self) -> List[str]: + """从配置加载目标名称""" + config_path = os.path.join(self.problem_path, "problem_config.json") + with open(config_path, "r", encoding="utf-8") as f: + config = json.load(f) + return config.get("objective_names", ["f1", "f2"]) + + def _log_generation(self, generation: int, pareto_front: List[MultiObjectiveIndividual]): + """记录进化日志""" + log_entry = ( + f"Generation {generation:04d} | " + f"Pareto Front Size: {len(pareto_front)} | " + f"Timestamp: {datetime.now().isoformat()}\n" + ) + with open(self.log_file, "a", encoding="utf-8") as f: + f.write(log_entry) \ No newline at end of file