Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/memmap buffer dir #62

Merged
merged 6 commits into from
Jul 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion sheeprl/algos/dreamer_v1/dreamer_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,11 @@ def main():
args.buffer_size // int(args.num_envs * fabric.world_size * args.action_repeat) if not args.dry_run else 2
)
rb = SequentialReplayBuffer(
buffer_size, args.num_envs, device=fabric.device if args.memmap_buffer else "cpu", memmap=args.memmap_buffer
buffer_size,
args.num_envs,
device=fabric.device if args.memmap_buffer else "cpu",
memmap=args.memmap_buffer,
memmap_dir=os.path.join(log_dir, "memmap_buffer", f"rank_{fabric.global_rank}"),
)
if args.checkpoint_path and args.checkpoint_buffer:
if isinstance(state["rb"], list) and fabric.world_size == len(state["rb"]):
Expand Down
26 changes: 19 additions & 7 deletions sheeprl/algos/dreamer_v2/dreamer_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,10 +254,13 @@ def train(
imagined_trajectories[i] = imagined_latent_state

# predict values and rewards
predicted_target_values = target_critic(imagined_trajectories)
predicted_rewards = world_model.reward_model(imagined_trajectories)
with torch.no_grad():
predicted_target_values = Independent(Normal(target_critic(imagined_trajectories), 1), 1).mean
predicted_rewards = Independent(Normal(world_model.reward_model(imagined_trajectories), 1), 1).mean
if args.use_continues and world_model.continue_model:
continues = Independent(Bernoulli(logits=world_model.continue_model(imagined_trajectories)), 1).mean
continues = Independent(
Bernoulli(logits=world_model.continue_model(imagined_trajectories), validate_args=False), 1
).mean
true_done = (1 - data["dones"]).reshape(1, -1, 1) * args.gamma
continues = torch.cat((true_done, continues[1:]))
else:
Expand Down Expand Up @@ -317,8 +320,7 @@ def train(
dynamics = lambda_values[1:]

# Reinforce
baseline = target_critic(imagined_trajectories[:-2])
advantage = (lambda_values[1:] - baseline).detach()
advantage = (lambda_values[1:] - predicted_target_values).detach()
reinforce = (
torch.stack(
[
Expand Down Expand Up @@ -546,10 +548,20 @@ def main():
)
buffer_type = args.buffer_type.lower()
if buffer_type == "sequential":
rb = SequentialReplayBuffer(buffer_size, args.num_envs, device="cpu", memmap=args.memmap_buffer)
rb = SequentialReplayBuffer(
buffer_size,
args.num_envs,
device="cpu",
memmap=args.memmap_buffer,
memmap_dir=os.path.join(log_dir, "memmap_buffer", f"rank_{fabric.global_rank}"),
)
elif buffer_type == "episode":
rb = EpisodeBuffer(
buffer_size, sequence_length=args.per_rank_sequence_length, device="cpu", memmap=args.memmap_buffer
buffer_size,
sequence_length=args.per_rank_sequence_length,
device="cpu",
memmap=args.memmap_buffer,
memmap_dir=os.path.join(log_dir, "memmap_buffer", f"rank_{fabric.global_rank}"),
)
else:
raise ValueError(f"Unrecognized buffer type: must be one of `sequential` or `episode`, received: {buffer_type}")
Expand Down
8 changes: 7 additions & 1 deletion sheeprl/algos/droq/droq.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,13 @@ def main():
buffer_size = (
args.buffer_size // int(args.num_envs * fabric.world_size * args.action_repeat) if not args.dry_run else 1
)
rb = ReplayBuffer(buffer_size, args.num_envs, device=device, memmap=args.memmap_buffer)
rb = ReplayBuffer(
buffer_size,
args.num_envs,
device=device,
memmap=args.memmap_buffer,
memmap_dir=os.path.join(log_dir, "memmap_buffer", f"rank_{fabric.global_rank}"),
)
step_data = TensorDict({}, batch_size=[args.num_envs], device=device)

# Global variables
Expand Down
8 changes: 7 additions & 1 deletion sheeprl/algos/p2e_dv1/p2e_dv1.py
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,13 @@ def main():
buffer_size = (
args.buffer_size // int(args.num_envs * fabric.world_size * args.action_repeat) if not args.dry_run else 4
)
rb = SequentialReplayBuffer(buffer_size, args.num_envs, device="cpu", memmap=args.memmap_buffer)
rb = SequentialReplayBuffer(
buffer_size,
args.num_envs,
device="cpu",
memmap=args.memmap_buffer,
memmap_dir=os.path.join(log_dir, "memmap_buffer", f"rank_{fabric.global_rank}"),
)
if args.checkpoint_path and args.checkpoint_buffer:
if isinstance(state["rb"], list) and fabric.world_size == len(state["rb"]):
rb = state["rb"][fabric.global_rank]
Expand Down
14 changes: 12 additions & 2 deletions sheeprl/algos/p2e_dv2/p2e_dv2.py
Original file line number Diff line number Diff line change
Expand Up @@ -691,10 +691,20 @@ def main():
)
buffer_type = args.buffer_type.lower()
if buffer_type == "sequential":
rb = SequentialReplayBuffer(buffer_size, args.num_envs, device="cpu", memmap=args.memmap_buffer)
rb = SequentialReplayBuffer(
buffer_size,
args.num_envs,
device="cpu",
memmap=args.memmap_buffer,
memmap_dir=os.path.join(log_dir, "memmap_buffer", f"rank_{fabric.global_rank}"),
)
elif buffer_type == "episode":
rb = EpisodeBuffer(
buffer_size, sequence_length=args.per_rank_sequence_length, device="cpu", memmap=args.memmap_buffer
buffer_size,
sequence_length=args.per_rank_sequence_length,
device="cpu",
memmap=args.memmap_buffer,
memmap_dir=os.path.join(log_dir, "memmap_buffer", f"rank_{fabric.global_rank}"),
)
else:
raise ValueError(f"Unrecognized buffer type: must be one of `sequential` or `episode`, received: {buffer_type}")
Expand Down
8 changes: 7 additions & 1 deletion sheeprl/algos/ppo/ppo.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,13 @@ def main():
)

# Local data
rb = ReplayBuffer(args.rollout_steps, args.num_envs, device=device, memmap=args.memmap_buffer)
rb = ReplayBuffer(
args.rollout_steps,
args.num_envs,
device=device,
memmap=args.memmap_buffer,
memmap_dir=os.path.join(log_dir, "memmap_buffer", f"rank_{fabric.global_rank}"),
)
step_data = TensorDict({}, batch_size=[args.num_envs], device=device)

# Global variables
Expand Down
8 changes: 7 additions & 1 deletion sheeprl/algos/ppo/ppo_decoupled.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,13 @@ def player(args: PPOArgs, world_collective: TorchCollective, player_trainer_coll
)

# Local data
rb = ReplayBuffer(args.rollout_steps, args.num_envs, device=device, memmap=args.memmap_buffer)
rb = ReplayBuffer(
args.rollout_steps,
args.num_envs,
device=device,
memmap=args.memmap_buffer,
memmap_dir=os.path.join(logger.log_dir, "memmap_buffer", f"rank_{fabric.global_rank}"),
)
step_data = TensorDict({}, batch_size=[args.num_envs], device=device)

# Global variables
Expand Down
8 changes: 7 additions & 1 deletion sheeprl/algos/ppo_continuous/ppo_continuous.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,13 @@ def main():
)

# Local data
rb = ReplayBuffer(args.rollout_steps, args.num_envs, device=device, memmap=args.memmap_buffer)
rb = ReplayBuffer(
args.rollout_steps,
args.num_envs,
device=device,
memmap=args.memmap_buffer,
memmap_dir=os.path.join(log_dir, "memmap_buffer", f"rank_{fabric.global_rank}"),
)
step_data = TensorDict({}, batch_size=[args.num_envs], device=device)

# Global variables
Expand Down
8 changes: 7 additions & 1 deletion sheeprl/algos/ppo_pixel/ppo_atari.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,13 @@ def player(args: PPOAtariArgs, world_collective: TorchCollective, player_trainer
)

# Local data
rb = ReplayBuffer(args.rollout_steps, args.num_envs, device=device, memmap=args.memmap_buffer)
rb = ReplayBuffer(
args.rollout_steps,
args.num_envs,
device=device,
memmap=args.memmap_buffer,
memmap_dir=os.path.join(logger.log_dir, "memmap_buffer", f"rank_{fabric.global_rank}"),
)
step_data = TensorDict({}, batch_size=[args.num_envs], device=device)

# Global variables
Expand Down
8 changes: 7 additions & 1 deletion sheeprl/algos/ppo_pixel/ppo_pixel_continuous.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,13 @@ def player(args: PPOPixelContinuousArgs, world_collective: TorchCollective, play
)

# Local data
rb = ReplayBuffer(args.rollout_steps, args.num_envs, device=device, memmap=args.memmap_buffer)
rb = ReplayBuffer(
args.rollout_steps,
args.num_envs,
device=device,
memmap=args.memmap_buffer,
memmap_dir=os.path.join(logger.log_dir, "memmap_buffer", f"rank_{fabric.global_rank}"),
)
step_data = TensorDict({}, batch_size=[args.num_envs], device=device)

# Global variables
Expand Down
8 changes: 7 additions & 1 deletion sheeprl/algos/ppo_recurrent/ppo_recurrent.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,13 @@ def main():
)

