-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathrequest_message.py
235 lines (207 loc) · 10.3 KB
/
request_message.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
import os,glob,datetime,argparse
import base64,json
import hashlib,codecs,struct
import requests
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.backends import default_backend
from pypush_gsa_icloud import icloud_login_mobileme, generate_anisette_headers
import time
import datetime
import os
from generate_keys import get_both_public_keys
initialState = {
"length": 16,
"complete": False,
"message_id": "",
"modem_id": "",
"first_bit_time": None,
"last_bit_time": None,
"message": "",
"content":[{"char": None, "bits": [None]*8, "keys": [], "res_keys": []} for _ in range(16)]
}
def getAuth(regenerate=False, second_factor='sms'):
CONFIG_PATH = os.path.dirname(os.path.realpath(__file__)) + "/auth.json"
if os.path.exists(CONFIG_PATH) and not regenerate:
with open(CONFIG_PATH, "r") as f: j = json.load(f)
else:
mobileme = icloud_login_mobileme(second_factor=second_factor)
j = {'dsid': mobileme['dsid'], 'searchPartyToken': mobileme['delegates']['com.apple.mobileme']['service-data']['tokens']['searchPartyToken']}
with open(CONFIG_PATH, "w") as f: json.dump(j, f)
return (j['dsid'], j['searchPartyToken'])
def init_state(modem_id, msg_id):
state = initialState
total_key_counter = 0
character_counter = 0
state["message_id"] = msg_id
state["modem_id"] = modem_id
# Iterate through characters in the state content
for character in state["content"]:
public_keys = []
# Generate 8 pairs of public keys for each character
for key in range(8):
public_key1, public_key2 = get_both_public_keys(modem_id, total_key_counter, msg_id)
double_keys = [public_key1, public_key2]
public_keys.append(double_keys)
total_key_counter += 1
# Assign generated public keys to the character
state["content"][character_counter]['keys'] = public_keys
character_counter += 1
return state
def request_reports(hashed_adv_key1, hashed_adv_key2, time_window_minutes):
unix_epoch = int(datetime.datetime.now().strftime('%s'))
unix_epoch_ms = int(datetime.datetime.now().timestamp() * 1000)
start_date = unix_epoch - (60 * time_window_minutes)
start_date_ms = unix_epoch_ms - (60 * 1000 * time_window_minutes)
# Function to check if report is within the specified time frame
def is_within_timeframe(entry):
return entry['datePublished'] >= start_date_ms
data = {"search": [{"startDate": start_date * 1000, "endDate": unix_epoch * 1000, "ids": [hashed_adv_key1, hashed_adv_key2]}]}
try:
# Continuously fetch reports until one is found within the time frame
while True:
r = requests.post("https://gateway.icloud.com/acsnservice/fetch",
auth=getAuth(regenerate=args.regen, second_factor='trusted_device' if args.trusteddevice else 'sms'),
headers=generate_anisette_headers(),
json=data)
res = json.loads(r.content.decode())['results']
filtered_res = list(filter(is_within_timeframe, res))
if len(filtered_res) == 0:
return ""
else:
return filtered_res[-1]['id']
except Exception as e:
print(f"An error occurred: {e}")
# Function to extract keys from reports
def keys_from_reports(reports):
keys = [report['id'] for report in reports]
return keys
def check_message_length(state):
message_length = 0
# Iterate through each character in the state
for character in state["content"]:
# Check if there's at least one non-None value in the 'bits' array
if any(bit is not None for bit in character["bits"]):
message_length += 1
if message_length == 0:
return 16
else:
return message_length
def combine_message(state):
# Combine all characters in 'content' where the character is not None
combined_message = ''.join([char["char"] for char in state["content"] if char["char"] is not None])
# Update the 'message' field in the state dictionary with the combined message
state["message"] = combined_message
def validate_message(state):
# Get the length of the message from the 'check_message_length' function
message_length = check_message_length(state)
# Update the 'length' field in the state dictionary with the length of the message
state["length"] = message_length
# Count the number of characters in 'content' that are not None
complete_chars = sum(1 for item in state['content'] if item['char'] is not None)
# If there are exactly 16 complete characters, mark the length as 16 and return True
if complete_chars == 16:
state["length"] = 16
return True
else:
# Define the end character as 8 bits of 0, this is an empty character, used to signal the end of a message
# INFO still needs to be implemented in the modem code
end_char = [0, 0, 0, 0, 0, 0, 0, 0]
# If the number of complete characters matches the expected length the message is complete
if complete_chars == state['length'] and state["content"][message_length-1]["bits"] == end_char:
#if complete_chars == state['length']:
state["length"] = message_length-1
return True
else:
# If the number of complete characters doesn't match the expected length the message is not complete yet
return False
def update_state(state, time_window_hours):
os.system("printf '\033c'")
print()
print()
print("Receiving Message for Modem ID: ", f"0x{state['modem_id']:08x}" , "| Message ID: ", state["message_id"])
print()
# loop through every character in the message (initial 16)
character_counter = 0
for character in range(16):
current_time = datetime.datetime.now()
character = state["content"][character_counter]
#print("requesting character: ", character_counter)
#interate through every bot in the character (8)
for key_counter, keypair in enumerate(character["keys"]):
# only request the bits that are still None
if character["bits"][key_counter] == None:
#print("requesting bit: ", key_counter)
found_key = request_reports(keypair[0], keypair[1], time_window_hours)
#print(found_key)
if found_key == keypair[0]:
state["content"][character_counter]["bits"][key_counter] = 0
state["content"][character_counter]["res_keys"].append(found_key)
if state["first_bit_time"] is None:
state["first_bit_time"] = current_time
state["last_bit_time"] = current_time
#print("this key is it:",keypair[0])
#print("this is 0")
elif found_key == keypair[1]:
state["content"][character_counter]["bits"][key_counter] = 1
state["content"][character_counter]["res_keys"].append(found_key)
if state["first_bit_time"] is None:
state["first_bit_time"] = current_time
state["last_bit_time"] = current_time
#print("this key is it:",keypair[1])
#print("this is 1")
if None not in state["content"][character_counter]["bits"]:
#print("all bits complete -> decode character")
#print(state["content"][character_counter]["bits"])
character_string = decode_char(state["content"][character_counter]["bits"])
print(character_string)
state["content"][character_counter]["char"] = character_string
else:
print(state["content"][character_counter]["bits"])
character_counter += 1
state["complete"] = validate_message(state)
if state["complete"]:
combine_message(state)
return state
def decode_char(bits):
byte_value = int(''.join(map(str, bits)), 2)
character = chr(byte_value)
return character
def hex_type(x):
return int(x, 16)
if __name__ == "__main__":
os.environ['Test'] = '1312'
parser = argparse.ArgumentParser()
parser.add_argument('-M', '--minutes', help='Only show reports not older than these minutes', type=int, default=60) # Defaulting to 60 minutes for 1 hour
parser.add_argument('-r', '--regen', help='regenerate search-party-token', action='store_true')
parser.add_argument('-i', '--message_id', help='message id to request', type=int, default=0)
parser.add_argument('-t', '--trusteddevice', help='use trusted device for 2FA instead of SMS', action='store_true')
parser.add_argument('-d', '--modem_id', help='Modem ID to use for communication', type=hex_type, default=0x11111111)
args = parser.parse_args()
start_time = datetime.datetime.now()
state = init_state(args.modem_id, args.message_id)
while True:
if not state["complete"]:
updated_state = update_state(state, args.minutes)
else:
end_time = datetime.datetime.now()
duration = end_time - start_time
# Calculate minutes and seconds
total_seconds = int(duration.total_seconds())
minutes = total_seconds // 60
seconds = total_seconds % 60
print()
print("MESSAGE COMPLETE:")
print("-> Modem ID:", f"0x{state['modem_id']:08x}")
print("-> Message ID:", state["message_id"])
print("-> Length:", state["length"])
print("-> Content:", state["message"])
print("App started at:", start_time.strftime('%Y-%m-%d %H:%M:%S'))
print(f"Runtime app: {minutes} minutes {seconds} seconds")
print("First bit received at:", state["first_bit_time"].strftime('%Y-%m-%d %H:%M:%S') if state["first_bit_time"] else "N/A")
print("Last bit received at:", state["last_bit_time"].strftime('%Y-%m-%d %H:%M:%S') if state["last_bit_time"] else "N/A")
time_difference = state["last_bit_time"] - state["first_bit_time"]
minutes, seconds = divmod(time_difference.total_seconds(), 60)
print(f"Time between first and last bit: {int(minutes)} minutes and {int(seconds)} seconds")
print()
break