第一个可用的多目标进化策略

This commit is contained in:
deastern 2025-04-15 18:32:09 +08:00
parent 95f1467ed7
commit 92ef835422
14 changed files with 445 additions and 149 deletions

View File

@ -1,4 +1,5 @@
import os import os
from typing import Dict, Any, List
# 项目根目录 # 项目根目录
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) ROOT_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
@ -16,10 +17,35 @@ DEFAULT_LLM_CONFIG = {
"max_tokens": 1000 "max_tokens": 1000
} }
# 默认问题配置模板
DEFAULT_PROBLEM_CONFIG = {
"description": "问题描述",
"function_name": "solution",
"input_format": "输入格式描述",
"output_format": "输出格式描述",
"evaluation_timeout": 30,
"multi_objective": False, # 是否使用多目标优化
"objective_names": ["f1", "f2"], # 多目标名称列表
"llm_config": DEFAULT_LLM_CONFIG,
"evolution_params": {
"algorithm": "TU", # TU: Tournament, DE: Differential Evolution, MO: Multi-Objective
"population_size": 8,
"generations": 10,
"mutation_rate": 0.3,
"crossover_rate": 0.5,
"tournament_size": 3,
"F": 0.5, # DE参数
"CR": 0.7 # DE参数
}
}
# 默认进化参数 # 默认进化参数
DEFAULT_EVOLUTION_PARAMS = { DEFAULT_EVOLUTION_PARAMS = {
"population_size": 8, "population_size": 8,
"generations": 10, "generations": 10,
"mutation_rate": 0.3, "mutation_rate": 0.3,
"crossover_rate": 0.5 "crossover_rate": 0.5,
"tournament_size": 3,
"F": 0.5, # DE参数
"CR": 0.7 # DE参数
} }

View File

@ -1,4 +1,5 @@
from .tournament import TournamentEvolution from .tournament import TournamentEvolution
from .base import BaseEvolutionAlgorithm from .base import BaseEvolutionAlgorithm
from .multi_objective import MultiObjectiveEvolution
__all__ = ['TournamentEvolution', 'BaseEvolutionAlgorithm'] __all__ = ['TournamentEvolution', 'BaseEvolutionAlgorithm', 'MultiObjectiveEvolution']

View File

@ -0,0 +1,134 @@
from typing import List
import random
import numpy as np
from .base import BaseEvolutionAlgorithm
from ..multi_objective.individual import MultiObjectiveIndividual
from ..llm_integration import LLMClient
from ..operators.multi_objective_crossover import MultiObjectiveCrossoverOperator
from ..operators.multi_objective_mutation import MultiObjectiveMutationOperator
from ..operators.verify_operator import VerifyOperator
class MultiObjectiveEvolution(BaseEvolutionAlgorithm):
"""多目标进化算法基于NSGA-II框架"""
def __init__(self, config: dict, llm_client: LLMClient):
super().__init__(config)
self.llm_client = llm_client
self.verify_operator = VerifyOperator(llm_client)
self.crossover_operator = MultiObjectiveCrossoverOperator(
llm_client, self.verify_operator)
self.mutation_operator = MultiObjectiveMutationOperator(
llm_client, self.verify_operator)
self.tournament_size = config.get("tournament_size", 3)
def select(self, population: List[MultiObjectiveIndividual],
num_parents: int) -> List[MultiObjectiveIndividual]:
"""基于非支配排序和拥挤距离的锦标赛选择"""
# 先进行非支配排序
fronts = self._non_dominated_sort(population)
# 计算拥挤距离
for front in fronts:
for ind in front:
ind.crowding_distance(front)
# 锦标赛选择
parents = []
for _ in range(num_parents):
candidates = random.sample(population, min(self.tournament_size, len(population)))
# 优先选择前沿等级高的,其次选择拥挤距离大的
winner = min(candidates, key=lambda x: (
x.rank,
-x.crowding_distance(fronts[x.rank])
))
parents.append(winner)
return parents
def crossover(self, parents: List[MultiObjectiveIndividual]) -> List[MultiObjectiveIndividual]:
"""多目标交叉操作"""
if len(parents) < 2:
return parents
if random.random() < self.config.get("crossover_rate", 0.9):
# 获取目标名称(从第一个父代中获取)
objective_names = list(parents[0].fitnesses.keys())
# 执行多目标交叉
child_code, _ = self.crossover_operator.crossover(
parents[0], parents[1], objective_names)
if child_code:
return [MultiObjectiveIndividual(
child_code,
generation=parents[0].generation + 1
)]
return [parents[0]]
def mutate(self, individual: MultiObjectiveIndividual) -> MultiObjectiveIndividual:
"""多目标变异操作"""
if random.random() < self.config.get("mutation_rate", 0.1):
# 获取目标名称
objective_names = list(individual.fitnesses.keys())
# 执行多目标变异
mutated_code, _ = self.mutation_operator.mutate(
individual, objective_names)
if mutated_code:
return MultiObjectiveIndividual(
mutated_code,
generation=individual.generation
)
return individual
def survive(self, population: List[MultiObjectiveIndividual],
offspring: List[MultiObjectiveIndividual],
pop_size: int) -> List[MultiObjectiveIndividual]:
"""基于非支配排序和拥挤距离的精英保留策略"""
combined_pop = population + offspring
# 非支配排序
fronts = self._non_dominated_sort(combined_pop)
# 构建新种群
new_pop = []
for front in fronts:
if len(new_pop) + len(front) <= pop_size:
new_pop.extend(front)
else:
# 计算拥挤距离并选择
for ind in front:
ind.crowding_distance(front)
front = sorted(front, key=lambda x: x.crowding_distance(front), reverse=True)
new_pop.extend(front[:pop_size - len(new_pop)])
break
return new_pop[:pop_size]
def _non_dominated_sort(self, population: List[MultiObjectiveIndividual]) -> List[List[MultiObjectiveIndividual]]:
"""非支配排序"""
fronts = [[]]
for ind in population:
ind.domination_count = 0
ind.dominated_set = []
for other in population:
if ind.dominates(other):
ind.dominated_set.append(other)
elif other.dominates(ind):
ind.domination_count += 1
if ind.domination_count == 0:
fronts[0].append(ind)
ind.rank = 0
i = 0
while fronts[i]:
next_front = []
for ind in fronts[i]:
for dominated_ind in ind.dominated_set:
dominated_ind.domination_count -= 1
if dominated_ind.domination_count == 0:
dominated_ind.rank = i + 1
next_front.append(dominated_ind)
i += 1
fronts.append(next_front)
return fronts[:-1] # 最后一个是空的

