标签搜索

启发式优化算法学习-PSO

lishengxie
2023-02-25 / 0 评论 / 18 阅读 / 正在检测是否收录...

粒子群算法(PSO)

参考教程:https://cloud.tencent.com/developer/article/1424756

粒子群算法是一种利用群体智能建立的一个简化模型,基于群体中个体对信息的共享使整个群体的运动趋向最优解。粒子群算法使用一群粒子,粒子拥有位置和速度两种属性,根据自身已经找到的最优解和参考整个共享于整个集群中找到的最优解去改变自己的飞行速度和位置。

粒子速度和位置

粒子群算法使用一定数量的粒子,粒子$ i $在$ N $维空间中的位置和速度表示为$ x_i=(x_1,x_2,\cdots,x_N) $ 和 $ v_i=(v_1,v_2,\cdots,v_N) $。每个粒子都有一个目标函数决定的适应值(fitness value),并且知道自己到目前为止发现的最好位置(pbest)和现在的位置$ x_i $。这个可以看作是粒子自己的飞行经验。除此之外,每个粒子还知道到目前为止整个群体中所有粒子发现的最好位置(gbest)(gbest是pbest中的最好值),这个可以看作是粒子同伴的经验。粒子就是通过自己的经验和同伴中最好的经验来决定下一步的运动。

粒子速度和位置更新公式

粒子群算法会给每个粒子一个初始的随机解,并通过迭代寻找最优解,迭代规则如下图所示:
PSO更新规则
迭代公式中的第一部分和第二部分分别反映了粒子对自身经验和群体经验的学习。基于上述公式还可以对当前的速度引入更新权重
PSO权重迭代

PSO算法运行流程

1)初始化一群微粒(群体规模为N),包括随机位置和速度;
2)评价每个微粒的适应度;
3)对每个微粒,将其适应值与其经过的最好位置pbest作比较,如果较好,则将其作为当前的最好位置pbest;
4)对每个微粒,将其适应值与其经过的最好位置gbest作比较,如果较好,则将其作为当前的最好位置gbest;
5)根据公式(2)、(3)调整微粒速度和位置;
6)未达到结束条件则转第2)步。
迭代终止条件根据具体问题一般选为最大迭代次数Gk或(和)微粒群迄今为止搜索到的最优位置满足预定最小适应阈值。

PSO算法求解映射问题

