Project: Guide the four-axis aircraft learning flight¶

Design an agent that will allow the quadcopter to fly, then train it with the reinforcement learning algorithm of your choice!

Try to use what you have learned in this module to see which method works best, but of course you can come up with innovative methods and test them.

Description

Please review the files in the directory to better understand the project structure.

  • task.py: Define your task (environment) in this document.
  • agents/: This folder contains reinforcement learning agents.
    • policy_search.py: We have provided you with an agent template.
    • agent.py: Develop your agent in this document.
  • physics_sim.py: This file contains the quadcopter simulator. Do not modify this document .

In this project, you need to task.pydefine your mission. Although we have provided you with a task example to help you get started, you can change this file at will. In this notebook, you will also learn more about modifying this file.

You also need agent.pya strengthening agent in study design, to complete the task of your choice.

We also encourage you to create additional files to help you organize your code. For example, maybe you can define a model.pydefined neural network architecture you need other files.

Four-axis aircraft control

In the code below, we provide an example of an agent to demonstrate how to use the simulator to control a quadcopter. The agent in notebook than you need to test agent (in agents/policy_search.pymore simple in)!

This agent controls the aircraft by setting the speed on the four axes of the aircraft. Basic_AgentThe agents provided in the class will randomly specify actions for the four axes. These four speed by actreturned as a list of four floating-point method.

In this project, you will agents/agent.pyappreciate the intelligence to realize the specified actions in a more intelligent way.

Plot

In [1]:
import matplotlib.pyplot as plt
%matplotlib inline

def plot_run(results, standalone=True):
    if standalone:
        plt.subplots(figsize=(15, 15))
    
    
    #View the position change of the four-axis aircraft plt . subplot ( 3 ,  3 ,  1 ) 
    plt . title ( 'Position' ) 
    plt . plot ( results [ 'time' ],  results [ 'x' ],  label = 'x' ) 
    Plt . plot ( results [ 'time' ],  results [ 'y' ],  label = 'y' ) 
    plt . plot (results['time'], results['z'], label='z')
    plt.xlabel('time, seconds')
    plt.ylabel('Position')
    plt.grid(True)
    if standalone:
        plt.legend()

    # 
    四轴飞机的速度plt . subplot ( 3 ,  3 ,  2 ) 
    plt . title ( 'Velocity' ) 
    plt . plot ( results [ 'time' ],  results [ 'x_velocity' ],  label = 'x_hat' ) 
    plt . Plot ( results [ 'time' ],  results [ 'y_velocity' ],  label = 'y_hat' ) 
    plt . plot(results['time'], results['z_velocity'], label='z_hat')
    plt.xlabel('time, seconds')
    plt.ylabel('Velocity')
    plt.grid(True)
    if standalone:
        plt.legend()
    
    # 
    图图Euler angles (the rotation of the quadcopter around the x, y and z axes) plt . subplot ( 3 ,  3 ,  3 ) 
    plt . title ( 'Orientation ' ) 
    plt . plot ( results [ ' Time' ],  results [ 'phi' ],  label = 'phi' ) 
    plt . plot ( results [ 'time' ],  results [ 'theta' ],  label = 'theta' ) 
    plt.plot(results['time'], results['psi'], label='psi')
    plt.xlabel('time, seconds')
    plt.grid(True)
    if standalone:
        plt.legend()
    
    # draw the speed of each Euler angle (arc per second) 
    plt . subplot ( 3 ,  3 ,  4 ) 
    plt . title ( 'Angular Velocity ' ) 
    plt . plot ( results [ 'time' ],  results [ 'phi_velocity ' ],  label = 'phi' ) 
    plt . plot ( results [ 'time' ],  results [ 'theta_velocity' ],  label = 'theta' ) 
    plt .plot(results['time'], results['psi_velocity'], label='psi')
    plt.xlabel('time, seconds')
    plt.grid(True)
    if standalone:
        plt.legend()

    # Finally, you can use the code below to output the action selected by the agent. 
    PLT . the subplot ( . 3 ,  . 3 ,  . 5 ) 
    PLT . title ( 'the Rotor Speed' ) 
    PLT . Plot ( Results [ 'Time' ],  Results [ 'rotor_speed1' ],  label = 'the Rotor. 1' ) 
    PLT . Plot ( Results [ 'time' ],  results [ 'rotor_speed2' ],  label = 'Rotor 2' ) 
    plt.plot(results['time'], results['rotor_speed3'], label='Rotor 3')
    plt.plot(results['time'], results['rotor_speed4'], label='Rotor 4')
    plt.xlabel('time, seconds')
    plt.ylabel('Rotor Speed, revolutions / second')
    plt.grid(True)
    if  standalone : 
        plt . legend ()

    if  standalone : 
        plt . tight_layout () 
        plt . show ()
