import random
import time
from collections import defaultdict
class TraditionalECMP:
"""传统 ECMP:基于五元组哈希的静态负载均衡"""
def __init__(self, path_count=4):
self.paths = [f"Path{i}" for i in range(path_count)]
self.path_load = defaultdict(int)
def hash_function(self, flow_info):
"""简单的五元组哈希"""
hash_str = f"{flow_info['src_ip']}:{flow_info['dst_ip']}:{flow_info['src_port']}:{flow_info['dst_port']}:{flow_info['protocol']}"
return hash(hash_str) % len(self.paths)
def route_flow(self, flow_info, flow_size):
"""路由流量"""
path_idx = self.hash_function(flow_info)
path = self.paths[path_idx]
self.path_load[path] += flow_size
return path
def print_load_distribution(self):
"""打印负载分布"""
print("传统 ECMP 负载分布:")
total_load = sum(self.path_load.values())
for path in self.paths:
load = self.path_load[path]
percentage = (load / total_load * 100) if total_load > 0 else 0
print(f" {path}: {load} units ({percentage:.1f}%)")
class AIDrivenDynamicRouter:
"""AI 驱动的动态路由器:基于实时负载的动态调度"""
def __init__(self, path_count=4):
self.paths = [f"Path{i}" for i in range(path_count)]
self.path_load = defaultdict(int)
self.path_capacity = {path: 1000 for path in self.paths}
self.path_latency = {path: 10 for path in self.paths}
self.congestion_threshold = 800
def get_path_metrics(self):
"""获取路径实时指标"""
metrics = {}
for path in self.paths:
load_ratio = self.path_load[path] / self.path_capacity[path]
estimated_latency = self.path_latency[path] * (1 + load_ratio * 2)
metrics[path] = {
'load': self.path_load[path],
'load_ratio': load_ratio,
'latency': estimated_latency,
'is_congested': load_ratio > 0.8
}
return metrics
def ai_route_decision(self, flow_info, flow_size, metrics):
"""AI 路由决策算法"""
available_paths = []
for path, metric in metrics.items():
if not metric['is_congested']:
available_paths.append((path, metric['latency']))
if available_paths:
available_paths.sort(key=lambda x: x[1])
return available_paths[0][0]
else:
min_load_path = min(metrics.items(), key=lambda x: x[1]['load_ratio'])[0]
return min_load_path
def route_flow(self, flow_info, flow_size):
"""路由流量"""
metrics = self.get_path_metrics()
path = self.ai_route_decision(flow_info, flow_size, metrics)
self.path_load[path] += flow_size
return path
def print_load_distribution(self):
"""打印负载分布"""
print("AI 驱动动态路由负载分布:")
total_load = sum(self.path_load.values())
metrics = self.get_path_metrics()
for path in self.paths:
load = self.path_load[path]
percentage = (load / total_load * 100) if total_load > 0 else 0
latency = metrics[path]['latency']
congested = "拥塞" if metrics[path]['is_congested'] else "正常"
print(f" {path}: {load} units ({percentage:.1f}%), 延迟:{latency:.1f}ms, 状态:{congested}")
def simulate_traffic(router, num_flows=1000):
"""模拟流量生成"""
print(f"\n模拟{num_flows}个流量...")
large_flows = []
for i in range(10):
flow_info = {
'src_ip': f'10.0.0.{random.randint(1, 50)}',
'dst_ip': f'10.0.1.{random.randint(1, 50)}',
'src_port': random.randint(10000, 60000),
'dst_port': 80,
'protocol': 'TCP'
}
flow_size = random.randint(100, 500)
large_flows.append((flow_info, flow_size))
all_flows = large_flows.copy()
for i in range(num_flows - 10):
flow_info = {
'src_ip': f'10.0.0.{random.randint(1, 100)}',
'dst_ip': f'10.0.1.{random.randint(1, 100)}',
'src_port': random.randint(10000, 60000),
'dst_port': random.choice([80, 443, 8080]),
'protocol': random.choice(['TCP', 'UDP'])
}
flow_size = random.randint(1, 50)
all_flows.append((flow_info, flow_size))
random.shuffle(all_flows)
for flow_info, flow_size in all_flows:
router.route_flow(flow_info, flow_size)
return router
print("="*60)
print("ECMP 演进对比测试")
print("="*60)
traditional_ecmp = TraditionalECMP(path_count=4)
traditional_ecmp = simulate_traffic(traditional_ecmp, 1000)
traditional_ecmp.print_load_distribution()
print("\n" + "-"*60)
ai_router = AIDrivenDynamicRouter(path_count=4)
ai_router = simulate_traffic(ai_router, 1000)
ai_router.print_load_distribution()
def analyze_performance(traditional, ai):
"""分析性能差异"""
traditional_metrics = traditional.path_load
ai_metrics = ai.get_path_metrics()
traditional_loads = list(traditional_metrics.values())
ai_loads = [ai_metrics[path]['load'] for path in ai.paths]
def calculate_std(values):
mean = sum(values) / len(values)
variance = sum((x - mean)**2 for x in values) / len(values)
return variance ** 0.5
traditional_std = calculate_std(traditional_loads)
ai_std = calculate_std(ai_loads)
print("\n" + "="*60)
print("性能分析结果:")
print("="*60)
print(f"传统 ECMP 负载标准差:{traditional_std:.2f}")
print(f"AI 动态路由负载标准差:{ai_std:.2f}")
print(f"负载均衡改善:{(traditional_std - ai_std)/traditional_std*100:.1f}%")
congested_paths = sum(1 for path in ai.paths if ai_metrics[path]['is_congested'])
print(f"AI 路由拥塞路径数:{congested_paths}/{len(ai.paths)}")
analyze_performance(traditional_ecmp, ai_router)
import threading
import time
import json
from dataclasses import dataclass
from typing import Dict, List, Optional
from enum import Enum
import heapq
class BGPState(Enum):
"""BGP 状态枚举"""
IDLE = 1
CONNECT = 2
ACTIVE = 3
OPENSENT = 4
OPENCONFIRM = 5
ESTABLISHED = 6
class RouteSelectionCriteria(Enum):
"""路由选择标准"""
LOCAL_PREF = 1
AS_PATH_LENGTH = 2
ORIGIN = 3
MED = 4
AI_SCORE = 5
@dataclass
class BGPRoute:
"""BGP 路由信息"""
prefix: str
next_hop: str
as_path: List[int]
local_pref: int = 100
med: int = 0
origin: str = "IGP"
communities: List[str] = None
ai_score: float = 0.0
latency: float = 0.0
reliability: float = 1.0
def __post_init__(self):
if self.communities is None:
self.communities = []
def get_as_path_length(self):
"""获取 AS 路径长度"""
return len(self.as_path)
def calculate_ai_score(self, network_state):
"""计算 AI 综合评分"""
latency_score = max(0, 1 - self.latency / 100)
reliability_score = self.reliability
path_length_score = max(0, 1 - self.get_as_path_length() / 20)
congestion = network_state.get('congestion', 0)
congestion_score = max(0, 1 - congestion)
weights = {
'latency': 0.35,
'reliability': 0.25,
'path_length': 0.20,
'congestion': 0.20
}
self.ai_score = (
latency_score * weights['latency'] +
reliability_score * weights['reliability'] +
path_length_score * weights['path_length'] +
congestion_score * weights['congestion']
)
return self.ai_score
class BGPNeighbor:
"""BGP 邻居类"""
def __init__(self, ip_address: str, as_number: int, local_router):
self.ip_address = ip_address
self.as_number = as_number
self.local_router = local_router
self.state = BGPState.IDLE
self.last_keepalive_received = time.time()
def establish_session(self):
"""建立会话"""
self.state = BGPState.ESTABLISHED
return True
def check_state(self):
"""检查状态"""
pass
def receive_route(self, route: BGPRoute):
"""接收路由"""
self.local_router.receive_route_advertisement(route, self.ip_address)
class AIBGPRouter:
"""AI 增强的 BGP 路由器"""
def __init__(self, router_id: str, as_number: int):
self.router_id = router_id
self.as_number = as_number
self.state = BGPState.IDLE
self.neighbors: Dict[str, 'BGPNeighbor'] = {}
self.routing_table: Dict[str, BGPRoute] = {}
self.best_routes: Dict[str, BGPRoute] = {}
self.network_state = {
'congestion': 0.0,
'latency_trend': 'stable',
'reliability_history': []
}
self.ai_enabled = True
self.learning_rate = 0.1
self.prediction_horizon = 100
self.advertisement_interval = 0
self.hold_time = 30
self._running = True
self._state_thread = threading.Thread(target=self._maintain_state)
self._state_thread.start()
def add_neighbor(self, neighbor_ip: str, neighbor_as: int):
"""添加 BGP 邻居"""
neighbor = BGPNeighbor(
ip_address=neighbor_ip,
as_number=neighbor_as,
local_router=self
)
self.neighbors[neighbor_ip] = neighbor
print(f"[{self.router_id}] 添加邻居 {neighbor_ip} (AS{neighbor_as})")
def establish_session(self, neighbor_ip: str):
"""建立 BGP 会话"""
if neighbor_ip in self.neighbors:
neighbor = self.neighbors[neighbor_ip]
success = neighbor.establish_session()
if success:
print(f"[{self.router_id}] 与 {neighbor_ip} 的 BGP 会话已建立")
self.state = BGPState.ESTABLISHED
else:
print(f"[{self.router_id}] 无法建立与 {neighbor_ip} 的 BGP 会话")
else:
print(f"[{self.router_id}] 未找到邻居 {neighbor_ip}")
def receive_route_advertisement(self, route: BGPRoute, from_neighbor: str):
"""接收路由通告"""
route_key = f"{route.prefix}|{route.next_hop}"
self.routing_table[route_key] = route
if self.ai_enabled:
route.calculate_ai_score(self.network_state)
print(f"[{self.router_id}] 收到来自 {from_neighbor} 的路由:{route.prefix}")
print(f" AS 路径:{route.as_path}, AI 评分:{route.ai_score:.3f}")
self.select_best_routes()
def select_best_routes(self):
"""选择最佳路由(AI 增强版)"""
routes_by_prefix: Dict[str, List[BGPRoute]] = {}
for route in self.routing_table.values():
if route.prefix not in routes_by_prefix:
routes_by_prefix[route.prefix] = []
routes_by_prefix[route.prefix].append(route)
for prefix, routes in routes_by_prefix.items():
if self.ai_enabled:
best_route = self._select_best_route_ai(routes)
else:
best_route = self._select_best_route_traditional(routes)
if best_route:
self.best_routes[prefix] = best_route
print(f"[{self.router_id}] 选择最佳路由 {prefix}: {best_route.next_hop}")
print(f" 选择标准:{'AI 评分' if self.ai_enabled else '传统 BGP'}, 得分:{best_route.ai_score if self.ai_enabled else 'N/A'}")
def _select_best_route_traditional(self, routes: List[BGPRoute]) -> Optional[BGPRoute]:
"""传统 BGP 路由选择算法"""
if not routes:
return None
def route_sort_key(route: BGPRoute):
return (-route.local_pref,
route.get_as_path_length(),
-self._origin_preference(route.origin),
route.med,
return sorted(routes, key=route_sort_key)[0]
def _select_best_route_ai(self, routes: List[BGPRoute]) -> Optional[BGPRoute]:
"""AI 增强的路由选择算法"""
if not routes:
return None
routes_sorted_by_ai = sorted(routes, key=lambda r: -r.ai_score)
if len(routes_sorted_by_ai) > 1:
best_score = routes_sorted_by_ai[0].ai_score
second_best_score = routes_sorted_by_ai[1].ai_score
if best_score > 0 and (best_score - second_best_score) / best_score < 0.05:
return self._select_best_route_traditional(routes_sorted_by_ai[:2])
return routes_sorted_by_ai[0]
def _origin_preference(self, origin: str) -> int:
"""起源类型偏好"""
preferences = {"IGP": 2, "EGP": 1, "INCOMPLETE": 0}
return preferences.get(origin, 0)
def advertise_routes(self):
"""向邻居通告路由"""
for neighbor_ip, neighbor in self.neighbors.items():
if neighbor.state == BGPState.ESTABLISHED:
for prefix, route in self.best_routes.items():
advertised_route = BGPRoute(
prefix=route.prefix,
next_hop=self.router_id,
as_path=[self.as_number] + route.as_path,
local_pref=route.local_pref,
med=route.med,
origin=route.origin,
communities=route.communities.copy(),
ai_score=route.ai_score,
latency=route.latency,
reliability=route.reliability
)
neighbor.receive_route(advertised_route)
def update_network_state(self, telemetry_data: Dict):
"""更新网络状态(基于遥测数据)"""
if 'congestion_level' in telemetry_data:
self.network_state['congestion'] = telemetry_data['congestion_level']
if 'latency_history' in telemetry_data:
history = telemetry_data['latency_history']
if len(history) >= 2:
trend = "increasing" if history[-1] > history[-2] else "decreasing"
self.network_state['latency_trend'] = trend
if 'reliability' in telemetry_data:
self.network_state['reliability_history'].append(telemetry_data['reliability'])
if len(self.network_state['reliability_history']) > 100:
self.network_state['reliability_history'].pop(0)
print(f"[{self.router_id}] 网络状态更新:拥塞={self.network_state['congestion']:.2f}, "
f"延迟趋势={self.network_state['latency_trend']}")
def _maintain_state(self):
"""维护 BGP 状态(后台线程)"""
while self._running:
for neighbor in self.neighbors.values():
neighbor.check_state()
self._send_keepalive()
if self.ai_enabled:
self._update_ai_model()
time.sleep(5)
def _send_keepalive(self):
"""发送 keepalive 消息"""
for neighbor in self.neighbors.values():
if neighbor.state == BGPState.ESTABLISHED:
neighbor.last_keepalive_received = time.time()
def _update_ai_model(self):
"""更新 AI 模型(简化实现)"""
if len(self.network_state['reliability_history']) >= 10:
avg_reliability = sum(self.network_state['reliability_history'][-10:]) / 10
if avg_reliability < 0.9:
pass
def enable_fast_convergence(self):
"""启用快速收敛模式"""
self.advertisement_interval = 0
self.hold_time = 3
print(f"[{self.router_id}] 启用快速收敛模式")
def add_path_diversity(self, prefix: str, max_paths: int = 4):
"""为指定前缀添加路径多样性支持"""
routes_for_prefix = [r for r in self.routing_table.values() if r.prefix == prefix]
if len(routes_for_prefix) > 1:
if self.ai_enabled:
sorted_routes = sorted(routes_for_prefix, key=lambda r: r.ai_score, reverse=True)
selected = sorted_routes[:max_paths]
print(f"[{self.router_id}] 为 {prefix} 添加 {len(selected)} 条路径多样性支持")
return [r.prefix for r in selected]
return []