映射问题本质上可以看作一个序列的排序问题,使用PSO算法求解映射问题时,粒子位置对应一个序列的排序。这里$ pbest-x_i^{t-1} $对应将$ x_i^{t-1} $转换成pbest所需要的交换序列,这里的迭代公式的含义变为对于原来的解序列$ x_i^{t-1} $,

  • 先按照$ c_2 $的概率使用$ pbest-x_i^{t-1} $的交换序列对$ x_i^{t-1} $进行交换获得$ (x_i^{t-1})^{'} $,
  • 再按照$ c_3 $的概率使用$ gbest-(x_i^{t-1})^{'} $的交换序列对$ (x_i^{t-1})^{'} $进行交换获得$ x_i^{t} $.

PSO求解映射代码

# -*- coding: utf-8 -*-
# Lishengxie
# 2023-02-19
# Implementation of a PSO algorithm for crossbar mapping/placing

import numpy as np
import copy
import sys

class PsoPlacer:
    """
        PSO algorithm
        -------------
        Class used to find the mapping from computing cores to routers on NoC which minizes the communication cost
    """
    def __init__(self, dim_x, dim_y, spike_table, num_particle=50, s1=1.0, s2=0.04, s3=0.02):
        """
            Initialization of PsoPlacer.
            Args:
                dim_x: int, the `x` dimension on NoC 
                dim_y: int, the `y` dimension on NoC 
                spike_table: 2D-array, the number of spikes between core `i` and core `j`
                num_particle: int, number of particles in the PSO algorithm
        """
        self.dim_x = dim_x
        self.dim_y = dim_y
        self.dim_size = dim_x * dim_y
        self.num_particle = num_particle
        self.s1 = s1
        self.s2 = s2
        self.s3 = s3
        self.init_particle(spike_table)

    def init_particle(self, spike_table):
        """
            Init spike table, particles, local and global best solution
            Args:
                spike_table: 2D-array, the number of spikes between core `i` and core `j`
        """
        # init the spike table, adding `0`s if the dimension of `spike_table` not equal to `dim_size`
        self.spike_table = np.zeros((self.dim_size, self.dim_size))
        h, w = spike_table.shape
        for i in range(h):
            for j in range(w):
                self.spike_table[i,j] = spike_table[i,j]

        # init particle vectors with number of `num_particle`, each vector is a mappinng solution
        self.solution = np.zeros((self.num_particle, self.dim_size), dtype=np.int32)
        for i in range(self.num_particle):
            self.solution[i] = np.arange(self.dim_size)
            np.random.shuffle(self.solution[i])

        # init hop distance table
        self.hop_xy()

        # init the local/global best solution and their cost
        self.local_best_solution = copy.deepcopy(self.solution)
        self.local_best_cost = np.zeros((self.num_particle,))
        self.global_best_solution = np.arange(self.dim_size)
        self.global_best_cost = sys.maxsize
        for i in range(self.num_particle):
            self.local_best_cost[i] = self.fitnessFuntcion(self.local_best_solution[i])
            if self.local_best_cost[i] < self.global_best_cost:
                self.global_best_solution = self.local_best_solution[i]
                self.global_best_cost = self.local_best_cost[i]
        print("Init, gloabal best cost:{}".format(self.global_best_cost))


    def hop_xy(self):
        """
            The hop distance between router `x` and `y` in the 2D-Mesh NoC using XY routing
        """
        self.hop_dist = np.zeros((self.dim_size, self.dim_size))
        for i in range(self.dim_size):
            for j in range(self.dim_size):
                self.hop_dist[i,j] = abs(i % self.dim_x - j % self.dim_x) \
                                + abs(i // self.dim_x - j // self.dim_x)

    def fitnessFuntcion(self, solution):
        """
            Compute the communication cost for each mapping solution
            Args:
                solution: List, a possible mapping solution
        """
        cost = 0
        for i in range(self.dim_size):
            for j in range(self.dim_size):
                cost +=  self.spike_table[i,j] * self.hop_dist[solution[i], solution[j]]
        return cost

    def compute_swap_sequence(self, src_seq, dst_seq, seq_len):
        # TODO: Optimize this algorithm.
        """
            Find the swap sequence from source sequecne to destination sequence.
            Args:
                src_seq: List, the source sequence
                dst_seq: List, the destination sequence
                seq_len: int, the length of source and destination sequence
            Return:
                swap_sequence: the index of each point for `src_seq` in `dst_seq`
        """
        swap_seq = np.zeros((seq_len,))
        for i in range(seq_len):
            swap_seq[i] = np.where(dst_seq == src_seq[i])[0]
        return swap_seq

    def applySwap(self, swap_seq, src_seq, seq_len):
        """
            Apply swap sequence to the source sequence
            Args:
                src_seq: List, the source sequence
                swap_sequence: the index of each point for `src_seq` in `dst_seq`
                seq_len: int, the length of source and destination sequence
            Return:
                dst_seq: List, the destination sequence
        """
        dst_seq = np.zeros((seq_len,))
        for i in range(seq_len):
            dst_seq[swap_seq[i]] = src_seq[i]
        return dst_seq

    def place(self, num_iter=10):
        """
            Funciton to execute pso algorithm.
            Args:
                num_iter: int, max number of iterations to execute pso update
            Return:
                global best solution after `num_iter` iterations
        """
        for iter in range(num_iter):
            for i in range(self.num_particle):
                tmp_solution = copy.deepcopy(self.solution[i])
                if np.random.random() <= self.s2:
                    # print("local")
                    swap_seq_local = self.compute_swap_sequence(tmp_solution, self.local_best_solution[i], self.dim_size)
                    self.solution[i] = self.applySwap(self.solution[i], swap_seq_local, self.dim_size)
                if np.random.random() <= self.s3:
                    # print("global")
                    swap_seq_global = self.compute_swap_sequence(tmp_solution, self.global_best_solution, self.dim_size)
                    self.solution[i] = self.applySwap(self.solution[i], swap_seq_global, self.dim_size)

                if np.random.random() <= 0.0001:
                    np.random.shuffle(self.solution[i])

                cost = self.fitnessFuntcion(self.solution[i])
                if cost < self.local_best_cost[i]:
                    self.local_best_cost[i] = cost
                    self.local_best_solution[i] = copy.deepcopy(self.solution[i])

                if self.local_best_cost[i] < self.global_best_cost:
                    self.global_best_cost = self.local_best_cost[i]
                    self.global_best_solution = copy.deepcopy(self.local_best_solution[i])
            print("Iteration:{}, global best cost:{}".format(iter, self.global_best_cost))
        return self.global_best_solution
0

评论 (0)

取消