粒子群算法(PSO)
参考教程:https://cloud.tencent.com/developer/article/1424756
粒子群算法是一种利用群体智能建立的一个简化模型,基于群体中个体对信息的共享使整个群体的运动趋向最优解。粒子群算法使用一群粒子,粒子拥有位置和速度两种属性,根据自身已经找到的最优解和参考整个共享于整个集群中找到的最优解去改变自己的飞行速度和位置。
粒子速度和位置
粒子群算法使用一定数量的粒子,粒子$ i $在$ N $维空间中的位置和速度表示为$ x_i=(x_1,x_2,\cdots,x_N) $ 和 $ v_i=(v_1,v_2,\cdots,v_N) $。每个粒子都有一个目标函数决定的适应值(fitness value),并且知道自己到目前为止发现的最好位置(pbest)和现在的位置$ x_i $。这个可以看作是粒子自己的飞行经验。除此之外,每个粒子还知道到目前为止整个群体中所有粒子发现的最好位置(gbest)(gbest是pbest中的最好值),这个可以看作是粒子同伴的经验。粒子就是通过自己的经验和同伴中最好的经验来决定下一步的运动。
粒子速度和位置更新公式
粒子群算法会给每个粒子一个初始的随机解,并通过迭代寻找最优解,迭代规则如下图所示:
迭代公式中的第一部分和第二部分分别反映了粒子对自身经验和群体经验的学习。基于上述公式还可以对当前的速度引入更新权重
PSO算法运行流程
1)初始化一群微粒(群体规模为N),包括随机位置和速度;
2)评价每个微粒的适应度;
3)对每个微粒,将其适应值与其经过的最好位置pbest作比较,如果较好,则将其作为当前的最好位置pbest;
4)对每个微粒,将其适应值与其经过的最好位置gbest作比较,如果较好,则将其作为当前的最好位置gbest;
5)根据公式(2)、(3)调整微粒速度和位置;
6)未达到结束条件则转第2)步。
迭代终止条件根据具体问题一般选为最大迭代次数Gk或(和)微粒群迄今为止搜索到的最优位置满足预定最小适应阈值。
PSO算法求解映射问题
映射问题本质上可以看作一个序列的排序问题,使用PSO算法求解映射问题时,粒子位置对应一个序列的排序。这里$ pbest-x_i^{t-1} $对应将$ x_i^{t-1} $转换成pbest所需要的交换序列,这里的迭代公式的含义变为对于原来的解序列$ x_i^{t-1} $,
- 先按照$ c_2 $的概率使用$ pbest-x_i^{t-1} $的交换序列对$ x_i^{t-1} $进行交换获得$ (x_i^{t-1})^{'} $,
- 再按照$ c_3 $的概率使用$ gbest-(x_i^{t-1})^{'} $的交换序列对$ (x_i^{t-1})^{'} $进行交换获得$ x_i^{t} $.
PSO求解映射代码
# -*- coding: utf-8 -*-
# Lishengxie
# 2023-02-19
# Implementation of a PSO algorithm for crossbar mapping/placing
import numpy as np
import copy
import sys
class PsoPlacer:
"""
PSO algorithm
-------------
Class used to find the mapping from computing cores to routers on NoC which minizes the communication cost
"""
def __init__(self, dim_x, dim_y, spike_table, num_particle=50, s1=1.0, s2=0.04, s3=0.02):
"""
Initialization of PsoPlacer.
Args:
dim_x: int, the `x` dimension on NoC
dim_y: int, the `y` dimension on NoC
spike_table: 2D-array, the number of spikes between core `i` and core `j`
num_particle: int, number of particles in the PSO algorithm
"""
self.dim_x = dim_x
self.dim_y = dim_y
self.dim_size = dim_x * dim_y
self.num_particle = num_particle
self.s1 = s1
self.s2 = s2
self.s3 = s3
self.init_particle(spike_table)
def init_particle(self, spike_table):
"""
Init spike table, particles, local and global best solution
Args:
spike_table: 2D-array, the number of spikes between core `i` and core `j`
"""
# init the spike table, adding `0`s if the dimension of `spike_table` not equal to `dim_size`
self.spike_table = np.zeros((self.dim_size, self.dim_size))
h, w = spike_table.shape
for i in range(h):
for j in range(w):
self.spike_table[i,j] = spike_table[i,j]
# init particle vectors with number of `num_particle`, each vector is a mappinng solution
self.solution = np.zeros((self.num_particle, self.dim_size), dtype=np.int32)
for i in range(self.num_particle):
self.solution[i] = np.arange(self.dim_size)
np.random.shuffle(self.solution[i])
# init hop distance table
self.hop_xy()
# init the local/global best solution and their cost
self.local_best_solution = copy.deepcopy(self.solution)
self.local_best_cost = np.zeros((self.num_particle,))
self.global_best_solution = np.arange(self.dim_size)
self.global_best_cost = sys.maxsize
for i in range(self.num_particle):
self.local_best_cost[i] = self.fitnessFuntcion(self.local_best_solution[i])
if self.local_best_cost[i] < self.global_best_cost:
self.global_best_solution = self.local_best_solution[i]
self.global_best_cost = self.local_best_cost[i]
print("Init, gloabal best cost:{}".format(self.global_best_cost))
def hop_xy(self):
"""
The hop distance between router `x` and `y` in the 2D-Mesh NoC using XY routing
"""
self.hop_dist = np.zeros((self.dim_size, self.dim_size))
for i in range(self.dim_size):
for j in range(self.dim_size):
self.hop_dist[i,j] = abs(i % self.dim_x - j % self.dim_x) \
+ abs(i // self.dim_x - j // self.dim_x)
def fitnessFuntcion(self, solution):
"""
Compute the communication cost for each mapping solution
Args:
solution: List, a possible mapping solution
"""
cost = 0
for i in range(self.dim_size):
for j in range(self.dim_size):
cost += self.spike_table[i,j] * self.hop_dist[solution[i], solution[j]]
return cost
def compute_swap_sequence(self, src_seq, dst_seq, seq_len):
# TODO: Optimize this algorithm.
"""
Find the swap sequence from source sequecne to destination sequence.
Args:
src_seq: List, the source sequence
dst_seq: List, the destination sequence
seq_len: int, the length of source and destination sequence
Return:
swap_sequence: the index of each point for `src_seq` in `dst_seq`
"""
swap_seq = np.zeros((seq_len,))
for i in range(seq_len):
swap_seq[i] = np.where(dst_seq == src_seq[i])[0]
return swap_seq
def applySwap(self, swap_seq, src_seq, seq_len):
"""
Apply swap sequence to the source sequence
Args:
src_seq: List, the source sequence
swap_sequence: the index of each point for `src_seq` in `dst_seq`
seq_len: int, the length of source and destination sequence
Return:
dst_seq: List, the destination sequence
"""
dst_seq = np.zeros((seq_len,))
for i in range(seq_len):
dst_seq[swap_seq[i]] = src_seq[i]
return dst_seq
def place(self, num_iter=10):
"""
Funciton to execute pso algorithm.
Args:
num_iter: int, max number of iterations to execute pso update
Return:
global best solution after `num_iter` iterations
"""
for iter in range(num_iter):
for i in range(self.num_particle):
tmp_solution = copy.deepcopy(self.solution[i])
if np.random.random() <= self.s2:
# print("local")
swap_seq_local = self.compute_swap_sequence(tmp_solution, self.local_best_solution[i], self.dim_size)
self.solution[i] = self.applySwap(self.solution[i], swap_seq_local, self.dim_size)
if np.random.random() <= self.s3:
# print("global")
swap_seq_global = self.compute_swap_sequence(tmp_solution, self.global_best_solution, self.dim_size)
self.solution[i] = self.applySwap(self.solution[i], swap_seq_global, self.dim_size)
if np.random.random() <= 0.0001:
np.random.shuffle(self.solution[i])
cost = self.fitnessFuntcion(self.solution[i])
if cost < self.local_best_cost[i]:
self.local_best_cost[i] = cost
self.local_best_solution[i] = copy.deepcopy(self.solution[i])
if self.local_best_cost[i] < self.global_best_cost:
self.global_best_cost = self.local_best_cost[i]
self.global_best_solution = copy.deepcopy(self.local_best_solution[i])
print("Iteration:{}, global best cost:{}".format(iter, self.global_best_cost))
return self.global_best_solution
评论 (0)