Skip to content

Commit

Permalink
🐛 Something still weird in BiRP, keep fixing
Browse files Browse the repository at this point in the history
  • Loading branch information
Skylark0924 committed Mar 6, 2024
1 parent da402fd commit 7793fc7
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 36 deletions.
4 changes: 2 additions & 2 deletions examples/learning_ml/example_tpgmm.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

# Reproductions for new situations: set the endpoint as the start point to make a cycled motion
ref_demo_idx = 2
start_xdx = [Repr.demos_xdx[ref_demo_idx][0]]
end_xdx = [Repr.demos_xdx[ref_demo_idx][0]]
start_xdx = [Repr.demos_xdx[ref_demo_idx][9]]
end_xdx = [Repr.demos_xdx[ref_demo_idx][9]]
Repr.task_params = {'frame_origins': [start_xdx, end_xdx], 'frame_names': ['start', 'end']}
traj, _ = Repr.generate(model, ref_demo_idx)
7 changes: 4 additions & 3 deletions examples/learning_ml/example_tpgmmbi_RPCtrl.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
end_xdx_r = end_xdx_l

Repr.task_params = {"left": {"frame_origins": [start_xdx_l, end_xdx_l], "frame_names": ["start", "end"]},
"right": {"frame_origins": [start_xdx_r, end_xdx_r], "frame_names": ["start", "end"]}}
traj_l, traj_r, _, _ = Repr.generate([model_l, model_r, model_c], ref_demo_idx=0)

"right": {"frame_origins": [start_xdx_r, end_xdx_r], "frame_names": ["start", "end"]},
"relative": {"frame_origins": [[start_xdx_l[0] - start_xdx_r[0]], [end_xdx_l[0] - end_xdx_r[0]]],
"frame_names": ["start", "end"]}}
traj_l, traj_r, _, _ = Repr.generate([model_l, model_r, model_c], ref_demo_idx=1)
10 changes: 10 additions & 0 deletions examples/learning_ml/example_tpgmmbi_RPRepr.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,13 @@

# Reproductions for the same situations
Repr.reproduce([model_l, model_r], show_demo_idx=2)

# Reproductions for new situations
start_xdx_l = [np.array([-0.5, 1, 0, 0])]
end_xdx_l = [np.array([4, 4, 0, 0])]
start_xdx_r = [np.array([6.5, 7, 0, 0])]
end_xdx_r = end_xdx_l

Repr.task_params = {"left": {"frame_origins": [start_xdx_l, end_xdx_l], "frame_names": ["start", "end"]},
"right": {"frame_origins": [start_xdx_r, end_xdx_r], "frame_names": ["start", "end"]}}
traj_l, traj_r, _, _ = Repr.generate([model_l, model_r], ref_demo_idx=0)
86 changes: 68 additions & 18 deletions rofunc/learning/ml/tpgmm.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,12 @@ def get_dx(self):
for j in range(len(self.demos_x[i])):
if 0 < j < len(self.demos_x[i]) - 1:
dx = (self.demos_x[i][j + 1] - self.demos_x[i][j - 1]) / 2
elif j == len(self.demos_x[i]) - 1:
dx = self.demos_x[i][j] - self.demos_x[i][j - 1]
else:
dx = self.demos_x[i][j + 1] - self.demos_x[i][j]
elif j == len(self.demos_x[i]) - 1: # final state
# dx = self.demos_x[i][j] - self.demos_x[i][j - 1]
dx = np.zeros(self.nb_dim)
else: # initial state j = 0
# dx = self.demos_x[i][j + 1] - self.demos_x[i][j]
dx = np.zeros(self.nb_dim)
dx = dx / 0.01
demo_dx.append(dx)
demos_dx.append(np.array(demo_dx))
Expand Down Expand Up @@ -160,7 +162,7 @@ def poe(self, model: HMM, show_demo_idx: int) -> GMM:
# We use the transformation of the first timestep as they are constant
if len(self.task_params['frame_origins'][0]) == 1: # For new task parameters generation
A, b = self.demo_A_xdx[0][0], self.demo_b_xdx[0][0] # Attention: here we use self.demo_A_xdx not
# self.demos_A_xdx cause we just called get_A_b not get_related_matrix
# self.demos_A_xdx, because we just called get_A_b not get_related_matrix
else:
A, b = self.demos_A_xdx[show_demo_idx][0], self.demos_b_xdx[show_demo_idx][0]

