forked from centrefornetzero/domestic-heating-abm
-
Notifications
You must be signed in to change notification settings - Fork 0
/
abm.py
147 lines (110 loc) · 3.87 KB
/
abm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import functools
import json
import time
from typing import (
Any,
Callable,
Dict,
Generic,
Iterable,
Iterator,
List,
Optional,
TextIO,
Tuple,
TypeVar,
)
import pandas as pd
import structlog
A = TypeVar("A", bound="Agent")
M = TypeVar("M", bound="AgentBasedModel")
T = TypeVar("T")
History = Iterable[Tuple[List[Dict[str, Any]], Dict[str, Any]]]
logger = structlog.getLogger()
class UnorderedSpace(Generic[A]):
def __init__(self) -> None:
self.agents: Dict[A, None] = dict()
def add_agent(self, agent: A) -> None:
self.agents[agent] = None
def __contains__(self, agent: A) -> bool:
return agent in self.agents
def __iter__(self) -> Iterator[A]:
yield from self.agents
class Agent:
def make_decisions(self, model: Optional["AgentBasedModel"] = None) -> None:
raise NotImplementedError
class AgentBasedModel(Generic[A]):
def __init__(self, space: Optional[UnorderedSpace[A]] = None) -> None:
self.space = space if space else UnorderedSpace[A]()
def add_agent(self, agent: A) -> None:
self.space.add_agent(agent)
def add_agents(self, agents: Iterable[A]) -> None:
for agent in agents:
self.add_agent(agent)
def increment_timestep(self) -> None:
raise NotImplementedError
def run(
self,
time_steps: int,
agent_callables: Optional[List[Callable[[A], Any]]] = None,
model_callables: Optional[List[Callable[["AgentBasedModel[A]"], Any]]] = None,
) -> History:
if agent_callables is None:
agent_callables = []
if model_callables is None:
model_callables = []
for step in range(time_steps):
start_time = time.time()
try:
self.increment_timestep()
except NotImplementedError:
pass
agent_data = []
for agent in self.space:
agent.make_decisions(self)
agent_datum = {
agent_callable.__name__: agent_callable(agent)
for agent_callable in agent_callables
if agent_callable(agent) is not None
}
agent_data.append(agent_datum)
model_data = {
model_callable.__name__: model_callable(self)
for model_callable in model_callables
}
logger.info(
"step completed",
step=step,
elapsed_time_seconds=time.time() - start_time,
)
yield agent_data, model_data
def collect_when(
model: M, condition: Callable[[M], bool]
) -> Callable[[Callable[..., T]], Callable[..., Optional[T]]]:
def collect_when_decorator(
callable: Callable[..., T]
) -> Callable[..., Optional[T]]:
@functools.wraps(callable)
def wrapper(*args: Any, **kwargs: Any) -> Optional[T]:
if condition(model):
return callable(*args, **kwargs)
return None
return wrapper
return collect_when_decorator
def write_jsonlines(history: History, file: TextIO) -> None:
for step in history:
file.write(json.dumps(step, default=str) + "\n")
def read_jsonlines(file: TextIO) -> History:
for line in file:
yield tuple(json.loads(line)) # type: ignore
def history_to_dataframes(history: History) -> Tuple[pd.DataFrame, pd.DataFrame]:
agent_history, model_history = zip(*history)
flattened_agent_history = []
for step, agents in enumerate(agent_history):
for agent in agents:
flattened_agent_history.append({"step": step, **agent})
agent_history_df = pd.DataFrame(flattened_agent_history)
model_history_df = (
pd.DataFrame(model_history).reset_index().rename({"index": "step"}, axis=1)
)
return agent_history_df, model_history_df