From 1337de59b712c30c447b2b45de9ec54ecd6d8ea4 Mon Sep 17 00:00:00 2001 From: Matt Strapp Date: Fri, 18 Feb 2022 12:41:51 -0600 Subject: Hopefully get the system able to be packaged Signed-off-by: Matt Strapp --- System/Pendulum/system_swingup_test.py | 293 +++++++++++++++++++++++++++++++++ 1 file changed, 293 insertions(+) create mode 100644 System/Pendulum/system_swingup_test.py (limited to 'System/Pendulum/system_swingup_test.py') diff --git a/System/Pendulum/system_swingup_test.py b/System/Pendulum/system_swingup_test.py new file mode 100644 index 0000000..e13c7ca --- /dev/null +++ b/System/Pendulum/system_swingup_test.py @@ -0,0 +1,293 @@ +import numpy as np +import numpy.random as rnd +import torch as pt + +import math +from gym import spaces, logger +from gym.utils import seeding + +from system import System +import time +from sys import exit + +class SwingUpEnv(): + """ + Description: + A pole is attached by an un-actuated joint to a cart, which moves along a frictionless track. The pendulum starts upright, and the goal is to prevent it from falling over by increasing and reducing the cart's velocity. + + Source: + This environment corresponds to the version of the cart-pole problem described by Barto, Sutton, and Anderson + + Observation: + Type: Box(4) + Num Observation Min Max + 0 Cart Position -4.8 4.8 + 1 Cart Velocity -Inf Inf + 2 Pole Angle -Inf Inf + 3 Pole Velocity At Tip -Inf Inf + + Actions: + Type: Box(1) + Num Action Min Max + 0 Push cart -1 1 + + Note: The amount the velocity that is reduced or increased is not fixed; it depends on the angle the pole is pointing. This is because the center of gravity of the pole increases the amount of energy needed to move the cart underneath it + + Reward: + Reward is 1 for every step taken, including the termination step + + Starting State: + All observations are assigned a uniform random value in [-0.05..0.05] + + Episode Termination: + Pole Angle is more than 12 degrees + Cart Position is more than 2.4 (center of the cart reaches the edge of the display) + Episode length is greater than 200 + Solved Requirements + Considered solved when the average reward is greater than or equal to 195.0 over 100 consecutive trials. + """ + + metadata = { + 'render.modes': ['human', 'rgb_array'], + 'video.frames_per_second' : 50 + } + + def __init__(self): + self.sys = System(angular_units='Radians') + + self.force_mag = 10. + self.last_time = time.time() # time for seconds between updates + + # Angle at which to fail the episode + self.x_threshold = 10. + self.x_dot_threshold = 10. + self.theta_dot_threshold = 3*np.pi + + # Angle limit set to 2 * theta_threshold_radians so failing observation is still within bounds + high = np.array([self.x_threshold*2, self.x_dot_threshold, np.finfo(np.float32).max, np.finfo(np.float32).max]) + + + self.action_space = spaces.Box(-np.ones(1), np.ones(1), dtype = np.float32) + + self.seed() + self.state = None + + self.steps_beyond_done = None + + def seed(self, seed=None): + self.np_random, seed = seeding.np_random(seed) + return [seed] + + def step(self, action): + assert self.action_space.contains(action), "%r (%s) invalid"%(action, type(action)) + state = self.state + x, x_dot, theta, theta_dot = state + force = self.force_mag * action[0] + self.sys.adjust(force) + + costheta = math.cos(theta) + sintheta = math.sin(theta) + + if costheta > 0: + self.up_time += 1 + self.max_up_time = np.max([self.up_time, self.max_up_time]) + + else: + self.up_time = 0 + + current_time = time.time() + tau = current_time - self.last_time + self.last_time = current_time + + new_theta, new_x = self.sys.measure() + theta_dot = (new_theta - theta) / tau + x_dot = (new_x - x) / tau + self.state = (new_x, x_dot, new_theta, theta_dot) + self.sys.add_results(new_theta, new_x, force) + + done = x < -self.x_threshold \ + or x > self.x_threshold \ + or theta_dot < -self.theta_dot_threshold \ + or theta_dot > self.theta_dot_threshold + done = bool(done) + + if not done: + reward = np.ceil(costheta) + elif self.steps_beyond_done is None: + # Pole just fell! + self.steps_beyond_done = 0 + reward = -( 100 * (np.abs(x_dot) + np.abs(theta_dot)) ) + else: + if self.steps_beyond_done == 0: + logger.warn("You are calling 'step()' even though this environment has already returned done = True. You should always call 'reset()' once you receive 'done = True' -- any further steps are undefined behavior.") + self.steps_beyond_done += 1 + reward = 0.0 + + return np.array(self.state), reward, done, {'max_up_time' : self.max_up_time} + + def reset(self, home = True): + if home == True: + self.sys.return_home() + time.sleep(1) + init_ang, lin = self.sys.measure() + time.sleep(0.05) + ang, lin = self.sys.measure() + self.state = (0, 0, ang, (ang-init_ang)/0.05) + + self.up_time = 0 + self.max_up_time = 0 + self.up = False + self.steps_beyond_done = None + return np.array(self.state) + + def end(self): + self.sys.deinitialize() + + +class nnQ(pt.nn.Module): + """ + Here is a basic neural network with for representing a policy + """ + + def __init__(self, stateDim, numActions, numHiddenUnits, numLayers): + super().__init__() + + InputLayer = [pt.nn.Linear(stateDim + numActions, numHiddenUnits), + pt.nn.ReLU()] + + HiddenLayers = [] + for _ in range(numLayers - 1): + HiddenLayers.append(pt.nn.Linear(numHiddenUnits, numHiddenUnits)) + HiddenLayers.append(pt.nn.ReLU()) + + + OutputLayer = [pt.nn.Linear(numHiddenUnits, 1)] + + AllLayers = InputLayer + HiddenLayers + OutputLayer + self.net = pt.nn.Sequential(*AllLayers) + + self.numActions = numActions + + def forward(self,x,a): + x = pt.tensor(x, dtype = pt.float32) + + b = pt.nn.functional.one_hot(pt.tensor(a).long(), self.numActions) + + c = b.float().detach() + y = pt.cat([x, c]) + + return self.net(y) + + +class sarsaAgent: + def __init__(self, stateDim, numActions, numHiddenUnits, numLayers, + epsilon = .1, gamma = .9, alpha = .1): + self.Q = nnQ(stateDim, numActions, numHiddenUnits, numLayers) + self.gamma = gamma + self.epsilon = epsilon + self.alpha = alpha + self.numActions = numActions + self.s_last = None + + def action(self, x): + # This is an epsilon greedy selection + a = 0 + if rnd.rand() < self.epsilon: + a = rnd.randint(0, numActions) + else: + qBest = -np.inf + for aTest in range(self.numActions): + qTest = self.Q(x, aTest).detach().numpy()[0] + if qTest > qBest: + qBest = qTest + a = aTest + return a + + def update(self, s, a, r, s_next,done): + # Compute the TD error, if there is enough data + update = True + if done: + Q_cur = self.Q(s, a).detach().numpy()[0] + delta = r - Q_cur + self.s_last = None + Q_diff = self.Q(s, a) + elif self.s_last is not None: + Q_next = self.Q(s, a).detach().numpy()[0] + Q_cur = self.Q(self.s_last, self.a_last).detach().numpy()[0] + delta = self.r_last + self.gamma * Q_next - Q_cur + Q_diff = self.Q(self.s_last, self.a_last) + else: + update = False + + # Update the parameter via the semi-gradient method + if update: + self.Q.zero_grad() + Q_diff.backward() + for p in self.Q.parameters(): + p.data.add_(self.alpha * delta, p.grad.data) + + if not done: + self.s_last = np.copy(s) + self.a_last = np.copy(a) + self.r_last = np.copy(r) + +# This is the environment +env = SwingUpEnv() + +# For simplicity, we only consider forces of -1 and 1 +numActions = 5 +Actions = np.linspace(-1, 1, numActions) + +# This is our learning agent +gamma = .95 +agent = sarsaAgent(5, numActions, 20, 1, epsilon = 5e-2, gamma = gamma, alpha = 1e-5) + +maxSteps = 5e4 + +# This is a helper to deal with the fact that x[2] is actually an angle +x_to_y = lambda x : np.array([x[0], x[1], np.cos(x[2]), np.sin(x[2]), x[3]]) + +R = [] +UpTime = [] + +step = 0 +ep = 0 +try: + while step < maxSteps: + ep += 1 + x = env.reset(home = ep > 1) + C = 0. + + done = False + t = 1 + while not done: + t += 1 + step += 1 + y = x_to_y(x) + a = agent.action(y) + u = Actions[a:a+1] + x_next, c, done, info = env.step(u) + + max_up_time = info['max_up_time'] + y_next = x_to_y(x_next) + + C += (1./t) * (c - C) + agent.update(y, a, c, y_next, done) + x = x_next + if done: + break + + if step >= maxSteps: + break + + + R.append(C) + UpTime.append(max_up_time) + #print('t:',ep+1,', R:',C,', L:',t-1,', G:',G,', Q:', Q_est, 'U:', max_up_time) + print('Episode:',ep, 'Total Steps:',step, ', Ave. Reward:',C, ', Episode Length:',t-1, 'Max Up-Time:',max_up_time) +except: + env.end() + exit(-1) +finally: + env.end() + exit(0) \ No newline at end of file -- cgit v1.2.3