View File

@ -1,12 +1,13 @@
import os import os
import json import json
import numpy as np from typing import List
from typing import List, Dict, Any
from ...utils.multi_objective.evaluator import MultiObjectiveEvaluator from ...utils.multi_objective.evaluator import MultiObjectiveEvaluator
from ...utils.multi_objective.storage import MultiObjectiveGenerationStorage from ...utils.multi_objective.storage import MultiObjectiveGenerationStorage
from .individual import MultiObjectiveIndividual from .individual import MultiObjectiveIndividual
from ..llm_integration import LLMClient from ..llm_integration import LLMClient
from ...config.settings import DEFAULT_EVOLUTION_PARAMS from ...config.settings import DEFAULT_EVOLUTION_PARAMS
from ..evolution_algorithms.multi_objective import MultiObjectiveEvolution
from ..operators.initialize_operator import InitializeOperator
class MultiObjectiveEvolutionEngine: class MultiObjectiveEvolutionEngine:
"""多目标进化引擎""" """多目标进化引擎"""
@ -16,6 +17,7 @@ class MultiObjectiveEvolutionEngine:
self.storage = MultiObjectiveGenerationStorage(problem_path) self.storage = MultiObjectiveGenerationStorage(problem_path)
self.evaluator = MultiObjectiveEvaluator(problem_path) self.evaluator = MultiObjectiveEvaluator(problem_path)
self.llm_client = LLMClient.from_config(problem_path) self.llm_client = LLMClient.from_config(problem_path)
self.initialize_operator = InitializeOperator(self.llm_client)
# 加载进化参数 # 加载进化参数
config = self._load_problem_config() config = self._load_problem_config()
@ -23,16 +25,30 @@ class MultiObjectiveEvolutionEngine:
**DEFAULT_EVOLUTION_PARAMS, **DEFAULT_EVOLUTION_PARAMS,
**config.get("evolution_params", {}) **config.get("evolution_params", {})
} }
# 初始化多目标进化算法
self.evolution_algorithm = MultiObjectiveEvolution(
self.evolution_params,
self.llm_client
)
print(f"多目标进化参数:{self.evolution_params}") print(f"多目标进化参数:{self.evolution_params}")
def initialize_population(self, size: int) -> List[MultiObjectiveIndividual]: def initialize_population(self, size: int) -> List[MultiObjectiveIndividual]:
"""初始化种群""" """使用LLM生成初始种群"""
# 这里可以集成LLM生成初始代码 problem_config = self._load_problem_config()
population = [] population = []
for _ in range(size):
# 示例代码实际应使用LLM生成 while len(population) < size:
code = "def solution(input):\n return {'f1': 0.0, 'f2': 0.0}" code = self.initialize_operator.generate_initial_code(
population.append(MultiObjectiveIndividual(code, generation=0)) problem_config["description"],
problem_config["function_name"],
problem_config["input_format"],
problem_config["output_format"]
)
if code:
population.append(MultiObjectiveIndividual(code, generation=0))
return population return population
def run_evolution(self, generations: int = None, population_size: int = None): def run_evolution(self, generations: int = None, population_size: int = None):
@ -54,18 +70,19 @@ class MultiObjectiveEvolutionEngine:
print(f"\n开始第 {gen+1}/{generations} 代进化") print(f"\n开始第 {gen+1}/{generations} 代进化")
# 生成子代 # 生成子代
offspring = self._generate_offspring(population) offspring = []
while len(offspring) < population_size:
parents = self.evolution_algorithm.select(population, 2)
children = self.evolution_algorithm.crossover(parents)
for child in children:
mutated_child = self.evolution_algorithm.mutate(child)
if mutated_child.code:
mutated_child.fitnesses = self.evaluator.evaluate(mutated_child.code)
print(f"新个体适应度:{mutated_child.fitnesses}")
offspring.append(mutated_child)
# 评估子代 # 生存选择
for ind in offspring: population = self.evolution_algorithm.survive(population, offspring, population_size)
ind.fitnesses = self.evaluator.evaluate(ind.code)
print(f"新个体适应度:{ind.fitnesses}")
# 合并父代和子代
combined_pop = population + offspring
# 非支配排序和选择
population = self._select_new_population(combined_pop, population_size)
# 保存当前代信息 # 保存当前代信息
self.storage.save_generation(gen, population) self.storage.save_generation(gen, population)
@ -77,81 +94,6 @@ class MultiObjectiveEvolutionEngine:
print("\n多目标进化完成!") print("\n多目标进化完成!")
return population return population
def _generate_offspring(self, population: List[MultiObjectiveIndividual]) -> List[MultiObjectiveIndividual]:
"""生成子代个体"""
offspring = []
for _ in range(len(population) // 2):
# 选择父代
parents = self._select_parents(population)
# 交叉 (这里可以集成LLM)
child_code = self._crossover(parents[0].code, parents[1].code)
# 变异 (这里可以集成LLM)
mutated_code = self._mutate(child_code)
offspring.append(MultiObjectiveIndividual(mutated_code, generation=parents[0].generation + 1))
return offspring
def _select_parents(self, population: List[MultiObjectiveIndividual]) -> List[MultiObjectiveIndividual]:
"""使用锦标赛选择父代"""
tournament_size = min(5, len(population))
parents = []
for _ in range(2):
candidates = np.random.choice(population, tournament_size, replace=False)
# 基于非支配排序和拥挤距离选择
candidates = sorted(candidates, key=lambda x: (
-x.rank, # 优先选择前沿等级高的
x.crowding_distance(population) # 其次选择拥挤距离大的
))
parents.append(candidates[0])
return parents
def _select_new_population(self, combined_pop: List[MultiObjectiveIndividual], size: int) -> List[MultiObjectiveIndividual]:
"""选择新一代种群"""
fronts = self._non_dominated_sort(combined_pop)
new_pop = []
for front in fronts:
if len(new_pop) + len(front) <= size:
new_pop.extend(front)
else:
# 计算拥挤距离并选择
for ind in front:
ind.crowding_distance(front)
front = sorted(front, key=lambda x: x.crowding_distance(front), reverse=True)
new_pop.extend(front[:size - len(new_pop)])
break
return new_pop
def _non_dominated_sort(self, population: List[MultiObjectiveIndividual]) -> List[List[MultiObjectiveIndividual]]:
"""非支配排序"""
fronts = [[]]
for ind in population:
ind.domination_count = 0
ind.dominated_set = []
for other in population:
if ind.dominates(other):
ind.dominated_set.append(other)
elif other.dominates(ind):
ind.domination_count += 1
if ind.domination_count == 0:
fronts[0].append(ind)
ind.rank = 0
i = 0
while fronts[i]:
next_front = []
for ind in fronts[i]:
for dominated_ind in ind.dominated_set:
dominated_ind.domination_count -= 1
if dominated_ind.domination_count == 0:
dominated_ind.rank = i + 1
next_front.append(dominated_ind)
i += 1
fronts.append(next_front)
return fronts[:-1] # 最后一个是空的
def _calculate_pareto_front(self, population: List[MultiObjectiveIndividual]) -> List[MultiObjectiveIndividual]: def _calculate_pareto_front(self, population: List[MultiObjectiveIndividual]) -> List[MultiObjectiveIndividual]:
"""计算Pareto前沿""" """计算Pareto前沿"""
front = [] front = []
@ -165,16 +107,6 @@ class MultiObjectiveEvolutionEngine:
front.append(ind) front.append(ind)
return front return front
def _crossover(self, code1: str, code2: str) -> str:
"""交叉操作"""
# 这里可以集成LLM的交叉操作
return code1 # 简化示例
def _mutate(self, code: str) -> str:
"""变异操作"""
# 这里可以集成LLM的变异操作
return code # 简化示例
def _load_problem_config(self) -> dict: def _load_problem_config(self) -> dict:
"""加载问题配置""" """加载问题配置"""
config_path = os.path.join(self.problem_path, "problem_config.json") config_path = os.path.join(self.problem_path, "problem_config.json")

View File

@ -14,14 +14,16 @@ class CrossoverAnalysisOperator:
"dominance": "parent1"|"parent2"|"non_dominated", "dominance": "parent1"|"parent2"|"non_dominated",
"target_objective": str, # 主要优化目标 "target_objective": str, # 主要优化目标
"parent1_strengths": List[str], # parent1优势目标 "parent1_strengths": List[str], # parent1优势目标
"parent2_strengths": List[str] # parent2优势目标 "parent2_strengths": List[str], # parent2优势目标
"function_name": str # 函数名
} }
""" """
result = { result = {
"dominance": self._check_dominance(parent1, parent2), "dominance": self._check_dominance(parent1, parent2),
"target_objective": None, "target_objective": None,
"parent1_strengths": [], "parent1_strengths": [],
"parent2_strengths": [] "parent2_strengths": [],
"function_name": self._extract_function_name(parent1.code)
} }
# 比较各目标值,找出各自的优势目标 # 比较各目标值,找出各自的优势目标
@ -59,3 +61,10 @@ class CrossoverAnalysisOperator:
elif ind2.dominates(ind1): elif ind2.dominates(ind1):
return "parent2" return "parent2"
return "non_dominated" return "non_dominated"
def _extract_function_name(self, code: str) -> str:
"""从代码中提取函数名"""
for line in code.split('\n'):
if line.strip().startswith('def '):
return line.split()[1].split('(')[0]
return "solution" # 默认函数名

