diff options
Diffstat (limited to 'System_Python/system_swingup_test.py')
-rw-r--r-- | System_Python/system_swingup_test.py | 275 |
1 files changed, 275 insertions, 0 deletions
diff --git a/System_Python/system_swingup_test.py b/System_Python/system_swingup_test.py new file mode 100644 index 0000000..1b30c41 --- /dev/null +++ b/System_Python/system_swingup_test.py @@ -0,0 +1,275 @@ +import numpy as np
+import numpy.random as rnd
+import torch as pt
+
+import math
+from gym import spaces, logger
+from gym.utils import seeding
+
+from system import System
+import time
+
+class SwingUpEnv():
+ """
+ Description:
+ A pole is attached by an un-actuated joint to a cart, which moves along a frictionless track. The pendulum starts upright, and the goal is to prevent it from falling over by increasing and reducing the cart's velocity.
+
+ Source:
+ This environment corresponds to the version of the cart-pole problem described by Barto, Sutton, and Anderson
+
+ Observation:
+ Type: Box(4)
+ Num Observation Min Max
+ 0 Cart Position -4.8 4.8
+ 1 Cart Velocity -Inf Inf
+ 2 Pole Angle -Inf Inf
+ 3 Pole Velocity At Tip -Inf Inf
+
+ Actions:
+ Type: Box(1)
+ Num Action Min Max
+ 0 Push cart -1 1
+
+ Note: The amount the velocity that is reduced or increased is not fixed; it depends on the angle the pole is pointing. This is because the center of gravity of the pole increases the amount of energy needed to move the cart underneath it
+
+ Reward:
+ Reward is 1 for every step taken, including the termination step
+
+ Starting State:
+ All observations are assigned a uniform random value in [-0.05..0.05]
+
+ Episode Termination:
+ Pole Angle is more than 12 degrees
+ Cart Position is more than 2.4 (center of the cart reaches the edge of the display)
+ Episode length is greater than 200
+ Solved Requirements
+ Considered solved when the average reward is greater than or equal to 195.0 over 100 consecutive trials.
+ """
+
+ metadata = {
+ 'render.modes': ['human', 'rgb_array'],
+ 'video.frames_per_second' : 50
+ }
+
+ def __init__(self):
+ self.sys = System()
+
+ self.force_mag = 10.0
+ self.tau = 0.02 # seconds between state updates
+
+ # Angle at which to fail the episode
+ self.x_threshold = 10.
+ self.x_dot_threshold = 10.
+ self.theta_dot_threshold = 3*np.pi
+
+ # Angle limit set to 2 * theta_threshold_radians so failing observation is still within bounds
+ high = np.array([self.x_threshold*2, self.x_dot_threshold, np.finfo(np.float32).max, np.finfo(np.float32).max])
+
+
+ self.action_space = spaces.Box(-np.ones(1), np.ones(1), dtype = np.float32)
+
+ self.seed()
+ self.state = None
+
+ self.steps_beyond_done = None
+
+ def seed(self, seed=None):
+ self.np_random, seed = seeding.np_random(seed)
+ return [seed]
+
+ def step(self, action):
+ assert self.action_space.contains(action), "%r (%s) invalid"%(action, type(action))
+ state = self.state
+ x, x_dot, theta, theta_dot = state
+ force = self.force_mag * action[0]
+ self.sys.adjust(force)
+
+ costheta = math.cos(theta)
+ sintheta = math.sin(theta)
+
+ if costheta > 0:
+ self.up_time += 1
+ self.max_up_time = np.max([self.up_time, self.max_up_time])
+
+ else:
+ self.up_time = 0
+
+ new_theta, new_x = self.sys.measure()
+ new_theta = radians(new_theta)
+ theta_dot = (new_theta - theta) / self.tau
+ x_dot = (new_x - x) / self.tau
+ self.state = (new_x, x_dot, new_theta, theta_dot)
+
+ done = x < -self.x_threshold \
+ or x > self.x_threshold \
+ or theta_dot < -self.theta_dot_threshold \
+ or theta_dot > self.theta_dot_threshold
+ done = bool(done)
+
+ if not done:
+ reward = np.ceil(costheta)
+ elif self.steps_beyond_done is None:
+ # Pole just fell!
+ self.steps_beyond_done = 0
+ reward = -( 100 * (np.abs(x_dot) + np.abs(theta_dot)) )
+ else:
+ if self.steps_beyond_done == 0:
+ logger.warn("You are calling 'step()' even though this environment has already returned done = True. You should always call 'reset()' once you receive 'done = True' -- any further steps are undefined behavior.")
+ self.steps_beyond_done += 1
+ reward = 0.0
+
+ return np.array(self.state), reward, done, {'max_up_time' : self.max_up_time}
+
+ def reset(self):
+ self.sys.return_home()
+ time.sleep(1)
+ self.state = (0, 0, np.pi, 0)
+
+ self.up_time = 0
+ self.max_up_time = 0
+ self.up = False
+ self.steps_beyond_done = None
+ return np.array(self.state)
+
+
+class nnQ(pt.nn.Module):
+ """
+ Here is a basic neural network with for representing a policy
+ """
+
+ def __init__(self, stateDim, numActions, numHiddenUnits, numLayers):
+ super().__init__()
+
+ InputLayer = [pt.nn.Linear(stateDim + numActions, numHiddenUnits),
+ pt.nn.ReLU()]
+
+ HiddenLayers = []
+ for _ in range(numLayers - 1):
+ HiddenLayers.append(pt.nn.Linear(numHiddenUnits, numHiddenUnits))
+ HiddenLayers.append(pt.nn.ReLU())
+
+
+ OutputLayer = [pt.nn.Linear(numHiddenUnits, 1)]
+
+ AllLayers = InputLayer + HiddenLayers + OutputLayer
+ self.net = pt.nn.Sequential(*AllLayers)
+
+ self.numActions = numActions
+
+ def forward(self,x,a):
+ x = pt.tensor(x, dtype = pt.float32)
+
+ b = pt.nn.functional.one_hot(pt.tensor(a), self.numActions)
+
+ c = b.float().detach()
+ y = pt.cat([x, c])
+
+ return self.net(y)
+
+
+class sarsaAgent:
+ def __init__(self, stateDim, numActions, numHiddenUnits, numLayers,
+ epsilon = .1, gamma = .9, alpha = .1):
+ self.Q = nnQ(stateDim, numActions, numHiddenUnits, numLayers)
+ self.gamma = gamma
+ self.epsilon = epsilon
+ self.alpha = alpha
+ self.numActions = numActions
+ self.s_last = None
+
+ def action(self, x):
+ # This is an epsilon greedy selection
+ if rnd.rand() < self.epsilon:
+ a = rnd.randint(numActions)
+ else:
+ qBest = -np.inf
+ for aTest in range(self.numActions):
+ qTest = self.Q(x, aTest).detach().numpy()[0]
+ if qTest > qBest:
+ qBest = qTest
+ a = aTest
+ return a
+
+ def update(self, s, a, r, s_next,done):
+ # Compute the TD error, if there is enough data
+ update = True
+ if done:
+ Q_cur = self.Q(s, a).detach().numpy()[0]
+ delta = r - Q_cur
+ self.s_last = None
+ Q_diff = self.Q(s, a)
+ elif self.s_last is not None:
+ Q_next = self.Q(s, a).detach().numpy()[0]
+ Q_cur = self.Q(self.s_last, self.a_last).detach().numpy()[0]
+ delta = self.r_last + self.gamma * Q_next - Q_cur
+ Q_diff = self.Q(self.s_last, self.a_last)
+ else:
+ update = False
+
+ # Update the parameter via the semi-gradient method
+ if update:
+ self.Q.zero_grad()
+ Q_diff.backward()
+ for p in self.Q.parameters():
+ p.data.add_(self.alpha * delta, p.grad.data)
+
+ if not done:
+ self.s_last = np.copy(s)
+ self.a_last = np.copy(a)
+ self.r_last = np.copy(r)
+
+# This is the environment
+env = swingUp.SwingUpEnv()
+
+# For simplicity, we only consider forces of -1 and 1
+numActions = 2
+Actions = np.linspace(-1, 1, numActions)
+
+# This is our learning agent
+gamma = .95
+agent = sarsaAgent(5, numActions, 20, 1, epsilon = 5e-2, gamma = gamma, alpha = 1e-5)
+
+maxSteps = 2e5
+
+# This is a helper to deal with the fact that x[2] is actually an angle
+x_to_y = lambda x : np.array([x[0], x[1], np.cos(x[2]), np.sin(x[2]), x[3]])
+
+R = []
+UpTime = []
+
+step = 0
+ep = 0
+while step < maxSteps:
+ ep += 1
+ x = env.reset()
+ C = 0.
+
+ done = False
+ t = 1
+ while not done:
+ t += 1
+ step += 1
+ y = x_to_y(x)
+ a = agent.action(y)
+ u = Actions[a:a+1]
+ env.render()
+ x_next, c, done, info = env.step(u)
+
+ max_up_time = info['max_up_time']
+ y_next = x_to_y(x_next)
+
+ C += (1./t) * (c - C)
+ agent.update(y, a, c, y_next, done)
+ x = x_next
+ if done:
+ break
+
+ if step >= maxSteps:
+ break
+
+
+ R.append(C)
+ UpTime.append(max_up_time)
+ #print('t:',ep+1,', R:',C,', L:',t-1,', G:',G,', Q:', Q_est, 'U:', max_up_time)
+ print('Episode:',ep, 'Total Steps:',step, ', Ave. Reward:',C, ', Episode Length:',t-1, 'Max Up-Time:',max_up_time)
+env.close()
|