Skip to content

Commit

Permalink
Merge pull request #120 from rdavydov/tv
Browse files Browse the repository at this point in the history
TV login flow
  • Loading branch information
rdavydov authored Jan 18, 2023
2 parents 06502db + 8239334 commit 5b6d748
Show file tree
Hide file tree
Showing 7 changed files with 184 additions and 298 deletions.
2 changes: 1 addition & 1 deletion TwitchChannelPointsMiner/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
__version__ = "1.6.0"
__version__ = "1.7.0"
from .TwitchChannelPointsMiner import TwitchChannelPointsMiner

__all__ = [
Expand Down
183 changes: 24 additions & 159 deletions TwitchChannelPointsMiner/classes/Twitch.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,10 @@
from secrets import choice, token_hex

import json
import pickle
from base64 import urlsafe_b64decode

from hashlib import sha256
from base64 import urlsafe_b64decode

import requests

from undetected_chromedriver import ChromeOptions
import seleniumwire.undetected_chromedriver.v2 as uc

from TwitchChannelPointsMiner.classes.entities.Campaign import Campaign
from TwitchChannelPointsMiner.classes.entities.Drop import Drop
from TwitchChannelPointsMiner.classes.Exceptions import (
Expand Down Expand Up @@ -54,71 +47,6 @@

logger = logging.getLogger(__name__)

HEX_CHARS = '0123456789abcdef'
difficulty_1 = 10
subchallengeCount = 2
platformInputs = "tp-v2-input"


def get_time_now() -> int:
time_now = f'{time.time()}'
return int(time_now.replace('.', '')[:13])


def random_string(k=32):
return ''.join(random.choices(HEX_CHARS, k=k))


def string_to_sha256(string_to: str):
return sha256(string_to.encode()).hexdigest()


def get_hash_difficulty(hash_string):
return 4503599627370496 / (int(f'0x{hash_string[:13]}', 16) + 1)


def prof_work(config, _id, work_time):
f_list = []
difficulty = config['difficulty'] / config['subchallengeCount']
start_string = string_to_sha256(
platformInputs + ',\x20' + str(work_time) + ',\x20' + _id)
for _ in range(config['subchallengeCount']):
d = 1
while True:
start_string2 = string_to_sha256(str(d) + ',\x20' + start_string)
if (get_hash_difficulty(start_string2) >= difficulty):
f_list.append(d)
start_string = start_string2
break
d += 1
return {
'answers': f_list,
'finalHash': start_string
}


def get_kpsdk_cd():
config = {
"platformInputs": "tp-v2-input",
"difficulty": 10,
"subchallengeCount": 2
}
t0 = time.perf_counter()
_id = random_string()
work_time = get_time_now() - 527
prof_of_work = prof_work(config, _id, work_time)
t1 = time.perf_counter_ns()

duration = round(1000 * (t1 - t0)) / 1000
return {
'answers': prof_of_work['answers'],
'rst': time.perf_counter_ns(),
'st': time.perf_counter_ns(),
'd': duration,
'id': _id,
'workTime': work_time,
}


class Twitch(object):
__slots__ = [
Expand All @@ -127,8 +55,8 @@ class Twitch(object):
"twitch_login",
"running",
"device_id",
"integrity",
"integrity_expire",
# "integrity",
# "integrity_expire",
"client_session",
"client_version",
"twilight_build_id_pattern",
Expand All @@ -146,8 +74,8 @@ def __init__(self, username, user_agent, password=None):
CLIENT_ID, self.device_id, username, self.user_agent, password=password
)
self.running = True
self.integrity = None
self.integrity_expire = 0
# self.integrity = None
# self.integrity_expire = 0
self.client_session = token_hex(16)
self.client_version = CLIENT_VERSION
self.twilight_build_id_pattern = re.compile(
Expand Down Expand Up @@ -201,8 +129,7 @@ def get_spade_url(self, streamer):
# fixes AttributeError: 'NoneType' object has no attribute 'group'
# headers = {"User-Agent": self.user_agent}
from TwitchChannelPointsMiner.constants import USER_AGENTS
# headers = {"User-Agent": USER_AGENTS["Linux"]["FIREFOX"]}
headers = {"User-Agent": USER_AGENTS["Windows"]["CHROME"]}
headers = {"User-Agent": USER_AGENTS["Linux"]["FIREFOX"]}

main_page_request = requests.get(
streamer.streamer_url, headers=headers)
Expand Down Expand Up @@ -345,7 +272,7 @@ def post_gql_request(self, json_data):
headers={
"Authorization": f"OAuth {self.twitch_login.get_auth_token()}",
"Client-Id": CLIENT_ID,
"Client-Integrity": self.post_integrity(),
# "Client-Integrity": self.post_integrity(),
"Client-Session-Id": self.client_session,
"Client-Version": self.update_client_version(),
"User-Agent": self.user_agent,
Expand All @@ -365,98 +292,36 @@ def post_gql_request(self, json_data):
# Request for Integrity Token
# Twitch needs Authorization, Client-Id, X-Device-Id to generate JWT which is used for authorize gql requests
# Regenerate Integrity Token 5 minutes before expire
def post_integrity(self):
"""def post_integrity(self):
if (
self.integrity_expire - datetime.now().timestamp() * 1000 > 5 * 60 * 1000
and self.integrity is not None
):
return self.integrity
try:
# is_bad_bot becomes true when I use headless mode.
HEADLESS = False

options = uc.ChromeOptions()
if HEADLESS is True:
options.add_argument('--headless')
options.add_argument('--log-level=3')
options.add_argument('--disable-web-security')
options.add_argument('--allow-running-insecure-content')
options.add_argument('--lang=en')
options.add_argument('--no-sandbox')
options.add_argument('--disable-gpu')
# options.add_argument("--user-agent=\"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36\"")
# options.add_argument("--window-size=1920,1080")
# options.set_capability("detach", True)

driver = uc.Chrome(
options=options, use_subprocess=True # , executable_path=EXECUTABLE_PATH
)
driver.minimize_window()
driver.get('https://www.twitch.tv/robots.txt')
cookies = pickle.load(open(self.cookies_file, "rb"))
for cookie in cookies:
driver.add_cookie(cookie)
driver.get('https://www.twitch.tv/settings/profile')

# Print request headers
# for request in driver.requests:
# logger.info(request.url) # <--------------- Request url
# logger.info(request.headers) # <----------- Request headers
# logger.info(request.response.headers) # <-- Response headers

# Set correct user agent
selenium_user_agent = driver.execute_script(
"return navigator.userAgent;")

self.user_agent = selenium_user_agent

session = requests.Session()
kpsdk_cd = json.dumps(get_kpsdk_cd())

for cookie in driver.get_cookies():
session.cookies.set(cookie['name'], cookie['value'],
domain=cookie['domain'])
if cookie["name"] == "KP_UIDz":
if cookie["value"] is not None:
kpsdk_ct = cookie["value"]
else:
logger.error("Can't extract kpsdk_ct")

headers = {
'Accept': '*/*',
'Accept-Language': 'en-US,en;q=0.9,pt;q=0.8',
"Authorization": f"OAuth {self.twitch_login.get_auth_token()}",
"Client-Id": CLIENT_ID,
"Client-Session-Id": self.client_session,
"Client-Version": self.update_client_version(),
'Origin': 'https://www.twitch.tv',
'Referer': 'https://www.twitch.tv/',
"User-Agent": self.user_agent,
# "User-Agent": selenium_user_agent,
"X-Device-Id": self.device_id,
'x-kpsdk-cd': kpsdk_cd,
'x-kpsdk-ct': kpsdk_ct
}

response = session.post(
response = requests.post(
GQLOperations.integrity_url,
json={},
headers=headers
headers={
"Authorization": f"OAuth {self.twitch_login.get_auth_token()}",
"Client-Id": CLIENT_ID,
"Client-Session-Id": self.client_session,
"Client-Version": self.update_client_version(),
"User-Agent": self.user_agent,
"X-Device-Id": self.device_id,
},
)

driver.close()
driver.quit()

integrity_json = response.json()

self.integrity = integrity_json.get("token", None)
logger.debug(
f"Data: [], Status code: {response.status_code}, Content: {response.text}"
)
self.integrity = response.json().get("token", None)
# logger.info(f"integrity: {self.integrity}")
if self.isBadBot(self.integrity) is True:
logger.error(
"Uh-oh, Twitch has detected this miner as a \"Bad Bot\"")
logger.info(
"Uh-oh, Twitch has detected this miner as a \"Bad Bot\". Don't worry.")
self.integrity_expire = integrity_json.get("expiration", 0)
self.integrity_expire = response.json().get("expiration", 0)
# logger.info(f"integrity_expire: {self.integrity_expire}")
return self.integrity
except requests.exceptions.RequestException as e:
Expand All @@ -478,7 +343,7 @@ def isBadBot(self, integrity):
if decoded_header.get("is_bad_bot", "false") != "false":
return True
else:
return False
return False"""

def update_client_version(self):
try:
Expand Down
Loading

0 comments on commit 5b6d748

Please sign in to comment.