# Local data
rb = ReplayBuffer(args.rollout_steps, args.num_envs, device=device, memmap=args.memmap_buffer)
rb = ReplayBuffer(
args.rollout_steps,
args.num_envs,
device=device,
memmap=args.memmap_buffer,
memmap_dir=os.path.join(log_dir, "memmap_buffer", f"rank_{fabric.global_rank}"),
)
step_data = TensorDict({}, batch_size=[1, args.num_envs], device=device)

# Global variables
Expand Down
8 changes: 7 additions & 1 deletion sheeprl/algos/sac/sac.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,13 @@ def main():

# Local data
buffer_size = args.buffer_size // int(args.num_envs * fabric.world_size) if not args.dry_run else 1
rb = ReplayBuffer(buffer_size, args.num_envs, device=device, memmap=args.memmap_buffer)
rb = ReplayBuffer(
buffer_size,
args.num_envs,
device=device,
memmap=args.memmap_buffer,
memmap_dir=os.path.join(log_dir, "memmap_buffer", f"rank_{fabric.global_rank}"),
)
step_data = TensorDict({}, batch_size=[args.num_envs], device=device)

# Global variables
Expand Down
8 changes: 7 additions & 1 deletion sheeprl/algos/sac/sac_decoupled.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,13 @@ def player(args: SACArgs, world_collective: TorchCollective, player_trainer_coll

# Local data
buffer_size = args.buffer_size // args.num_envs if not args.dry_run else 1
rb = ReplayBuffer(buffer_size, args.num_envs, device=device, memmap=args.memmap_buffer)
rb = ReplayBuffer(
buffer_size,
args.num_envs,
device=device,
memmap=args.memmap_buffer,
memmap_dir=os.path.join(logger.log_dir, "memmap_buffer", f"rank_{fabric.global_rank}"),
)
step_data = TensorDict({}, batch_size=[args.num_envs], device=device)

# Global variables
Expand Down
6 changes: 5 additions & 1 deletion sheeprl/algos/sac_pixel/sac_pixel_continuous.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,11 @@ def main():
# Local data
buffer_size = args.buffer_size // int(args.num_envs * fabric.world_size) if not args.dry_run else 1
rb = ReplayBuffer(
buffer_size, args.num_envs, device=fabric.device if args.memmap_buffer else "cpu", memmap=args.memmap_buffer
buffer_size,
args.num_envs,
device=fabric.device if args.memmap_buffer else "cpu",
memmap=args.memmap_buffer,
memmap_dir=os.path.join(log_dir, "memmap_buffer", f"rank_{fabric.global_rank}"),
)
step_data = TensorDict({}, batch_size=[args.num_envs], device=fabric.device if args.memmap_buffer else "cpu")

Expand Down
67 changes: 60 additions & 7 deletions sheeprl/data/buffers.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
import os
import typing
import uuid
import warnings
from pathlib import Path
from typing import List, Optional, Union

import numpy as np
import torch
from tensordict import MemmapTensor, TensorDict
from tensordict.tensordict import TensorDictBase
from torch import Size, Tensor, device
import shutil


class ReplayBuffer:
Expand All @@ -15,6 +20,7 @@ def __init__(
n_envs: int = 1,
device: Union[device, str] = "cpu",
memmap: bool = False,
memmap_dir: Optional[Union[str, os.PathLike]] = None,
):
"""A replay buffer which internally uses a TensorDict.

Expand All @@ -34,7 +40,17 @@ def __init__(
device = torch.device(device=device)
self._device = device
self._memmap = memmap
self._memmap_dir = memmap_dir
if self._memmap:
if memmap_dir is None:
warnings.warn(
"The buffer will be memory-mapped into the `/tmp` folder, this means that there is the"
" possibility to lose the saved files. Set the `memmap_dir` to a known directory.",
UserWarning,
)
else:
self._memmap_dir = Path(self._memmap_dir)
self._memmap_dir.mkdir(parents=True, exist_ok=True)
self._buf = None
else:
self._buf = TensorDict({}, batch_size=[buffer_size, n_envs], device=device)
Expand Down Expand Up @@ -115,13 +131,18 @@ def add(self, data: Union["ReplayBuffer", TensorDictBase]) -> None:
if self._memmap and self._buf is None:
self._buf = TensorDict(
{
k: MemmapTensor((self._buffer_size, self._n_envs, *v.shape[2:]), dtype=v.dtype, device=v.device)
k: MemmapTensor(
(self._buffer_size, self._n_envs, *v.shape[2:]),
dtype=v.dtype,
device=v.device,
filename=None if self._memmap_dir is None else self._memmap_dir / f"{k}.memmap",
)
for k, v in data_to_store.items()
},
batch_size=[self._buffer_size, self._n_envs],
device=self.device,
)
self._buf.memmap_()
self._buf.memmap_(prefix=self._memmap_dir)
self._buf[idxes, :] = data_to_store
if self._pos + data_len >= self._buffer_size:
self._full = True
Expand Down Expand Up @@ -207,8 +228,9 @@ def __init__(
n_envs: int = 1,
device: Union[device, str] = "cpu",
memmap: bool = False,
memmap_dir: Optional[Union[str, os.PathLike]] = None,
):
super().__init__(buffer_size, n_envs, device, memmap)
super().__init__(buffer_size, n_envs, device, memmap, memmap_dir)

def sample(
self,
Expand Down Expand Up @@ -338,6 +360,7 @@ def __init__(
sequence_length: int,
device: Union[device, str] = "cpu",
memmap: bool = False,
memmap_dir: Optional[Union[str, os.PathLike]] = None,
) -> None:
if buffer_size <= 0:
raise ValueError(f"The buffer size must be greater than zero, got: {buffer_size}")
Expand All @@ -355,6 +378,16 @@ def __init__(
device = torch.device(device=device)
self._device = device
self._memmap = memmap
self._memmap_dir = memmap_dir
if memmap_dir is None:
warnings.warn(
"The buffer will be memory-mapped into the `/tmp` folder, this means that there is the"
" possibility to lose the saved files. Set the `memmap_dir` to a known directory.",
UserWarning,
)
else:
self._memmap_dir = Path(self._memmap_dir)
self._memmap_dir.mkdir(parents=True, exist_ok=True)
self._chunk_length = torch.arange(sequence_length, device=self.device).reshape(1, -1)

@property
Expand Down Expand Up @@ -418,14 +451,34 @@ def add(self, episode: TensorDictBase) -> None:
if self.full or len(self) + ep_len > self._buffer_size:
cum_lengths = np.array(self._cum_lengths)
mask = (len(self) - cum_lengths + ep_len) <= self._buffer_size
self._buf = self._buf[mask.argmax() + 1 :]
cum_lengths = cum_lengths[mask.argmax() + 1 :] - cum_lengths[mask.argmax()]
last_to_remove = mask.argmax()
# Remove all memmaped episodes
if self._memmap and self._memmap_dir is not None:
for _ in range(last_to_remove + 1):
filename = self._buf[0][self._buf[0].sorted_keys[0]].filename
for k in self._buf[0].sorted_keys:
f = self._buf[0][k].file
if f is not None:
f.close()
del self._buf[0]
shutil.rmtree(os.path.dirname(filename))
else:
self._buf = self._buf[last_to_remove + 1 :]
cum_lengths = cum_lengths[last_to_remove + 1 :] - cum_lengths[last_to_remove]
self._cum_lengths = cum_lengths.tolist()
self._cum_lengths.append(len(self) + ep_len)
if self._memmap:
episode_dir = None
if self._memmap_dir is not None:
episode_dir = self._memmap_dir / f"episode_{str(uuid.uuid4())}"
episode_dir.mkdir(parents=True, exist_ok=True)
for k, v in episode.items():
episode[k] = MemmapTensor.from_tensor(v)
episode.memmap_()
episode[k] = MemmapTensor.from_tensor(
v,
filename=None if episode_dir is None else episode_dir / f"{k}.memmap",
transfer_ownership=False,
)
episode.memmap_(prefix=episode_dir)
episode.to(self.device)
self._buf.append(episode)

Expand Down
Loading