View File

@ -32,7 +32,7 @@ class MultiObjectiveCrossoverOperator:
new_code = self.llm_client._call_llm(prompt, operator="crossover") new_code = self.llm_client._call_llm(prompt, operator="crossover")
# 4. 验证代码 # 4. 验证代码
verified_code = self._verify_code(new_code) verified_code = self._verify_code(new_code, analysis_info["function_name"])
return verified_code, analysis_info return verified_code, analysis_info
@ -57,6 +57,7 @@ class MultiObjectiveCrossoverOperator:
1. 保留两个父代在各自优势目标上的优点 1. 保留两个父代在各自优势目标上的优点
2. 特别优化目标 '{target_obj}' 的性能 2. 特别优化目标 '{target_obj}' 的性能
3. 确保代码完整且符合问题要求 3. 确保代码完整且符合问题要求
4. 函数名必须为: {analysis_info["function_name"]}
只返回最终的Python代码不要包含任何解释 只返回最终的Python代码不要包含任何解释
""" """

View File

@ -24,6 +24,10 @@ class MultiObjectiveMutationOperator:
analysis_info = self.analysis_operator.analyze( analysis_info = self.analysis_operator.analyze(
individual, objective_names, reference_front) individual, objective_names, reference_front)
# 确保target_objective有效
if not analysis_info["target_objective"]:
analysis_info["target_objective"] = objective_names[0] if objective_names else "error"
# 2. 构建多目标变异提示词 # 2. 构建多目标变异提示词
prompt = self._build_multi_obj_prompt( prompt = self._build_multi_obj_prompt(
individual, analysis_info) individual, analysis_info)
@ -33,7 +37,7 @@ class MultiObjectiveMutationOperator:
# 4. 验证代码 # 4. 验证代码
verified_code = self._verify_code( verified_code = self._verify_code(
new_code, analysis_info["target_objective"]) new_code, analysis_info["function_name"])
return verified_code, analysis_info return verified_code, analysis_info
@ -42,7 +46,7 @@ class MultiObjectiveMutationOperator:
analysis_info: Dict) -> str: analysis_info: Dict) -> str:
"""构建多目标变异提示词""" """构建多目标变异提示词"""
target_obj = analysis_info["target_objective"] target_obj = analysis_info["target_objective"]
improvement = analysis_info["improvement_potential"][target_obj] improvement = analysis_info["improvement_potential"].get(target_obj, 1.0)
return f""" return f"""
请对以下算法进行多目标优化变异重点改进目标 '{target_obj}' (当前改进潜力: {improvement:.2f}): 请对以下算法进行多目标优化变异重点改进目标 '{target_obj}' (当前改进潜力: {improvement:.2f}):
@ -55,6 +59,8 @@ class MultiObjectiveMutationOperator:
2. 专门优化目标 '{target_obj}' 2. 专门优化目标 '{target_obj}'
3. 可以引入新的数学概念或优化方法 3. 可以引入新的数学概念或优化方法
4. 确保代码完整且可执行 4. 确保代码完整且可执行
5. 避免使用递归或限制递归深度
6. 函数名必须保持为: {analysis_info["function_name"]}
只返回最终的Python代码不要包含任何解释 只返回最终的Python代码不要包含任何解释
""" """