In [2]:
from mpl_toolkits.mplot3d.axes3d import Axes3D


def plot_point3d(ax, x, y, z, **kwargs):
    ax.scatter([x], [y], [z], **kwargs)
    ax.text(x, y, z, "({:.1f}, {:.1f}, {:.1f})".format(x, y, z))


def show_flight_path(results, target=None):
    results = np.array(results)
    
    fig = plt.figure(figsize=(10,10))
    ax = fig.gca(projection='3d')
    ax.set_xlabel('X-axis')
    ax.set_ylabel('Y-axis')
    ax.set_zlabel('Z-axis')

    ax.plot3D(results[:, 0], results[:, 1], results[:, 2], 'gray')
    
    if target is not None:
        plot_point3d(ax, *target[0:3], c='y', marker='x', s=100, label='target')
        
    plot_point3d(ax, *results[0, 0:3], c='g', marker='o', s=50, label='start')
    plot_point3d(ax, *results[-1, 0:3], c='r', marker='o', s=50, label='end')
    
    ax.legend()

random Agent

In [3]:
import  random

class Basic_Agent():
    def __init__(self, task):
        self.task = task
    
    def act(self):
        new_thrust = random.gauss(450., 25.)
        return [new_thrust + random.gauss(0., 1.) for x in range(4)]

Run the code below to let the agent specify the action to control the quadcopter.

Please free to change our offer runtime, init_pose, init_velocitiesand init_angle_velocitiesvalues to change the initial conditions of a four-axis aircraft.

The bottom of the labelslist as a comment simulation data. All information is stored in data.txtthe document, and save it in resultsthe directory.

In [4]:
% load_ext autoreload
 % autoreload 2

import csv
import numpy as np
from task import Task

# Modify the values below to give the quadcopter a different starting position.
runtime = 5.                                     # time limit of the episode
init_pose = np.array([0., 0., 10., 0., 0., 0.])  # initial pose
init_velocities = np.array([0., 0., 0.])         # initial velocities
init_angle_velocities = np.array([0., 0., 0.])   # initial angle velocities
file_output = 'data.txt'                         # file name for saved results

# Setup
task = Task(init_pose, init_velocities, init_angle_velocities, runtime)
agent = Basic_Agent(task)
done = False
labels = ['time', 'x', 'y', 'z', 'phi', 'theta', 'psi', 'x_velocity',
          'y_velocity', 'z_velocity', 'phi_velocity', 'theta_velocity',
          'psi_velocity', 'rotor_speed1', 'rotor_speed2', 'rotor_speed3', 'rotor_speed4']
results = {x : [] for x in labels}

# Run the simulation, and save the results.
with open(file_output, 'w') as csvfile:
    writer = csv.writer(csvfile)
    writer.writerow(labels)
    while True:
        rotor_speeds = agent.act()
        _, _, done = task.step(rotor_speeds)
        to_write = [task.sim.time] + list(task.sim.pose) + list(task.sim.v) + list(task.sim.angular_v) + list(rotor_speeds)
        for ii in range(len(labels)):
            results[labels[ii]].append(to_write[ii])
        writer.writerow(to_write)
        if done:
            break
