基本思想:
MC_basic.py
注意:由于使用贪心策略,因此不支持stochastic策略。
from collections import defaultdict import numpy as np from env import GridWorldEnv from utils import drow_policy class MonteCarloPolicyIteration(object): def __init__(self, env: GridWorldEnv, gamma=0.9, samples=1): self.env = env self.action_space_size = self.env.num_actions # 上下左右原地 self.reward_space_size = self.env.reward_space_size # 执行每个动作的reward self.state_space_size = self.env.num_states self.reward_list = self.env.reward_list self.gamma = gamma self.samples=samples self.policy = np.zeros((self.state_space_size, self.action_space_size)) self.policy[:,0]=1.0 # 初始动作都是向上 self.action_value = np.zeros((self.state_space_size, self.action_space_size, self.samples)) # action value q(s,a) self.state_value= np.zeros((self.env.size,self.env.size)) def solve(self, iterations=20): for i in range(iterations): for s in range(self.state_space_size): for a in range(self.action_space_size): for k in range(self.samples): # 从(s,a)出发,遵循策略policy采样若干个episodes,并获得return的平均值 episodes = self.generate_episodes(s, a) returns = self.compute_returns(episodes) self.action_value[s,a,k]=returns qvalues = self.action_value.mean(axis=2) # k次采样的平均值 shape: [state_size, 5] best_a = np.argmax(qvalues, axis=1) # 每个state使得平均return最大的action index shape: [state_size] for s in range(self.state_space_size): if s in self.env.terminal: self.policy[s] = np.eye(self.action_space_size)[4] continue else: a_star = best_a[s] # 当前状态 s 的最优动作 index self.policy[s] = np.eye(self.action_space_size)[a_star] self.state_value = np.sum(self.policy * qvalues, axis=1).reshape(self.env.size,self.env.size) def generate_episodes(self, start_state, start_action, max_steps=200): ''' :param start_state: 当前状态的state_id :param start_action: 当前动作 :return: [(state_id, action,reward),(...)] ''' episode = [] state = start_state action = start_action for _ in range(max_steps): next_state, reward, done = self.env.step(state, action) episode.append((state, action, reward)) if done: break state = next_state action = np.random.choice(self.action_space_size, p=self.policy[state]) # 从[0,action_space_size)随机选一个,每个action的概率为policy[state] return episode def compute_returns(self, episodes): ''' :param episodes: 一条轨迹 [(state_id, action,reward),(...)] :return: 一条轨迹的累计return ''' G = 0 for (_, _, r) in reversed(episodes): G = r + self.gamma * G # 最后一步->第一步 return G if __name__ == '__main__': # transition_prob = { # "forward": 0.8, # "left": 0.05, # "right": 0.15 # } env = GridWorldEnv( size=5, forbidden=[(1, 2),(2,4)], terminal=[(4, 4)], r_boundary=-10, r_other=0, r_terminal=1, r_forbidden=-1 ) vi = MonteCarloPolicyIteration(env=env, gamma=0.9) vi.solve(iterations=20) print("\n state value: ") print(vi.state_value) print("\n 策略 π(s):") print(vi.policy) drow_policy(vi.policy, env)env.py修改地方:
1. self.terminal和self.forbidden存储termina和forbiddenl对应的state_id,而不是传入的list。修改后要修改其他是用到self.terminal和self.forbidden的地方。
self.terminal = {self.state_id(x, y) for (x, y) in terminal} self.forbidden = {self.state_id(x, y) for (x, y) in forbidden} def build_reward_list(self): reward = set() reward.add(self.r_forbidden) reward.add(self.r_other) reward.add(self.r_terminal) reward.add(self.r_boundary) self.reward_list = sorted(list(reward)) self.reward_space_size = len(self.reward_list)2. 分别为边界、其他、forbidden、terminal设置reward,而不是使用传入的list列表。
def __init__(self, size: int, terminal, forbidden, r_forbidden=-1, r_other=0, r_terminal=1, r_boundary=-1, transition_prob=None): ... self.r_forbidden = r_forbidden self.r_other = r_other self.r_terminal = r_terminal self.r_boundary = r_boundary ...3. 新增step方法。
def step(self, state, action): ''' :param action: 当前所在的state_id :param action: 当前采取的动作 :return: state,reward,done 到达的下一个状态的state_id,获取的奖励,是否走到了终点 ''' if state in self.terminal: return state, self.r_terminal, True i, j = divmod(state, self.size) moves = { 0: (-1, 0), # 上 1: (0, 1), # 右 2: (1, 0), # 下 3: (0, -1), # 左 4: (0, 0) # 原地 } di, dj = moves[action] ni, nj = i + di, j + dj if not (0 <= ni < self.size and 0 <= nj < self.size): next_state = self.state_id(i,j) else: next_state = self.state_id(ni,nj) reward = self.reward_func(state, next_state, action) done = next_state in self.terminal return next_state, reward, done相同配置下,policy iteration, value iteration与MC PI运行结果: