-
Notifications
You must be signed in to change notification settings - Fork 0
/
ansible_api.py
263 lines (236 loc) · 9.14 KB
/
ansible_api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
import json
import shutil
from collections import namedtuple
from ansible.parsing.dataloader import DataLoader
from ansible.vars.manager import VariableManager
from ansible.inventory.manager import InventoryManager
from ansible.playbook.play import Play
from ansible.executor.task_queue_manager import TaskQueueManager
from ansible.plugins.callback import CallbackBase
from ansible.executor.playbook_executor import PlaybookExecutor
from ansible.utils.ssh_functions import check_for_controlpersist
import ansible.constants as C
import redis
import datetime
import logging, logging.handlers
from callback_plugins.redis2 import CallbackModule
# from tools.config import REDIS_ADDR, REDIS_PORT, REDIS_PD, ansible_result_redis_db
from backend.settings import REDIS_ADDR, REDIS_PORT, REDIS_PD, ansible_result_redis_db
ansible_remote_user = 'root'
class ResultCallback(CallbackBase):
# Ansible Api 和 Ansible Playbook V2 api 调用该CallBack
CALLBACK_VERSION = 2.0
CALLBACK_TYPE = 'stdout'
def __init__(self, id): # 初始化时要求传入任务 id
super(ResultCallback, self).__init__()
self.id = id
self.r = redis.Redis(host=REDIS_ADDR, port=REDIS_PORT, password=REDIS_PD, db=ansible_result_redis_db)
def _write_to_save(self, data): # 写入 redis
print(data)
msg = json.dumps(data, ensure_ascii=False)
self.r.rpush(self.id, msg)
# 为了方便查看,我们 print 写入 redis 的字符串的前 50 个字符
print("\33[34m写入Redis:%.50s......\33[0m" % msg)
def v2_playbook_on_play_start(self, play):
name = play.get_name().strip()
if not name:
msg = u"PLAY"
else:
msg = u"PLAY [%s]" % name
print(msg)
def v2_runner_on_ok(self, result, **kwargs):
# 处理成功任务,跳过 setup 模块的结果
host = result._host
# print("=======================", result._result.keys())
if "ansible_facts" in result._result.keys(): # 我们忽略 setup 操作的结果
print("\33[32mSetUp 操作,不Save结果\33[0m")
else:
print(result)
self._write_to_save({
"host": host.name,
"result": result._result,
"task": result.task_name,
"status": "success"
})
def v2_runner_on_failed(self, result, ignore_errors=False, **kwargs):
# 处理执行失败的任务,有些任务失败会被忽略,所有有两种状态
host = result._host
if ignore_errors:
status = "ignoring"
else:
status = 'failed'
self._write_to_save({
"host": host.name,
"result": result._result,
"task": result.task_name,
"status": status
})
def v2_runner_on_skipped(self, result, *args, **kwargs):
# 处理跳过的任务
host = result._host
self._write_to_save({
"host": host.name,
"result": result._result,
"task": result.task_name,
"status": "skipped"
})
def v2_runner_on_unreachable(self, result, **kwargs):
# 处理主机不可达的任务
host = result._host
self._write_to_save({
"host": host.name,
"result": result._result,
"task": result.task_name,
"status": "unreachable"
})
def v2_playbook_on_notify(self, handler, host):
pass
def v2_playbook_on_no_hosts_matched(self):
pass
def v2_playbook_on_no_hosts_remaining(self):
pass
def v2_playbook_on_start(self, playbook):
pass
class MyTaskQueueManager(TaskQueueManager):
def load_callbacks(self): # 截断callback,只保留 api 自定义
pass
def ansible_exec_api(tid, hosts, tasks, sources, extra_vars={}):
Options = namedtuple('Options',
['remote_user',
'connection',
'module_path',
'forks',
'become',
'become_method',
'become_user',
'check',
'diff'])
options = Options(remote_user=ansible_remote_user,
connection='paramiko',
module_path=['/to/mymodules'],
forks=10,
become=None,
become_method=None,
become_user=None,
check=False,
diff=False)
loader = DataLoader()
passwords = dict(vault_pass='secret')
inventory = InventoryManager(loader=loader, sources=sources)
inventory.add_host("haha")
variable_manager = VariableManager(loader=loader, inventory=inventory)
variable_manager.extra_vars=extra_vars
play_source = dict(name="Ansible Play", hosts=hosts, gather_facts='no', tasks=tasks)
play = Play().load(play_source, variable_manager=variable_manager, loader=loader)
tqm = None
try:
tqm = MyTaskQueueManager(
inventory=inventory,
variable_manager=variable_manager,
loader=loader,
options=options,
passwords=passwords,
stdout_callback=ResultCallback(tid), # 将任务 ID 传递给 ResultCallback
)
result = tqm.run(play)
finally:
if tqm is not None:
tqm.cleanup()
shutil.rmtree(C.DEFAULT_LOCAL_TMP, True)
class MyPlaybookExecutor(PlaybookExecutor):
def __init__(self, tid, playbooks, inventory, variable_manager, loader, options, passwords):
self._playbooks = playbooks
self._inventory = inventory
self._variable_manager = variable_manager
self._loader = loader
self._options = options
self.passwords = passwords
self._unreachable_hosts = dict()
if options.listhosts or options.listtasks or options.listtags or options.syntax:
self._tqm = None
else:
self._tqm = MyTaskQueueManager(
inventory=inventory,
variable_manager=variable_manager,
loader=loader,
options=options,
passwords=self.passwords,
stdout_callback=ResultCallback(tid) # 将任务 ID 传递给 ResultCallback
)
check_for_controlpersist(C.ANSIBLE_SSH_EXECUTABLE)
def ansible_playbook_api(tid, hosts, playbooks, sources, extra_vars={}):
Options = namedtuple('Options', [
'remote_user',
'connection',
'module_path',
'forks',
'become',
'become_method',
'become_user',
'check',
'diff',
'listhosts',
'listtasks',
'listtags',
'syntax',
])
options = Options(
remote_user=ansible_remote_user,
connection='paramiko',
module_path=['/to/mymodules'],
forks=10,
become=None,
become_method=None,
become_user=None,
check=False,
diff=False,
listhosts=None,
listtasks=None,
listtags=None,
syntax=None
)
loader = DataLoader()
passwords = dict(vault_pass='secret')
inventory = InventoryManager(loader=loader, sources=sources)
# 創建默认的主机组
inventory.add_group("all")
for host, port in hosts:
print(host, port)
inventory.add_host(host, group="all", port=port)
variable_manager = VariableManager(loader=loader, inventory=inventory)
variable_manager.extra_vars = extra_vars
pb = MyPlaybookExecutor(tid=tid,
playbooks=playbooks,
inventory=inventory,
variable_manager=variable_manager,
loader=loader,
options=options,
passwords=passwords)
# pb = PlaybookExecutor(
# playbooks=playbooks,
# inventory=inventory,
# variable_manager=variable_manager,
# loader=loader,
# options=options,
# passwords=passwords)
# raise ValueError("1123")
result = pb.run()
if __name__ == '__main__':
sources = 'scripts/inventory'
extra_vars = {'content': '这个参数从外部传入'}
# 测试 ansible_api
tasks = []
# tasks.append(dict(action=dict(module='debug', args=dict(msg='{{ content }}'))))
tasks.append(dict(action=dict(module='shell', args='ssh root@api ifconfig')))
ansible_exec_api(
"AnsibleApi-%s" % datetime.datetime.now().strftime("%Y%m%d-%H%M%S"),
"localhost", tasks, sources, extra_vars
)
# 测试 ansible-playbook_api
extra_vars = {'host': '192.13.2.4'}
playbooks = ['playbooks/test.yml']
hosts = [('127.0.0.1', 22)]
ansible_playbook_api(
"AnsiblePlayBookApi-%s" % datetime.datetime.now().strftime("%Y%m%d-%H%M%S"),
hosts, playbooks, sources, extra_vars
)