Reinforcement Learning

Reinforcement Learning#

Reinforcement learning is a subfield of machine learning that focuses on training agents to make sequential decisions in an environment to maximize a reward signal.

One of the challenges in reinforcement learning is defining the target value for optimization. Unlike classification tasks where we can use the correct label as a target, in reinforcement learning, a correct label is not readily available.

In this notebook, we will be using the gymnasium library, which provides a collection of environments for developing and testing reinforcement learning algorithms. Specifically, we will be working with the CartPole environment, where the goal is to balance a pole on a cart by applying appropriate actions.

import gymnasium as gym
import numpy as np
import matplotlib.pyplot as plt
import imageio

First of all, we are going to load the environment and investigate it.

env = gym.make('CartPole-v1', render_mode='rgb_array')
num_states = env.observation_space.shape[0]
num_actions = env.action_space.n
# The four states are: cart position, cart velocity, pole angle, pole angular velocity
# The two actions are: push cart to the left, push cart to the right
print('observation space:', num_states)
print('action space:', num_actions)
print('observation limit top', env.observation_space.high)
print('observation limit low', env.observation_space.low)
observation space: 4
action space: 2
observation limit top [4.8000002e+00 3.4028235e+38 4.1887903e-01 3.4028235e+38]
observation limit low [-4.8000002e+00 -3.4028235e+38 -4.1887903e-01 -3.4028235e+38]

Policy Learning#

The approach above is very simple and obviously does not scale to more complex problems. In a sense, we created a very limited neural network but instead of using gradient descent, we used a random optimization. Nevertheless, we can solve the problem smarter.

Policy Gradient methods are a type of Reinforcement Learning algorithms that directly optimize the policy (the strategy that the agent uses to determine the next action based on the current state). The key idea behind these methods is to push up the probabilities of actions that lead to higher rewards and push down the probabilities of actions that lead to lower rewards. Over time, this should lead to a policy that selects good actions more frequently.

First of all, we are going to build up our MLP again.

class FCLayer:
    def __init__(self, input_size, output_size, activation=None):
        self.relu = activation == 'relu'
        self.sigmoid = activation == 'sigmoid'
        self.tanh = activation == 'tanh'
        self.weights = np.random.randn(
            input_size, output_size) / np.sqrt(input_size)  # Xavier initialization
        self.bias = np.zeros(output_size)
        self.weight_update = np.zeros_like(
            self.weights)  # Gradients of the weight
        self.bias_update = np.zeros_like(self.bias)  # Gradients of the bias
        self.grad_counter = 0  # Number of gradients accumulated, used to average the gradients

    def forward(self, input):
        self.input = input.copy()
        self.y = np.dot(self.input, self.weights) + self.bias
        if self.relu:
            self.y[self.y < 0] = 0
        if self.sigmoid:
            self.y = 1.0 / (1.0 + np.exp(-self.y))
        if self.tanh:
            self.y = np.tanh(self.y)
        return self.y

    def backward(self, grad):
        if self.relu:
            grad[self.y <= 0] = 0
        if self.sigmoid:
            grad = grad * self.y * (1 - self.y)
        if self.tanh:
            grad = grad * (1 - self.y ** 2)
        self.weight_update += np.outer(self.input, grad)
        self.bias_update += grad
        self.grad_counter += 1
        return np.dot(grad, self.weights.T)

    def update_weights(self, learning_rate):
        self.weights -= learning_rate * self.weight_update / self.grad_counter
        self.bias -= learning_rate * self.bias_update / self.grad_counter
        self.grad_counter = 0  # Reset the gradient counter
        self.weight_update = np.zeros_like(self.weights)
        self.bias_update = np.zeros_like(self.bias)


class Network:
    def __init__(self, topology, learning_rate):
        self.learning_rate = learning_rate
        self.topology = topology

    def update_weights(self):
        for layer in self.topology:
            layer.update_weights(self.learning_rate)

    def forward(self, x):
        for layer in self.topology:
            x = layer.forward(x)
        return x

    def backward(self, x, y):
        for layer in self.topology:
            x = layer.forward(x)
        for layer in reversed(self.topology):
            y = layer.backward(y)

In this instance, the network architecture includes a genuine hidden layer. Previously, we only had an input layer and an output layer.

The final layer of our network has a single output that utilizes a sigmoid activation function. The sigmoid function is an excellent choice because it primarily outputs values close to 0 or 1.