Expand All @@ -180,13 +182,17 @@ def poe(self, model: HMM, show_demo_idx: int) -> GMM:
plt.show()
return prod

def _reproduce(self, model: HMM, prod: GMM, show_demo_idx: int, start_xdx: np.ndarray) -> np.ndarray:
def _reproduce(self, model: HMM, prod: GMM, show_demo_idx: int, start_xdx: np.ndarray, title: str = None,
label: str = None) -> np.ndarray:
"""
Reproduce the specific demo_idx from the learned model
:param model: learned model
:param prod: result of PoE
:param show_demo_idx: index of the specific demo to be reproduced
:param start_xdx: start state
:param title: title of the plot
:param label: label of the plot
:return:
"""
# get the most probable sequence of state for this demonstration
Expand All @@ -200,7 +206,7 @@ def _reproduce(self, model: HMM, prod: GMM, show_demo_idx: int, start_xdx: np.nd

xi = lqr.seq_xi

fig = rf.ml.gen_plot(self.nb_dim, xi, prod, self.demos_x, show_demo_idx)
fig = rf.ml.gen_plot(self.nb_dim, xi, prod, self.demos_x, show_demo_idx, title=title, label=label)
if self.save:
rf.visualab.save_img(fig, self.save_params['save_dir'], format=self.save_params['format'])
if self.plot:
Expand All @@ -223,7 +229,8 @@ def reproduce(self, model: HMM, show_demo_idx: int) -> Tuple[np.ndarray, GMM]:
beauty_print('reproduce {}-th demo from learned representation'.format(show_demo_idx), type='info')

prod = self.poe(model, show_demo_idx)
traj = self._reproduce(model, prod, show_demo_idx, self.demos_xdx[show_demo_idx][0])
traj = self._reproduce(model, prod, show_demo_idx, self.demos_xdx[show_demo_idx][0],
title="Trajectory reproduction", label="reproduced line")
return traj, prod

def generate(self, model: HMM, ref_demo_idx: int) -> Tuple[np.ndarray, GMM]:
Expand All @@ -235,7 +242,8 @@ def generate(self, model: HMM, ref_demo_idx: int) -> Tuple[np.ndarray, GMM]:
self.get_A_b()

prod = self.poe(model, ref_demo_idx)
traj = self._reproduce(model, prod, ref_demo_idx, self.task_params['frame_origins'][0][0])
traj = self._reproduce(model, prod, ref_demo_idx, self.task_params['frame_origins'][0][0],
title="Trajectory generation", label="generated line")
return traj, prod


Expand Down Expand Up @@ -324,7 +332,7 @@ def generate(self, models: List, ref_demo_idx: int) -> Tuple[ndarray, ndarray, G

class TPGMM_RPCtrl(TPGMMBi):
"""
Simple TPGMMBi (no coordination) with bimanual coordination in the LQR controller
Simple TPGMMBi (no coordination in the representation) with bimanual coordination in the LQR controller
"""

def __init__(self, demos_left_x, demos_right_x, task_params, nb_states: int = 4, reg: float = 1e-3,
Expand All @@ -350,7 +358,20 @@ def get_rel_demos(self):
rel_demos.append(rel_demo)
return rel_demos

def _bi_reproduce(self, model_l, prod_l, model_r, prod_r, model_c, prod_c, show_demo_idx):
def _bi_reproduce(self, model_l, prod_l, model_r, prod_r, model_c, prod_c, show_demo_idx, start_xdx_lst):
"""
Reproduce the specific demo_idx from the learned model
:param model_l: learned model for left arm
:param prod_l: result of PoE for left arm
:param model_r: learned model for right arm
:param prod_r: result of PoE for ri+588/5/8ght arm
:param model_c: learned model for relative movement
:param prod_c: result of PoE for relative movement
:param show_demo_idx: index of the specific demo to be reproduced
:param start_xdx_lst: the start states for both arms, List
:return:
"""
# get the most probable sequence of state for this demonstration
sq_l = model_l.viterbi(self.repr_l.demos_xdx_augm[show_demo_idx])
sq_r = model_r.viterbi(self.repr_r.demos_xdx_augm[show_demo_idx])
Expand All @@ -362,9 +383,9 @@ def _bi_reproduce(self, model_l, prod_l, model_r, prod_r, model_c, prod_c, show_
lqr.mvn_xi_r = prod_r.concatenate_gaussian(sq_r) # augmented version of gaussian
lqr.mvn_xi_c = prod_c.concatenate_gaussian(sq_c) # augmented version of gaussian
lqr.mvn_u = -4 # N(0, R_s) R_s: R^{DTxDT}
lqr.x0_l = self.repr_l.demos_xdx[show_demo_idx][0] # zeta_0 R^DC => 4
lqr.x0_r = self.repr_r.demos_xdx[show_demo_idx][0]
lqr.x0_c = self.repr_c.demos_xdx[show_demo_idx][0]
lqr.x0_l = start_xdx_lst[0] # zeta_0 R^DC => 4
lqr.x0_r = start_xdx_lst[1]
lqr.x0_c = start_xdx_lst[2]

xi_l, xi_r = lqr.seq_xi
return xi_l, xi_r
Expand All @@ -375,6 +396,13 @@ def fit(self) -> Tuple[HMM, HMM, HMM]:
return model_l, model_r, model_c

def reproduce(self, models, show_demo_idx: int) -> Tuple[ndarray, ndarray, GMM, GMM]:
"""
Reproduce the specific demo_idx from the learned model
:param models: List of learned models for left, right and relative movement
:param show_demo_idx: index of the specific demo to be reproduced
:return:
"""
beauty_print('reproduce {}-th demo from learned representation'.format(show_demo_idx), type='info')

model_l, model_r, model_c = models
Expand All @@ -383,7 +411,10 @@ def reproduce(self, models, show_demo_idx: int) -> Tuple[ndarray, ndarray, GMM,
prod_r = self.repr_r.poe(model_r, show_demo_idx=show_demo_idx)
prod_c = self.repr_c.poe(model_c, show_demo_idx=show_demo_idx)
# Coordinated trajectory
ctraj_l, ctraj_r = self._bi_reproduce(model_l, prod_l, model_r, prod_r, model_c, prod_c, show_demo_idx)
ctraj_l, ctraj_r = self._bi_reproduce(model_l, prod_l, model_r, prod_r, model_c, prod_c, show_demo_idx,
start_xdx_lst=[self.repr_l.demos_xdx[show_demo_idx][0],
self.repr_r.demos_xdx[show_demo_idx][0],
self.repr_c.demos_xdx[show_demo_idx][0]])

data_lst = [ctraj_l[:, :self.repr_l.nb_dim], ctraj_r[:, :self.repr_r.nb_dim]]
fig = rf.visualab.traj_plot(data_lst, title='Reproduced bimanual trajectories')
Expand All @@ -394,21 +425,36 @@ def reproduce(self, models, show_demo_idx: int) -> Tuple[ndarray, ndarray, GMM,
return ctraj_l, ctraj_r, prod_l, prod_r

def generate(self, models: List, ref_demo_idx: int) -> Tuple[ndarray, ndarray, GMM, GMM]:
"""
Generate a new trajectory from the learned model
:param models: List of learned models for left, right and relative movement
:param ref_demo_idx: index of the specific demo to be referenced
:return:
"""
beauty_print('generate trajectories from learned representation with new task parameters', type='info')

model_l, model_r, model_c = models

self.repr_l.task_params = self.task_params['left']
self.repr_r.task_params = self.task_params['right']
self.repr_c.task_params = self.task_params['relative']

self.repr_l.get_A_b()
self.repr_r.get_A_b()
self.repr_c.get_A_b()

prod_l = self.repr_l.poe(model_l, ref_demo_idx)
prod_r = self.repr_r.poe(model_r, ref_demo_idx)
prod_c = self.repr_c.poe(model_c, show_demo_idx=ref_demo_idx)
prod_c = self.repr_c.poe(model_c, ref_demo_idx)

ctraj_l, ctraj_r = self._bi_reproduce(model_l, prod_l, model_r, prod_r, model_c, prod_c, ref_demo_idx)
ctraj_l, ctraj_r = self._bi_reproduce(model_l, prod_l, model_r, prod_r, model_c, prod_c, ref_demo_idx,
# start_xdx_lst=[self.repr_l.task_params['frame_origins'][0][0],
# self.repr_r.task_params['frame_origins'][0][0],
# self.repr_c.task_params['frame_origins'][0][0]])
start_xdx_lst=[self.repr_l.demos_xdx[ref_demo_idx][0],
self.repr_r.demos_xdx[ref_demo_idx][0],
self.repr_c.demos_xdx[ref_demo_idx][0]])

data_lst = [ctraj_l[:, :self.repr_l.nb_dim], ctraj_r[:, :self.repr_r.nb_dim]]
fig = rf.visualab.traj_plot(data_lst, title='Generated bimanual trajectories')
Expand All @@ -421,7 +467,7 @@ def generate(self, models: List, ref_demo_idx: int) -> Tuple[ndarray, ndarray, G

class TPGMM_RPRepr(TPGMMBi):
"""
TPGMM for bimanual coordination
TPGMM for bimanual coordination in representation
"""

def __init__(self, demos_left_x, demos_right_x, task_params, nb_states: int = 4, reg: float = 1e-3,
Expand Down Expand Up @@ -655,6 +701,10 @@ def generate(self, models: List, ref_demo_idx: int, leader: str = None):


class TPGMM_RPAll(TPGMM_RPRepr, TPGMM_RPCtrl):
"""
TPGMM for bimanual coordination in both representation and LQR controller
"""

def __init__(self, demos_left_x, demos_right_x, nb_states: int = 4, reg: float = 1e-3, horizon: int = 150,
plot: bool = False, save: bool = False, save_params: dict = None, **kwargs):
TPGMM_RPRepr.__init__(self, demos_left_x, demos_right_x, nb_states, reg, horizon, plot=plot, save=save,
Expand Down
27 changes: 14 additions & 13 deletions rofunc/learning/ml/visualize.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ def poe_plot(nb_dim, mod_list, prod, demos_x, show_demo_idx, task_params=None):
return fig


def gen_plot(nb_dim, xi, prod, demos_x, show_demo_idx):
def gen_plot(nb_dim, xi, prod, demos_x, show_demo_idx, title='Trajectory reproduction', label='reproduced line'):
if nb_dim == 2:
fig = generate_plot_2d(xi, prod, demos_x, show_demo_idx)
fig = generate_plot_2d(xi, prod, demos_x, show_demo_idx, title=title, label=label)
elif nb_dim > 2:
fig = generate_plot_3d(xi, prod, demos_x, show_demo_idx)
fig = generate_plot_3d(xi, prod, demos_x, show_demo_idx, title=title, label=label)
else:
raise Exception('Dimension is less than 2, cannot plot')
return fig
Expand Down Expand Up @@ -149,27 +149,28 @@ def poe_plot_3d(mod_list, prod, demos_x, demo_idx, task_params=None):
return fig


def generate_plot_2d(xi, prod, demos_x, demo_idx):
def generate_plot_2d(xi, prod, demos_x, demo_idx, title='Trajectory reproduction', label='reproduced line'):
fig = plt.figure()

plt.title('Trajectory reproduction')
plt.title(title)
rf.visualab.gmm_plot(prod.mu, prod.sigma, swap=True, dim=[0, 1], color='gold', alpha=0.5)
plt.plot(demos_x[demo_idx][:, 0], demos_x[demo_idx][:, 1], 'k--', lw=2, label='demo line')
plt.plot(xi[:, 0], xi[:, 1], color='r', lw=2, label='generated line')
plt.plot(xi[:, 0], xi[:, 1], color='r', lw=2, label=label)
plt.axis('equal')
plt.legend()
return fig


def generate_plot_3d(xi, prod, demos_x, demo_idx, scale=0.01, plot_gmm=False, plot_ori=True):
def generate_plot_3d(xi, prod, demos_x, demo_idx, scale=0.01, plot_gmm=False, plot_ori=True,
title='Trajectory reproduction', label='reproduced line'):
fig = plt.figure(figsize=(4, 4))
ax = fig.add_subplot(111, projection='3d', fc='white')

ax.set_title('Trajectory reproduction')
ax.set_title(title)
if plot_gmm:
rf.visualab.gmm_plot(prod.mu, prod.sigma, dim=[0, 1, 2], color='gold', scale=0.01, ax=ax)
ax.plot(demos_x[demo_idx][:, 0], demos_x[demo_idx][:, 1], demos_x[demo_idx][:, 2], 'k--', lw=2, label='demo line')
ax.plot(xi[:, 0], xi[:, 1], xi[:, 2], color='r', lw=2, label='generated line')
ax.plot(xi[:, 0], xi[:, 1], xi[:, 2], color='r', lw=2, label=label)
rf.visualab.set_axis(ax, data=[demos_x[demo_idx][:, 0], demos_x[demo_idx][:, 1], demos_x[demo_idx][:, 2]])
plt.legend()

Expand All @@ -178,21 +179,21 @@ def generate_plot_3d(xi, prod, demos_x, demo_idx, scale=0.01, plot_gmm=False, pl
plt.figure()
plt.subplot(2, 2, 1)
plt.plot(np.arange(len(demos_x[demo_idx][:, 3])), demos_x[demo_idx][:, 3], 'k--', lw=2, label='demo line')
plt.plot(t, xi[:, 3], color='r', lw=2, label='generated line')
plt.plot(t, xi[:, 3], color='r', lw=2, label=label)
plt.title('w-t')

plt.subplot(2, 2, 2)
plt.plot(np.arange(len(demos_x[demo_idx][:, 4])), demos_x[demo_idx][:, 4], 'k--', lw=2, label='demo line')
plt.plot(t, xi[:, 4], color='r', lw=2, label='generated line')
plt.plot(t, xi[:, 4], color='r', lw=2, label=label)
plt.title('x-t')

plt.subplot(2, 2, 3)
plt.plot(np.arange(len(demos_x[demo_idx][:, 5])), demos_x[demo_idx][:, 5], 'k--', lw=2, label='demo line')
plt.plot(t, xi[:, 5], color='r', lw=2, label='generated line')
plt.plot(t, xi[:, 5], color='r', lw=2, label=label)
plt.title('y-t')

plt.subplot(2, 2, 4)
plt.plot(np.arange(len(demos_x[demo_idx][:, 6])), demos_x[demo_idx][:, 6], 'k--', lw=2, label='demo line')
plt.plot(t, xi[:, 6], color='r', lw=2, label='generated line')
plt.plot(t, xi[:, 6], color='r', lw=2, label=label)
plt.title('z-t')
return fig

0 comments on commit 7793fc7

Please sign in to comment.