-
Notifications
You must be signed in to change notification settings - Fork 30
/
Copy pathdvrip_test
executable file
·70 lines (57 loc) · 2.19 KB
/
dvrip_test
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
#!/usr/bin/env python3
# pylint: disable=wildcard-import,unused-wildcard-import
from datetime import datetime, timedelta
from socket import AF_INET, SOCK_STREAM, socket as Socket
from sys import argv, stderr
from dvrip import DVRIP_PORT
from dvrip.io import DVRIPClient
from dvrip.files import FileType
from dvrip.monitor import *
from dvrip.ptz import PTZButton
import dvrip.packet
def _read(file, length, _read=dvrip.packet._read): # pylint: disable=protected-access
data = _read(file, length)
print('recv', bytes(data) if data[:1].isascii() else data.hex(),
file=stderr, flush=True)
return data
dvrip.packet._read = _read # pylint: disable=protected-access
def _write(file, data, _write=dvrip.packet._write): # pylint: disable=protected-access
print('send', bytes(data) if data[:1].isascii() else data.hex(),
file=stderr, flush=True)
return _write(file, data)
dvrip.packet._write = _write # pylint: disable=protected-access
print(list(DVRIPClient.discover('', 1.0)))
conn = DVRIPClient(Socket(AF_INET, SOCK_STREAM))
conn.connect((argv[1], DVRIP_PORT), argv[2], argv[3])
print(conn.systeminfo())
print(conn.storageinfo())
print(conn.time(conn.time() - timedelta(seconds=0)))
now = datetime.now()
print(list(conn.files(start=now - timedelta(hours=5),
end=now,
channel=0,
type=FileType.VIDEO)))
entries = list(conn.log(start=now - timedelta(days=30), end=now))
for e in entries:
print(e)
print(e.data)
for p in set((e.type, e.data) for e in entries if isinstance(e.data, str)):
print(p)
conn.button(0, PTZButton.MENU)
#name = '/idea0/2019-05-01/001/01.04.07-01.04.23[M][@15b7][0].h264'
#sock = Socket(AF_INET, SOCK_STREAM)
#sock.connect((argv[1], DVRIP_PORT))
#monitor = Monitor(action=MonitorAction.START,
# params=MonitorParams(channel=0,
# stream=Stream.HD))
#claim = MonitorClaim(session=conn.session, monitor=monitor)
#request = DoMonitor(session=conn.session, monitor=monitor)
#in_ = conn.reader(sock, claim, request)
#with open('test.264', 'wb') as out:
# while True:
# chunk = in_.read(16)
# if not chunk: break
# out.write(chunk)
# out.flush()
#conn.reboot()
conn.logout()