In [5]:
plot_run(results)
In [6]:
path = [[results['x'][i], results['y'][i], results['z'][i]] for i in range(len(results['x']))]
show_flight_path(path, target=None)

Before you specify a task, you need to derive the state of the environment in the simulator. Run the code below to output the following variable values ​​at the end of the simulation:

  • task.sim.pose: Four-way aircraft at ( x,Y,with) The position in the coordinate system and the Euler angle.
  • task.sim.v: Quadcopter in ( x,Y,with) The speed in the coordinate system.
  • task.sim.angular_v: Radius of three Euler angles per second.
In [7]:
# the pose, velocity, and angular velocity of the quadcopter at the end of the episode
print(task.sim.pose)
print(task.sim.v)
print(task.sim.angular_v)
[ 12.53793773 -27.05667129 24.71770869 6.01897845 5.89467343 0. ]
[ 8.07873718 -16.00327905 0.70099963]
[ 0.01539283 -0.04806114 0. ]

In the task.pytask of examples, we use six dimensions of action to build a four-axis aircraft environmental status of each time step. However, you can also change the task as you wish. You can add speed information to expand the state vector, or use any combination of motion, velocity, and angular velocity to construct an environment state that is appropriate for your task.

Task

In the task.pymiddle, we provide an example of a task for you. Please open this file in a new window.

Use __init__()method to initialize specify several variables needed for this task.

  • As the simulator PhysicsSimclasses (from physics_sim.pyexample file) is initialized.
  • Inspired by the research methods in the DDPG paper, we used a method of repeatedly calling actions. For each time step agent, we will use the action_repeatstime step to simulate. If you are not familiar with this approach, you can read the conclusions of the DDPG paper .
  • We set the value of each component in the state vector. In the task example, we only set the action information for six dimensions. In order to set the vector size ( state_size), we must consider repeated actions.
  • The task environment is usually a four-dimensional action space with one input ( action_size=4) per axis . You can set the minimum ( action_low) and maximum ( action_high) values ​​for each input .
  • The task examples we provide in the file will bring the agent to the target location. We set the target location to a variable.

reset()The method will reset the simulator. This method is called by the agent whenever the phase ends. You can see the examples in the code below.

step()The method is the most important one. It will receive the action selected by the agent rotor_speedsand prepare for the next state, returning to the agent. Then, you will pass get_reward()calculated reward value. This phase will be considered complete when the specified time is exceeded or when the quadcopter reaches the edge of the simulator.

Next, you'll learn how to test the performance of the agent in this task.

Agent

agents/policy_search.pyThe example of the agent provided in the file uses a very simple linear strategy that treats the motion vector directly as a dot product of the state vector and the matrix weight. It then randomly interferes with the parameters by adding some Gaussian noise to produce different strategies. Based on the average reward value ( score) obtained at each stage , it will record the best set of parameters found so far and the state of change of the score, and adjust the scale factor accordingly to increase or decrease the noise.

Please run the code below to see the performance of the agent in the task example.

In [8]:
import sys
import pandas as p
In [9]:
from agents.policy_search import PolicySearch_Agent
from task import Task

labels = ['time', 'x', 'y', 'z', 'phi', 'theta', 'psi', 'x_velocity',
          'y_velocity', 'z_velocity', 'phi_velocity', 'theta_velocity',
          'psi_velocity', 'rotor_speed1', 'rotor_speed2', 'rotor_speed3', 'rotor_speed4']
results = {x : [] for x in labels}

num_episodes = 1
target_pos = np.array([0., 0., 10.])
task = Task(target_pos=target_pos)
agent = PolicySearch_Agent(task) 

for i_episode in range(1, num_episodes+1):
    state = agent.reset_episode() # start a new episode
    while True:
        action = agent.act(state) 
        next_state, reward, done = task.step(action)
        if i_episode == num_episodes:
            to_write = [task.sim.time] + list(task.sim.pose) + list(task.sim.v) + list(task.sim.angular_v) + list(action)
            for ii in range(len(labels)):
                results[labels[ii]].append(to_write[ii])
        agent.step(reward, done)
        state = next_state
        if done:
            print("\rEpisode = {:4d}, score = {:7.3f} (best = {:7.3f}), noise_scale = {}".format(
                i_episode, agent.score, agent.best_score, agent.noise_scale), end="")  # [debug]
            break
    sys.stdout.flush()
