Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
morellexf13 committed Oct 20, 2023
2 parents 5ca55f3 + 483a287 commit 5689a9e
Show file tree
Hide file tree
Showing 19 changed files with 289 additions and 257 deletions.
18 changes: 16 additions & 2 deletions pymobiledevice3/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,21 @@

logger = logging.getLogger(__name__)

INVALID_SERVICE_MESSAGE = """Failed to start service. Possible reasons are:
- If you were trying to access a developer service (developer subcommand):
- Make sure the DeveloperDiskImage/PersonalizedImage is mounted via:
> python3 -m pymobiledevice3 mounter auto-mount
- If your device iOS version >= 17.0:
- Make sure you passed the --rsd option to the subcommand
https://github.com/doronz88/pymobiledevice3#working-with-developer-tools-ios--170
- Apple removed this service
- A bug. Please file a bug report:
https://github.com/doronz88/pymobiledevice3/issues/new?assignees=&labels=&projects=&template=bug_report.md&title=
"""


def cli():
cli_commands = click.CommandCollection(sources=[
Expand Down Expand Up @@ -88,8 +103,7 @@ def cli():
logger.error('Developer Mode is disabled. You can try to enable it using: '
'python3 -m pymobiledevice3 amfi enable-developer-mode')
except InvalidServiceError:
logger.error('Failed to access an invalid lockdown service, possibly from DeveloperDiskImage.dmg or a Cryptex. '
'You may try: python3 -m pymobiledevice3 mounter auto-mount')
logger.error(INVALID_SERVICE_MESSAGE)
except NoDeviceSelectedError:
return
except PasswordRequiredError:
Expand Down
70 changes: 34 additions & 36 deletions pymobiledevice3/cli/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,21 @@
from typing import List, TextIO

import click
from cryptography.hazmat.primitives.asymmetric import rsa

from pymobiledevice3.cli.cli_common import RSDCommand, print_json, prompt_device_list
from pymobiledevice3.exceptions import NoDeviceConnectedError
from pymobiledevice3.remote.bonjour import get_remoted_addresses
from pymobiledevice3.remote.remote_service_discovery import RSD_PORT, RemoteServiceDiscoveryService
from pymobiledevice3.remote.utils import resume_remoted_if_required, stop_remoted, stop_remoted_if_required
from pymobiledevice3.remote.utils import stop_remoted

logger = logging.getLogger(__name__)

try:
from pymobiledevice3.remote.core_device_tunnel_service import create_core_device_tunnel_service
from pymobiledevice3.remote.core_device_tunnel_service import start_quic_tunnel
except ImportError:
start_quic_tunnel = None
logger.warning(
'create_core_device_tunnel_service failed to be imported. Some feature may not work.\n'
'start_quic_tunnel failed to be imported. Some feature may not work.\n'
'You can debug this by trying the import yourself:\n\n'
'from pymobiledevice3.remote.core_device_tunnel_service import create_core_device_tunnel_service')

Expand Down Expand Up @@ -68,38 +68,36 @@ def rsd_info(service_provider: RemoteServiceDiscoveryService, color: bool):
print_json(service_provider.peer_info, colored=color)


async def start_quic_tunnel(service_provider: RemoteServiceDiscoveryService, secrets: TextIO,
script_mode: bool = False) -> None:
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
async def tunnel_task(service_provider: RemoteServiceDiscoveryService, secrets: TextIO,
script_mode: bool = False) -> None:
if start_quic_tunnel is None:
raise NotImplementedError('failed to start the QUIC tunnel on your platform')

stop_remoted_if_required()
with create_core_device_tunnel_service(service_provider, autopair=True) as service:
async with service.start_quic_tunnel(private_key, secrets_log_file=secrets) as tunnel_result:
resume_remoted_if_required()
if script_mode:
print(f'{tunnel_result.address} {tunnel_result.port}')
else:
if secrets is not None:
print(click.style('Secrets: ', bold=True, fg='magenta') +
click.style(secrets.name, bold=True, fg='white'))
print(click.style('UDID: ', bold=True, fg='yellow') +
click.style(service_provider.udid, bold=True, fg='white'))
print(click.style('ProductType: ', bold=True, fg='yellow') +
click.style(service_provider.product_type, bold=True, fg='white'))
print(click.style('ProductVersion: ', bold=True, fg='yellow') +
click.style(service_provider.product_version, bold=True, fg='white'))
print(click.style('Interface: ', bold=True, fg='yellow') +
click.style(tunnel_result.interface, bold=True, fg='white'))
print(click.style('RSD Address: ', bold=True, fg='yellow') +
click.style(tunnel_result.address, bold=True, fg='white'))
print(click.style('RSD Port: ', bold=True, fg='yellow') +
click.style(tunnel_result.port, bold=True, fg='white'))
print(click.style('Use the follow connection option:\n', bold=True, fg='yellow') +
click.style(f'--rsd {tunnel_result.address} {tunnel_result.port}', bold=True, fg='cyan'))

while True:
# wait user input while the asyncio tasks execute
await asyncio.sleep(.5)
async with start_quic_tunnel(service_provider, secrets=secrets) as tunnel_result:
if script_mode:
print(f'{tunnel_result.address} {tunnel_result.port}')
else:
if secrets is not None:
print(click.style('Secrets: ', bold=True, fg='magenta') +
click.style(secrets.name, bold=True, fg='white'))
print(click.style('UDID: ', bold=True, fg='yellow') +
click.style(service_provider.udid, bold=True, fg='white'))
print(click.style('ProductType: ', bold=True, fg='yellow') +
click.style(service_provider.product_type, bold=True, fg='white'))
print(click.style('ProductVersion: ', bold=True, fg='yellow') +
click.style(service_provider.product_version, bold=True, fg='white'))
print(click.style('Interface: ', bold=True, fg='yellow') +
click.style(tunnel_result.interface, bold=True, fg='white'))
print(click.style('RSD Address: ', bold=True, fg='yellow') +
click.style(tunnel_result.address, bold=True, fg='white'))
print(click.style('RSD Port: ', bold=True, fg='yellow') +
click.style(tunnel_result.port, bold=True, fg='white'))
print(click.style('Use the follow connection option:\n', bold=True, fg='yellow') +
click.style(f'--rsd {tunnel_result.address} {tunnel_result.port}', bold=True, fg='cyan'))

while True:
# wait user input while the asyncio tasks execute
await asyncio.sleep(.5)


@remote_cli.command('start-quic-tunnel')
Expand Down Expand Up @@ -131,7 +129,7 @@ def cli_start_quic_tunnel(udid: str, secrets: TextIO, script_mode: bool):
if udid is not None and rsd.udid != udid:
raise NoDeviceConnectedError()

asyncio.run(start_quic_tunnel(rsd, secrets, script_mode=script_mode), debug=True)
asyncio.run(tunnel_task(rsd, secrets, script_mode), debug=True)


@remote_cli.command('service', cls=RSDCommand)
Expand Down
8 changes: 3 additions & 5 deletions pymobiledevice3/cli/syslog.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,11 @@ def format_line(color, pid, syslog_entry, include_label):
@click.option('-e', '--regex', multiple=True, help='filter only lines matching given regex')
@click.option('-ei', '--insensitive-regex', multiple=True, help='filter only lines matching given regex (insensitive)')
def syslog_live(service_provider: LockdownClient, out, color, pid, process_name, match, match_insensitive,
include_label, regex,
insensitive_regex):
include_label, regex, insensitive_regex):
""" view live syslog lines """

match_regex = [re.compile(f'.*({r}).*') for r in regex]
match_regex += [re.compile(f'.*({r}).*', re.IGNORECASE) for r in insensitive_regex]
match_regex = [re.compile(f'.*({r}).*', re.DOTALL) for r in regex]
match_regex += [re.compile(f'.*({r}).*', re.IGNORECASE | re.DOTALL) for r in insensitive_regex]

def replace(m):
if len(m.groups()):
Expand Down Expand Up @@ -140,7 +139,6 @@ def replace(m):
for r in match_regex:
if not r.findall(line):
continue

line = re.sub(r, replace, line)
skip = False

Expand Down
17 changes: 17 additions & 0 deletions pymobiledevice3/remote/core_device_tunnel_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from construct import Const, Container, Enum, GreedyBytes, GreedyRange, Int8ul, Int16ub, Int64ul, Prefixed, Struct
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives._serialization import Encoding, PublicFormat
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey
from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey, X25519PublicKey
Expand All @@ -42,6 +43,7 @@
from pymobiledevice3.pair_records import create_pairing_records_cache_folder, generate_host_id
from pymobiledevice3.remote.remote_service import RemoteService
from pymobiledevice3.remote.remote_service_discovery import RemoteServiceDiscoveryService
from pymobiledevice3.remote.utils import resume_remoted_if_required, stop_remoted_if_required
from pymobiledevice3.remote.xpc_message import XpcInt64Type, XpcUInt64Type
from pymobiledevice3.utils import asyncio_print_traceback

Expand Down Expand Up @@ -565,3 +567,18 @@ def create_core_device_tunnel_service(rsd: RemoteServiceDiscoveryService, autopa
service = CoreDeviceTunnelService(rsd)
service.connect(autopair=autopair)
return service


@asynccontextmanager
async def start_quic_tunnel(service_provider: RemoteServiceDiscoveryService, secrets: Optional[TextIO] = None) \
-> AsyncGenerator[TunnelResult, None]:
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
stop_remoted_if_required()
with create_core_device_tunnel_service(service_provider, autopair=True) as service:
async with service.start_quic_tunnel(private_key, secrets_log_file=secrets) as tunnel_result:
resume_remoted_if_required()
yield tunnel_result

while True:
# wait user input while the asyncio tasks execute
await asyncio.sleep(.5)
42 changes: 21 additions & 21 deletions pymobiledevice3/services/crash_reports.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,27 +134,27 @@ def get_new_sysdiagnose(self, out: str, erase: bool = True) -> None:
"""
sysdiagnose_filename = None

for syslog_entry in OsTraceService(lockdown=self.lockdown).syslog():
if (posixpath.basename(syslog_entry.filename) not in SYSDIAGNOSE_PROCESS_NAMES) or \
(posixpath.basename(syslog_entry.image_name) not in SYSDIAGNOSE_PROCESS_NAMES):
# filter only sysdianose lines
continue

message = syslog_entry.message

if message.startswith('SDArchive: Successfully created tar at '):
self.logger.info('sysdiagnose creation has begun')
for filename in self.ls('DiagnosticLogs/sysdiagnose'):
# search for an IN_PROGRESS archive
if 'IN_PROGRESS_' in filename:
for ext in self.IN_PROGRESS_SYSDIAGNOSE_EXTENSIONS:
if filename.endswith(ext):
sysdiagnose_filename = filename.rsplit(ext)[0]
sysdiagnose_filename = sysdiagnose_filename.replace('IN_PROGRESS_', '')
sysdiagnose_filename = f'{sysdiagnose_filename}.tar.gz'
break
break

now = None
if isinstance(self.lockdown, LockdownClient):
now = self.lockdown.date

while sysdiagnose_filename is None:
self.afc.wait_exists('DiagnosticLogs/sysdiagnose')
for filename in self.ls('DiagnosticLogs/sysdiagnose'):
if now is not None and now.strftime('%Y.%m.%d') not in filename:
# filter out files that weren't created now
continue

# search for an IN_PROGRESS archive
if 'IN_PROGRESS_' in filename:
for ext in self.IN_PROGRESS_SYSDIAGNOSE_EXTENSIONS:
if filename.endswith(ext):
sysdiagnose_filename = filename.rsplit(ext)[0]
sysdiagnose_filename = sysdiagnose_filename.replace('IN_PROGRESS_', '')
sysdiagnose_filename = f'{sysdiagnose_filename}.tar.gz'
break
break
self.logger.info('sysdiagnose tarball creation has been started')
self.afc.wait_exists(sysdiagnose_filename)
time.sleep(IOS17_SYSDIAGNOSE_DELAY)
self.pull(out, entry=sysdiagnose_filename, erase=erase)
Expand Down
6 changes: 2 additions & 4 deletions pymobiledevice3/services/dvt/dvt_secure_socket_proxy.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
from typing import Union

from packaging.version import Version

from pymobiledevice3.lockdown import LockdownClient
from pymobiledevice3.lockdown_service_provider import LockdownServiceProvider
from pymobiledevice3.remote.remote_service_discovery import RemoteServiceDiscoveryService
from pymobiledevice3.services.remote_server import RemoteServer

Expand All @@ -12,7 +10,7 @@ class DvtSecureSocketProxyService(RemoteServer):
OLD_SERVICE_NAME = 'com.apple.instruments.remoteserver'
RSD_SERVICE_NAME = 'com.apple.instruments.dtservicehub'

def __init__(self, lockdown: Union[RemoteServiceDiscoveryService, LockdownClient]):
def __init__(self, lockdown: LockdownServiceProvider):
if isinstance(lockdown, RemoteServiceDiscoveryService):
service_name = self.RSD_SERVICE_NAME
remove_ssl_context = False
Expand Down
9 changes: 6 additions & 3 deletions pymobiledevice3/services/os_trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import tempfile
import typing
from datetime import datetime
from pathlib import Path
from tarfile import TarFile

from construct import Adapter, Byte, Bytes, Computed, Enum, Int16ul, Int32ul, Optional, RepeatUntil, Struct, this
Expand Down Expand Up @@ -117,9 +118,11 @@ def collect(self, out: str, size_limit: int = None, age_limit: int = None, start
"""
Collect the system logs into a .logarchive that can be viewed later with tools such as log or Console.
"""
with tempfile.NamedTemporaryFile() as tar:
self.create_archive(tar, size_limit=size_limit, age_limit=age_limit, start_time=start_time)
TarFile(tar.name).extractall(out)
with tempfile.TemporaryDirectory() as temp_dir:
file = Path(temp_dir) / 'foo.tar'
with open(file, 'wb') as f:
self.create_archive(f, size_limit=size_limit, age_limit=age_limit, start_time=start_time)
TarFile(file).extractall(out)

def syslog(self, pid=-1):
self.service.send_plist({'Request': 'StartActivity', 'MessageFilter': 65535, 'Pid': pid, 'StreamFlags': 60})
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "pymobiledevice3"
version = "2.16.0"
version = "2.17.2"
description = "Pure python3 implementation for working with iDevices (iPhone, etc...)"
readme = "README.md"
requires-python = ">=3.8"
Expand Down
47 changes: 47 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import pytest

from pymobiledevice3.exceptions import InvalidServiceError
from pymobiledevice3.lockdown import LockdownClient, create_using_usbmux
from pymobiledevice3.lockdown_service_provider import LockdownServiceProvider
from pymobiledevice3.remote.remote_service_discovery import RemoteServiceDiscoveryService
from pymobiledevice3.services.dvt.dvt_secure_socket_proxy import DvtSecureSocketProxyService


def pytest_addoption(parser):
parser.addoption('--rsd', default=None, type=str, nargs=2, action='store')


@pytest.fixture(scope='function')
def service_provider(request) -> LockdownServiceProvider:
"""
Creates a new LockdownServiceProvider client for each test.
"""
rsd = request.config.getoption('--rsd')

if rsd is not None:
with RemoteServiceDiscoveryService(rsd) as rsd:
yield rsd
else:
with create_using_usbmux() as client:
yield client


@pytest.fixture(scope='function')
def dvt(service_provider) -> DvtSecureSocketProxyService:
"""
Creates a new DvtSecureSocketProxyService client for each test.
"""
try:
with DvtSecureSocketProxyService(lockdown=service_provider) as dvt:
yield dvt
except InvalidServiceError:
pytest.skip('Skipping DVT-based test since the service isn\'t accessible')


@pytest.fixture(scope='function')
def lockdown(request) -> LockdownClient:
"""
Creates a new lockdown client for each test.
"""
with create_using_usbmux() as client:
yield client
12 changes: 0 additions & 12 deletions tests/services/conftest.py

This file was deleted.

9 changes: 3 additions & 6 deletions tests/services/instruments/test_core_profile_session.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
from pymobiledevice3.services.dvt.dvt_secure_socket_proxy import DvtSecureSocketProxyService
from pymobiledevice3.services.dvt.instruments.core_profile_session_tap import CoreProfileSessionTap


def test_stackshot(lockdown):
def test_stackshot(dvt):
"""
Test getting stackshot.
:param pymobiledevice3.lockdown.LockdownClient lockdown: Lockdown client.
"""
with DvtSecureSocketProxyService(lockdown=lockdown) as dvt:
with CoreProfileSessionTap(dvt, CoreProfileSessionTap.get_time_config(dvt)) as tap:
data = tap.get_stackshot()
with CoreProfileSessionTap(dvt, CoreProfileSessionTap.get_time_config(dvt)) as tap:
data = tap.get_stackshot()

assert 'Darwin Kernel' in data['osversion']
# Constant kernel task data.
Expand Down
Loading

0 comments on commit 5689a9e

Please sign in to comment.