aboutsummaryrefslogtreecommitdiffstats
path: root/System/Pendulum/system_swingup_test_2.py
diff options
context:
space:
mode:
authorMatt Strapp <matt@mattstrapp.net>2022-02-18 12:42:14 -0600
committerMatt Strapp <matt@mattstrapp.net>2022-02-18 12:42:14 -0600
commit827e67dbd593d10e74336e5f699e29ddac80356e (patch)
treec22d868b2a443fc652f9bc9e887938ac176c018c /System/Pendulum/system_swingup_test_2.py
parentHopefully get the system able to be packaged (diff)
downloadee4511w-827e67dbd593d10e74336e5f699e29ddac80356e.tar
ee4511w-827e67dbd593d10e74336e5f699e29ddac80356e.tar.gz
ee4511w-827e67dbd593d10e74336e5f699e29ddac80356e.tar.bz2
ee4511w-827e67dbd593d10e74336e5f699e29ddac80356e.tar.lz
ee4511w-827e67dbd593d10e74336e5f699e29ddac80356e.tar.xz
ee4511w-827e67dbd593d10e74336e5f699e29ddac80356e.tar.zst
ee4511w-827e67dbd593d10e74336e5f699e29ddac80356e.zip
Change package name for consistency
Signed-off-by: Matt Strapp <matt@mattstrapp.net>
Diffstat (limited to 'System/Pendulum/system_swingup_test_2.py')
-rw-r--r--System/Pendulum/system_swingup_test_2.py395
1 files changed, 0 insertions, 395 deletions
diff --git a/System/Pendulum/system_swingup_test_2.py b/System/Pendulum/system_swingup_test_2.py
deleted file mode 100644
index fc02ed6..0000000
--- a/System/Pendulum/system_swingup_test_2.py
+++ /dev/null
@@ -1,395 +0,0 @@
-import numpy as np
-import numpy.random as rnd
-import torch as pt
-
-import math
-from gym import spaces, logger
-from gym.utils import seeding
-
-###
-import sys
-sys.path.insert(0, '/home/pi/pendulum/System')
-###
-
-
-from System.system import System
-#from . import System
-import time
-from sys import exit
-
-class SwingUpEnv():
- """
- Description:
- A pole is attached by an un-actuated joint to a cart, which moves along a frictionless track. The pendulum starts upright, and the goal is to prevent it from falling over by increasing and reducing the cart's velocity.
-
- Source:
- This environment corresponds to the version of the cart-pole problem described by Barto, Sutton, and Anderson
-
- Observation:
- Type: Box(4)
- Num Observation Min Max
- 0 Cart Position -4.8 4.8
- 1 Cart Velocity -Inf Inf
- 2 Pole Angle -Inf Inf
- 3 Pole Velocity At Tip -Inf Inf
-
- Actions:
- Type: Box(1)
- Num Action Min Max
- 0 Push cart -1 1
-
- Note: The amount the velocity that is reduced or increased is not fixed; it depends on the angle the pole is pointing. This is because the center of gravity of the pole increases the amount of energy needed to move the cart underneath it
-
- Reward:
- Reward is 1 for every step taken, including the termination step
-
- Starting State:
- All observations are assigned a uniform random value in [-0.05..0.05]
-
- Episode Termination:
- Pole Angle is more than 12 degrees
- Cart Position is more than 2.4 (center of the cart reaches the edge of the display)
- Episode length is greater than 200
- Solved Requirements
- Considered solved when the average reward is greater than or equal to 195.0 over 100 consecutive trials.
- """
-
- metadata = {
- 'render.modes': ['human', 'rgb_array'],
- 'video.frames_per_second' : 50
- }
-
- def __init__(self):
- self.x_threshold = 14.
- self.sys = System(angular_units='Radians', positive_limit=self.x_threshold, negative_limit=-self.x_threshold, sw_limit_routine=self.x_threshold_routine)
-
- self.force_mag = 11.
- self.last_time = time.time() # time for seconds between updates
-
- # Angle at which to fail the episode
- self.x_dot_threshold = 10.
- self.theta_dot_threshold = 3*np.pi
-
- # Angle limit set to 2 * theta_threshold_radians so failing observation is still within bounds
- high = np.array([self.x_threshold*2, self.x_dot_threshold, np.finfo(np.float32).max, np.finfo(np.float32).max])
-
-
- self.action_space = spaces.Box(-np.ones(1), np.ones(1), dtype = np.float32)
-
- self.seed()
- self.state = None
-
- self.steps_beyond_done = None
- self.done = False
-
- def seed(self, seed=None):
- self.np_random, seed = seeding.np_random(seed)
- return [seed]
-
- def step(self, action):
- assert self.action_space.contains(action), "%r (%s) invalid"%(action, type(action))
- state = self.state
- x, x_dot, theta, theta_dot = state
- force = self.force_mag * action[0]
- # Do not adjust the motor further if the x_threshold has been triggered by the SW limit
- if self.done == False:
- self.sys.adjust(force)
-
- costheta = math.cos(theta)
- sintheta = math.sin(theta)
-
- if costheta > 0:
- self.up_time += 1
- self.max_up_time = np.max([self.up_time, self.max_up_time])
-
- else:
- self.up_time = 0
-
- current_time = time.time()
- tau = current_time - self.last_time
- self.last_time = current_time
-
- new_theta, new_x = self.sys.measure()
- if (theta >= 0 and theta < math.pi/2.) and (new_theta > 3.*math.pi/2.):
- theta_dot = (new_theta - (theta + 2.*math.pi)) / tau
- elif (new_theta >= 0 and new_theta < math.pi/2.) and (theta > 3.*math.pi/2.):
- theta_dot = ((new_theta + 2.*math.pi) - theta) / tau
- else:
- theta_dot = (new_theta - theta) / tau
- x_dot = (new_x - x) / tau
- self.state = (new_x, x_dot, new_theta, theta_dot)
- self.sys.add_results(new_theta, new_x, force)
-
- '''done = theta_dot < -self.theta_dot_threshold \
- or theta_dot > self.theta_dot_threshold \
- or self.done == True'''
- done = self.done
-
- '''done = x < -self.x_threshold \
- or x > self.x_threshold \
- or theta_dot < -self.theta_dot_threshold \
- or theta_dot > self.theta_dot_threshold \
- or self.done == True'''
- done = bool(done)
-
- if not done:
- reward = np.ceil(costheta)
- elif self.steps_beyond_done is None:
- # Pole just fell!
- self.steps_beyond_done = 0
- reward = -( 100 * (np.abs(x_dot) + np.abs(theta_dot)) )
- else:
- if self.steps_beyond_done == 0:
- logger.warn("You are calling 'step()' even though this environment has already returned done = True. You should always call 'reset()' once you receive 'done = True' -- any further steps are undefined behavior.")
- self.steps_beyond_done += 1
- reward = 0.0
-
- return np.array(self.state), reward, done, {'max_up_time' : self.max_up_time}
-
- def x_threshold_routine(self):
- print("SW Limit reached!")
- self.done = True
- self.sys.adjust(0)
-
- def reset(self, home = True):
- if home == True:
- self.sys.return_home()
- time.sleep(1)
- init_ang, lin = self.sys.measure()
- time.sleep(0.05)
- ang, lin = self.sys.measure()
- self.state = (0, 0, ang, (ang-init_ang)/0.05)
-
- self.up_time = 0
- self.max_up_time = 0
- self.up = False
- self.steps_beyond_done = None
- self.done = False
- return np.array(self.state)
-
- def end(self):
- self.sys.deinitialize()
-
- def log(self, message):
- self.sys.add_log(message)
- print(message)
-
-
-class nnQ(pt.nn.Module):
- """
- Here is a basic neural network with for representing a policy
- """
-
- def __init__(self, stateDim, numActions, numHiddenUnits, numLayers):
- super().__init__()
-
- InputLayer = [pt.nn.Linear(stateDim + numActions, numHiddenUnits),
- pt.nn.ReLU()]
-
- HiddenLayers = []
- for _ in range(numLayers - 1):
- HiddenLayers.append(pt.nn.Linear(numHiddenUnits, numHiddenUnits))
- HiddenLayers.append(pt.nn.ReLU())
-
-
- OutputLayer = [pt.nn.Linear(numHiddenUnits, 1)]
-
- AllLayers = InputLayer + HiddenLayers + OutputLayer
- self.net = pt.nn.Sequential(*AllLayers)
-
- self.numActions = numActions
-
- def forward(self,x,a):
- x = pt.tensor(x, dtype = pt.float32)
-
- b = pt.nn.functional.one_hot(pt.tensor(a).long(), self.numActions)
-
- c = b.float().detach()
- y = pt.cat([x, c])
-
- return self.net(y)
-
-class deepQagent:
- def __init__(self,stateDim,numActions,numHiddenUnits,numLayers,epsilon=.1,gamma=.9,alpha=.1,
- c = 100,batch_size=20):
- self.Q = nnQ(stateDim,numActions,numHiddenUnits,numLayers)
- self.Q_target = nnQ(stateDim,numActions,numHiddenUnits,numLayers)
-
- self.alpha = alpha
- self.gamma = gamma
- self.epsilon = epsilon
- self.numActions = numActions
-
- self.D = []
- self.batch_size = batch_size
- self.c = c
- self.optimizer = pt.optim.SGD(self.Q.parameters(),lr=alpha)
- self.counter = 0
-
- def action(self,x):
- # This is an epsilon greedy selection
- if rnd.rand() < self.epsilon:
- a = rnd.randint(numActions)
- else:
- qBest = -np.inf
- for aTest in range(self.numActions):
- qTest = self.Q(x,aTest).detach().numpy()[0]
- if qTest > qBest:
- qBest = qTest
- a = aTest
- return a
-
- def update(self,s,a,r,s_next,done):
- self.counter += 1
- self.D.append((s,a,r,s_next,done))
-
- B_ind = rnd.choice(len(self.D),size=self.batch_size)
-
- loss = 0.
-
- #B_ind = [-1]
- for j in B_ind:
- sj,aj,rj,s_next_j,done_j = self.D[j]
- Q_cur = self.Q(sj,aj)
- if done_j:
- y = rj
- else:
-
- Q_vals = []
- for a_next in range(self.numActions):
-
- Q_vals.append(self.Q_target(s_next_j,a_next).detach().numpy()[0])
-
-
- y = rj + self.gamma * np.max(Q_vals)
- loss += .5 * (y-Q_cur)**2 / self.batch_size
- self.optimizer.zero_grad()
- #self.Q.zero_grad()
- loss.backward()
- self.optimizer.step()
-
-
- if (self.counter % self.c) == 0:
- for p, p_target in zip(self.Q.parameters(),self.Q_target.parameters()):
- p_target.data = p.data.clone().detach()
-
-class sarsaAgent:
- def __init__(self, stateDim, numActions, numHiddenUnits, numLayers,
- epsilon = .1, gamma = .9, alpha = .1):
- self.Q = nnQ(stateDim, numActions, numHiddenUnits, numLayers)
- self.gamma = gamma
- self.epsilon = epsilon
- self.alpha = alpha
- self.numActions = numActions
- self.s_last = None
-
- def action(self, x):
- # This is an epsilon greedy selection
- a = 0
- if rnd.rand() < self.epsilon:
- a = rnd.randint(0, numActions)
- else:
- qBest = -np.inf
- for aTest in range(self.numActions):
- qTest = self.Q(x, aTest).detach().numpy()[0]
- if qTest > qBest:
- qBest = qTest
- a = aTest
- return a
-
- def update(self, s, a, r, s_next,done):
- # Compute the TD error, if there is enough data
- update = True
- if done:
- Q_cur = self.Q(s, a).detach().numpy()[0]
- delta = r - Q_cur
- self.s_last = None
- Q_diff = self.Q(s, a)
- elif self.s_last is not None:
- Q_next = self.Q(s, a).detach().numpy()[0]
- Q_cur = self.Q(self.s_last, self.a_last).detach().numpy()[0]
- delta = self.r_last + self.gamma * Q_next - Q_cur
- Q_diff = self.Q(self.s_last, self.a_last)
- else:
- update = False
-
- # Update the parameter via the semi-gradient method
- if update:
- self.Q.zero_grad()
- Q_diff.backward()
- for p in self.Q.parameters():
- p.data.add_(self.alpha * delta, p.grad.data)
-
- if not done:
- self.s_last = np.copy(s)
- self.a_last = np.copy(a)
- self.r_last = np.copy(r)
-
-# This is the environment
-env = SwingUpEnv()
-
-# For simplicity, we only consider forces of -1 and 1
-numActions = 2
-Actions = np.linspace(-1, 1, numActions)
-
-# This is our learning agent
-gamma = .95
-#agent = sarsaAgent(5, numActions, 20, 1, epsilon = 5e-2, gamma = gamma, alpha = 1e-5)
-agent = deepQagent(5,numActions,20,2,epsilon=5e-2,gamma=gamma,batch_size=20,
- c= 100,alpha=1e-4)
-
-maxSteps = 2e6
-
-# This is a helper to deal with the fact that x[2] is actually an angle
-x_to_y = lambda x : np.array([x[0], x[1], np.cos(x[2]), np.sin(x[2]), x[3]])
-
-R = []
-UpTime = []
-
-step = 0
-ep = 0
-maxLen = 500
-try:
- while step < maxSteps:
- ep += 1
- x = env.reset(home = ep > 1)
- C = 0.
-
- done = False
- t = 1
- while not done:
- t += 1
- step += 1
- y = x_to_y(x)
- a = agent.action(y)
- u = Actions[a:a+1]
- x_next, c, done, info = env.step(u)
-
- max_up_time = info['max_up_time']
- y_next = x_to_y(x_next)
-
- C += (1./t) * (c - C)
- agent.update(y, a, c, y_next, done)
- x = x_next
- if done:
- break
-
- if step >= maxSteps:
- break
-
- if t > maxLen:
- agent.s_last = None
- break
-
-
- R.append(C)
- UpTime.append(max_up_time)
- #print('t:',ep+1,', R:',C,', L:',t-1,', G:',G,', Q:', Q_est, 'U:', max_up_time)
- log = "Episode:" + str(ep) + " Total Steps:" + str(step) + " Ave. Reward:" + str(C) + " Episode Length:" + str(t-1) + " Max Up-Time:" + str(max_up_time)
- env.log(log)
-except:
- env.end()
- exit(-1)
-finally:
- env.end()
- exit(0)