Episode =    1, score =  -0.993 (best =  -0.993), noise_scale = 0.05
In [10]:
plot_run(results)

path = [[results['x'][i], results['y'][i], results['z'][i]] for i in range(len(results['x']))]
show_flight_path(path, target=target_pos)

The performance of this agent must be very bad! Now it is your turn to play!

Define tasks, design and train your agents!

Modify the task.pyfile to specify the task of your choice. If you are not sure what to choose, you can teach your quadcopter to take off, hover, land or reach a designated location.

After the specified task, agents/policy_search.pyAgent example as a template to in agents/agent.pythe definition of your own intelligence body. You can freely borrowed elements you need from the agent examples, including how to modularize your code (use act(), learn()and reset_episode_vars()other auxiliary methods).

Please note that the first agent and task you specify will most likely not be able to learn smoothly. You will need to improve the different hyperparameters and reward functions until you get good results.

When developing an agent, you also need to pay attention to its performance. Refer to the code below to establish a mechanism to store the total reward value for each stage. If the stage reward value is gradually increasing, your agent is learning.

In [11]:
# From Udacity
import numpy as np
import random
from collections import namedtuple, deque
from agents.agent import *
from task import *
In [17]:
labels = ['time', 'x', 'y', 'z', 'phi', 'theta', 'psi', 'x_velocity',
          'y_velocity', 'z_velocity', 'phi_velocity', 'theta_velocity',
          'psi_velocity', 'rotor_speed1', 'rotor_speed2', 'rotor_speed3', 'rotor_speed4']
results = {x : [] for x in labels}

labels2 = ['epoch', 'reward']
results2 = {x: [] for x in labels2}

runtime = 5.0                                    # time limit of the episode
init_pose = np.array([0., 0., 10., 0., 0., 0.])  # initial pose
init_velocities = np.array([0., 0., 0.])         # initial velocities
init_angle_velocities = np.array([0., 0., 0.])   # initial angle velocities
target_pos = np.array([0., 0., 10.])

# Setup
task = Task(init_pose=init_pose, init_velocities=init_velocities, 
        init_angle_velocities=init_angle_velocities, runtime=runtime, target_pos=target_pos) #, init_velocities, init_angle_velocities, runtime
agent = DDPG(task) 
best_reward = None
i_episode = 0
In [18]:
from tqdm import tqdm_notebook as tqdm

num_episodes = 2000
try:
    to_write = None
    pbar = tqdm(range(1, num_episodes+1))
    for i_episode_now in pbar:
        i_episode = i_episode+1
        state = agent.reset_episode() # start a new episode
        current_reward = 0
        time = 0
        while True:
            time = time +1
            action = agent.act(state) 
            next_state, reward, done = task.step(action)

            """Update Reward"""
            current_reward = current_reward+reward
            """Write Final Result"""
            to_write = [task.sim.time] + list(task.sim.pose) + list(task.sim.v) + list(task.sim.angular_v) + list(action)
            if i_episode_now == num_episodes:
                for ii in range(len(labels)):
                    results[labels[ii]].append(to_write[ii])

            agent.step(action, reward, next_state, done) # add action, next_state
            state = next_state
            if done:
                if best_reward == None or current_reward > best_reward:
                    best_reward = current_reward
                pbar.set_description("\rEpisode = {:4d}/{:4d}, score = {:7.3f}/{:3.3f} (best = {:7.3f}) z={:.3f}, v={:.3f}".format(
                    i_episode_now, i_episode, current_reward, current_reward/time, best_reward, task.sim.pose[2], task.sim.v[2]))
                break

        to_write2 = [i_episode] + [current_reward]
        for iii in range(len(labels2)):
            results2[labels2[iii]].append(to_write2[iii])

        sys.stdout.flush()
