-
Notifications
You must be signed in to change notification settings - Fork 14
/
gymenv_qsimpy.py
333 lines (288 loc) · 12.4 KB
/
gymenv_qsimpy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
"""
QSimPy 0.1.0 - Gymnasium Environment
Author: Hoa Nguyen
Sample Gymasium environment for QSimPy with the scenario of 5 IBM QNodes (washington, kolkata, hanoi, perth, lagos)
and continuous QTasks arrival, generated from a quantum circuit dataset.
"""
import gymnasium as gym
from gymnasium.spaces import Box, Discrete
import numpy as np
from numpy.random import default_rng
from qsimpy import Broker, QTask, TaskStatus, IBMQNode, Dataset, Log
import simpy
class QSimPyEnv(gym.Env):
MAX_ROUNDS = 999 # maximum number of rounds in the QTask dataset
"""
Gym environment for QSimPy.
Args:
config (dict): Configuration parameters for the environment.
dataset (str): Path to the QTask dataset.
mode (str): Selection for simulation/emulation mode.
Attributes:
n_qtasks (int): Number of qtasks.
n_qnodes (int): Number of qnodes.
qtasks (list): List of qtasks.
qnodes (list): List of qnodes.
obs_dim (int): Dimension of the observation space.
observation_space (gym.spaces.Box): Observation space.
action_space (gym.spaces.Discrete): Action space.
qtask_dataset (Dataset): QTasks dataset.
rng (numpy.random.Generator): Random number generator.
qsp_env (simpy.Environment): QSimPy environment.
round (int): Current round.
seed (int): Seed for the random number generator.
"""
def __init__(
self,
config=None,
dataset=None,
mode="simulate"
):
"""
Initialize the QSimPyEnv class.
Args:
config (dict): Configuration parameters for the environment.
dataset (str): Path to the dataset.
"""
super().__init__()
# OBSERVATION SPACE
# Each observation is a dict of qtask_attributes and qnode_attributes
# QTask attrributes = [arrivaltime, qt_qubits, cl]
# QNode attributes = [qn_qubits, d1cps, next_available_time]
self.n_qtasks = 25
self.n_qnodes = 5 # number of qnodes
self.qtasks = []
self.qnodes = []
self.mode = mode
self.obs_dim = 4 + self.n_qnodes * 3
self.observation_space = Box(
low=np.ones((self.obs_dim,), dtype=np.float32) * -np.inf,
high=np.ones((self.obs_dim,), dtype=np.float32) * np.inf,
dtype=np.float32,
)
# Example max values for each type of observation
max_time = 10000000 # Max next available time
max_qubits = 500 # Max number of qubits
max_layers = 1000000 # Max number of layers in a circuit
max_clops = 10000 # Max computational load
max_rescheduling_count = 1000 # Max number of rescheduling
# Assuming the observation consists of [arrival_time, qubit_number, circuit_layers] for tasks
# and [qubit_number, clops, next_available_time] for each node
task_obs_low = np.array([0, 0, 0, 0], dtype=np.float64)
task_obs_high = np.array(
[max_time, max_qubits, max_layers, max_rescheduling_count], dtype=np.float64
)
node_obs_low = np.array([0, 0, -1] * self.n_qnodes, dtype=np.float64)
node_obs_high = np.array(
[max_qubits, max_clops, max_time] * self.n_qnodes, dtype=np.float64
)
# Combine to form the complete observation space
obs_low = np.concatenate([task_obs_low, node_obs_low]).astype(np.float64)
obs_high = np.concatenate([task_obs_high, node_obs_high]).astype(np.float64)
self.observation_space = Box(low=obs_low, high=obs_high, dtype=np.float64)
self.current_obs = None
# ACTION SPACE
self.action_space = Discrete(self.n_qnodes)
# Load QTasks dataset
if dataset is None:
raise ValueError("Dataset is not specified")
self.qtask_dataset = Dataset(dataset)
self.rng = default_rng(seed=22)
# QSimPy environment
self.qsp_env = simpy.Environment()
self.setup_quantum_resources()
# Round
self.round = 1
self.seed = 22
self.round_robin_index = 0
self.results = []
# Set rescheduling time, default = 0.01s
self.rescheduling_time = 0.01
# Check if evaluation is set
self.evaluation = config.get("evaluation", False)
self.policy = config.get("policy", "UnknownPolicy")
def _get_obs(self):
"""
Get the current observation of the environment.
Returns:
np.ndarray: The current observation, which includes the observation of the current quantum task and all quantum nodes.
"""
# Get the current observation of quantum task
if self.current_qtask is None:
self.qtask_obs = np.array([0, 0, 0, 0], dtype=np.float64)
else:
self.qtask_obs = np.array(
[
self.current_qtask.arrival_time,
self.current_qtask.qubit_number,
self.current_qtask.circuit_layers,
self.current_qtask.rescheduling_count,
],
dtype=np.float64,
)
# Get the current observation of quantum nodes
self.qnode_obs = []
for qnode in self.qnodes:
qnode_obs = np.array(
[
qnode.qubit_number,
qnode.clops,
qnode.next_available_time,
],
dtype=np.float64,
)
self.qnode_obs.append(qnode_obs)
# Flatten the qnode observations and concatenate with qtask observations
qnode_obs_flat = np.concatenate(self.qnode_obs).astype(np.float64)
self.current_obs = np.concatenate(
(self.qtask_obs, qnode_obs_flat), dtype=np.float64
)
return self.current_obs
def setup_quantum_resources(self):
# Create a list of 10 IBM QNodes
qnode_ids = range(self.n_qnodes)
qnode_names = [
"washington",
"kolkata",
"hanoi",
"perth",
"lagos",
]
self.qnodes = [
IBMQNode.create_ibmq_node(self.qsp_env, qid, qname)
for qid, qname in zip(qnode_ids, qnode_names)
]
# Create a Broker
self.broker = Broker(self.qsp_env, self.qnodes, self.mode)
def generate_qtasks(self):
"""Generate a list of QTasks from the QTask dataset, following Poisson distribution of arrival time."""
# QTask IDs
qtask_ids = list(
f"{self.round:04d}" + f"{id:02d}" for id in range(self.n_qtasks)
)
# Get QTask from the subset of the dataset
qtasks = self.qtask_dataset.get_subset_data(self.round)
# Option 1: Random arrival time of qtasks (original implementation)
n_qtasks = len(qtasks) # number of qtasks
qtask_arrival = self.rng.uniform(
low=0.1 + self.round * 60, high=59.9 + self.round * 60, size=n_qtasks
)
# Set the arrival time of the first qtask to 0
qtask_arrival.sort()
# Option 2: Get arrival time from the datasets
# qtask_arrival = [qtask["arrival_time"] for qtask in qtasks.values()]
# Extract only the values (dictionaries) from qtasks
qtask_values = list(qtasks.values())
self.qtasks = [
QTask(
id=tid,
arrival_time=arrival_time,
qtask_data=qdata,
)
for tid, arrival_time, qdata in zip(qtask_ids, qtask_arrival, qtask_values)
]
# Set current qtask to the first qtask in the list
if len(self.qtasks) > 0:
self.current_qtask = self.qtasks.pop(0)
self.prev_qtask = None
self.round += 1
def submit_task_to_qnode(self, qtask, qnode_id=None):
reward = 0
if qnode_id is None:
qnode_id = self.round_robin_index % self.n_qnodes
self.round_robin_index += 1
qtask, waiting_time, execution_time = self.broker.preprocess_qtask(
qtask, self.qnodes[qnode_id]
)
if qtask.status == TaskStatus.ERROR:
# Apply large penalty to the reward if QTask constraints are not satisfied
# Beside, this task need to be rescheduled to another QNode until it can be executed
# Put this task back to the queue
qtask.status = TaskStatus.QUEUED
qtask.QNode = None
qtask.rescheduling_count += 1
qtask.arrival_time += 1
# Find the index to insert the qtask based on arrival_time
index = 0
while (
index < len(self.qtasks)
and self.qtasks[index].arrival_time < qtask.arrival_time
):
index += 1
self.qtasks.insert(index, qtask)
return -0.1, qtask.rescheduling_count
# Submit the qtask to the qnode following the action
qtask_execution = self.broker.submit_qtask_to_qnode(
qtask, self.qnodes[qnode_id]
)
self.qsp_env.process(qtask_execution)
# Delay time is the time from initial arrival time to the time the task started to be placed in the QNode
delay_time = qtask.arrival_time - qtask.init_arrival_time
# print(f"Estimated waiting time: {waiting_time}")
# print(f"Estimated execution time: {execution_time}")
self.results.append({
'qtask_id': qtask.id,
'qnode_id': qnode_id,
'waiting_time': waiting_time,
'execution_time': execution_time,
'rescheduling_count': qtask.rescheduling_count, # Store the actual count from the task
})
reward = delay_time + waiting_time + execution_time
return reward, qtask.rescheduling_count
def reset(self, *, seed=None, options=None):
super().reset(seed=22)
self.generate_qtasks()
self.current_obs = self._get_obs().astype(np.float32)
info = {}
return self.current_obs, info
def collect_results(self):
Log.print_simulation_results(self.qnodes)
"""Collect and summarize results from the QTask executions."""
total_qtasks = len(self.results)
total_waiting_time = sum(result['waiting_time'] for result in self.results)
total_execution_time = sum(result['execution_time'] for result in self.results)
total_rescheduling_count = sum(result['rescheduling_count'] for result in self.results)
if total_qtasks > 0:
avg_waiting_time = total_waiting_time / total_qtasks
avg_execution_time = total_execution_time / total_qtasks
avg_rescheduling_count = total_rescheduling_count / total_qtasks
else:
avg_waiting_time = avg_execution_time = avg_rescheduling_count = 0
summary = {
'total_qtasks': total_qtasks,
'total_waiting_time': total_waiting_time,
'total_execution_time': total_execution_time,
'avg_waiting_time': avg_waiting_time,
'avg_execution_time': avg_execution_time,
'avg_rescheduling_count': avg_rescheduling_count,
}
return summary
def step(self, action):
# Submit the current qtask to the selected qnode
# action is qnode_id
# Intermediately reward is the inverse of completion time
# Sample Objective: Minimize the total completion time of all qtasks
time_reward, _ = self.submit_task_to_qnode(
self.current_qtask, action
)
reward = 1/time_reward
scheduled_qtask = self.current_qtask
# Get the next observation
# Check if there are more qtasks, if yes, get the next qtask, otherwise set terminated to True
if len(self.qtasks) > 0:
self.current_qtask = self.qtasks.pop(0)
terminated = False
else:
self.current_qtask = None
terminated = True
self.current_obs = self._get_obs()
return self.current_obs, reward, terminated, False, {"scheduled_qtask": scheduled_qtask}
def close(self):
# If the evaluation is set, run the environment and export the results
if self.evaluation:
Log.log = True
self.qsp_env.run()
Log.print_simulation_results(self.qnodes)
self.collect_results()
# Log.export_simulation_results(self.qnodes, output_file=self.policy)
pass