topology = [
    FCLayer(4, 10, activation='relu'),
    FCLayer(10, 10),
    FCLayer(10, 1, activation='sigmoid')]
net = Network(topology=topology, learning_rate=0.5)

In reinforcement learning, an agent is an entity that interacts with an environment to learn and make decisions. The agent receives observations from the environment and takes actions based on those observations. The goal of the agent is to maximize a notion of cumulative reward over time.

To achieve this, the agent follows a policy, which is a mapping from states to actions. The policy guides the agent’s decision-making process by determining which action to take in a given state. The agent learns and improves its policy through a trial-and-error process, where it explores different actions and observes the resulting rewards.

Agents in reinforcement learning can be implemented using various algorithms, such as Q-learning, Deep Q-Networks (DQN), or policy gradients. Here, we will be looking at policy gradient learning.

class Agent():
    def __init__(self, net):
        self.net = net

    def act(self, state):
        # We interpret the output of the network as the probability of taking action 1 (going to the right)
        prob = self.net.forward(state)
        action = 1 if np.random.rand() < prob else 0
        return action, prob

    def _discount_rewards(self, rewards, gamma):
        discounted = np.zeros_like(rewards)
        reward = 0
        # We start from the end of the rewards list
        for t in reversed(range(len(rewards))):
            # We multiply the reward by gamma and add the reward at time t
            # This way, the reward at time t will have a weight of gamma^0, the reward at time t-1 will have a weight of gamma^1, and so on
            # Since gamma is less than 1, the rewards that are further in the past will have less weight
            reward = reward * gamma + rewards[t]
            discounted[t] = reward
        # By normalizing the rewards, we can make the training more stable
        discounted -= np.mean(discounted)
        discounted /= np.std(discounted)
        return discounted

    def train(self, episodes, gamma=0.99):
        total_rewards = []

        for _ in range(episodes):
            error_gradients = []
            episode_states = []
            episode_rewards = []
            done = False

            state, _ = env.reset()
            while not done:
                action, prob = self.act(state)
                # We want to encourage the actions that were taken
                # If the action was 1 (going to the right), we want the probability to be higher
                # If the action was 0 (going to the left), we want the probability to be lower
                # Action 1, Probability 1 -> Error: 0
                # Action 0, Probability 0 -> Error: 0
                # Action 1, Probability 0 -> Error: -1
                # Action 0, Probability 1 -> Error: 1
                error_gradients.append(prob-action)
                episode_states.append(state)
                state, reward, done, _, _ = env.step(action)
                episode_rewards.append(reward)

            # After every episode, we update the weights
            total_rewards.append(sum(episode_rewards))
            # We multiply the error gradients with the discounted rewards
            # This way, we encourage the actions that lead to a higher reward
            error_gradients = np.vstack(
                error_gradients) * self._discount_rewards(np.vstack(episode_rewards), gamma)

            for state, gradient in zip(episode_states, error_gradients):
                self.net.backward(state, gradient)
            # We update the weights in a batch
            # This helps to stabilize the training, since the weights are updated less frequently
            # and a single example does not have a big impact on the weights
            self.net.update_weights()

        return total_rewards
agent = Agent(net)
rewards = agent.train(1000)
plot_rewards(rewards)
_images/f61a58780e7f5390c8f80b6cad18fec5fb776652b470fda2daf37145d712434e.png
frames = []
for _ in range(5):
    state, _ = env.reset()
    done = False
    while not done:
        frame = env.render()
        frames.append(frame)
        action, _ = agent.act(state)
        state, _, done, _, _ = env.step(action)
    imageio.mimsave('images/cartpole_policy_learning.gif', frames, fps=30)

Cartpole

While there are more sophisticated algorithms for tackling reinforcement learning challenges, such as Deep Q-Networks, they share a common principle with our approach - the notion of future rewards. Deep Q-Networks, for instance, employ a separate neural network to learn the policy. This network is trained to predict the maximum future reward for each possible action in a given state, and the action with the highest predicted reward is selected.

This concept of future rewards is fundamental to reinforcement learning. It’s based on the idea that actions taken now can have long-term consequences, and the goal is to choose actions that maximize the total reward over time, not just the immediate reward. This is what allows reinforcement learning agents to learn complex strategies that involve delayed gratification and planning ahead.