View File

@ -13,30 +13,41 @@ class MutationAnalysisOperator:
{ {
"target_objective": str, # 主要优化目标 "target_objective": str, # 主要优化目标
"improvement_potential": Dict[str, float], # 各目标改进潜力 "improvement_potential": Dict[str, float], # 各目标改进潜力
"nearest_solution": Dict[str, float] # 参考前沿中最接近的解(可选) "nearest_solution": Dict[str, float], # 参考前沿中最接近的解(可选)
"function_name": str # 函数名
} }
""" """
result = { result = {
"target_objective": None, "target_objective": None,
"improvement_potential": {}, "improvement_potential": {},
"nearest_solution": None "nearest_solution": None,
"function_name": self._extract_function_name(individual.code)
} }
# 处理空fitnesses的情况
if not individual.fitnesses:
return result
# 计算各目标的改进潜力 # 计算各目标的改进潜力
for obj in objective_names: for obj in objective_names:
if reference_front: if obj in individual.fitnesses:
# 如果有参考前沿,计算与前沿中最好解的差距 if reference_front:
best_val = min(sol[obj] for sol in reference_front) # 如果有参考前沿,计算与前沿中最好解的差距
result["improvement_potential"][obj] = individual.fitnesses[obj] - best_val best_val = min(sol[obj] for sol in reference_front)
else: result["improvement_potential"][obj] = individual.fitnesses[obj] - best_val
# 没有参考前沿,使用归一化值 else:
result["improvement_potential"][obj] = individual.fitnesses[obj] # 没有参考前沿,使用归一化值
result["improvement_potential"][obj] = individual.fitnesses[obj]
# 选择改进潜力最大的目标作为变异方向 # 选择改进潜力最大的目标作为变异方向
result["target_objective"] = max( if result["improvement_potential"]:
result["improvement_potential"].items(), result["target_objective"] = max(
key=lambda x: x[1] result["improvement_potential"].items(),
)[0] key=lambda x: x[1]
)[0]
else:
# 如果没有有效的改进潜力数据,选择第一个目标
result["target_objective"] = objective_names[0] if objective_names else None
# 如果有参考前沿,找到最接近的解 # 如果有参考前沿,找到最接近的解
if reference_front: if reference_front:
@ -44,10 +55,17 @@ class MutationAnalysisOperator:
for sol in reference_front: for sol in reference_front:
distance = sum( distance = sum(
(individual.fitnesses[obj] - sol[obj]) ** 2 (individual.fitnesses[obj] - sol[obj]) ** 2
for obj in objective_names for obj in objective_names if obj in individual.fitnesses
) )
if distance < min_distance: if distance < min_distance:
min_distance = distance min_distance = distance
result["nearest_solution"] = sol result["nearest_solution"] = sol
return result return result
def _extract_function_name(self, code: str) -> str:
"""从代码中提取函数名"""
for line in code.split('\n'):
if line.strip().startswith('def '):
return line.split()[1].split('(')[0]
return "solution" # 默认函数名

