多目标算子实现,交叉、变异、分析等

This commit is contained in:
deastern 2025-04-15 11:24:42 +08:00
parent 8e5a4710d6
commit 95f1467ed7
9 changed files with 696 additions and 0 deletions

View File

@ -0,0 +1,182 @@
import os
import json
import numpy as np
from typing import List, Dict, Any
from ...utils.multi_objective.evaluator import MultiObjectiveEvaluator
from ...utils.multi_objective.storage import MultiObjectiveGenerationStorage
from .individual import MultiObjectiveIndividual
from ..llm_integration import LLMClient
from ...config.settings import DEFAULT_EVOLUTION_PARAMS
class MultiObjectiveEvolutionEngine:
"""多目标进化引擎"""
def __init__(self, problem_path: str):
self.problem_path = problem_path
self.storage = MultiObjectiveGenerationStorage(problem_path)
self.evaluator = MultiObjectiveEvaluator(problem_path)
self.llm_client = LLMClient.from_config(problem_path)
# 加载进化参数
config = self._load_problem_config()
self.evolution_params = {
**DEFAULT_EVOLUTION_PARAMS,
**config.get("evolution_params", {})
}
print(f"多目标进化参数:{self.evolution_params}")
def initialize_population(self, size: int) -> List[MultiObjectiveIndividual]:
"""初始化种群"""
# 这里可以集成LLM生成初始代码
population = []
for _ in range(size):
# 示例代码实际应使用LLM生成
code = "def solution(input):\n return {'f1': 0.0, 'f2': 0.0}"
population.append(MultiObjectiveIndividual(code, generation=0))
return population
def run_evolution(self, generations: int = None, population_size: int = None):
"""运行多目标进化"""
generations = generations or self.evolution_params["generations"]
population_size = population_size or self.evolution_params["population_size"]
print(f"开始多目标进化,代数:{generations},种群大小:{population_size}")
population = self.initialize_population(population_size)
# 评估初始种群
print("\n评估初始种群...")
for ind in population:
ind.fitnesses = self.evaluator.evaluate(ind.code)
print(f"初始个体适应度:{ind.fitnesses}")
# 主进化循环
for gen in range(generations):
print(f"\n开始第 {gen+1}/{generations} 代进化")
# 生成子代
offspring = self._generate_offspring(population)
# 评估子代
for ind in offspring:
ind.fitnesses = self.evaluator.evaluate(ind.code)
print(f"新个体适应度:{ind.fitnesses}")
# 合并父代和子代
combined_pop = population + offspring
# 非支配排序和选择
population = self._select_new_population(combined_pop, population_size)
# 保存当前代信息
self.storage.save_generation(gen, population)
# 打印前沿信息
front = self._calculate_pareto_front(population)
print(f"当前代Pareto前沿大小{len(front)}")
print("\n多目标进化完成!")
return population
def _generate_offspring(self, population: List[MultiObjectiveIndividual]) -> List[MultiObjectiveIndividual]:
"""生成子代个体"""
offspring = []
for _ in range(len(population) // 2):
# 选择父代
parents = self._select_parents(population)
# 交叉 (这里可以集成LLM)
child_code = self._crossover(parents[0].code, parents[1].code)
# 变异 (这里可以集成LLM)
mutated_code = self._mutate(child_code)
offspring.append(MultiObjectiveIndividual(mutated_code, generation=parents[0].generation + 1))
return offspring
def _select_parents(self, population: List[MultiObjectiveIndividual]) -> List[MultiObjectiveIndividual]:
"""使用锦标赛选择父代"""
tournament_size = min(5, len(population))
parents = []
for _ in range(2):
candidates = np.random.choice(population, tournament_size, replace=False)
# 基于非支配排序和拥挤距离选择
candidates = sorted(candidates, key=lambda x: (
-x.rank, # 优先选择前沿等级高的
x.crowding_distance(population) # 其次选择拥挤距离大的
))
parents.append(candidates[0])
return parents
def _select_new_population(self, combined_pop: List[MultiObjectiveIndividual], size: int) -> List[MultiObjectiveIndividual]:
"""选择新一代种群"""
fronts = self._non_dominated_sort(combined_pop)
new_pop = []
for front in fronts:
if len(new_pop) + len(front) <= size:
new_pop.extend(front)
else:
# 计算拥挤距离并选择
for ind in front:
ind.crowding_distance(front)
front = sorted(front, key=lambda x: x.crowding_distance(front), reverse=True)
new_pop.extend(front[:size - len(new_pop)])
break
return new_pop
def _non_dominated_sort(self, population: List[MultiObjectiveIndividual]) -> List[List[MultiObjectiveIndividual]]:
"""非支配排序"""
fronts = [[]]
for ind in population:
ind.domination_count = 0
ind.dominated_set = []
for other in population:
if ind.dominates(other):
ind.dominated_set.append(other)
elif other.dominates(ind):
ind.domination_count += 1
if ind.domination_count == 0:
fronts[0].append(ind)
ind.rank = 0
i = 0
while fronts[i]:
next_front = []
for ind in fronts[i]:
for dominated_ind in ind.dominated_set:
dominated_ind.domination_count -= 1
if dominated_ind.domination_count == 0:
dominated_ind.rank = i + 1
next_front.append(dominated_ind)
i += 1
fronts.append(next_front)
return fronts[:-1] # 最后一个是空的
def _calculate_pareto_front(self, population: List[MultiObjectiveIndividual]) -> List[MultiObjectiveIndividual]:
"""计算Pareto前沿"""
front = []
for ind in population:
is_dominated = False
for other in population:
if other.dominates(ind):
is_dominated = True
break
if not is_dominated:
front.append(ind)
return front
def _crossover(self, code1: str, code2: str) -> str:
"""交叉操作"""
# 这里可以集成LLM的交叉操作
return code1 # 简化示例
def _mutate(self, code: str) -> str:
"""变异操作"""
# 这里可以集成LLM的变异操作
return code # 简化示例
def _load_problem_config(self) -> dict:
"""加载问题配置"""
config_path = os.path.join(self.problem_path, "problem_config.json")
with open(config_path, "r", encoding="utf-8") as f:
return json.load(f)

View File

@ -0,0 +1,63 @@
from dataclasses import dataclass, field
from typing import Dict, Any, List
import numpy as np
@dataclass
class MultiObjectiveIndividual:
"""多目标优化个体表示"""
code: str
generation: int
fitnesses: Dict[str, float] = field(default_factory=dict)
def dominates(self, other: 'MultiObjectiveIndividual') -> bool:
"""检查当前个体是否Pareto支配另一个个体"""
if not self.fitnesses or not other.fitnesses:
return False
at_least_one_better = False
for key in self.fitnesses:
if self.fitnesses[key] > other.fitnesses[key]: # 假设都是最小化
return False
if self.fitnesses[key] < other.fitnesses[key]:
at_least_one_better = True
return at_least_one_better
def to_dict(self) -> Dict[str, Any]:
"""转换为字典格式"""
return {
"code": self.code,
"generation": self.generation,
"fitnesses": self.fitnesses
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'MultiObjectiveIndividual':
"""从字典创建个体"""
return cls(
code=data["code"],
generation=data["generation"],
fitnesses=data.get("fitnesses", {})
)
def crowding_distance(self, front: List['MultiObjectiveIndividual']) -> float:
"""计算拥挤距离"""
if not front or len(front) == 1:
return float('inf')
distance = 0.0
for obj in self.fitnesses:
# 按当前目标值排序
sorted_front = sorted(front, key=lambda x: x.fitnesses[obj])
min_val = sorted_front[0].fitnesses[obj]
max_val = sorted_front[-1].fitnesses[obj]
# 找到当前个体在排序中的位置
for i, ind in enumerate(sorted_front):
if ind == self:
if i == 0 or i == len(sorted_front) - 1:
distance += float('inf')
else:
distance += (sorted_front[i+1].fitnesses[obj] -
sorted_front[i-1].fitnesses[obj]) / (max_val - min_val)
break
return distance

View File

@ -0,0 +1,61 @@
from typing import Dict, List, Tuple
from lead.core.multi_objective.individual import MultiObjectiveIndividual
class CrossoverAnalysisOperator:
"""交叉分析算子:分析两个父代个体的目标值和支配关系"""
def analyze(self, parent1: MultiObjectiveIndividual,
parent2: MultiObjectiveIndividual,
objective_names: List[str]) -> Dict:
"""
分析两个父代个体确定交叉优化方向
返回:
{
"dominance": "parent1"|"parent2"|"non_dominated",
"target_objective": str, # 主要优化目标
"parent1_strengths": List[str], # parent1优势目标
"parent2_strengths": List[str] # parent2优势目标
}
"""
result = {
"dominance": self._check_dominance(parent1, parent2),
"target_objective": None,
"parent1_strengths": [],
"parent2_strengths": []
}
# 比较各目标值,找出各自的优势目标
for obj in objective_names:
if parent1.fitnesses[obj] < parent2.fitnesses[obj]: # 假设最小化
result["parent1_strengths"].append(obj)
elif parent1.fitnesses[obj] > parent2.fitnesses[obj]:
result["parent2_strengths"].append(obj)
# 选择交叉优化的主要目标
if result["dominance"] == "parent1":
# 如果parent1支配parent2优化parent2的最差目标
worst_obj = max(parent2.fitnesses.items(), key=lambda x: x[1])[0]
result["target_objective"] = worst_obj
elif result["dominance"] == "parent2":
# 如果parent2支配parent1优化parent1的最差目标
worst_obj = max(parent1.fitnesses.items(), key=lambda x: x[1])[0]
result["target_objective"] = worst_obj
else:
# 非支配关系,选择差异最大的目标进行优化
max_diff = 0
for obj in objective_names:
diff = abs(parent1.fitnesses[obj] - parent2.fitnesses[obj])
if diff > max_diff:
max_diff = diff
result["target_objective"] = obj
return result
def _check_dominance(self, ind1: MultiObjectiveIndividual,
ind2: MultiObjectiveIndividual) -> str:
"""检查支配关系"""
if ind1.dominates(ind2):
return "parent1"
elif ind2.dominates(ind1):
return "parent2"
return "non_dominated"

View File

@ -0,0 +1,66 @@
from typing import Tuple, List, Dict
from lead.core.multi_objective.individual import MultiObjectiveIndividual
from .crossover_analysis import CrossoverAnalysisOperator
from ..llm_integration import LLMClient
from .verify_operator import VerifyOperator
class MultiObjectiveCrossoverOperator:
"""多目标交叉算子"""
def __init__(self, llm_client: LLMClient, verify_operator: VerifyOperator):
self.llm_client = llm_client
self.analysis_operator = CrossoverAnalysisOperator()
self.verify_operator = verify_operator
def crossover(self, parent1: MultiObjectiveIndividual,
parent2: MultiObjectiveIndividual,
objective_names: List[str]) -> Tuple[str, Dict]:
"""
执行多目标交叉
返回:
(new_code, analysis_info)
"""
# 1. 分析父代个体
analysis_info = self.analysis_operator.analyze(
parent1, parent2, objective_names)
# 2. 构建多目标交叉提示词
prompt = self._build_multi_obj_prompt(
parent1, parent2, analysis_info)
# 3. 生成新代码
new_code = self.llm_client._call_llm(prompt, operator="crossover")
# 4. 验证代码
verified_code = self._verify_code(new_code)
return verified_code, analysis_info
def _build_multi_obj_prompt(self,
parent1: MultiObjectiveIndividual,
parent2: MultiObjectiveIndividual,
analysis_info: Dict) -> str:
"""构建多目标交叉提示词"""
target_obj = analysis_info["target_objective"]
return f"""
请基于以下两个父代算法进行多目标优化交叉重点关注目标 '{target_obj}':
父代1优势: {', '.join(analysis_info["parent1_strengths"])}
父代1代码:
{parent1.code}
父代2优势: {', '.join(analysis_info["parent2_strengths"])}
父代2代码:
{parent2.code}
请生成一个新算法要求:
1. 保留两个父代在各自优势目标上的优点
2. 特别优化目标 '{target_obj}' 的性能
3. 确保代码完整且符合问题要求
只返回最终的Python代码不要包含任何解释
"""
def _verify_code(self, code: str, function_name: str) -> str:
"""使用VerifyOperator验证代码"""
return self.verify_operator.verify_code_format(code, function_name)

View File

@ -0,0 +1,64 @@
from typing import Tuple, List, Dict
from lead.core.multi_objective.individual import MultiObjectiveIndividual
from .mutation_analysis import MutationAnalysisOperator
from ..llm_integration import LLMClient
from .verify_operator import VerifyOperator
class MultiObjectiveMutationOperator:
"""多目标变异算子"""
def __init__(self, llm_client: LLMClient, verify_operator: VerifyOperator):
self.llm_client = llm_client
self.analysis_operator = MutationAnalysisOperator()
self.verify_operator = verify_operator
def mutate(self, individual: MultiObjectiveIndividual,
objective_names: List[str],
reference_front: List[Dict] = None) -> Tuple[str, Dict]:
"""
执行多目标变异
返回:
(new_code, analysis_info)
"""
# 1. 分析变异方向
analysis_info = self.analysis_operator.analyze(
individual, objective_names, reference_front)
# 2. 构建多目标变异提示词
prompt = self._build_multi_obj_prompt(
individual, analysis_info)
# 3. 生成变异代码
new_code = self.llm_client._call_llm(prompt, operator="mutation")
# 4. 验证代码
verified_code = self._verify_code(
new_code, analysis_info["target_objective"])
return verified_code, analysis_info
def _build_multi_obj_prompt(self,
individual: MultiObjectiveIndividual,
analysis_info: Dict) -> str:
"""构建多目标变异提示词"""
target_obj = analysis_info["target_objective"]
improvement = analysis_info["improvement_potential"][target_obj]
return f"""
请对以下算法进行多目标优化变异重点改进目标 '{target_obj}' (当前改进潜力: {improvement:.2f}):
原始代码:
{individual.code}
:
1. 保持在其他目标上的表现
2. 专门优化目标 '{target_obj}'
3. 可以引入新的数学概念或优化方法
4. 确保代码完整且可执行
只返回最终的Python代码不要包含任何解释
"""
def _verify_code(self, code: str, function_name: str) -> str:
"""使用VerifyOperator验证代码"""
return self.verify_operator.verify_code_format(code, function_name)

View File

@ -0,0 +1,53 @@
from typing import Dict, List
from lead.core.multi_objective.individual import MultiObjectiveIndividual
class MutationAnalysisOperator:
"""变异分析算子:分析单个个体的优化潜力"""
def analyze(self, individual: MultiObjectiveIndividual,
objective_names: List[str],
reference_front: List[Dict[str, float]] = None) -> Dict:
"""
分析个体确定变异优化方向
返回:
{
"target_objective": str, # 主要优化目标
"improvement_potential": Dict[str, float], # 各目标改进潜力
"nearest_solution": Dict[str, float] # 参考前沿中最接近的解(可选)
}
"""
result = {
"target_objective": None,
"improvement_potential": {},
"nearest_solution": None
}
# 计算各目标的改进潜力
for obj in objective_names:
if reference_front:
# 如果有参考前沿,计算与前沿中最好解的差距
best_val = min(sol[obj] for sol in reference_front)
result["improvement_potential"][obj] = individual.fitnesses[obj] - best_val
else:
# 没有参考前沿,使用归一化值
result["improvement_potential"][obj] = individual.fitnesses[obj]
# 选择改进潜力最大的目标作为变异方向
result["target_objective"] = max(
result["improvement_potential"].items(),
key=lambda x: x[1]
)[0]
# 如果有参考前沿,找到最接近的解
if reference_front:
min_distance = float('inf')
for sol in reference_front:
distance = sum(
(individual.fitnesses[obj] - sol[obj]) ** 2
for obj in objective_names
)
if distance < min_distance:
min_distance = distance
result["nearest_solution"] = sol
return result

View File

@ -0,0 +1,91 @@
import os
import json
import importlib.util
import traceback
from typing import Dict, List, Any
from ..data_loader import DataLoader
from ..timeout import Timeout, TimeoutError
from lead.core.multi_objective.individual import MultiObjectiveIndividual
class MultiObjectiveEvaluator:
"""多目标评估器"""
def __init__(self, problem_path: str):
self.problem_path = problem_path
self.evaluation_module = self._load_evaluation_module()
self.data_loader = self._load_data_loader()
self.objective_names = self._load_objective_names()
# 加载数据集
if self.data_loader:
self.dataset = self.data_loader.get_data()
else:
self.dataset = None
# 从配置文件加载超时设置默认30秒
config = self._load_problem_config()
self.timeout = config.get("evaluation_timeout", 30)
def evaluate(self, code: str) -> Dict[str, float]:
"""评估算法代码的多个目标值"""
try:
# 动态编译代码
namespace = {}
exec(code, namespace)
# 获取函数对象
config = self._load_problem_config()
func = namespace[config["function_name"]]
# 使用超时控制调用评估函数
timeout = Timeout(self.timeout)
if self.dataset is not None:
results = timeout.run(self.evaluation_module.evaluate, func, self.dataset)
else:
results = timeout.run(self.evaluation_module.evaluate, func)
# 确保返回结果与目标名称匹配
if isinstance(results, dict):
return {name: results[name] for name in self.objective_names}
elif isinstance(results, (list, tuple)):
return {name: results[i] for i, name in enumerate(self.objective_names)}
else:
raise ValueError("评估函数应返回字典或列表/元组")
except TimeoutError:
print(f"评估超时({self.timeout}秒)")
return {name: float('inf') for name in self.objective_names}
except Exception as e:
print(f"评估失败:{str(e)}")
print(f"错误信息:{traceback.format_exc()}")
return {name: float('inf') for name in self.objective_names}
def _load_objective_names(self) -> List[str]:
"""从配置加载目标名称"""
config = self._load_problem_config()
return config.get("objective_names", ["f1", "f2"])
def _load_evaluation_module(self):
"""加载评估模块"""
eval_path = os.path.join(self.problem_path, "evaluation.py")
spec = importlib.util.spec_from_file_location("evaluation", eval_path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
def _load_data_loader(self):
"""加载数据加载器"""
loader_path = os.path.join(self.problem_path, "dataset", "data_loader.py")
if not os.path.exists(loader_path):
return None
spec = importlib.util.spec_from_file_location("data_loader", loader_path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module.ProblemDataLoader(self.problem_path)
def _load_problem_config(self) -> dict:
"""加载问题配置"""
config_path = os.path.join(self.problem_path, "problem_config.json")
with open(config_path, "r", encoding="utf-8") as f:
return json.load(f)

View File

@ -0,0 +1,116 @@
import os
import json
from datetime import datetime
from typing import List, Dict, Any
import numpy as np
from lead.core.multi_objective.individual import MultiObjectiveIndividual
class MultiObjectiveGenerationStorage:
"""多目标进化历史存储"""
def __init__(self, problem_path: str):
self.problem_path = problem_path
# 创建存储目录
self.storage_dir = os.path.join(problem_path, "multi_objective_history",
datetime.now().strftime("%Y%m%d_%H%M%S"))
os.makedirs(self.storage_dir, exist_ok=True)
# 创建进化日志文件
self.log_file = os.path.join(problem_path, "multi_objective_evolution.log")
# 从配置加载目标名称
self.objective_names = self._load_objective_names()
def save_generation(self, generation: int, population: List[MultiObjectiveIndividual]):
"""保存一代种群信息"""
# 计算当前代的Pareto前沿
pareto_front = self._calculate_pareto_front(population)
# 准备存储数据
data = {
"generation": generation,
"population": [ind.to_dict() for ind in population],
"pareto_front": [ind.to_dict() for ind in pareto_front],
"objective_names": self.objective_names
}
# 保存到文件
file_path = os.path.join(self.storage_dir, f"generation_{generation}.json")
with open(file_path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
# 记录日志
self._log_generation(generation, pareto_front)
def get_pareto_front(self) -> List[Dict[str, Any]]:
"""获取历史Pareto前沿"""
all_fronts = []
for file in os.listdir(self.storage_dir):
if file.startswith("generation_") and file.endswith(".json"):
with open(os.path.join(self.storage_dir, file), "r", encoding="utf-8") as f:
data = json.load(f)
all_fronts.extend(data["pareto_front"])
# 合并所有前沿并去重
unique_front = []
seen_codes = set()
for ind in all_fronts:
if ind["code"] not in seen_codes:
unique_front.append(ind)
seen_codes.add(ind["code"])
# 计算全局Pareto前沿
return self._calculate_global_pareto_front(unique_front)
def _calculate_pareto_front(self, population: List[MultiObjectiveIndividual]) -> List[MultiObjectiveIndividual]:
"""计算当前代的Pareto前沿"""
front = []
for ind in population:
is_dominated = False
for other in population:
if other.dominates(ind):
is_dominated = True
break
if not is_dominated:
front.append(ind)
return front
def _calculate_global_pareto_front(self, individuals: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""从历史数据计算全局Pareto前沿"""
if not individuals:
return []
front = []
for ind in individuals:
is_dominated = False
for other in individuals:
# 检查other是否支配ind
dominates = True
for obj in self.objective_names:
if other["fitnesses"][obj] > ind["fitnesses"][obj]: # 假设都是最小化
dominates = False
break
if dominates and any(other["fitnesses"][obj] < ind["fitnesses"][obj] for obj in self.objective_names):
is_dominated = True
break
if not is_dominated:
front.append(ind)
return front
def _load_objective_names(self) -> List[str]:
"""从配置加载目标名称"""
config_path = os.path.join(self.problem_path, "problem_config.json")
with open(config_path, "r", encoding="utf-8") as f:
config = json.load(f)
return config.get("objective_names", ["f1", "f2"])
def _log_generation(self, generation: int, pareto_front: List[MultiObjectiveIndividual]):
"""记录进化日志"""
log_entry = (
f"Generation {generation:04d} | "
f"Pareto Front Size: {len(pareto_front)} | "
f"Timestamp: {datetime.now().isoformat()}\n"
)
with open(self.log_file, "a", encoding="utf-8") as f:
f.write(log_entry)