From 1337de59b712c30c447b2b45de9ec54ecd6d8ea4 Mon Sep 17 00:00:00 2001 From: Matt Strapp Date: Fri, 18 Feb 2022 12:41:51 -0600 Subject: Hopefully get the system able to be packaged Signed-off-by: Matt Strapp --- System/system_swingup_test_2.py | 395 ---------------------------------------- 1 file changed, 395 deletions(-) delete mode 100644 System/system_swingup_test_2.py (limited to 'System/system_swingup_test_2.py') diff --git a/System/system_swingup_test_2.py b/System/system_swingup_test_2.py deleted file mode 100644 index fc02ed6..0000000 --- a/System/system_swingup_test_2.py +++ /dev/null @@ -1,395 +0,0 @@ -import numpy as np -import numpy.random as rnd -import torch as pt - -import math -from gym import spaces, logger -from gym.utils import seeding - -### -import sys -sys.path.insert(0, '/home/pi/pendulum/System') -### - - -from System.system import System -#from . import System -import time -from sys import exit - -class SwingUpEnv(): - """ - Description: - A pole is attached by an un-actuated joint to a cart, which moves along a frictionless track. The pendulum starts upright, and the goal is to prevent it from falling over by increasing and reducing the cart's velocity. - - Source: - This environment corresponds to the version of the cart-pole problem described by Barto, Sutton, and Anderson - - Observation: - Type: Box(4) - Num Observation Min Max - 0 Cart Position -4.8 4.8 - 1 Cart Velocity -Inf Inf - 2 Pole Angle -Inf Inf - 3 Pole Velocity At Tip -Inf Inf - - Actions: - Type: Box(1) - Num Action Min Max - 0 Push cart -1 1 - - Note: The amount the velocity that is reduced or increased is not fixed; it depends on the angle the pole is pointing. This is because the center of gravity of the pole increases the amount of energy needed to move the cart underneath it - - Reward: - Reward is 1 for every step taken, including the termination step - - Starting State: - All observations are assigned a uniform random value in [-0.05..0.05] - - Episode Termination: - Pole Angle is more than 12 degrees - Cart Position is more than 2.4 (center of the cart reaches the edge of the display) - Episode length is greater than 200 - Solved Requirements - Considered solved when the average reward is greater than or equal to 195.0 over 100 consecutive trials. - """ - - metadata = { - 'render.modes': ['human', 'rgb_array'], - 'video.frames_per_second' : 50 - } - - def __init__(self): - self.x_threshold = 14. - self.sys = System(angular_units='Radians', positive_limit=self.x_threshold, negative_limit=-self.x_threshold, sw_limit_routine=self.x_threshold_routine) - - self.force_mag = 11. - self.last_time = time.time() # time for seconds between updates - - # Angle at which to fail the episode - self.x_dot_threshold = 10. - self.theta_dot_threshold = 3*np.pi - - # Angle limit set to 2 * theta_threshold_radians so failing observation is still within bounds - high = np.array([self.x_threshold*2, self.x_dot_threshold, np.finfo(np.float32).max, np.finfo(np.float32).max]) - - - self.action_space = spaces.Box(-np.ones(1), np.ones(1), dtype = np.float32) - - self.seed() - self.state = None - - self.steps_beyond_done = None - self.done = False - - def seed(self, seed=None): - self.np_random, seed = seeding.np_random(seed) - return [seed] - - def step(self, action): - assert self.action_space.contains(action), "%r (%s) invalid"%(action, type(action)) - state = self.state - x, x_dot, theta, theta_dot = state - force = self.force_mag * action[0] - # Do not adjust the motor further if the x_threshold has been triggered by the SW limit - if self.done == False: - self.sys.adjust(force) - - costheta = math.cos(theta) - sintheta = math.sin(theta) - - if costheta > 0: - self.up_time += 1 - self.max_up_time = np.max([self.up_time, self.max_up_time]) - - else: - self.up_time = 0 - - current_time = time.time() - tau = current_time - self.last_time - self.last_time = current_time - - new_theta, new_x = self.sys.measure() - if (theta >= 0 and theta < math.pi/2.) and (new_theta > 3.*math.pi/2.): - theta_dot = (new_theta - (theta + 2.*math.pi)) / tau - elif (new_theta >= 0 and new_theta < math.pi/2.) and (theta > 3.*math.pi/2.): - theta_dot = ((new_theta + 2.*math.pi) - theta) / tau - else: - theta_dot = (new_theta - theta) / tau - x_dot = (new_x - x) / tau - self.state = (new_x, x_dot, new_theta, theta_dot) - self.sys.add_results(new_theta, new_x, force) - - '''done = theta_dot < -self.theta_dot_threshold \ - or theta_dot > self.theta_dot_threshold \ - or self.done == True''' - done = self.done - - '''done = x < -self.x_threshold \ - or x > self.x_threshold \ - or theta_dot < -self.theta_dot_threshold \ - or theta_dot > self.theta_dot_threshold \ - or self.done == True''' - done = bool(done) - - if not done: - reward = np.ceil(costheta) - elif self.steps_beyond_done is None: - # Pole just fell! - self.steps_beyond_done = 0 - reward = -( 100 * (np.abs(x_dot) + np.abs(theta_dot)) ) - else: - if self.steps_beyond_done == 0: - logger.warn("You are calling 'step()' even though this environment has already returned done = True. You should always call 'reset()' once you receive 'done = True' -- any further steps are undefined behavior.") - self.steps_beyond_done += 1 - reward = 0.0 - - return np.array(self.state), reward, done, {'max_up_time' : self.max_up_time} - - def x_threshold_routine(self): - print("SW Limit reached!") - self.done = True - self.sys.adjust(0) - - def reset(self, home = True): - if home == True: - self.sys.return_home() - time.sleep(1) - init_ang, lin = self.sys.measure() - time.sleep(0.05) - ang, lin = self.sys.measure() - self.state = (0, 0, ang, (ang-init_ang)/0.05) - - self.up_time = 0 - self.max_up_time = 0 - self.up = False - self.steps_beyond_done = None - self.done = False - return np.array(self.state) - - def end(self): - self.sys.deinitialize() - - def log(self, message): - self.sys.add_log(message) - print(message) - - -class nnQ(pt.nn.Module): - """ - Here is a basic neural network with for representing a policy - """ - - def __init__(self, stateDim, numActions, numHiddenUnits, numLayers): - super().__init__() - - InputLayer = [pt.nn.Linear(stateDim + numActions, numHiddenUnits), - pt.nn.ReLU()] - - HiddenLayers = [] - for _ in range(numLayers - 1): - HiddenLayers.append(pt.nn.Linear(numHiddenUnits, numHiddenUnits)) - HiddenLayers.append(pt.nn.ReLU()) - - - OutputLayer = [pt.nn.Linear(numHiddenUnits, 1)] - - AllLayers = InputLayer + HiddenLayers + OutputLayer - self.net = pt.nn.Sequential(*AllLayers) - - self.numActions = numActions - - def forward(self,x,a): - x = pt.tensor(x, dtype = pt.float32) - - b = pt.nn.functional.one_hot(pt.tensor(a).long(), self.numActions) - - c = b.float().detach() - y = pt.cat([x, c]) - - return self.net(y) - -class deepQagent: - def __init__(self,stateDim,numActions,numHiddenUnits,numLayers,epsilon=.1,gamma=.9,alpha=.1, - c = 100,batch_size=20): - self.Q = nnQ(stateDim,numActions,numHiddenUnits,numLayers) - self.Q_target = nnQ(stateDim,numActions,numHiddenUnits,numLayers) - - self.alpha = alpha - self.gamma = gamma - self.epsilon = epsilon - self.numActions = numActions - - self.D = [] - self.batch_size = batch_size - self.c = c - self.optimizer = pt.optim.SGD(self.Q.parameters(),lr=alpha) - self.counter = 0 - - def action(self,x): - # This is an epsilon greedy selection - if rnd.rand() < self.epsilon: - a = rnd.randint(numActions) - else: - qBest = -np.inf - for aTest in range(self.numActions): - qTest = self.Q(x,aTest).detach().numpy()[0] - if qTest > qBest: - qBest = qTest - a = aTest - return a - - def update(self,s,a,r,s_next,done): - self.counter += 1 - self.D.append((s,a,r,s_next,done)) - - B_ind = rnd.choice(len(self.D),size=self.batch_size) - - loss = 0. - - #B_ind = [-1] - for j in B_ind: - sj,aj,rj,s_next_j,done_j = self.D[j] - Q_cur = self.Q(sj,aj) - if done_j: - y = rj - else: - - Q_vals = [] - for a_next in range(self.numActions): - - Q_vals.append(self.Q_target(s_next_j,a_next).detach().numpy()[0]) - - - y = rj + self.gamma * np.max(Q_vals) - loss += .5 * (y-Q_cur)**2 / self.batch_size - self.optimizer.zero_grad() - #self.Q.zero_grad() - loss.backward() - self.optimizer.step() - - - if (self.counter % self.c) == 0: - for p, p_target in zip(self.Q.parameters(),self.Q_target.parameters()): - p_target.data = p.data.clone().detach() - -class sarsaAgent: - def __init__(self, stateDim, numActions, numHiddenUnits, numLayers, - epsilon = .1, gamma = .9, alpha = .1): - self.Q = nnQ(stateDim, numActions, numHiddenUnits, numLayers) - self.gamma = gamma - self.epsilon = epsilon - self.alpha = alpha - self.numActions = numActions - self.s_last = None - - def action(self, x): - # This is an epsilon greedy selection - a = 0 - if rnd.rand() < self.epsilon: - a = rnd.randint(0, numActions) - else: - qBest = -np.inf - for aTest in range(self.numActions): - qTest = self.Q(x, aTest).detach().numpy()[0] - if qTest > qBest: - qBest = qTest - a = aTest - return a - - def update(self, s, a, r, s_next,done): - # Compute the TD error, if there is enough data - update = True - if done: - Q_cur = self.Q(s, a).detach().numpy()[0] - delta = r - Q_cur - self.s_last = None - Q_diff = self.Q(s, a) - elif self.s_last is not None: - Q_next = self.Q(s, a).detach().numpy()[0] - Q_cur = self.Q(self.s_last, self.a_last).detach().numpy()[0] - delta = self.r_last + self.gamma * Q_next - Q_cur - Q_diff = self.Q(self.s_last, self.a_last) - else: - update = False - - # Update the parameter via the semi-gradient method - if update: - self.Q.zero_grad() - Q_diff.backward() - for p in self.Q.parameters(): - p.data.add_(self.alpha * delta, p.grad.data) - - if not done: - self.s_last = np.copy(s) - self.a_last = np.copy(a) - self.r_last = np.copy(r) - -# This is the environment -env = SwingUpEnv() - -# For simplicity, we only consider forces of -1 and 1 -numActions = 2 -Actions = np.linspace(-1, 1, numActions) - -# This is our learning agent -gamma = .95 -#agent = sarsaAgent(5, numActions, 20, 1, epsilon = 5e-2, gamma = gamma, alpha = 1e-5) -agent = deepQagent(5,numActions,20,2,epsilon=5e-2,gamma=gamma,batch_size=20, - c= 100,alpha=1e-4) - -maxSteps = 2e6 - -# This is a helper to deal with the fact that x[2] is actually an angle -x_to_y = lambda x : np.array([x[0], x[1], np.cos(x[2]), np.sin(x[2]), x[3]]) - -R = [] -UpTime = [] - -step = 0 -ep = 0 -maxLen = 500 -try: - while step < maxSteps: - ep += 1 - x = env.reset(home = ep > 1) - C = 0. - - done = False - t = 1 - while not done: - t += 1 - step += 1 - y = x_to_y(x) - a = agent.action(y) - u = Actions[a:a+1] - x_next, c, done, info = env.step(u) - - max_up_time = info['max_up_time'] - y_next = x_to_y(x_next) - - C += (1./t) * (c - C) - agent.update(y, a, c, y_next, done) - x = x_next - if done: - break - - if step >= maxSteps: - break - - if t > maxLen: - agent.s_last = None - break - - - R.append(C) - UpTime.append(max_up_time) - #print('t:',ep+1,', R:',C,', L:',t-1,', G:',G,', Q:', Q_est, 'U:', max_up_time) - log = "Episode:" + str(ep) + " Total Steps:" + str(step) + " Ave. Reward:" + str(C) + " Episode Length:" + str(t-1) + " Max Up-Time:" + str(max_up_time) - env.log(log) -except: - env.end() - exit(-1) -finally: - env.end() - exit(0) -- cgit v1.2.3