View File

@ -1,19 +1,27 @@
from typing import Callable, List, Tuple from typing import Callable, List, Tuple, Dict
import time import time
import sys
def evaluate(sort_func: Callable[[List[int]], List[int]], dataset: List[Tuple[List[int], List[int]]]) -> float: def evaluate(sort_func: Callable[[List[int]], List[int]], dataset: List[Tuple[List[int], List[int]]]) -> Dict[str, float]:
"""评估排序算法的性能 """多目标评估排序算法的性能
Args: Args:
sort_func: 排序函数 sort_func: 排序函数
dataset: [(输入数组, 正确排序结果), ...] dataset: [(输入数组, 正确排序结果), ...]
Returns: Returns:
float: 适应度分数分数越小表示性能越好 dict: 包含两个评估指标的字典所有指标都是越小越好
{
"time": 平均执行时间(),
"error": 错误率(0-1)
}
""" """
total_time = 0 total_time = 0
success_count = 0 success_count = 0
# 设置递归深度限制防止无限递归
sys.setrecursionlimit(1000)
for input_arr, expected in dataset: for input_arr, expected in dataset:
try: try:
# 复制输入数组,避免修改原数组 # 复制输入数组,避免修改原数组
@ -30,15 +38,24 @@ def evaluate(sort_func: Callable[[List[int]], List[int]], dataset: List[Tuple[Li
total_time += execution_time total_time += execution_time
success_count += 1 success_count += 1
except RecursionError:
print("递归深度超过限制,算法可能存在无限递归")
continue
except Exception as e: except Exception as e:
print(f"测试用例执行失败: {str(e)}") print(f"测试用例执行失败: {str(e)}")
continue continue
if success_count == 0: if success_count == 0:
return float('inf') # 如果全部失败返回无穷大 return {
"time": float('inf'),
"error": 1.0
}
# 计算最终适应度:平均执行时间 * (1 + 错误率惩罚) # 计算各项指标
avg_time = total_time / success_count avg_time = total_time / success_count
error_rate = (len(dataset) - success_count) / len(dataset) error_rate = (len(dataset) - success_count) / len(dataset)
return avg_time * (1 + error_rate * 2) # 错误率会增加最终得分 return {
"time": avg_time,
"error": error_rate
}

View File

@ -0,0 +1,2 @@
Generation 0000 | Pareto Front Size: 2 | Timestamp: 2025-04-15T18:08:58.280070
Generation 0001 | Pareto Front Size: 2 | Timestamp: 2025-04-15T18:10:29.667065

View File

@ -0,0 +1,67 @@
{
"generation": 0,
"population": [
{
"code": "def advanced_sort(arr):\n if len(arr) < 2:\n return arr\n \n def quicksort(arr, low, high):\n if low < high:\n pi = partition(arr, low, high)\n quicksort(arr, low, pi - 1)\n quicksort(arr, pi + 1, high)\n\n def partition(arr, low, high):\n pivot = arr[(low + high) // 2]\n i = low - 1\n j = high + 1\n\n while True:\n i += 1\n while arr[i] < pivot:\n i += 1\n j -= 1\n while arr[j] > pivot:\n j -= 1\n \n if i >= j:\n return j\n \n arr[i], arr[j] = arr[j], arr[i]\n\n def insertion_sort(arr):\n for i in range(1, len(arr)):\n key = arr[i]\n j = i - 1\n while j >= 0 and key < arr[j]:\n arr[j + 1] = arr[j]\n j -= 1\n arr[j + 1] = key\n\n if len(arr) <= 10:\n insertion_sort(arr)\n else:\n quicksort(arr, 0, len(arr) - 1)\n\n return arr",
"generation": 0,
"fitnesses": {
"time": 0.07625889778137207,
"error": 0.75
}
},
{
"code": "def advanced_sort(arr):\n if len(arr) < 2:\n return arr\n\n def insertion_sort(sub_arr):\n for i in range(1, len(sub_arr)):\n key = sub_arr[i]\n j = i - 1\n while j >= 0 and sub_arr[j] > key:\n sub_arr[j + 1] = sub_arr[j]\n j -= 1\n sub_arr[j + 1] = key\n return sub_arr\n\n def quicksort(sub_arr):\n if len(sub_arr) <= 10:\n return insertion_sort(sub_arr)\n else:\n pivot = sub_arr[len(sub_arr) // 2]\n left = [x for x in sub_arr if x < pivot]\n middle = [x for x in sub_arr if x == pivot]\n right = [x for x in sub_arr if x > pivot]\n return quicksort(left) + middle + quicksort(right)\n\n return quicksort(arr)",
"generation": 1,
"fitnesses": {
"time": 0.08427751064300537,
"error": 0.0
}
},
{
"code": "def advanced_sort(arr):\n if len(arr) < 2:\n return arr\n\n def insertion_sort(sub_arr):\n for i in range(1, len(sub_arr)):\n key = sub_arr[i]\n j = i - 1\n while j >= 0 and sub_arr[j] > key:\n sub_arr[j + 1] = sub_arr[j]\n j -= 1\n sub_arr[j + 1] = key\n return sub_arr\n\n def quicksort(arr, low, high):\n if low < high:\n pi = partition(arr, low, high)\n quicksort(arr, low, pi)\n quicksort(arr, pi + 1, high)\n\n def partition(arr, low, high):\n pivot = arr[(low + high) // 2]\n i = low - 1\n j = high + 1\n\n while True:\n i += 1\n while arr[i] < pivot:\n i += 1\n j -= 1\n while arr[j] > pivot:\n j -= 1\n \n if i >= j:\n return j\n \n arr[i], arr[j] = arr[j], arr[i]\n\n if len(arr) <= 10:\n return insertion_sort(arr)\n else:\n quicksort(arr, 0, len(arr) - 1)\n\n return arr",
"generation": 1,
"fitnesses": {
"time": 0.1091681718826294,
"error": 0.0
}
},
{
"code": "def advanced_sort(arr):\n if len(arr) < 2:\n return arr\n\n def insertion_sort(sub_arr):\n for i in range(1, len(sub_arr)):\n key = sub_arr[i]\n j = i - 1\n while j >= 0 and sub_arr[j] > key:\n sub_arr[j + 1] = sub_arr[j]\n j -= 1\n sub_arr[j + 1] = key\n return sub_arr\n\n def quicksort(sub_arr):\n if len(sub_arr) <= 10:\n return insertion_sort(sub_arr)\n else:\n pivot = sub_arr[len(sub_arr) // 2]\n left = [x for x in sub_arr if x < pivot]\n middle = [x for x in sub_arr if x == pivot]\n right = [x for x in sub_arr if x > pivot]\n return quicksort(left) + middle + quicksort(right)\n\n return quicksort(arr)",
"generation": 0,
"fitnesses": {
"time": 0.17589008808135986,
"error": 0.0
}
},
{
"code": "def advanced_sort(arr):\n if len(arr) < 2:\n return arr\n\n def counting_sort(arr):\n max_val = max(arr)\n min_val = min(arr)\n range_of_elements = max_val - min_val + 1\n count = [0] * range_of_elements\n output = [0] * len(arr)\n\n for num in arr:\n count[num - min_val] += 1\n\n for i in range(1, len(count)):\n count[i] += count[i - 1]\n\n for num in reversed(arr):\n output[count[num - min_val] - 1] = num\n count[num - min_val] -= 1\n\n return output\n\n def quick_sort(arr):\n if len(arr) < 2:\n return arr\n\n pivot = arr[len(arr) // 2]\n left = [x for x in arr if x < pivot]\n middle = [x for x in arr if x == pivot]\n right = [x for x in arr if x > pivot]\n\n return quick_sort(left) + middle + quick_sort(right)\n\n unique_elements = set(arr)\n \n if len(unique_elements) * 2 < len(arr):\n return counting_sort(arr)\n else:\n return quick_sort(arr)",
"generation": 0,
"fitnesses": {
"time": 0.2675272822380066,
"error": 0.0
}
}
],
"pareto_front": [
{
"code": "def advanced_sort(arr):\n if len(arr) < 2:\n return arr\n \n def quicksort(arr, low, high):\n if low < high:\n pi = partition(arr, low, high)\n quicksort(arr, low, pi - 1)\n quicksort(arr, pi + 1, high)\n\n def partition(arr, low, high):\n pivot = arr[(low + high) // 2]\n i = low - 1\n j = high + 1\n\n while True:\n i += 1\n while arr[i] < pivot:\n i += 1\n j -= 1\n while arr[j] > pivot:\n j -= 1\n \n if i >= j:\n return j\n \n arr[i], arr[j] = arr[j], arr[i]\n\n def insertion_sort(arr):\n for i in range(1, len(arr)):\n key = arr[i]\n j = i - 1\n while j >= 0 and key < arr[j]:\n arr[j + 1] = arr[j]\n j -= 1\n arr[j + 1] = key\n\n if len(arr) <= 10:\n insertion_sort(arr)\n else:\n quicksort(arr, 0, len(arr) - 1)\n\n return arr",
"generation": 0,
"fitnesses": {
"time": 0.07625889778137207,
"error": 0.75
}
},
{
"code": "def advanced_sort(arr):\n if len(arr) < 2:\n return arr\n\n def insertion_sort(sub_arr):\n for i in range(1, len(sub_arr)):\n key = sub_arr[i]\n j = i - 1\n while j >= 0 and sub_arr[j] > key:\n sub_arr[j + 1] = sub_arr[j]\n j -= 1\n sub_arr[j + 1] = key\n return sub_arr\n\n def quicksort(sub_arr):\n if len(sub_arr) <= 10:\n return insertion_sort(sub_arr)\n else:\n pivot = sub_arr[len(sub_arr) // 2]\n left = [x for x in sub_arr if x < pivot]\n middle = [x for x in sub_arr if x == pivot]\n right = [x for x in sub_arr if x > pivot]\n return quicksort(left) + middle + quicksort(right)\n\n return quicksort(arr)",
"generation": 1,
"fitnesses": {
"time": 0.08427751064300537,
"error": 0.0
}
}
],
"objective_names": [
"time",
"error"
]
}

View File

@ -0,0 +1,67 @@
{
"generation": 1,
"population": [
{
"code": "def advanced_sort(arr):\n if len(arr) < 2:\n return arr\n \n def quicksort(arr, low, high):\n if low < high:\n pi = partition(arr, low, high)\n quicksort(arr, low, pi - 1)\n quicksort(arr, pi + 1, high)\n\n def partition(arr, low, high):\n pivot = arr[(low + high) // 2]\n i = low - 1\n j = high + 1\n\n while True:\n i += 1\n while arr[i] < pivot:\n i += 1\n j -= 1\n while arr[j] > pivot:\n j -= 1\n \n if i >= j:\n return j\n \n arr[i], arr[j] = arr[j], arr[i]\n\n def insertion_sort(arr):\n for i in range(1, len(arr)):\n key = arr[i]\n j = i - 1\n while j >= 0 and key < arr[j]:\n arr[j + 1] = arr[j]\n j -= 1\n arr[j + 1] = key\n\n if len(arr) <= 10:\n insertion_sort(arr)\n else:\n quicksort(arr, 0, len(arr) - 1)\n\n return arr",
"generation": 0,
"fitnesses": {
"time": 0.07625889778137207,
"error": 0.75
}
},
{
"code": "def advanced_sort(arr):\n if len(arr) < 2:\n return arr\n\n def insertion_sort(sub_arr):\n for i in range(1, len(sub_arr)):\n key = sub_arr[i]\n j = i - 1\n while j >= 0 and sub_arr[j] > key:\n sub_arr[j + 1] = sub_arr[j]\n j -= 1\n sub_arr[j + 1] = key\n return sub_arr\n\n def quicksort(sub_arr):\n if len(sub_arr) <= 10:\n return insertion_sort(sub_arr)\n else:\n pivot = sub_arr[len(sub_arr) // 2]\n left = [x for x in sub_arr if x < pivot]\n middle = [x for x in sub_arr if x == pivot]\n right = [x for x in sub_arr if x > pivot]\n return quicksort(left) + middle + quicksort(right)\n\n return quicksort(arr)",
"generation": 1,
"fitnesses": {
"time": 0.08427751064300537,
"error": 0.0
}
},
{
"code": "def advanced_sort(arr):\n if len(arr) < 2:\n return arr\n\n def insertion_sort(sub_arr):\n for i in range(1, len(sub_arr)):\n key = sub_arr[i]\n j = i - 1\n while j >= 0 and sub_arr[j] > key:\n sub_arr[j + 1] = sub_arr[j]\n j -= 1\n sub_arr[j + 1] = key\n return sub_arr\n\n def quicksort(sub_arr):\n if len(sub_arr) <= 10:\n return insertion_sort(sub_arr)\n else:\n pivot = sub_arr[len(sub_arr) // 2]\n left = [x for x in sub_arr if x < pivot]\n middle = [x for x in sub_arr if x == pivot]\n right = [x for x in sub_arr if x > pivot]\n return quicksort(left) + middle + quicksort(right)\n\n return quicksort(arr)",
"generation": 0,
"fitnesses": {
"time": 0.17589008808135986,
"error": 0.0
}
},
{
"code": "def advanced_sort(arr):\n if len(arr) < 2:\n return arr\n\n def insertion_sort(sub_arr):\n for i in range(1, len(sub_arr)):\n key = sub_arr[i]\n j = i - 1\n while j >= 0 and sub_arr[j] > key:\n sub_arr[j + 1] = sub_arr[j]\n j -= 1\n sub_arr[j + 1] = key\n return sub_arr\n\n def quicksort(arr, low, high):\n if low < high:\n pi = partition(arr, low, high)\n quicksort(arr, low, pi)\n quicksort(arr, pi + 1, high)\n\n def partition(arr, low, high):\n pivot = arr[(low + high) // 2]\n i = low - 1\n j = high + 1\n\n while True:\n i += 1\n while arr[i] < pivot:\n i += 1\n j -= 1\n while arr[j] > pivot:\n j -= 1\n \n if i >= j:\n return j\n \n arr[i], arr[j] = arr[j], arr[i]\n\n if len(arr) <= 10:\n return insertion_sort(arr)\n else:\n quicksort(arr, 0, len(arr) - 1)\n\n return arr",
"generation": 1,
"fitnesses": {
"time": 0.3470318913459778,
"error": 0.0
}
},
{
"code": "def advanced_sort(arr):\n if len(arr) < 2:\n return arr\n \n def quicksort(arr, low, high):\n if low < high:\n pi = partition(arr, low, high)\n quicksort(arr, low, pi - 1)\n quicksort(arr, pi + 1, high)\n\n def partition(arr, low, high):\n pivot = arr[(low + high) // 2]\n i = low - 1\n j = high + 1\n\n while True:\n i += 1\n while i < high and arr[i] < pivot:\n i += 1\n j -= 1\n while j > low and arr[j] > pivot:\n j -= 1\n \n if i >= j:\n return j\n \n arr[i], arr[j] = arr[j], arr[i]\n\n def insertion_sort(arr):\n for i in range(1, len(arr)):\n key = arr[i]\n j = i - 1\n while j >= 0 and key < arr[j]:\n arr[j + 1] = arr[j]\n j -= 1\n arr[j + 1] = key\n\n if len(arr) <= 10:\n insertion_sort(arr)\n else:\n quicksort(arr, 0, len(arr) - 1)\n\n return arr",
"generation": 1,
"fitnesses": {
"time": 0.24364137649536133,
"error": 0.75
}
}
],
"pareto_front": [
{
"code": "def advanced_sort(arr):\n if len(arr) < 2:\n return arr\n \n def quicksort(arr, low, high):\n if low < high:\n pi = partition(arr, low, high)\n quicksort(arr, low, pi - 1)\n quicksort(arr, pi + 1, high)\n\n def partition(arr, low, high):\n pivot = arr[(low + high) // 2]\n i = low - 1\n j = high + 1\n\n while True:\n i += 1\n while arr[i] < pivot:\n i += 1\n j -= 1\n while arr[j] > pivot:\n j -= 1\n \n if i >= j:\n return j\n \n arr[i], arr[j] = arr[j], arr[i]\n\n def insertion_sort(arr):\n for i in range(1, len(arr)):\n key = arr[i]\n j = i - 1\n while j >= 0 and key < arr[j]:\n arr[j + 1] = arr[j]\n j -= 1\n arr[j + 1] = key\n\n if len(arr) <= 10:\n insertion_sort(arr)\n else:\n quicksort(arr, 0, len(arr) - 1)\n\n return arr",
"generation": 0,
"fitnesses": {
"time": 0.07625889778137207,
"error": 0.75
}
},
{
"code": "def advanced_sort(arr):\n if len(arr) < 2:\n return arr\n\n def insertion_sort(sub_arr):\n for i in range(1, len(sub_arr)):\n key = sub_arr[i]\n j = i - 1\n while j >= 0 and sub_arr[j] > key:\n sub_arr[j + 1] = sub_arr[j]\n j -= 1\n sub_arr[j + 1] = key\n return sub_arr\n\n def quicksort(sub_arr):\n if len(sub_arr) <= 10:\n return insertion_sort(sub_arr)\n else:\n pivot = sub_arr[len(sub_arr) // 2]\n left = [x for x in sub_arr if x < pivot]\n middle = [x for x in sub_arr if x == pivot]\n right = [x for x in sub_arr if x > pivot]\n return quicksort(left) + middle + quicksort(right)\n\n return quicksort(arr)",
"generation": 1,
"fitnesses": {
"time": 0.08427751064300537,
"error": 0.0
}
}
],
"objective_names": [
"time",
"error"
]
}

View File

@ -1,23 +1,24 @@
{ {
"description": "实现一个高效的排序算法,要求时间复杂度尽可能低。算法需要处理各种情况:随机数组、接近有序的数组、逆序数组、包含大量重复元素的数组等。同时由于你拥有的评估时间仅有2秒请确保你的算法在2秒内能够完成排序。而且你只能使用python语言。注意我们用于评估你的测试用例的数组大小为100000请确保你的算法在100000个元素的数组上也能在2秒内完成排序。", "description": "实现一个高效的排序算法,需要同时优化执行时间和正确性。算法需要处理各种情况:随机数组、接近有序的数组、逆序数组、包含大量重复元素的数组等。",
"function_name": "advanced_sort", "function_name": "advanced_sort",
"input_format": "一个整数列表arr包含需要排序的数字", "input_format": "一个整数列表arr包含需要排序的数字",
"output_format": "返回排序后的列表,按照从小到大的顺序排列", "output_format": "返回排序后的列表,按照从小到大的顺序排列",
"evaluation_timeout": 2, "evaluation_timeout": 2,
"multi_objective": true,
"objective_names": ["time", "error"],
"llm_config": { "llm_config": {
"api_key": "sk-5wHuJRuC2KDMINeYERru0zCCMrRTzmkrOMwpvBQr4EULV0Tv", "api_key": "sk-5wHuJRuC2KDMINeYERru0zCCMrRTzmkrOMwpvBQr4EULV0Tv",
"base_url": "https://xiaohumini.site/v1/chat/completions", "base_url": "https://xiaohumini.site/v1/chat/completions",
"model": "gemini-1.5-flash-latest", "model": "gpt-4o-mini",
"temperature": 0.5, "temperature": 0.5,
"max_tokens": 4096 "max_tokens": 4096
}, },
"evolution_params": { "evolution_params": {
"algorithm": "TU", "algorithm": "MO",
"population_size": 5, "population_size": 5,
"generations": 2, "generations": 2,
"mutation_rate": 0.3, "mutation_rate": 0.3,
"crossover_rate": 0.5, "crossover_rate": 0.7,
"F": 0.2, "tournament_size": 3
"CR": 0.5
} }
} }

View File

@ -1,7 +1,9 @@
import os import os
import sys import sys
import argparse import argparse
import json
from lead.core.evolution import EvolutionEngine from lead.core.evolution import EvolutionEngine
from lead.core.multi_objective.evolution import MultiObjectiveEvolutionEngine
from lead.config.settings import PROBLEMS_DIR from lead.config.settings import PROBLEMS_DIR
def parse_args(): def parse_args():
@ -14,6 +16,19 @@ def parse_args():
help="种群大小") help="种群大小")
return parser.parse_args() return parser.parse_args()
def load_engine_config(problem_path):
"""加载问题配置并决定使用哪种引擎"""
config_path = os.path.join(problem_path, "problem_config.json")
with open(config_path, "r", encoding="utf-8") as f:
config = json.load(f)
if config.get("multi_objective", False):
print("使用多目标进化引擎")
return MultiObjectiveEvolutionEngine(problem_path)
else:
print("使用单目标进化引擎")
return EvolutionEngine(problem_path)
def main(): def main():
args = parse_args() args = parse_args()
problem_path = os.path.join(PROBLEMS_DIR, args.problem) problem_path = os.path.join(PROBLEMS_DIR, args.problem)
@ -27,7 +42,7 @@ def main():
print(f"开始进化求解问题: {args.problem}") print(f"开始进化求解问题: {args.problem}")
print(f"配置: 代数={args.generations or '默认'},种群大小={args.population or '默认'}") print(f"配置: 代数={args.generations or '默认'},种群大小={args.population or '默认'}")
engine = EvolutionEngine(problem_path) engine = load_engine_config(problem_path)
engine.run_evolution( engine.run_evolution(
generations=args.generations, generations=args.generations,
population_size=args.population population_size=args.population