except KeyboardInterrupt as e:
    for ii in range(len(labels)):
        results[labels[ii]].append(to_write[ii])

Drawing stage reward¶

Please plot the total rewards that the agent gets in each phase, which can be a single run bonus value or an average of multiple runs.

In [19]:
# ## TODO: Plot the rewards.

# import matplotlib.pyplot as plt
# %matplotlib inline

# plt.plot(results['episod'], results['total_reward'])
# #plt.legend()

# _ = plt.ylim()
plot_run(results)

path = [[results['x'][i], results['y'][i], results['z'][i]] for i in range(len(results['x']))]
show_flight_path(path, target=target_pos)
In [20]:
plt.plot(results2['epoch'], results2['reward'], label='total_reward/epoch')
# plt.yscale('log')
plt.legend()
plt.show()
In [ ]:
smooth_value = int(len(results2['reward']) * 0.01)
smoothed = np.convolve(results2['reward'], np.ones(smooth_value)/smooth_value)
# smoothed = smooth(results2['reward'] ,window_len=smooth_value,window='hanning')
x = list(range(len(smoothed))) #results2['epoch']
plt.plot(x[:-10], smoothed[:-10], label='total_reward/epoch')
plt.legend()
plt.show()
In [ ]:
# Storage
%store task
%store agent
%store results
%store results2
%store best_reward
%store i_episode

Review

Question 1 : Please describe in your task.pyassigned tasks. How do you design a reward function?

Answer :

The task of my agent is the default task - going to a specific location.
init_pose = np.array([0., 0., 10., 0., 0., 0.]) # initial pose init_velocities = np.array([0., 0., 0.]) # initial velocities init_angle_velocities = np.array([0., 0., 0.]) # initial angle velocities target_pos = np.array([0., 0., 10.])
As the code above shows, it starts from pos [0,0,10] to [0,0,10], tyring keep balance up on the air.
I used the default reward provided by Udacity but edicted so that it takes to the account of having a smaller unnesscary rotations. I punish the agent by not allowing it to turn too much. reward = 1.0-0.3*(abs(self.sim.pose[:3] - self.target_pos)).sum() + 1.0 - abs(0.1*(self.sim.pose[3:])).sum()

Question 2 : Please briefly describe your agent, you can refer to the following questions:

  • What learning algorithms have you tried? Which is the best?
  • Which hyperparameters did you finally choose (such as a, c, e Wait)?
  • What kind of neural network structure (if any) did you use? Please describe the number of layers, size and activation function.

回答: I tried different architecture of the neuro-network using CNN architecture. I found that there is no nessecity of adding the Dropout Layer after Dense layer - maybe because you can never overfit the environment - unlike image recognition problems. But since there is not down side of adding it, I decides to keep it. Generally, for Dense layer, it would be better to increase the width of the network instead of the depth, so I did not add the depth of the network (unlike CNN). This attempt also reduced the number of paremeters I need to train. The following is the network for Actor:

`

    net = layers.Dense(units=32, kernel_regularizer=regularizers.l2(0.001), activity_regularizer=regularizers.l1(0.001))(net)
    net = layers.LeakyReLU(alpha=0.3)(net)
    net = layers.BatchNormalization()(net)
    net = layers.Dropout(0.4)(net)

    net = layers.Dense(units=1024, kernel_regularizer=regularizers.l2(0.01), activity_regularizer=regularizers.l1(0.01))(net)
    net = layers.LeakyReLU(alpha=0.3)(net)
    net = layers.BatchNormalization()(net)
    net = layers.Dropout(0.4)(net)

`
Here I am using 5 layers with typical 32>1024 layers. Trying LeakyReLU instead of ReLu. BatchNormalization after each layer.

The following is the network for Critic:

`

    net_states = states
    net_states = layers.Dense(units=32, kernel_regularizer=regularizers.l2(0.001), activity_regularizer=regularizers.l1(0.001))(net_states)
    net_states = layers.LeakyReLU(alpha=0.3)(net_states)
    net_states = layers.BatchNormalization()(net_states)
    net_states = layers.Dropout(0.4)(net_states)

    net_states = layers.Dense(units=64, kernel_regularizer=regularizers.l2(0.001), activity_regularizer=regularizers.l1(0.001))(net_states)
    net_states = layers.LeakyReLU(alpha=0.3)(net_states)
    net_states = layers.BatchNormalization()(net_states)
    net_states = layers.Dropout(0.4)(net_states)

    net_actions = actions
    net_actions = layers.Dense(units=32, kernel_regularizer=regularizers.l2(0.001), activity_regularizer=regularizers.l1(0.001))(net_actions)
    net_actions = layers.LeakyReLU(alpha=0.3)(net_actions)
    net_actions = layers.BatchNormalization()(net_actions)
    net_actions = layers.Dropout(0.4)(net_actions)

    net_actions = layers.Dense(units=64, kernel_regularizer=regularizers.l2(0.001), activity_regularizer=regularizers.l1(0.001))(net_actions)
    net_actions = layers.LeakyReLU(alpha=0.3)(net_actions)
    net_actions = layers.BatchNormalization()(net_actions)
    net_actions = layers.Dropout(0.4)(net_actions)

    net = layers.Add()([net_states, net_actions])

    net = layers.Dense(units=256, kernel_regularizer=regularizers.l2(0.001), activity_regularizer=regularizers.l1(0.001))(net)
    net = layers.LeakyReLU(alpha=0.3)(net)
    net = layers.BatchNormalization()(net)
    net = layers.Dropout(0.4)(net)

    net = layers.Dense(units=16, kernel_regularizer=regularizers.l2(0.001), activity_regularizer=regularizers.l1(0.001))(net)
    net = layers.LeakyReLU(alpha=0.3)(net)

`

Here I am using 2+3 layers with typical 32>64 layers. After merging the state and action data, I did 3 more layers of calculation 256>16>1. Trying LeakyReLU instead of ReLu. BatchNormalization after each layer.
Also, since my task is to keep balance, I decrease the run time significantly from the default setting to speed up the learning process: runtime = 2.0 I also increased the batch size to 128 to get more smooth updates. (However, the tutor suggest me to change back from runtime = 2.0 to runtime = 5.0) I cut down a lot of layers due to insufficient trainning time.

The hyperparemeters you mentioned are not that improtant! The key is learning rate! Actor_lr = 5e-9, Critic_lr = 1e-4.

Question 3 : Describe the learning situation of the agent based on the reward map you have drawn.

  • Is learning the task simple or difficult?
  • Is there a step-by-step or rapid rise in the learning curve?
  • How good is the final performance of the agent? (such as the average reward value of the last ten stages)

回答:
The task is easier than harder tasks like landing and going to a certain point, but it is still challenging. I purposely do not allow the learning curve to be too steep to make sure the learning rate for the actor is low enough. (implementing a good learning rate decay in the context is hard). It looks like that the agent is learning, and able to pass through local minimums around the 500th episode. The highest average award is about 50 and the award from the last few epochs are around 45.

Question 4 : Please briefly summarize your project experience. You can refer to the following questions:

  • What is the most difficult part of this project? (eg start project, run ROS, draw, specific tasks, etc.)
  • Do you have any interesting findings about the behavior of the quadcopter and your agent?

回答:
The hardest part of this project is to choose the right algorism and actually write a working solutions.
It is very time-consuming to come up with a solution that does not work. The hyperparemeter tuning is also the key of the trainning process. Thanks https://github.com/UdaStuDot/QuadcopterUdStunb for providing me a reasonable learning rate (I was using 0.01, far off from his solution - 1e-7). From my observation, the agent likes to fly up a little bit so that the gravity can drag it to the reward.

(Optional) Plot Actor and Critic structure

Recommended from keras.utils import plot_modelto display the model structure;

Original text