From 3973da62a683b34548d07acb48e334ac5b4cfe6d Mon Sep 17 00:00:00 2001 From: David Teather <34144122+davidteather@users.noreply.github.com> Date: Sun, 13 Feb 2022 20:19:59 -0600 Subject: [PATCH] =?UTF-8?q?=F0=9F=8E=8A=20V5.0.0=20=F0=9F=8E=8A=20(#803)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Initial changes for v5 of the project. * Thanks @Daan-Grashoff for some of the helpers.extract_tag_contents code from a comment on #787 * High Level Modeling of Classes * update tests & auto call object * Fix package-test * simplify info_full * Logging Improvements * Logging changes from #804 by @zokalo * Improve docs * Fix some mypy typing issues * Move old search functions to trending & fix process_kwargs * Private self variables Co-authored-by: davidteather Co-authored-by: Daan Grashoff <9222025+Daan-Grashoff@users.noreply.github.com> Co-authored-by: Dmitriy <16061619+zokalo@users.noreply.github.com> --- .github/ISSUE_TEMPLATE/bug_report.md | 10 +- .github/ISSUE_TEMPLATE/installation-help.md | 3 + .github/workflows/package-test.yml | 4 +- CITATION.cff | 10 +- README.md | 122 +- TikTokApi/__init__.py | 1 - TikTokApi/api/__init__.py | 19 + TikTokApi/api/hashtag.py | 156 + TikTokApi/api/search.py | 122 + TikTokApi/api/sound.py | 181 + TikTokApi/api/trending.py | 70 + TikTokApi/api/user.py | 287 + TikTokApi/api/video.py | 145 + TikTokApi/browser_utilities/browser.py | 47 +- .../browser_utilities/browser_interface.py | 2 +- .../browser_utilities/browser_selenium.py | 224 - TikTokApi/browser_utilities/get_acrawler.py | 4 +- TikTokApi/browser_utilities/stealth.py | 502 -- TikTokApi/exceptions.py | 2 +- TikTokApi/helpers.py | 41 + TikTokApi/tiktok.py | 1549 +--- TikTokApi/tiktokuser.py | 101 - TikTokApi/utilities.py | 3 +- docs/TikTokApi.html | 506 +- docs/TikTokApi/api.html | 301 + docs/TikTokApi/api/hashtag.html | 816 ++ docs/TikTokApi/api/search.html | 721 ++ docs/TikTokApi/api/sound.html | 890 ++ docs/TikTokApi/api/trending.html | 477 ++ docs/TikTokApi/api/user.html | 1234 +++ docs/TikTokApi/api/video.html | 790 ++ docs/TikTokApi/browser_utilities.html | 360 +- docs/TikTokApi/browser_utilities/browser.html | 575 +- .../browser_utilities/browser_interface.html | 386 +- .../browser_utilities/browser_selenium.html | 1037 --- .../browser_utilities/get_acrawler.html | 406 +- docs/TikTokApi/browser_utilities/stealth.html | 356 +- docs/TikTokApi/exceptions.html | 368 +- docs/TikTokApi/helpers.html | 348 + docs/TikTokApi/tiktok.html | 7389 +++++------------ docs/TikTokApi/tiktokuser.html | 590 -- docs/TikTokApi/utilities.html | 373 +- docs/index.html | 258 +- docs/search.js | 46 + docs/search.json | 1 - examples/demo_user_pager.py | 49 - examples/discover.py | 15 - examples/download_tiktok.py | 19 - examples/external_signer.py | 57 - examples/get_a_users_videos.py | 20 - examples/get_tiktoks_by_hashtag.py | 10 - examples/get_tiktoks_by_sound.py | 13 - examples/get_tiktoks_by_username.py | 10 - examples/get_trending.py | 17 - examples/hashtag_example.py | 11 + examples/search_example.py | 14 + examples/sound_example.py | 9 + examples/trending_example.py | 7 + examples/user_example.py | 12 + examples/user_id_crawler.py | 12 - examples/video_example.py | 12 + setup.py | 7 +- tests/test_by_hashtag.py | 26 - tests/test_by_sound.py | 12 - tests/test_by_username.py | 12 - tests/test_get_music_object_full_by_api.py | 12 - tests/test_get_object_routes.py | 30 - tests/test_hashtag.py | 28 + tests/test_integration.py | 25 + tests/test_search.py | 24 + tests/test_search_for.py | 12 - tests/test_sound.py | 22 + tests/test_trending.py | 14 +- tests/test_user.py | 87 +- tests/test_user_pager.py | 55 - tests/test_video.py | 36 + 76 files changed, 11047 insertions(+), 11475 deletions(-) create mode 100644 TikTokApi/api/__init__.py create mode 100644 TikTokApi/api/hashtag.py create mode 100644 TikTokApi/api/search.py create mode 100644 TikTokApi/api/sound.py create mode 100644 TikTokApi/api/trending.py create mode 100644 TikTokApi/api/user.py create mode 100644 TikTokApi/api/video.py delete mode 100644 TikTokApi/browser_utilities/browser_selenium.py delete mode 100644 TikTokApi/browser_utilities/stealth.py create mode 100644 TikTokApi/helpers.py delete mode 100644 TikTokApi/tiktokuser.py create mode 100644 docs/TikTokApi/api.html create mode 100644 docs/TikTokApi/api/hashtag.html create mode 100644 docs/TikTokApi/api/search.html create mode 100644 docs/TikTokApi/api/sound.html create mode 100644 docs/TikTokApi/api/trending.html create mode 100644 docs/TikTokApi/api/user.html create mode 100644 docs/TikTokApi/api/video.html delete mode 100644 docs/TikTokApi/browser_utilities/browser_selenium.html create mode 100644 docs/TikTokApi/helpers.html delete mode 100644 docs/TikTokApi/tiktokuser.html create mode 100644 docs/search.js delete mode 100644 docs/search.json delete mode 100644 examples/demo_user_pager.py delete mode 100644 examples/discover.py delete mode 100644 examples/download_tiktok.py delete mode 100644 examples/external_signer.py delete mode 100644 examples/get_a_users_videos.py delete mode 100644 examples/get_tiktoks_by_hashtag.py delete mode 100644 examples/get_tiktoks_by_sound.py delete mode 100644 examples/get_tiktoks_by_username.py delete mode 100644 examples/get_trending.py create mode 100644 examples/hashtag_example.py create mode 100644 examples/search_example.py create mode 100644 examples/sound_example.py create mode 100644 examples/trending_example.py create mode 100644 examples/user_example.py delete mode 100644 examples/user_id_crawler.py create mode 100644 examples/video_example.py delete mode 100644 tests/test_by_hashtag.py delete mode 100644 tests/test_by_sound.py delete mode 100644 tests/test_by_username.py delete mode 100644 tests/test_get_music_object_full_by_api.py delete mode 100644 tests/test_get_object_routes.py create mode 100644 tests/test_hashtag.py create mode 100644 tests/test_integration.py create mode 100644 tests/test_search.py delete mode 100644 tests/test_search_for.py create mode 100644 tests/test_sound.py delete mode 100644 tests/test_user_pager.py create mode 100644 tests/test_video.py diff --git a/.github/ISSUE_TEMPLATE/bug_report.md b/.github/ISSUE_TEMPLATE/bug_report.md index 10de3c52..efaded62 100644 --- a/.github/ISSUE_TEMPLATE/bug_report.md +++ b/.github/ISSUE_TEMPLATE/bug_report.md @@ -6,9 +6,7 @@ labels: bug assignees: '' --- - -# Read Below!!! If this doesn't fix your issue delete these two lines -**You may need to install chromedriver for your machine globally. Download it [here](https://sites.google.com/a/chromium.org/chromedriver/) and add it to your path.** +Fill Out the template :) **Describe the bug** @@ -16,7 +14,9 @@ A clear and concise description of what the bug is. **The buggy code** -Please insert the code that is throwing errors or is giving you weird unexpected results. +Please add any relevant code that is giving you unexpected results. + +Preferably the smallest amount of code to reproduce the issue. ``` # Code Goes Here @@ -35,7 +35,7 @@ Put the error trace below if there's any error thrown. **Desktop (please complete the following information):** - OS: [e.g. Windows 10] - - TikTokApi Version [e.g. 3.3.1] - if out of date upgrade before posting an issue + - TikTokApi Version [e.g. 5.0.0] - if out of date upgrade before posting an issue **Additional context** diff --git a/.github/ISSUE_TEMPLATE/installation-help.md b/.github/ISSUE_TEMPLATE/installation-help.md index 182b201f..caeea644 100644 --- a/.github/ISSUE_TEMPLATE/installation-help.md +++ b/.github/ISSUE_TEMPLATE/installation-help.md @@ -8,6 +8,9 @@ assignees: '' --- +Please first check the closed issues on GitHub for people with similar problems to you. +If you'd like more instant help from the community consider joining the [discord](https://discord.gg/yyPhbfma6f) + **Describe the error** Put the error trace here. diff --git a/.github/workflows/package-test.yml b/.github/workflows/package-test.yml index 9eb1f3a1..27ff8ca3 100644 --- a/.github/workflows/package-test.yml +++ b/.github/workflows/package-test.yml @@ -7,7 +7,7 @@ on: branches: - master - nightly - - 'releases/*' + - "releases/*" jobs: Unit-Tests: @@ -17,7 +17,7 @@ jobs: fail-fast: false matrix: os: [macos-latest] - python-version: [3.7, 3.9] + python-version: [3.7, "3.10"] steps: - uses: actions/checkout@v2 - uses: microsoft/playwright-github-action@v1 diff --git a/CITATION.cff b/CITATION.cff index e21b1c8f..79e7ddea 100644 --- a/CITATION.cff +++ b/CITATION.cff @@ -1,9 +1,9 @@ cff-version: 1.2.0 authors: -- family-names: "Teather" - given-names: "David" - orcid: "https://orcid.org/0000-0002-9467-4676" + - family-names: "Teather" + given-names: "David" + orcid: "https://orcid.org/0000-0002-9467-4676" title: "TikTokAPI" url: "https://github.com/davidteather/tiktok-api" -version: 4.1.0 -date-released: 2021-12-11 \ No newline at end of file +version: 5.0.0 +date-released: 2022-2-11 diff --git a/README.md b/README.md index c6713c05..4409116e 100644 --- a/README.md +++ b/README.md @@ -8,46 +8,46 @@ This is an unofficial api wrapper for TikTok.com in python. With this api you ar ## Sponsors These sponsors have paid to be placed here and beyond that I do not have any affiliation with them, the TikTokAPI package will always be free and open-source. If you wish to be a sponsor of this project check out my [GitHub sponsors page](https://github.com/sponsors/davidteather). -[![TikAPI](imgs/logo128.png)](https://tikapi.io/?ref=davidteather) | **[TikAPI](https://tikapi.io/?ref=davidteather)** is a paid TikTok API service providing an full out-of-the-box solution for developers, trusted by 100+ companies. [Learn more](https://tikapi.io/?ref=davidteather) +[![TikAPI](https://raw.githubusercontent.com/davidteather/TikTok-Api/master/imgs/logo128.png)](https://tikapi.io/?ref=davidteather) | **[TikAPI](https://tikapi.io/?ref=davidteather)** is a paid TikTok API service providing an full out-of-the-box solution for developers, trusted by 100+ companies. [Learn more](https://tikapi.io/?ref=davidteather) :-------------------------:|:-------------------------: ## Table of Contents +- [Documentation](#documentation) - [Getting Started](#getting-started) + - [How to Support The Project](#how-to-support-the-project) - [Installing](#installing) - [Common Issues](#common-issues) - [Quick Start Guide](#quick-start-guide) - [Examples](https://github.com/davidteather/TikTok-Api/tree/master/examples) -- [Documentation](#documentation) -- [Built With](#built-with) -- [Authors](#authors) -- [License](#license) +[**Upgrading from V4 to V5**](#upgrading-from-v4-to-v5) + +## Documentation + +You can find the full documentation [here](https://davidteather.github.io/TikTok-Api/docs/TikTokApi.html), the [TikTokApi Class](https://davidteather.github.io/TikTok-Api/docs/TikTokApi/tiktok.html) is where you'll probably spend most of your time. ## Getting Started To get started using this api follow the instructions below. -#### How to support the project -* Feel free to sponsor me on GitHub -* Feel free to tip the project using the brave browser +### How to Support The Project +* Star the repo 😎 +* Consider [sponsoring](https://github.com/sponsors/davidteather) me on GitHub +* Send me an email or a [LinkedIn](https://www.linkedin.com/in/davidteather/) message telling me what you're using the API for, I really like hearing what people are using it for. * Submit PRs for issues :) ### Installing -If you run into an issue please check the closed issues on the github. You're most likely not the first person to experience this issue. If nothing works feel free to open an issue. +If you run into an issue please check the closed issues on the github, although feel free to re-open a new issue if you find an issue that's been closed for a few months. The codebase can and does run into similar issues as it has before, because TikTok changes things up. ```sh pip install TikTokApi python -m playwright install ``` -If you would prefer a video walk through of setting up this package I created a [YouTube video](https://www.youtube.com/watch?v=zwLmLfVI-VQ) just for that. - - - -If you're on MacOS you may need to install [XCode Developer Tools](https://webkit.org/build-tools/) +If you would prefer a video walk through of setting up this package I created a currently semi-outdated (TODO: new one for v5 coming soon) [YouTube video](https://www.youtube.com/watch?v=zwLmLfVI-VQ) just for that. #### Docker Installation -Clone this repository onto a local machine then run the following commands. +Clone this repository onto a local machine (or just the Dockerfile since it installs TikTokApi from pip) then run the following commands. ```sh docker pull mcr.microsoft.com/playwright:focal @@ -61,47 +61,93 @@ docker run -v TikTokApi --rm tiktokapi:latest python3 your_script.py Please don't open an issue if you're experiencing one of these just comment if the provided solution do not work for you. -* **Browser Has no Attribute** - make sure you ran `python3 -m playwright install`, if your error persists try the [playwright](https://github.com/microsoft/playwright-python) quickstart guide and diagnose issues from there. +* **Browser Has no Attribute** - make sure you ran `python3 -m playwright install`, if your error persists try the [playwright-python](https://github.com/microsoft/playwright-python) quickstart guide and diagnose issues from there. ## Quick Start Guide -Here's a quick bit of code to get the most recent trending on TikTok. There's more examples in the examples directory. +Here's a quick bit of code to get the most recent trending videos on TikTok. There's more examples in the [examples](https://github.com/davidteather/TikTok-Api/tree/master/examples) directory. ```py from TikTokApi import TikTokApi -api = TikTokApi.get_instance() -results = 10 -# Since TikTok changed their API you need to use the custom_verifyFp option. -# In your web browser you will need to go to TikTok, Log in and get the s_v_web_id value. -trending = api.by_trending(count=results, custom_verifyFp="") +# In your web browser you will need to go to TikTok, check the cookies +# and under www.tiktok.com s_v_web_id should exist, and use that value +# as input to custom_verify_fp +# Or watch https://www.youtube.com/watch?v=zwLmLfVI-VQ for a visual +# TODO: Update link +api = TikTokApi(custom_verify_fp="") -for tiktok in trending: - # Prints the id of the tiktok - print(tiktok['id']) - -print(len(trending)) +for trending_video in api.trending.videos(count=50): + # Prints the author's username of the trending video. + print(trending_video.author.username) ``` -To run the example scripts from the repository root, make sure you use the -module form of python the interpreter - +To run the example scripts from the repository root, make sure you use the `-m` option on python. ```sh python -m examples.get_trending ``` -[Here's](https://gist.github.com/davidteather/7c30780bbc30772ba11ec9e0b909e99d) an example of what a TikTok dictionary looks like. +You can access the dictionary type of an object using `.as_dict`. On a video this may look like +[this](https://gist.github.com/davidteather/7c30780bbc30772ba11ec9e0b909e99d), although TikTok changes their structure from time to time so it's worth investigating the structure of the dictionary when you use this package. -## Documentation +## Upgrading from V4 to V5 + +All changes will be noted on [#803](https://github.com/davidteather/TikTok-Api/pull/803) if you want more information. + +### Motivation -You can find the documentation [here](https://davidteather.github.io/TikTok-Api/docs/TikTokApi.html) (you'll likely just need the TikTokApi section of the docs), I will be making this documentation more complete overtime as it's not super great right now, but better than just having it in the readme! +This package has been difficult to maintain due to it's structure, difficult to work with since the user of the package must write parsing methods to extract information from dictionaries, more memory intensive than it needs to be (although this can be further improved), and in general just difficult to work with for new users. -## Authors +As a result, I've decided to at least attempt to remedy some of these issues, the biggest changes are that +1. The package has shifted to using classes for different TikTok objects resulting in an easier, higher-level programming experience. +2. All methods that used to return a list of objects have been switched to using iterators, to hopefully decrease memory utilization for most users. -* **David Teather** - *Initial work* - [davidteather](https://github.com/davidteather) -See also the list of [contributors](https://github.com/davidteather/TikTok-Api/contributors) who participated in this project. +### Upgrading Examples -## License -This project is licensed under the MIT License +#### Accessing Dictionary on Objects (similar to V4) + +TODO: Make video upgrading from V4-V5? + +You'll probably need to use this beyond just for legacy support, since not all attributes are parsed out and attached +to the different objects. + +You may want to use this as a workaround for legacy applications while you upgrade the rest of the app. I'd suggest that you do eventually upgrade to using the higher-level approach fully. +```py +user = api.user(username='therock') +user.as_dict # -> dict of the user_object +for video in user.videos(): + video.as_dict # -> dict of TikTok's video object as found when requesting the videos endpoint +``` + +Here's a few more examples that help illustrate the differences in the flow of the usage of the package with V5. + +```py +# V4 +api = TikTokApi.get_instance() +trending_videos = api.by_trending() + +#V5 +api = TikTokApi() # .get_instance no longer exists +for trending_video in api.trending.videos(): + # do something +``` + +Where in V4 you had to extract information yourself, the package now handles that for you. So it's much easier to do chained related function calls. +```py +# V4 +trending_videos = api.by_trending() +for video in trending_videos: + # The dictionary responses are also different depending on what endpoint you got them from + # So, it's usually more painful than this to deal with + trending_user = api.get_user(id=video['author']['id'], secUid=video['author']['secUid']) + + +# V5 +# This is more complicated than above, but it illustrates the simplified approach +for trending_video in api.trending.videos(): + user_stats = trending_video.author.info_full['stats'] + if user_stats['followerCount'] >= 10000: + # maybe save the user in a database +``` diff --git a/TikTokApi/__init__.py b/TikTokApi/__init__.py index fa9e85e0..af368e66 100644 --- a/TikTokApi/__init__.py +++ b/TikTokApi/__init__.py @@ -4,4 +4,3 @@ __docformat__ = "restructuredtext" from TikTokApi.tiktok import TikTokApi -from TikTokApi.tiktokuser import TikTokUser diff --git a/TikTokApi/api/__init__.py b/TikTokApi/api/__init__.py new file mode 100644 index 00000000..d75df9bd --- /dev/null +++ b/TikTokApi/api/__init__.py @@ -0,0 +1,19 @@ +""" +This module contains classes that all represent different types of data sent back by the TikTok servers. + +The files within in module correspond to what type of object is described and all have different methods associated with them. + + +### How To Interpret TikTok Data +There are quite a few ambigious keys in the JSON that TikTok returns so here's a section that tries to document some of them. + +**Note**: These are incomplete, if you get confused about something feel free to add it here as a PR once you figure it out. + +| JSON Key | Description | +|------------------|-------------| +| createTime | The [unix epoch](https://docs.python.org/3/library/datetime.html#datetime.date.fromtimestamp) of creation, all other time fields are also unix epochs. | +| secUid & (userId or id) | Two different unique attributes that are used in conjunction to reference a specific account, so if you're storing users somewhere in a database, you should store both secUid & userId. | +| id | A unique attribute used to reference a non-user object like video, hashtag, etc | +| diggCount | The likes for a specific video. | +| digged | Used to check if the current user has liked/digged a video, this will always be false since this package doesn't support logged-in user functions. | +""" diff --git a/TikTokApi/api/hashtag.py b/TikTokApi/api/hashtag.py new file mode 100644 index 00000000..aba2fad4 --- /dev/null +++ b/TikTokApi/api/hashtag.py @@ -0,0 +1,156 @@ +from __future__ import annotations +import logging + +from urllib.parse import urlencode +from ..exceptions import * + +from typing import TYPE_CHECKING, ClassVar, Iterator, Optional + +if TYPE_CHECKING: + from ..tiktok import TikTokApi + from .video import Video + + +class Hashtag: + """ + A TikTok Hashtag/Challenge. + + Example Usage + ```py + hashtag = api.hashtag(name='funny') + ``` + """ + + parent: ClassVar[TikTokApi] + + id: Optional[str] + """The ID of the hashtag""" + name: Optional[str] + """The name of the hashtag (omiting the #)""" + as_dict: dict + """The raw data associated with this hashtag.""" + + def __init__( + self, + name: Optional[str] = None, + id: Optional[str] = None, + data: Optional[dict] = None, + ): + """ + You must provide the name or id of the hashtag. + """ + self.name = name + self.id = id + + if data is not None: + self.as_dict = data + self.__extract_from_data() + + def info(self, **kwargs) -> dict: + """ + Returns TikTok's dictionary representation of the hashtag object. + """ + return self.info_full(**kwargs)["challengeInfo"]["challenge"] + + def info_full(self, **kwargs) -> dict: + """ + Returns all information sent by TikTok related to this hashtag. + + Example Usage + ```py + hashtag_data = api.hashtag(name='funny').info_full() + ``` + """ + processed = self.parent._process_kwargs(kwargs) + kwargs["custom_device_id"] = processed.device_id + + if self.name is not None: + query = {"challengeName": self.name} + elif self.id is not None: + query = {"challengeId": self.id} + else: + self.parent.logger.warning("Malformed Hashtag Object") + return {} + + path = "api/challenge/detail/?{}&{}".format( + self.parent._add_url_params(), urlencode(query) + ) + + data = self.parent.get_data(path, **kwargs) + + if data["challengeInfo"].get("challenge") is None: + raise TikTokNotFoundError("Challenge {} does not exist".format(self.name)) + + return data + + def videos(self, count=30, offset=0, **kwargs) -> Iterator[Video]: + """Returns a dictionary listing TikToks with a specific hashtag. + + - Parameters: + - count (int): The amount of videos you want returned. + - offset (int): The the offset of videos from 0 you want to get. + + Example Usage + ```py + for video in api.hashtag(name='funny').videos(): + # do something + ``` + """ + processed = self.parent._process_kwargs(kwargs) + kwargs["custom_device_id"] = processed.device_id + + if self.id is None: + self.id = self.info()["id"] + + cursor = offset + page_size = 30 + + while cursor - offset < count: + query = { + "count": page_size, + "challengeID": self.id, + "cursor": cursor, + } + path = "api/challenge/item_list/?{}&{}".format( + self.parent._add_url_params(), urlencode(query) + ) + res = self.parent.get_data(path, **kwargs) + + for result in res.get("itemList", []): + yield self.parent.video(data=result) + + if not res.get("hasMore", False): + self.parent.logger.info( + "TikTok isn't sending more TikToks beyond this point." + ) + return + + cursor = int(res["cursor"]) + + def __extract_from_data(self): + data = self.as_dict + keys = data.keys() + + if "title" in keys: + self.id = data["id"] + self.name = data["title"] + + if None in (self.name, self.id): + Hashtag.parent.logger.error( + f"Failed to create Hashtag with data: {data}\nwhich has keys {data.keys()}" + ) + + def __repr__(self): + return self.__str__() + + def __str__(self): + return f"TikTokApi.hashtag(id='{self.id}', name='{self.name}')" + + def __getattr__(self, name): + # TODO: Maybe switch to using @property instead + if name in ["id", "name", "as_dict"]: + self.as_dict = self.info() + self.__extract_from_data() + return self.__getattribute__(name) + + raise AttributeError(f"{name} doesn't exist on TikTokApi.api.Hashtag") diff --git a/TikTokApi/api/search.py b/TikTokApi/api/search.py new file mode 100644 index 00000000..6a70f219 --- /dev/null +++ b/TikTokApi/api/search.py @@ -0,0 +1,122 @@ +from __future__ import annotations + +from urllib.parse import urlencode + +from typing import TYPE_CHECKING, Iterator, Type + +from .user import User +from .sound import Sound +from .hashtag import Hashtag +from .video import Video + +if TYPE_CHECKING: + from ..tiktok import TikTokApi + +import requests + + +class Search: + """Contains static methods about searching.""" + + parent: TikTokApi + + @staticmethod + def videos(search_term, count=28, offset=0, **kwargs) -> Iterator[Video]: + """ + Searches for Videos + + - Parameters: + - search_term (str): The phrase you want to search for. + - count (int): The amount of videos you want returned. + - offset (int): The offset of videos from your data you want returned. + + Example Usage + ```py + for video in api.search.videos('therock'): + # do something + ``` + """ + return Search.search_type( + search_term, "item", count=count, offset=offset, **kwargs + ) + + @staticmethod + def users(search_term, count=28, offset=0, **kwargs) -> Iterator[User]: + """ + Searches for users using an alternate endpoint than Search.users + + - Parameters: + - search_term (str): The phrase you want to search for. + - count (int): The amount of videos you want returned. + + Example Usage + ```py + for user in api.search.users_alternate('therock'): + # do something + ``` + """ + return Search.search_type( + search_term, "user", count=count, offset=offset, **kwargs + ) + + @staticmethod + def search_type(search_term, obj_type, count=28, offset=0, **kwargs) -> Iterator: + """ + Searches for users using an alternate endpoint than Search.users + + - Parameters: + - search_term (str): The phrase you want to search for. + - count (int): The amount of videos you want returned. + - obj_type (str): user | item + + Just use .video & .users + ``` + """ + processed = Search.parent._process_kwargs(kwargs) + kwargs["custom_device_id"] = processed.device_id + + cursor = offset + + spawn = requests.head( + "https://www.tiktok.com", + proxies=Search.parent._format_proxy(processed.proxy), + **Search.parent._requests_extra_kwargs + ) + ttwid = spawn.cookies["ttwid"] + + # For some reason when <= it can be off by one. + while cursor - offset <= count: + query = { + "keyword": search_term, + "cursor": cursor, + "app_language": Search.parent._language, + } + path = "api/search/{}/full/?{}&{}".format( + obj_type, Search.parent._add_url_params(), urlencode(query) + ) + + if obj_type == "user": + subdomain = "www" + elif obj_type == "item": + subdomain = "us" + else: + raise TypeError("invalid obj_type") + + api_response = Search.parent.get_data( + path, subdomain=subdomain, ttwid=ttwid, **kwargs + ) + + # When I move to 3.10+ support make this a match switch. + for result in api_response.get("user_list", []): + yield User(data=result) + + for result in api_response.get("item_list", []): + yield Video(data=result) + + if api_response.get("has_more", 0) == 0: + Search.parent.logger.info( + "TikTok is not sending videos beyond this point." + ) + return + + cursor = int(api_response.get("cursor", cursor)) diff --git a/TikTokApi/api/sound.py b/TikTokApi/api/sound.py new file mode 100644 index 00000000..69770c25 --- /dev/null +++ b/TikTokApi/api/sound.py @@ -0,0 +1,181 @@ +from __future__ import annotations +from os import path + +import requests +import json + +from urllib.parse import quote, urlencode + +from ..helpers import extract_tag_contents +from ..exceptions import * + +from typing import TYPE_CHECKING, ClassVar, Iterator, Optional + +if TYPE_CHECKING: + from ..tiktok import TikTokApi + from .user import User + from .video import Video + + +class Sound: + """ + A TikTok Sound/Music/Song. + + Example Usage + ```py + song = api.song(id='7016547803243022337') + ``` + """ + + parent: ClassVar[TikTokApi] + + id: str + """TikTok's ID for the sound""" + title: Optional[str] + """The title of the song.""" + author: Optional[User] + """The author of the song (if it exists)""" + + def __init__(self, id: Optional[str] = None, data: Optional[str] = None): + """ + You must provide the id of the sound or it will not work. + """ + if data is not None: + self.as_dict = data + self.__extract_from_data() + elif id is None: + raise TypeError("You must provide id parameter.") + else: + self.id = id + + def info(self, use_html=False, **kwargs) -> dict: + """ + Returns a dictionary of TikTok's Sound/Music object. + + - Parameters: + - use_html (bool): If you want to perform an HTML request or not. + Defaults to False to use an API call, which shouldn't get detected + as often as an HTML request. + + + Example Usage + ```py + sound_data = api.sound(id='7016547803243022337').info() + ``` + """ + if use_html: + return self.info_full(**kwargs)["musicInfo"] + + processed = self.parent._process_kwargs(kwargs) + kwargs["custom_device_id"] = processed.device_id + + path = "node/share/music/-{}?{}".format(self.id, self.parent._add_url_params()) + res = self.parent.get_data(path, **kwargs) + + if res.get("statusCode", 200) == 10203: + raise TikTokNotFoundError() + + return res["musicInfo"]["music"] + + def info_full(self, **kwargs) -> dict: + """ + Returns all the data associated with a TikTok Sound. + + This makes an API request, there is no HTML request option, as such + with Sound.info() + + Example Usage + ```py + sound_data = api.sound(id='7016547803243022337').info_full() + ``` + """ + r = requests.get( + "https://www.tiktok.com/music/-{}".format(self.id), + headers={ + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9", + "Accept-Encoding": "gzip, deflate", + "Connection": "keep-alive", + "User-Agent": self.parent._user_agent, + }, + proxies=self.parent._format_proxy(kwargs.get("proxy", None)), + cookies=self.parent._get_cookies(**kwargs), + **self.parent._requests_extra_kwargs, + ) + + data = extract_tag_contents(r.text) + return json.loads(data)["props"]["pageProps"]["musicInfo"] + + def videos(self, count=30, offset=0, **kwargs) -> Iterator[Video]: + """ + Returns Video objects of videos created with this sound. + + - Parameters: + - count (int): The amount of videos you want returned. + - offset (int): The offset of videos you want returned. + + Example Usage + ```py + for video in api.sound(id='7016547803243022337').videos(): + # do something + ``` + """ + processed = self.parent._process_kwargs(kwargs) + kwargs["custom_device_id"] = processed.device_id + + cursor = offset + page_size = 30 + + while cursor - offset < count: + query = { + "secUid": "", + "musicID": self.id, + "cursor": cursor, + "shareUid": "", + "count": page_size, + } + path = "api/music/item_list/?{}&{}".format( + self.parent._add_url_params(), urlencode(query) + ) + + res = self.parent.get_data(path, send_tt_params=True, **kwargs) + + for result in res.get("itemList", []): + yield self.parent.video(data=result) + + if not res.get("hasMore", False): + self.parent.logger.info( + "TikTok isn't sending more TikToks beyond this point." + ) + return + + cursor = int(res["cursor"]) + + def __extract_from_data(self): + data = self.as_dict + keys = data.keys() + + if "authorName" in keys: + self.id = data["id"] + self.title = data["title"] + + if data.get("authorName") is not None: + self.author = self.parent.user(username=data["authorName"]) + + if self.id is None: + Sound.parent.logger.error( + f"Failed to create Sound with data: {data}\nwhich has keys {data.keys()}" + ) + + def __repr__(self): + return self.__str__() + + def __str__(self): + return f"TikTokApi.sound(id='{self.id}')" + + def __getattr__(self, name): + if name in ["title", "author", "as_dict"]: + self.as_dict = self.info() + self.__extract_from_data() + return self.__getattribute__(name) + + raise AttributeError(f"{name} doesn't exist on TikTokApi.api.Sound") diff --git a/TikTokApi/api/trending.py b/TikTokApi/api/trending.py new file mode 100644 index 00000000..7841a5b5 --- /dev/null +++ b/TikTokApi/api/trending.py @@ -0,0 +1,70 @@ +from __future__ import annotations + +import logging +import requests +from urllib.parse import urlencode + +from .video import Video +from .sound import Sound +from .user import User +from .hashtag import Hashtag + +from typing import TYPE_CHECKING, Iterator + +if TYPE_CHECKING: + from ..tiktok import TikTokApi + + +class Trending: + """Contains static methods related to trending.""" + + parent: TikTokApi + + @staticmethod + def videos(count=30, **kwargs) -> Iterator[Video]: + """ + Returns Videos that are trending on TikTok. + + - Parameters: + - count (int): The amount of videos you want returned. + """ + + processed = Trending.parent._process_kwargs(kwargs) + kwargs["custom_device_id"] = processed.device_id + + spawn = requests.head( + "https://www.tiktok.com", + proxies=Trending.parent._format_proxy(processed.proxy), + **Trending.parent._requests_extra_kwargs, + ) + ttwid = spawn.cookies["ttwid"] + + first = True + amount_yielded = 0 + + while amount_yielded < count: + query = { + "count": 30, + "id": 1, + "sourceType": 12, + "itemID": 1, + "insertedItemID": "", + "region": processed.region, + "priority_region": processed.region, + "language": processed.language, + } + path = "api/recommend/item_list/?{}&{}".format( + Trending.parent._add_url_params(), urlencode(query) + ) + res = Trending.parent.get_data(path, ttwid=ttwid, **kwargs) + for result in res.get("itemList", []): + yield Video(data=result) + amount_yielded += len(res.get("itemList", [])) + + if not res.get("hasMore", False) and not first: + Trending.parent.logger.info( + "TikTok isn't sending more TikToks beyond this point." + ) + return + + first = False diff --git a/TikTokApi/api/user.py b/TikTokApi/api/user.py new file mode 100644 index 00000000..35a09565 --- /dev/null +++ b/TikTokApi/api/user.py @@ -0,0 +1,287 @@ +from __future__ import annotations + +import json +import requests + +from urllib.parse import quote, urlencode + +from ..exceptions import * +from ..helpers import extract_tag_contents + +from typing import TYPE_CHECKING, ClassVar, Iterator, Optional + +if TYPE_CHECKING: + from ..tiktok import TikTokApi + from .video import Video + + +class User: + """ + A TikTok User. + + Example Usage + ```py + user = api.user(username='therock') + # or + user_id = '5831967' + sec_uid = 'MS4wLjABAAAA-VASjiXTh7wDDyXvjk10VFhMWUAoxr8bgfO1kAL1-9s' + user = api.user(user_id=user_id, sec_uid=sec_uid) + ``` + + """ + + parent: ClassVar[TikTokApi] + + user_id: str + """The user ID of the user.""" + sec_uid: str + """The sec UID of the user.""" + username: str + """The username of the user.""" + as_dict: dict + """The raw data associated with this user.""" + + def __init__( + self, + username: Optional[str] = None, + user_id: Optional[str] = None, + sec_uid: Optional[str] = None, + data: Optional[dict] = None, + ): + """ + You must provide the username or (user_id and sec_uid) otherwise this + will not function correctly. + """ + self.__update_id_sec_uid_username(user_id, sec_uid, username) + if data is not None: + self.as_dict = data + self.__extract_from_data() + + def info(self, **kwargs): + """ + Returns a dictionary of TikTok's User object + + Example Usage + ```py + user_data = api.user(username='therock').info() + ``` + """ + return self.info_full(**kwargs)["user"] + + def info_full(self, **kwargs) -> dict: + """ + Returns a dictionary of information associated with this User. + Includes statistics about this user. + + Example Usage + ```py + user_data = api.user(username='therock').info_full() + ``` + """ + + # TODO: Find the one using only user_id & sec_uid + if not self.username: + raise TypeError( + "You must provide the username when creating this class to use this method." + ) + + quoted_username = quote(self.username) + r = requests.get( + "https://tiktok.com/@{}?lang=en".format(quoted_username), + headers={ + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9", + "path": "/@{}".format(quoted_username), + "Accept-Encoding": "gzip, deflate", + "Connection": "keep-alive", + "User-Agent": self.parent._user_agent, + }, + proxies=User.parent._format_proxy(kwargs.get("proxy", None)), + cookies=User.parent._get_cookies(**kwargs), + **User.parent._requests_extra_kwargs, + ) + + data = extract_tag_contents(r.text) + user = json.loads(data) + + user_props = user["props"]["pageProps"] + if user_props["statusCode"] == 404: + raise TikTokNotFoundError( + "TikTok user with username {} does not exist".format(self.username) + ) + + return user_props["userInfo"] + + def videos(self, count=30, cursor=0, **kwargs) -> Iterator[Video]: + """ + Returns an iterator yielding Video objects. + + - Parameters: + - count (int): The amount of videos you want returned. + - cursor (int): The unix epoch to get uploaded videos since. + + Example Usage + ```py + user = api.user(username='therock') + for video in user.videos(count=100): + # do something + ``` + """ + processed = User.parent._process_kwargs(kwargs) + kwargs["custom_device_id"] = processed.device_id + + if not self.user_id and not self.sec_uid: + self.__find_attributes() + + first = True + amount_yielded = 0 + + while amount_yielded < count: + query = { + "count": 30, + "id": self.user_id, + "cursor": cursor, + "type": 1, + "secUid": self.sec_uid, + "sourceType": 8, + "appId": 1233, + "region": processed.region, + "priority_region": processed.region, + "language": processed.language, + } + path = "api/post/item_list/?{}&{}".format( + User.parent._add_url_params(), urlencode(query) + ) + + res = User.parent.get_data(path, send_tt_params=True, **kwargs) + + videos = res.get("itemList", []) + amount_yielded += len(videos) + for video in videos: + yield self.parent.video(data=video) + + if not res.get("hasMore", False) and not first: + User.parent.logger.info( + "TikTok isn't sending more TikToks beyond this point." + ) + return + + cursor = res["cursor"] + first = False + + def liked(self, count: int = 30, cursor: int = 0, **kwargs) -> Iterator[Video]: + """ + Returns a dictionary listing TikToks that a given a user has liked. + + **Note**: The user's likes must be **public** (which is not the default option) + + - Parameters: + - count (int): The amount of videos you want returned. + - cursor (int): The unix epoch to get uploaded videos since. + + Example Usage + ```py + for liked_video in api.user(username='public_likes'): + # do something + ``` + """ + processed = User.parent._process_kwargs(kwargs) + kwargs["custom_device_id"] = processed.device_id + + amount_yielded = 0 + first = True + + if self.user_id is None and self.sec_uid is None: + self.__find_attributes() + + while amount_yielded < count: + query = { + "count": 30, + "id": self.user_id, + "type": 2, + "secUid": self.sec_uid, + "cursor": cursor, + "sourceType": 9, + "appId": 1233, + "region": processed.region, + "priority_region": processed.region, + "language": processed.language, + } + path = "api/favorite/item_list/?{}&{}".format( + User.parent._add_url_params(), urlencode(query) + ) + + res = self.parent.get_data(path, **kwargs) + + if "itemList" not in res.keys(): + if first: + User.parent.logger.error("User's likes are most likely private") + return + + videos = res.get("itemList", []) + amount_yielded += len(videos) + for video in videos: + amount_yielded += 1 + yield self.parent.video(data=video) + + if not res.get("hasMore", False) and not first: + User.parent.logger.info( + "TikTok isn't sending more TikToks beyond this point." + ) + return + + cursor = res["cursor"] + first = False + + def __extract_from_data(self): + data = self.as_dict + keys = data.keys() + + if "user_info" in keys: + self.__update_id_sec_uid_username( + data["user_info"]["uid"], + data["user_info"]["sec_uid"], + data["user_info"]["unique_id"], + ) + elif "uniqueId" in keys: + self.__update_id_sec_uid_username( + data["id"], data["secUid"], data["uniqueId"] + ) + + if None in (self.username, self.user_id, self.sec_uid): + User.parent.logger.error( + f"Failed to create User with data: {data}\nwhich has keys {data.keys()}" + ) + + def __update_id_sec_uid_username(self, id, sec_uid, username): + self.user_id = id + self.sec_uid = sec_uid + self.username = username + + def __find_attributes(self) -> None: + # It is more efficient to check search first, since self.user_object() makes HTML request. + found = False + for u in self.parent.search.users(self.username): + if u.username == self.username: + found = True + self.__update_id_sec_uid_username(u.user_id, u.sec_uid, u.username) + break + + if not found: + user_object = self.info() + self.__update_id_sec_uid_username( + user_object["id"], user_object["secUid"], user_object["uniqueId"] + ) + + def __repr__(self): + return self.__str__() + + def __str__(self): + return f"TikTokApi.user(username='{self.username}', user_id='{self.user_id}', sec_uid='{self.sec_uid}')" + + def __getattr__(self, name): + if name in ["as_dict"]: + self.as_dict = self.info() + self.__extract_from_data() + return self.__getattribute__(name) + + raise AttributeError(f"{name} doesn't exist on TikTokApi.api.User") diff --git a/TikTokApi/api/video.py b/TikTokApi/api/video.py new file mode 100644 index 00000000..7285e79c --- /dev/null +++ b/TikTokApi/api/video.py @@ -0,0 +1,145 @@ +from __future__ import annotations + +from urllib.parse import urlencode + +from ..helpers import extract_video_id_from_url + +import logging +from typing import TYPE_CHECKING, ClassVar, Optional + +if TYPE_CHECKING: + from ..tiktok import TikTokApi + from .user import User + from .sound import Sound + from .hashtag import Hashtag + + +class Video: + """ + A TikTok Video class + + Example Usage + ```py + video = api.video(id='7041997751718137094') + ``` + """ + + parent: ClassVar[TikTokApi] + + id: Optional[str] + """TikTok's ID of the Video""" + author: Optional[User] + """The User who created the Video""" + sound: Optional[Sound] + """The Sound that is associated with the Video""" + hashtags: Optional[list[Hashtag]] + """A List of Hashtags on the Video""" + as_dict: dict + """The raw data associated with this Video.""" + + def __init__( + self, + id: Optional[str] = None, + url: Optional[str] = None, + data: Optional[dict] = None, + ): + """ + You must provide the id or a valid url, else this will fail. + """ + self.id = id + if data is not None: + self.as_dict = data + self.__extract_from_data() + elif url is not None: + self.id = extract_video_id_from_url(url) + + if self.id is None: + raise TypeError("You must provide id or url parameter.") + + def info(self, **kwargs) -> dict: + """ + Returns a dictionary of TikTok's Video object. + + Example Usage + ```py + video_data = api.video(id='7041997751718137094').info() + ``` + """ + return self.info_full(**kwargs)["itemInfo"]["itemStruct"] + + def info_full(self, **kwargs) -> dict: + """ + Returns a dictionary of all data associated with a TikTok Video. + + Example Usage + ```py + video_data = api.video(id='7041997751718137094').info_full() + ``` + """ + processed = self.parent._process_kwargs(kwargs) + kwargs["custom_device_id"] = processed.device_id + + device_id = kwargs.get("custom_device_id", None) + query = { + "itemId": self.id, + } + path = "api/item/detail/?{}&{}".format( + self.parent._add_url_params(), urlencode(query) + ) + + return self.parent.get_data(path, **kwargs) + + def bytes(self, **kwargs) -> bytes: + """ + Returns the bytes of a TikTok Video. + + Example Usage + ```py + video_bytes = api.video(id='7041997751718137094').bytes() + + # Saving The Video + with open('saved_video.mp4', 'wb') as output: + output.write(video_bytes) + ``` + """ + processed = self.parent._process_kwargs(kwargs) + kwargs["custom_device_id"] = processed.device_id + + video_data = self.info(**kwargs) + download_url = video_data["video"]["playAddr"] + + return self.parent.get_bytes(url=download_url, **kwargs) + + def __extract_from_data(self) -> None: + data = self.as_dict + keys = data.keys() + + if "author" in keys: + self.id = data["id"] + self.author = self.parent.user(data=data["author"]) + self.sound = self.parent.sound(data=data["music"]) + + self.hashtags = [ + self.parent.hashtag(data=hashtag) + for hashtag in data.get("challenges", []) + ] + + if self.id is None: + Video.parent.logger.error( + f"Failed to create Video with data: {data}\nwhich has keys {data.keys()}" + ) + + def __repr__(self): + return self.__str__() + + def __str__(self): + return f"TikTokApi.video(id='{self.id}')" + + def __getattr__(self, name): + # Handle author, sound, hashtags, as_dict + if name in ["author", "sound", "hashtags", "as_dict"]: + self.as_dict = self.info() + self.__extract_from_data() + return self.__getattribute__(name) + + raise AttributeError(f"{name} doesn't exist on TikTokApi.api.Video") diff --git a/TikTokApi/browser_utilities/browser.py b/TikTokApi/browser_utilities/browser.py index 8e3b44ef..b0982c7e 100644 --- a/TikTokApi/browser_utilities/browser.py +++ b/TikTokApi/browser_utilities/browser.py @@ -8,15 +8,16 @@ import json import re from .browser_interface import BrowserInterface -from urllib.parse import splitquery, parse_qs, parse_qsl +from urllib.parse import parse_qsl, urlparse - -# Import Detection From Stealth -from .get_acrawler import get_acrawler, get_tt_params_script +from ..utilities import LOGGER_NAME +from .get_acrawler import _get_acrawler, _get_tt_params_script from playwright.sync_api import sync_playwright playwright = None +logger = logging.getLogger(LOGGER_NAME) + def get_playwright(): global playwright @@ -40,7 +41,7 @@ def __init__( self.api_url = kwargs.get("api_url", None) self.referrer = kwargs.get("referer", "https://www.tiktok.com/") self.language = kwargs.get("language", "en") - self.executablePath = kwargs.get("executablePath", None) + self.executable_path = kwargs.get("executable_path", None) self.device_id = kwargs.get("custom_device_id", None) args = kwargs.get("browser_args", []) @@ -72,18 +73,18 @@ def __init__( self.options.update(options) - if self.executablePath is not None: - self.options["executablePath"] = self.executablePath + if self.executable_path is not None: + self.options["executable_path"] = self.executable_path try: self.browser = get_playwright().webkit.launch( args=self.args, **self.options ) except Exception as e: - logging.critical(e) + logger.critical("Webkit launch failed", exc_info=True) raise e - context = self.create_context(set_useragent=True) + context = self._create_context(set_useragent=True) page = context.new_page() self.get_params(page) context.close() @@ -118,7 +119,7 @@ def get_params(self, page) -> None: self.width = page.evaluate("""() => { return screen.width; }""") self.height = page.evaluate("""() => { return screen.height; }""") - def create_context(self, set_useragent=False): + def _create_context(self, set_useragent=False): iphone = playwright.devices["iPhone 11 Pro"] iphone["viewport"] = { "width": random.randint(320, 1920), @@ -132,11 +133,11 @@ def create_context(self, set_useragent=False): context = self.browser.new_context(**iphone) if set_useragent: - self.userAgent = iphone["user_agent"] + self.user_agent = iphone["user_agent"] return context - def base36encode(self, number, alphabet="0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"): + def _base36encode(self, number, alphabet="0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"): """Converts an integer to a base36 string.""" base36 = "" sign = "" @@ -157,7 +158,7 @@ def base36encode(self, number, alphabet="0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"): def gen_verifyFp(self): chars = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"[:] chars_len = len(chars) - scenario_title = self.base36encode(int(time.time() * 1000)) + scenario_title = self._base36encode(int(time.time() * 1000)) uuid = [0] * 36 uuid[8] = "_" uuid[13] = "_" @@ -173,16 +174,12 @@ def gen_verifyFp(self): return f'verify_{scenario_title.lower()}_{"".join(uuid)}' - def sign_url(self, calc_tt_params=False, **kwargs): + def sign_url(self, url, calc_tt_params=False, **kwargs): def process(route): route.abort() - url = kwargs.get("url", None) - if url is None: - raise Exception("sign_url required a url parameter") - tt_params = None - context = self.create_context() + context = self._create_context() page = context.new_page() if calc_tt_params: @@ -202,7 +199,7 @@ def process(route): verifyFp = self.gen_verifyFp() else: verifyFp = kwargs.get( - "custom_verifyFp", + "custom_verify_fp", "verify_khgp4f49_V12d4mRX_MdCO_4Wzt_Ar0k_z4RCQC9pUDpX", ) @@ -215,7 +212,7 @@ def process(route): url = "{}&verifyFp={}&device_id={}".format(url, verifyFp, device_id) - page.add_script_tag(content=get_acrawler()) + page.add_script_tag(content=_get_acrawler()) evaluatedPage = page.evaluate( '''() => { var url = "''' @@ -230,12 +227,12 @@ def process(route): url = "{}&_signature={}".format(url, evaluatedPage) if calc_tt_params: - page.add_script_tag(content=get_tt_params_script()) + page.add_script_tag(content=_get_tt_params_script()) tt_params = page.evaluate( """() => { return window.genXTTParams(""" - + json.dumps(dict(parse_qsl(splitquery(url)[1]))) + + json.dumps(dict(parse_qsl(urlparse(url).query))) + """); }""" @@ -244,11 +241,11 @@ def process(route): context.close() return (verifyFp, device_id, evaluatedPage, tt_params) - def clean_up(self): + def _clean_up(self): try: self.browser.close() except Exception: - logging.info("cleanup failed") + logger.exception("cleanup failed") # playwright.stop() def find_redirect(self, url): diff --git a/TikTokApi/browser_utilities/browser_interface.py b/TikTokApi/browser_utilities/browser_interface.py index a453c61a..fb4e9332 100644 --- a/TikTokApi/browser_utilities/browser_interface.py +++ b/TikTokApi/browser_utilities/browser_interface.py @@ -16,5 +16,5 @@ def sign_url(self, calc_tt_params=False, **kwargs): pass @abc.abstractmethod - def clean_up(self) -> None: + def _clean_up(self) -> None: pass diff --git a/TikTokApi/browser_utilities/browser_selenium.py b/TikTokApi/browser_utilities/browser_selenium.py deleted file mode 100644 index 1b79df91..00000000 --- a/TikTokApi/browser_utilities/browser_selenium.py +++ /dev/null @@ -1,224 +0,0 @@ -import random -import time -import requests -import logging -from threading import Thread -import time -import re -import random -import json -from .browser_interface import BrowserInterface -from selenium_stealth import stealth -from selenium import webdriver -from .get_acrawler import get_acrawler, get_tt_params_script -from urllib.parse import splitquery, parse_qs, parse_qsl - - -class browser(BrowserInterface): - def __init__( - self, - **kwargs, - ): - self.kwargs = kwargs - self.debug = kwargs.get("debug", False) - self.proxy = kwargs.get("proxy", None) - self.api_url = kwargs.get("api_url", None) - self.referrer = kwargs.get("referer", "https://www.tiktok.com/") - self.language = kwargs.get("language", "en") - self.executablePath = kwargs.get("executablePath", "chromedriver") - self.device_id = kwargs.get("custom_device_id", None) - - args = kwargs.get("browser_args", []) - options = kwargs.get("browser_options", {}) - - if len(args) == 0: - self.args = [] - else: - self.args = args - - options = webdriver.ChromeOptions() - options.add_argument("--headless") - options.add_argument("log-level=2") - self.options = { - "headless": True, - "handleSIGINT": True, - "handleSIGTERM": True, - "handleSIGHUP": True, - } - - if self.proxy is not None: - if "@" in self.proxy: - server_prefix = self.proxy.split("://")[0] - address = self.proxy.split("@")[1] - self.options["proxy"] = { - "server": server_prefix + "://" + address, - "username": self.proxy.split("://")[1].split(":")[0], - "password": self.proxy.split("://")[1].split("@")[0].split(":")[1], - } - else: - self.options["proxy"] = {"server": self.proxy} - - # self.options.update(options) - - if self.executablePath is not None: - self.options["executablePath"] = self.executablePath - - try: - self.browser = webdriver.Chrome( - executable_path=self.executablePath, chrome_options=options - ) - except Exception as e: - raise e - - # Page avoidance - self.setup_browser() - # page.close() - - def setup_browser(self): - stealth( - self.browser, - languages=["en-US", "en"], - vendor="Google Inc.", - platform="Win32", - webgl_vendor="Intel Inc.", - renderer="Intel Iris OpenGL Engine", - fix_hairline=True, - ) - - self.get_params(self.browser) - # NOTE: Slower than playwright at loading this because playwright can ignore unneeded files. - self.browser.get("https://www.tiktok.com/@redbull") - self.browser.execute_script(get_acrawler()) - self.browser.execute_script(get_tt_params_script()) - - def get_params(self, page) -> None: - self.userAgent = page.execute_script("""return navigator.userAgent""") - self.browser_language = self.kwargs.get( - "browser_language", ("""return navigator.language""") - ) - self.browser_version = """return window.navigator.appVersion""" - - if len(self.browser_language.split("-")) == 0: - self.region = self.kwargs.get("region", "US") - self.language = self.kwargs.get("language", "en") - elif len(self.browser_language.split("-")) == 1: - self.region = self.kwargs.get("region", "US") - self.language = self.browser_language.split("-")[0] - else: - self.region = self.kwargs.get("region", self.browser_language.split("-")[1]) - self.language = self.kwargs.get( - "language", self.browser_language.split("-")[0] - ) - - self.timezone_name = self.kwargs.get( - "timezone_name", - ("""return Intl.DateTimeFormat().resolvedOptions().timeZone"""), - ) - self.width = """return screen.width""" - self.height = """return screen.height""" - - def base36encode(self, number, alphabet="0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ"): - """Converts an integer to a base36 string.""" - base36 = "" - sign = "" - - if number < 0: - sign = "-" - number = -number - - if 0 <= number < len(alphabet): - return sign + alphabet[number] - - while number != 0: - number, i = divmod(number, len(alphabet)) - base36 = alphabet[i] + base36 - - return sign + base36 - - def gen_verifyFp(self): - chars = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"[:] - chars_len = len(chars) - scenario_title = self.base36encode(int(time.time() * 1000)) - uuid = [0] * 36 - uuid[8] = "_" - uuid[13] = "_" - uuid[18] = "_" - uuid[23] = "_" - uuid[14] = "4" - - for i in range(36): - if uuid[i] != 0: - continue - r = int(random.random() * chars_len) - uuid[i] = chars[int((3 & r) | 8 if i == 19 else r)] - - return f'verify_{scenario_title.lower()}_{"".join(uuid)}' - - def sign_url(self, calc_tt_params=False, **kwargs): - url = kwargs.get("url", None) - if url is None: - raise Exception("sign_url required a url parameter") - - tt_params = None - if kwargs.get("gen_new_verifyFp", False): - verifyFp = self.gen_verifyFp() - else: - verifyFp = kwargs.get( - "custom_verifyFp", - "verify_khgp4f49_V12d4mRX_MdCO_4Wzt_Ar0k_z4RCQC9pUDpX", - ) - - if kwargs.get("custom_device_id") is not None: - device_id = kwargs.get("custom_device_id", None) - elif self.device_id is None: - device_id = str(random.randint(10000, 999999999)) - else: - device_id = self.device_id - - url = "{}&verifyFp={}&device_id={}".format(url, verifyFp, device_id) - # self.browser.execute_script(content=get_acrawler()) - # Should be covered by an earlier addition of get_acrawler. - evaluatedPage = ( - self.browser.execute_script( - ''' - var url = "''' - + url - + """" - var token = window.byted_acrawler.sign({url: url}); - return token; - """ - ), - ) - - url = "{}&_signature={}".format(url, evaluatedPage) - # self.browser.execute_script(content=get_tt_params_script()) - # Should be covered by an earlier addition of get_acrawler. - - tt_params = self.browser.execute_script( - """() => { - return window.genXTTParams(""" - + json.dumps(dict(parse_qsl(splitquery(url)[1]))) - + """); - - }""" - ) - - return (verifyFp, device_id, evaluatedPage, tt_params) - - def clean_up(self): - try: - self.browser.close() - except: - logging.warning("cleanup of browser failed") - - def __format_proxy(self, proxy): - if proxy is not None: - return {"http": proxy, "https": proxy} - else: - return None - - def __get_js(self): - return requests.get( - "https://sf16-muse-va.ibytedtos.com/obj/rc-web-sdk-gcs/acrawler.js", - proxies=self.__format_proxy(self.proxy), - ).text diff --git a/TikTokApi/browser_utilities/get_acrawler.py b/TikTokApi/browser_utilities/get_acrawler.py index f1a51f16..7996dcfe 100644 --- a/TikTokApi/browser_utilities/get_acrawler.py +++ b/TikTokApi/browser_utilities/get_acrawler.py @@ -1,6 +1,6 @@ -def get_tt_params_script(): +def _get_tt_params_script(): return """var CryptoJS=CryptoJS||function(e,t){var r={},n=r.lib={},i=n.Base=function(){function e(){}return{extend:function(t){e.prototype=this;var r=new e;return t&&r.mixIn(t),r.hasOwnProperty("init")&&this.init!==r.init||(r.init=function(){r.$super.init.apply(this,arguments)}),r.init.prototype=r,r.$super=this,r},create:function(){var e=this.extend();return e.init.apply(e,arguments),e},init:function(){},mixIn:function(e){for(var t in e)e.hasOwnProperty(t)&&(this[t]=e[t]);e.hasOwnProperty("toString")&&(this.toString=e.toString)},clone:function(){return this.init.prototype.extend(this)}}}(),c=n.WordArray=i.extend({init:function(e,t){e=this.words=e||[],this.sigBytes=null!=t?t:4*e.length},toString:function(e){return(e||f).stringify(this)},concat:function(e){var t=this.words,r=e.words,n=this.sigBytes,i=e.sigBytes;if(this.clamp(),n%4)for(var c=0;c>>2]>>>24-c%4*8&255;t[n+c>>>2]|=o<<24-(n+c)%4*8}else if(r.length>65535)for(c=0;c>>2]=r[c>>>2];else t.push.apply(t,r);return this.sigBytes+=i,this},clamp:function(){var t=this.words,r=this.sigBytes;t[r>>>2]&=4294967295<<32-r%4*8,t.length=e.ceil(r/4)},clone:function(){var e=i.clone.call(this);return e.words=this.words.slice(0),e},random:function(t){for(var r,n=[],i=function(t){t=t;var r=987654321,n=4294967295;return function(){var i=((r=36969*(65535&r)+(r>>16)&n)<<16)+(t=18e3*(65535&t)+(t>>16)&n)&n;return i/=4294967296,(i+=.5)*(e.random()>.5?1:-1)}},o=0;o>>2]>>>24-i%4*8&255;n.push((c>>>4).toString(16)),n.push((15&c).toString(16))}return n.join("")},parse:function(e){for(var t=e.length,r=[],n=0;n>>3]|=parseInt(e.substr(n,2),16)<<24-n%8*4;return new c.init(r,t/2)}},a=o.Latin1={stringify:function(e){for(var t=e.words,r=e.sigBytes,n=[],i=0;i>>2]>>>24-i%4*8&255;n.push(String.fromCharCode(c))}return n.join("")},parse:function(e){for(var t=e.length,r=[],n=0;n>>2]|=(255&e.charCodeAt(n))<<24-n%4*8;return new c.init(r,t)}},s=o.Utf8={stringify:function(e){try{return decodeURIComponent(escape(a.stringify(e)))}catch(e){throw new Error("Malformed UTF-8 data")}},parse:function(e){return a.parse(unescape(encodeURIComponent(e)))}},u=n.BufferedBlockAlgorithm=i.extend({reset:function(){this._data=new c.init,this._nDataBytes=0},_append:function(e){"string"==typeof e&&(e=s.parse(e)),this._data.concat(e),this._nDataBytes+=e.sigBytes},_process:function(t){var r=this._data,n=r.words,i=r.sigBytes,o=this.blockSize,f=i/(4*o),a=(f=t?e.ceil(f):e.max((0|f)-this._minBufferSize,0))*o,s=e.min(4*a,i);if(a){for(var u=0;u>>2]>>>24-c%4*8&255)<<16|(t[c+1>>>2]>>>24-(c+1)%4*8&255)<<8|t[c+2>>>2]>>>24-(c+2)%4*8&255,f=0;f<4&&c+.75*f>>6*(3-f)&63));var a=n.charAt(64);if(a)for(;i.length%4;)i.push(a);return i.join("")},parse:function(e){var r=e.length,n=this._map,i=this._reverseMap;if(!i){i=this._reverseMap=[];for(var c=0;c>>6-o%4*2;i[c>>>2]|=(f|a)<<24-c%4*8,c++}return t.create(i,c)}(e,r,i)},_map:"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/="}}(),CryptoJS.lib.Cipher||function(e){var t=CryptoJS,r=t.lib,n=r.Base,i=r.WordArray,c=r.BufferedBlockAlgorithm,o=t.enc,f=(o.Utf8,o.Base64),a=t.algo.EvpKDF,s=r.Cipher=c.extend({cfg:n.extend(),createEncryptor:function(e,t){return this.create(this._ENC_XFORM_MODE,e,t)},createDecryptor:function(e,t){return this.create(this._DEC_XFORM_MODE,e,t)},init:function(e,t,r){this.cfg=this.cfg.extend(r),this._xformMode=e,this._key=t,this.reset()},reset:function(){c.reset.call(this),this._doReset()},process:function(e){return this._append(e),this._process()},finalize:function(e){return e&&this._append(e),this._doFinalize()},keySize:4,ivSize:4,_ENC_XFORM_MODE:1,_DEC_XFORM_MODE:2,_createHelper:function(){function e(e){return"string"==typeof e?_:v}return function(t){return{encrypt:function(r,n,i){return e(n).encrypt(t,r,n,i)},decrypt:function(r,n,i){return e(n).decrypt(t,r,n,i)}}}}()}),u=(r.StreamCipher=s.extend({_doFinalize:function(){return this._process(!0)},blockSize:1}),t.mode={}),d=r.BlockCipherMode=n.extend({createEncryptor:function(e,t){return this.Encryptor.create(e,t)},createDecryptor:function(e,t){return this.Decryptor.create(e,t)},init:function(e,t){this._cipher=e,this._iv=t}}),l=u.CBC=function(){var t=d.extend();function r(t,r,n){var i=this._iv;if(i){var c=i;this._iv=e}else c=this._prevBlock;for(var o=0;o>>2];e.sigBytes-=t}},h=(r.BlockCipher=s.extend({cfg:s.cfg.extend({mode:l,padding:p}),reset:function(){s.reset.call(this);var e=this.cfg,t=e.iv,r=e.mode;if(this._xformMode==this._ENC_XFORM_MODE)var n=r.createEncryptor;else{n=r.createDecryptor;this._minBufferSize=1}this._mode&&this._mode.__creator==n?this._mode.init(this,t&&t.words):(this._mode=n.call(r,this,t&&t.words),this._mode.__creator=n)},_doProcessBlock:function(e,t){this._mode.processBlock(e,t)},_doFinalize:function(){var e=this.cfg.padding;if(this._xformMode==this._ENC_XFORM_MODE){e.pad(this._data,this.blockSize);var t=this._process(!0)}else{t=this._process(!0);e.unpad(t)}return t},blockSize:4}),r.CipherParams=n.extend({init:function(e){this.mixIn(e)},toString:function(e){return(e||this.formatter).stringify(this)}})),y=(t.format={}).OpenSSL={stringify:function(e){var t=e.ciphertext,r=e.salt;if(r)var n=i.create([1398893684,1701076831]).concat(r).concat(t);else n=t;return n.toString(f)},parse:function(e){var t=f.parse(e),r=t.words;if(1398893684==r[0]&&1701076831==r[1]){var n=i.create(r.slice(2,4));r.splice(0,4),t.sigBytes-=16}return h.create({ciphertext:t,salt:n})}},v=r.SerializableCipher=n.extend({cfg:n.extend({format:y}),encrypt:function(e,t,r,n){n=this.cfg.extend(n);var i=e.createEncryptor(r,n),c=i.finalize(t),o=i.cfg;return h.create({ciphertext:c,key:r,iv:o.iv,algorithm:e,mode:o.mode,padding:o.padding,blockSize:e.blockSize,formatter:n.format})},decrypt:function(e,t,r,n){return n=this.cfg.extend(n),t=this._parse(t,n.format),e.createDecryptor(r,n).finalize(t.ciphertext)},_parse:function(e,t){return"string"==typeof e?t.parse(e,this):e}}),b=(t.kdf={}).OpenSSL={execute:function(e,t,r,n){n||(n=i.random(8));var c=a.create({keySize:t+r}).compute(e,n),o=i.create(c.words.slice(t),4*r);return c.sigBytes=4*t,h.create({key:c,iv:o,salt:n})}},_=r.PasswordBasedCipher=v.extend({cfg:v.cfg.extend({kdf:b}),encrypt:function(e,t,r,n){var i=(n=this.cfg.extend(n)).kdf.execute(r,e.keySize,e.ivSize);n.iv=i.iv;var c=v.encrypt.call(this,e,t,i.key,n);return c.mixIn(i),c},decrypt:function(e,t,r,n){n=this.cfg.extend(n),t=this._parse(t,n.format);var i=n.kdf.execute(r,e.keySize,e.ivSize,t.salt);return n.iv=i.iv,v.decrypt.call(this,e,t,i.key,n)}})}(),CryptoJS.mode.ECB=function(){var e=CryptoJS.lib.BlockCipherMode.extend();return e.Encryptor=e.extend({processBlock:function(e,t){this._cipher.encryptBlock(e,t)}}),e.Decryptor=e.extend({processBlock:function(e,t){this._cipher.decryptBlock(e,t)}}),e}(),function(){var e=CryptoJS,t=e.lib.BlockCipher,r=e.algo,n=[],i=[],c=[],o=[],f=[],a=[],s=[],u=[],d=[],l=[];!function(){for(var e=[],t=0;t<256;t++)e[t]=t<128?t<<1:t<<1^283;var r=0,p=0;for(t=0;t<256;t++){var h=p^p<<1^p<<2^p<<3^p<<4;h=h>>>8^255&h^99,n[r]=h,i[h]=r;var y=e[r],v=e[y],b=e[v],_=257*e[h]^16843008*h;c[r]=_<<24|_>>>8,o[r]=_<<16|_>>>16,f[r]=_<<8|_>>>24,a[r]=_;_=16843009*b^65537*v^257*y^16843008*r;s[h]=_<<24|_>>>8,u[h]=_<<16|_>>>16,d[h]=_<<8|_>>>24,l[h]=_,r?(r=y^e[e[e[b^y]]],p^=e[e[p]]):r=p=1}}();var p=[0,1,2,4,8,16,32,64,128,27,54],h=r.AES=t.extend({_doReset:function(){if(!this._nRounds||this._keyPriorReset!==this._key){for(var e=this._keyPriorReset=this._key,t=e.words,r=e.sigBytes/4,i=4*((this._nRounds=r+6)+1),c=this._keySchedule=[],o=0;o6&&o%r==4&&(f=n[f>>>24]<<24|n[f>>>16&255]<<16|n[f>>>8&255]<<8|n[255&f]):(f=n[(f=f<<8|f>>>24)>>>24]<<24|n[f>>>16&255]<<16|n[f>>>8&255]<<8|n[255&f],f^=p[o/r|0]<<24),c[o]=c[o-r]^f}for(var a=this._invKeySchedule=[],h=0;h>>24]]^u[n[f>>>16&255]]^d[n[f>>>8&255]]^l[n[255&f]]}}},encryptBlock:function(e,t){this._doCryptBlock(e,t,this._keySchedule,c,o,f,a,n)},decryptBlock:function(e,t){var r=e[t+1];e[t+1]=e[t+3],e[t+3]=r,this._doCryptBlock(e,t,this._invKeySchedule,s,u,d,l,i);r=e[t+1];e[t+1]=e[t+3],e[t+3]=r},_doCryptBlock:function(e,t,r,n,i,c,o,f){for(var a=this._nRounds,s=e[t]^r[0],u=e[t+1]^r[1],d=e[t+2]^r[2],l=e[t+3]^r[3],p=4,h=1;h>>24]^i[u>>>16&255]^c[d>>>8&255]^o[255&l]^r[p++],v=n[u>>>24]^i[d>>>16&255]^c[l>>>8&255]^o[255&s]^r[p++],b=n[d>>>24]^i[l>>>16&255]^c[s>>>8&255]^o[255&u]^r[p++],_=n[l>>>24]^i[s>>>16&255]^c[u>>>8&255]^o[255&d]^r[p++];s=y,u=v,d=b,l=_}y=(f[s>>>24]<<24|f[u>>>16&255]<<16|f[d>>>8&255]<<8|f[255&l])^r[p++],v=(f[u>>>24]<<24|f[d>>>16&255]<<16|f[l>>>8&255]<<8|f[255&s])^r[p++],b=(f[d>>>24]<<24|f[l>>>16&255]<<16|f[s>>>8&255]<<8|f[255&u])^r[p++],_=(f[l>>>24]<<24|f[s>>>16&255]<<16|f[u>>>8&255]<<8|f[255&d])^r[p++];e[t]=y,e[t+1]=v,e[t+2]=b,e[t+3]=_},keySize:8});e.AES=t._createHelper(h)}();var a,i={};i.CryptoJS=CryptoJS,window._$jsvmprt=function(e,t,r){function n(e,t,r){return(n=function(){if("undefined"==typeof Reflect||!Reflect.construct)return!1;if(Reflect.construct.sham)return!1;if("function"==typeof Proxy)return!0;try{return Date.prototype.toString.call(Reflect.construct(Date,[],function(){})),!0}catch(e){return!1}}()?Reflect.construct:function(e,t,r){var n=[null];n.push.apply(n,t);var i=new(Function.bind.apply(e,n));return r&&function(e,t){(Object.setPrototypeOf||function(e,t){return e.__proto__=t,e})(e,t)}(i,r.prototype),i}).apply(null,arguments)}function i(e){return function(e){if(Array.isArray(e)){for(var t=0,r=new Array(e.length);t>7==0)return[1,i];if(i>>6==2){var c=parseInt(""+e[++t]+e[++t],16);return i&=63,[2,c=(i<<=8)+c]}if(i>>6==3){var o=parseInt(""+e[++t]+e[++t],16),f=parseInt(""+e[++t]+e[++t],16);return i&=63,[3,f=(i<<=16)+(o<<=8)+f]}},u=function(e,t){var r=parseInt(""+e[t]+e[t+1],16);return r>127?-256+r:r},d=function(e,t){var r=parseInt(""+e[t]+e[t+1]+e[t+2]+e[t+3],16);return r>32767?-65536+r:r},l=function(e,t){var r=parseInt(""+e[t]+e[t+1]+e[t+2]+e[t+3]+e[t+4]+e[t+5]+e[t+6]+e[t+7],16);return r>2147483647?0+r:r},p=function(e,t){return parseInt(""+e[t]+e[t+1],16)},h=function(e,t){return parseInt(""+e[t]+e[t+1]+e[t+2]+e[t+3],16)},y=y||this||window,v=(Object.keys,e.length,0),b="",_=v;_>=2,O>2)O=3&B,B>>=2,O<1?(O=B)<4?(g=k[C--],k[C]=k[C]-g):O<6?(g=k[C--],k[C]=k[C]===g):O<15&&(g=k[C],k[C]=k[C-1],k[C-1]=g):O<2?(O=B)<5&&(x=p(e,w),w+=2,g=l[x],k[++C]=g):O<3?(O=B)<6||(O<8?g=k[C--]:O<12&&(x=d(e,w),f[++a]=[[w+4,x-3],0,0],w+=2*x-2)):(O=B)<2?(g=k[C--],k[C]=k[C]1)if(O=3&B,B>>=2,O>2)(O=B)>5?(x=p(e,w),w+=2,k[++C]=l["$"+x]):O>3&&(x=d(e,w),f[a][0]&&!f[a][2]?f[a][1]=[w+4,x-3]:f[a++]=[0,[w+4,x-3],0],w+=2*x-2);else if(O>1){if((O=B)>2)if(k[C--])w+=4;else{if((x=d(e,w))<0){_=1,X(e,t,2*r),w+=2*x-2;break}w+=2*x-2}else if(O>0){for(x=h(e,w),g="",A=c.q[x][0];A0?(O=B)>1?(g=k[C--],k[C]=k[C]+g):O>-1&&(k[++C]=y):(O=B)>9?(x=p(e,w),w+=2,g=k[C--],l[x]=g):O>7?(x=h(e,w),w+=4,m=C+1,k[C-=x-1]=x?k.slice(C,m):[]):O>0&&(g=k[C--],k[C]=k[C]>g);else if(O>0){if(O=3&B,B>>=2,O<1){if((O=B)>9);else if(O>5)x=p(e,w),w+=2,k[C-=x]=0===x?new k[C]:n(k[C],i(k.slice(C+1,C+x+1)));else if(O>3){x=d(e,w);try{if(f[a][2]=1,1==(g=N(e,w+4,x-3,[],l,v,null,0))[0])return g}catch(b){if(f[a]&&f[a][1]&&1==(g=N(e,f[a][1][0],f[a][1][1],[],l,v,b,0))[0])return g}finally{if(f[a]&&f[a][0]&&1==(g=N(e,f[a][0][0],f[a][0][1],[],l,v,null,0))[0])return g;f[a]=0,a--}w+=2*x-2}}else if(O<2){if((O=B)>12)k[++C]=u(e,w),w+=2;else if(O>8){for(x=h(e,w),O="",A=c.q[x][0];A11?(g=k[C],k[++C]=g):O>0&&(k[++C]=g);else if((O=B)<1)k[C]=!k[C];else if(O<3){if((x=d(e,w))<0){_=1,X(e,t,2*r),w+=2*x-2;break}w+=2*x-2}}else if(O=3&B,B>>=2,O>2)(O=B)<1&&(k[++C]=null);else if(O>1){if((O=B)<9){for(g=k[C--],x=h(e,w),O="",A=c.q[x][0];A0)(O=B)<4?(m=k[C--],(O=k[C]).x===N?O.y>=1?k[C]=U(e,O.c,O.l,[m],O.z,S,null,1):(k[C]=U(e,O.c,O.l,[m],O.z,S,null,0),O.y++):k[C]=O(m)):O<6&&(k[C-=1]=k[C][k[C+1]]);else{if((O=B)<1)return[1,k[C--]];O<14?(m=k[C--],S=k[C--],(O=k[C--]).x===N?O.y>=1?k[++C]=U(e,O.c,O.l,m,O.z,S,null,1):(k[++C]=U(e,O.c,O.l,m,O.z,S,null,0),O.y++):k[++C]=O.apply(S,m)):O<16&&(x=d(e,w),(I=function t(){var r=arguments;return t.y>0||t.y++,U(e,t.c,t.l,r,t.z,this,null,0)}).c=w+4,I.l=x-2,I.x=N,I.y=0,I.z=l,k[C]=I,w+=2*x-2)}}if(_)for(;w>=2,O<1)if(O=3&B,B>>=2,O>2)(O=B)<1&&(k[++C]=null);else if(O>1){if((O=B)<9){for(g=k[C--],x=$[w],O="",A=c.q[x][0];A0)(O=B)<4?(m=k[C--],(O=k[C]).x===N?O.y>=1?k[C]=U(e,O.c,O.l,[m],O.z,S,null,1):(k[C]=U(e,O.c,O.l,[m],O.z,S,null,0),O.y++):k[C]=O(m)):O<6&&(k[C-=1]=k[C][k[C+1]]);else{var I;if((O=B)>14)x=$[w],(I=function t(){var r=arguments;return t.y>0||t.y++,U(e,t.c,t.l,r,t.z,this,null,0)}).c=w+4,I.l=x-2,I.x=N,I.y=0,I.z=l,k[C]=I,w+=2*x-2;else if(O>12)m=k[C--],S=k[C--],(O=k[C--]).x===N?O.y>=1?k[++C]=U(e,O.c,O.l,m,O.z,S,null,1):(k[++C]=U(e,O.c,O.l,m,O.z,S,null,0),O.y++):k[++C]=O.apply(S,m);else if(O>-1)return[1,k[C--]]}else if(O<2)if(O=3&B,B>>=2,O>2)(O=B)<1?k[C]=!k[C]:O<3&&(w+=2*(x=$[w])-2);else if(O>1)(O=B)<2?k[++C]=g:O<13&&(g=k[C],k[++C]=g);else if(O>0)if((O=B)<10){for(x=$[w],O="",A=c.q[x][0];A>=2,O<1)(O=B)>9?(x=$[w],w+=2,g=k[C--],l[x]=g):O>7?(x=$[w],w+=4,m=C+1,k[C-=x-1]=x?k.slice(C,m):[]):O>0&&(g=k[C--],k[C]=k[C]>g);else if(O<2)(O=B)>1?(g=k[C--],k[C]=k[C]+g):O>-1&&(k[++C]=y);else if(O<3)if((O=B)<2){for(x=$[w],g="",A=c.q[x][0];A5?(x=$[w],w+=2,k[++C]=l["$"+x]):O>3&&(x=$[w],f[a][0]&&!f[a][2]?f[a][1]=[w+4,x-3]:f[a++]=[0,[w+4,x-3],0],w+=2*x-2);else O=3&B,B>>=2,O<1?(O=B)<4?(g=k[C--],k[C]=k[C]-g):O<6?(g=k[C--],k[C]=k[C]===g):O<15&&(g=k[C],k[C]=k[C-1],k[C-1]=g):O<2?(O=B)<5&&(x=$[w],w+=2,g=l[x],k[++C]=g):O<3?(O=B)>10?(x=$[w],f[++a]=[[w+4,x-3],0,0],w+=2*x-2):O>6&&(g=k[C--]):(O=B)<2?(g=k[C--],k[C]=k[C]>=1,0==z.position&&(z.position=a,z.val=y(z.index,o|=(T>0?1:0)*r,r<<=1;switch(o){case 0:for(o=0,l=Math.pow(2,8),r=1;r!=l;)T=z.val&z.position,z.position>>=1,0==z.position&&(z.position=a,z.val=y(z.index,o|=(T>0?1:0)*r,r<<=1;k=K(o)1:for(o=0,l=Math.pow(2,16),r=1;r!=l;)T=z.val&z.position,z.position>>=1,0==z.position&&(z.position=a,z.val=y(z.index,o|=(T>0?1:0)*r,r<<=1;k=K(o)2:""}for(i[3]=k,m=k,b.push(k);;){if(z.index>S)"";for(o=0,l=Math.pow(2,j),r=1;r!=l;)T=z.val&z.position,z.position>>=1,0==z.position&&(z.position=a,z.val=y(z.index,o|=(T>0?1:0)*r,r<<=1;switch(k=o){case 0:for(o=0,l=Math.pow(2,8),r=1;r!=l;)T=z.val&z.position,z.position>>=1,0==z.position&&(z.position=a,z.val=y(z.index,o|=(T>0?1:0)*r,r<<=1;i[q]=K(o),k=q-1,J--1:for(o=0,l=Math.pow(2,16),r=1;r!=l;)T=z.val&z.position,z.position>>=1,0==z.position&&(z.position=a,z.val=y(z.index,o|=(T>0?1:0)*r,r<<=1;i[q]=K(o),k=q-1,J--2:b.join("")}if(0==J&&(J=Math.pow(2,j),j),i[k])I=i[k]if(k!==q)null;I=m+m.charAt(0)}b.push(I),i[q]=m+I.charAt(0),m=I,0==--J&&(J=Math.pow(2,j),j)}}};y};""==typeof define&&define.amd?define({w}):"undefined"!=typeof module&&null!=module?module.exports=w:"undefined"!=typeof angular&&null!=angular&&angular.module("w",[]).factory("w",{w}),eval(w.x("";'.replace(/[-]/g,function(m){return t[m.charCodeAt(0)&15]})}("var function ().length++return ));break;case ;else{".split("")))();""" diff --git a/TikTokApi/browser_utilities/stealth.py b/TikTokApi/browser_utilities/stealth.py deleted file mode 100644 index a9dc9e89..00000000 --- a/TikTokApi/browser_utilities/stealth.py +++ /dev/null @@ -1,502 +0,0 @@ -import re - - -def chrome_runtime(page) -> None: - page.evaluateOnNewDocument( - """ -() => { - window.chrome = { - runtime: {} - } -} -""" - ) - - -def console_debug(page) -> None: - page.evaluateOnNewDocument( - """ -() => { - window.console.debug = () => { - return null - } -} -""" - ) - - -def iframe_content_window(page) -> None: - page.evaluateOnNewDocument( - """ -() => { - try { - // Adds a contentWindow proxy to the provided iframe element - const addContentWindowProxy = iframe => { - const contentWindowProxy = { - get(target, key) { - // Now to the interesting part: - // We actually make this thing behave like a regular iframe window, - // by intercepting calls to e.g. `.self` and redirect it to the correct thing. :) - // That makes it possible for these assertions to be correct: - // iframe.contentWindow.self === window.top // must be false - if (key === 'self') { - return this - } - // iframe.contentWindow.frameElement === iframe // must be true - if (key === 'frameElement') { - return iframe - } - return Reflect.get(target, key) - } - } - if (!iframe.contentWindow) { - const proxy = new Proxy(window, contentWindowProxy) - Object.defineProperty(iframe, 'contentWindow', { - get() { - return proxy - }, - set(newValue) { - return newValue // contentWindow is immutable - }, - enumerable: true, - configurable: false - }) - } - } - // Handles iframe element creation, augments `srcdoc` property so we can intercept further - const handleIframeCreation = (target, thisArg, args) => { - const iframe = target.apply(thisArg, args) - // We need to keep the originals around - const _iframe = iframe - const _srcdoc = _iframe.srcdoc - // Add hook for the srcdoc property - // We need to be very surgical here to not break other iframes by accident - Object.defineProperty(iframe, 'srcdoc', { - configurable: true, // Important, so we can reset this later - get: function() { - return _iframe.srcdoc - }, - set: function(newValue) { - addContentWindowProxy(this) - // Reset property, the hook is only needed once - Object.defineProperty(iframe, 'srcdoc', { - configurable: false, - writable: false, - value: _srcdoc - }) - _iframe.srcdoc = newValue - } - }) - return iframe - } - // Adds a hook to intercept iframe creation events - const addIframeCreationSniffer = () => { - /* global document */ - const createElement = { - // Make toString() native - get(target, key) { - return Reflect.get(target, key) - }, - apply: function(target, thisArg, args) { - const isIframe = - args && args.length && `${args[0]}`.toLowerCase() === 'iframe' - if (!isIframe) { - // Everything as usual - return target.apply(thisArg, args) - } else { - return handleIframeCreation(target, thisArg, args) - } - } - } - // All this just due to iframes with srcdoc bug - document.createElement = new Proxy( - document.createElement, - createElement - ) - } - // Let's go - addIframeCreationSniffer() - } catch (err) { - // console.warn(err) - } -} -""" - ) - - -def media_codecs(page) -> None: - page.evaluateOnNewDocument( - """ - () => { - try { - /** - * Input might look funky, we need to normalize it so e.g. whitespace isn't an issue for our spoofing. - * - * @example - * video/webm; codecs="vp8, vorbis" - * video/mp4; codecs="avc1.42E01E" - * audio/x-m4a; - * audio/ogg; codecs="vorbis" - * @param {String} arg - */ - const parseInput = arg => { - const [mime, codecStr] = arg.trim().split(';') - let codecs = [] - if (codecStr && codecStr.includes('codecs="')) { - codecs = codecStr - .trim() - .replace(`codecs="`, '') - .replace(`"`, '') - .trim() - .split(',') - .filter(x => !!x) - .map(x => x.trim()) - } - return { mime, codecStr, codecs } - } - /* global HTMLMediaElement */ - const canPlayType = { - // Make toString() native - get(target, key) { - // Mitigate Chromium bug (#130) - if (typeof target[key] === 'function') { - return target[key].bind(target) - } - return Reflect.get(target, key) - }, - // Intercept certain requests - apply: function(target, ctx, args) { - if (!args || !args.length) { - return target.apply(ctx, args) - } - const { mime, codecs } = parseInput(args[0]) - // This specific mp4 codec is missing in Chromium - if (mime === 'video/mp4') { - if (codecs.includes('avc1.42E01E')) { - return 'probably' - } - } - // This mimetype is only supported if no codecs are specified - if (mime === 'audio/x-m4a' && !codecs.length) { - return 'maybe' - } - // This mimetype is only supported if no codecs are specified - if (mime === 'audio/aac' && !codecs.length) { - return 'probably' - } - // Everything else as usual - return target.apply(ctx, args) - } - } - HTMLMediaElement.prototype.canPlayType = new Proxy( - HTMLMediaElement.prototype.canPlayType, - canPlayType - ) - } catch (err) {} -} -""" - ) - - -def navigator_languages(page) -> None: - page.evaluateOnNewDocument( - """ -() => { - Object.defineProperty(navigator, 'languages', { - get: () => ['en-US', 'en'] - }) -} - """ - ) - - -def navigator_permissions(page) -> None: - page.evaluateOnNewDocument( - """ -() => { - const originalQuery = window.navigator.permissions.query - window.navigator.permissions.__proto__.query = parameters => - parameters.name === 'notifications' - ? Promise.resolve({ state: Notification.permission }) - : originalQuery(parameters) - const oldCall = Function.prototype.call - function call () { - return oldCall.apply(this, arguments) - } - Function.prototype.call = call - const nativeToStringFunctionString = Error.toString().replace( - /Error/g, - 'toString' - ) - const oldToString = Function.prototype.toString - function functionToString () { - if (this === window.navigator.permissions.query) { - return 'function query() { [native code] }' - } - if (this === functionToString) { - return nativeToStringFunctionString - } - return oldCall.call(oldToString, this) - } - Function.prototype.toString = functionToString -} - """ - ) - - -def navigator_plugins(page) -> None: - page.evaluateOnNewDocument( - """ -() => { - function mockPluginsAndMimeTypes() { - const makeFnsNative = (fns = []) => { - const oldCall = Function.prototype.call - function call() { - return oldCall.apply(this, arguments) - } - Function.prototype.call = call - const nativeToStringFunctionString = Error.toString().replace( - /Error/g, - 'toString' - ) - const oldToString = Function.prototype.toString - function functionToString() { - for (const fn of fns) { - if (this === fn.ref) { - return `function ${fn.name}() { [native code] }` - } - } - if (this === functionToString) { - return nativeToStringFunctionString - } - return oldCall.call(oldToString, this) - } - Function.prototype.toString = functionToString - } - const mockedFns = [] - const fakeData = { - mimeTypes: [ - { - type: 'application/pdf', - suffixes: 'pdf', - description: '', - __pluginName: 'Chrome PDF Viewer' - }, - { - type: 'application/x-google-chrome-pdf', - suffixes: 'pdf', - description: 'Portable Document Format', - __pluginName: 'Chrome PDF Plugin' - }, - { - type: 'application/x-nacl', - suffixes: '', - description: 'Native Client Executable', - enabledPlugin: Plugin, - __pluginName: 'Native Client' - }, - { - type: 'application/x-pnacl', - suffixes: '', - description: 'Portable Native Client Executable', - __pluginName: 'Native Client' - } - ], - plugins: [ - { - name: 'Chrome PDF Plugin', - filename: 'internal-pdf-viewer', - description: 'Portable Document Format' - }, - { - name: 'Chrome PDF Viewer', - filename: 'mhjfbmdgcfjbbpaeojofohoefgiehjai', - description: '' - }, - { - name: 'Native Client', - filename: 'internal-nacl-plugin', - description: '' - } - ], - fns: { - namedItem: instanceName => { - const fn = function (name) { - if (!arguments.length) { - throw new TypeError( - `Failed to execute 'namedItem' on '${instanceName}': 1 argument required, but only 0 present.` - ) - } - return this[name] || null - } - mockedFns.push({ ref: fn, name: 'namedItem' }) - return fn - }, - item: instanceName => { - const fn = function (index) { - if (!arguments.length) { - throw new TypeError( - `Failed to execute 'namedItem' on '${instanceName}': 1 argument required, but only 0 present.` - ) - } - return this[index] || null - } - mockedFns.push({ ref: fn, name: 'item' }) - return fn - }, - refresh: instanceName => { - const fn = function () { - return undefined - } - mockedFns.push({ ref: fn, name: 'refresh' }) - return fn - } - } - } - const getSubset = (keys, obj) => - keys.reduce((a, c) => ({ ...a, [c]: obj[c] }), {}) - function generateMimeTypeArray() { - const arr = fakeData.mimeTypes - .map(obj => getSubset(['type', 'suffixes', 'description'], obj)) - .map(obj => Object.setPrototypeOf(obj, MimeType.prototype)) - arr.forEach(obj => { - arr[obj.type] = obj - }) - arr.namedItem = fakeData.fns.namedItem('MimeTypeArray') - arr.item = fakeData.fns.item('MimeTypeArray') - return Object.setPrototypeOf(arr, MimeTypeArray.prototype) - } - const mimeTypeArray = generateMimeTypeArray() - Object.defineProperty(navigator, 'mimeTypes', { - get: () => mimeTypeArray - }) - function generatePluginArray() { - const arr = fakeData.plugins - .map(obj => getSubset(['name', 'filename', 'description'], obj)) - .map(obj => { - const mimes = fakeData.mimeTypes.filter( - m => m.__pluginName === obj.name - ) - mimes.forEach((mime, index) => { - navigator.mimeTypes[mime.type].enabledPlugin = obj - obj[mime.type] = navigator.mimeTypes[mime.type] - obj[index] = navigator.mimeTypes[mime.type] - }) - obj.length = mimes.length - return obj - }) - .map(obj => { - obj.namedItem = fakeData.fns.namedItem('Plugin') - obj.item = fakeData.fns.item('Plugin') - return obj - }) - .map(obj => Object.setPrototypeOf(obj, Plugin.prototype)) - arr.forEach(obj => { - arr[obj.name] = obj - }) - arr.namedItem = fakeData.fns.namedItem('PluginArray') - arr.item = fakeData.fns.item('PluginArray') - arr.refresh = fakeData.fns.refresh('PluginArray') - return Object.setPrototypeOf(arr, PluginArray.prototype) - } - const pluginArray = generatePluginArray() - Object.defineProperty(navigator, 'plugins', { - get: () => pluginArray - }) - makeFnsNative(mockedFns) - } - try { - const isPluginArray = navigator.plugins instanceof PluginArray - const hasPlugins = isPluginArray && navigator.plugins.length > 0 - if (isPluginArray && hasPlugins) { - return - } - mockPluginsAndMimeTypes() - } catch (err) { } -} -""" - ) - - -def navigator_webdriver(page) -> None: - page.evaluateOnNewDocument( - """ -() => { - Object.defineProperty(window, 'navigator', { - value: new Proxy(navigator, { - has: (target, key) => (key === 'webdriver' ? false : key in target), - get: (target, key) => - key === 'webdriver' - ? undefined - : typeof target[key] === 'function' - ? target[key].bind(target) - : target[key] - }) - }) -} - """ - ) - - -def user_agent(page) -> None: - return - ua = page.browser.userAgent() - ua = ua.replace("HeadlessChrome", "Chrome") # hide headless nature - ua = re.sub( - r"\(([^)]+)\)", "(Windows NT 10.0; Win64; x64)", ua, 1 - ) # ensure windows - - page.setUserAgent(ua) - - -def webgl_vendor(page) -> None: - page.evaluateOnNewDocument( - """ -() => { - try { - const getParameter = WebGLRenderingContext.prototype.getParameter - WebGLRenderingContext.prototype.getParameter = function (parameter) { - if (parameter === 37445) { - return 'Intel Inc.' - } - if (parameter === 37446) { - return 'Intel Iris OpenGL Engine' - } - return getParameter.apply(this, [parameter]) - } - } catch (err) {} -} -""" - ) - - -def window_outerdimensions(page) -> None: - page.evaluateOnNewDocument( - """ -() => { - try { - if (window.outerWidth && window.outerHeight) { - return - } - const windowFrame = 85 - window.outerWidth = window.innerWidth - window.outerHeight = window.innerHeight + windowFrame - } catch (err) { } -} -""" - ) - - -def stealth(page) -> None: - # chrome_runtime(page) - console_debug(page) - iframe_content_window(page) - # navigator_languages(page) - navigator_permissions(page) - navigator_plugins(page) - navigator_webdriver(page) - # navigator_vendor(page) - user_agent(page) - webgl_vendor(page) - window_outerdimensions(page) - media_codecs(page) diff --git a/TikTokApi/exceptions.py b/TikTokApi/exceptions.py index 8c7c6460..8f0d79bb 100644 --- a/TikTokApi/exceptions.py +++ b/TikTokApi/exceptions.py @@ -1,7 +1,7 @@ class TikTokCaptchaError(Exception): def __init__( self, - message="TikTok blocks this request displaying a Captcha \nTip: Consider using a proxy or a custom_verifyFp as method parameters", + message="TikTok blocks this request displaying a Captcha \nTip: Consider using a proxy or a custom_verify_fp as method parameters", ): self.message = message super().__init__(self.message) diff --git a/TikTokApi/helpers.py b/TikTokApi/helpers.py new file mode 100644 index 00000000..048c40fe --- /dev/null +++ b/TikTokApi/helpers.py @@ -0,0 +1,41 @@ +from TikTokApi.browser_utilities.browser import browser +from urllib.parse import quote, urlencode +from .exceptions import * + +import re +import requests + + +def extract_tag_contents(html): + next_json = re.search( + r"id=\"__NEXT_DATA__\"\s+type=\"application\/json\"\s*[^>]+>\s*(?P[^<]+)", + html, + ) + if next_json: + nonce_start = '' + nonce = html.split(nonce_start)[1].split(nonce_end)[0] + j_raw = html.split( + '")[0] + return j_raw + else: + sigi_json = re.search( + r'>\s*window\[[\'"]SIGI_STATE[\'"]\]\s*=\s*(?P{.+});', html + ) + if sigi_json: + return sigi_json.group(1) + else: + raise TikTokCaptchaError() + + +def extract_video_id_from_url(url): + url = requests.head(url=url, allow_redirects=True).url + if "@" in url and "/video/" in url: + return url.split("/video/")[1].split("?")[0] + else: + raise TypeError( + "URL format not supported. Below is an example of a supported url.\n" + "https://www.tiktok.com/@therock/video/6829267836783971589" + ) diff --git a/TikTokApi/tiktok.py b/TikTokApi/tiktok.py index 100ce1ce..3b3460f2 100644 --- a/TikTokApi/tiktok.py +++ b/TikTokApi/tiktok.py @@ -4,91 +4,56 @@ import random import string import time +from typing import ClassVar, Optional +from urllib import request from urllib.parse import quote, urlencode import requests +from .api.sound import Sound +from .api.user import User +from .api.search import Search +from .api.hashtag import Hashtag +from .api.video import Video +from .api.trending import Trending + from playwright.sync_api import sync_playwright from .exceptions import * -from .utilities import update_messager +from .utilities import LOGGER_NAME, update_messager +from .browser_utilities.browser import browser +from dataclasses import dataclass os.environ["no_proxy"] = "127.0.0.1,localhost" BASE_URL = "https://m.tiktok.com/" +DESKTOP_BASE_URL = "https://www.tiktok.com/" class TikTokApi: - __instance = None - - def __init__(self, **kwargs): - """The TikTokApi class. Used to interact with TikTok, use get_instance NOT this.""" - # Forces Singleton - if TikTokApi.__instance is None: - TikTokApi.__instance = self - else: - raise Exception("Only one TikTokApi object is allowed") - logging.basicConfig(level=kwargs.get("logging_level", logging.WARNING)) - logging.info("Class initalized") - - # Some Instance Vars - self.executablePath = kwargs.get("executablePath", None) + _instance = None + logger: ClassVar[logging.Logger] = logging.getLogger(LOGGER_NAME) - if kwargs.get("custom_did") != None: - raise Exception("Please use custom_device_id instead of custom_device_id") - self.custom_device_id = kwargs.get("custom_device_id", None) - self.userAgent = ( - "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " - "AppleWebKit/537.36 (KHTML, like Gecko) " - "Chrome/86.0.4240.111 Safari/537.36" - ) - self.proxy = kwargs.get("proxy", None) - self.custom_verifyFp = kwargs.get("custom_verifyFp") - self.signer_url = kwargs.get("external_signer", None) - self.request_delay = kwargs.get("request_delay", None) - self.requests_extra_kwargs = kwargs.get("requests_extra_kwargs", {}) - - if kwargs.get("use_test_endpoints", False): - global BASE_URL - BASE_URL = "https://t.tiktok.com/" - if kwargs.get("use_selenium", False): - from .browser_utilities.browser_selenium import browser - else: - from .browser_utilities.browser import browser - - if kwargs.get("generate_static_device_id", False): - self.custom_device_id = "".join( - random.choice(string.digits) for num in range(19) - ) - - if self.signer_url is None: - self.browser = browser(**kwargs) - self.userAgent = self.browser.userAgent - - try: - self.timezone_name = self.__format_new_params__(self.browser.timezone_name) - self.browser_language = self.__format_new_params__( - self.browser.browser_language - ) - self.width = self.browser.width - self.height = self.browser.height - self.region = self.browser.region - self.language = self.browser.language - except Exception as e: - logging.exception(e) - logging.warning( - "An error ocurred while opening your browser but it was ignored." - ) - logging.warning("Are you sure you ran python -m playwright install") - - self.timezone_name = "" - self.browser_language = "" - self.width = "1920" - self.height = "1080" - self.region = "US" - self.language = "en" + user = User + search = Search + sound = Sound + hashtag = Hashtag + video = Video + trending = Trending @staticmethod - def get_instance(**kwargs): + def __new__( + cls, + logging_level=logging.WARNING, + request_delay: Optional[int] = None, + custom_device_id: Optional[str] = None, + generate_static_device_id: Optional[bool] = False, + custom_verify_fp: Optional[str] = None, + use_test_endpoints: Optional[bool] = False, + proxy: Optional[str] = None, + executable_path: Optional[str] = None, + *args, + **kwargs, + ): """The TikTokApi class. Used to interact with TikTok. This is a singleton class to prevent issues from arising with playwright @@ -111,7 +76,7 @@ class to prevent issues from arising with playwright Use this if you want to download videos from a script but don't want to generate your own custom_device_id parameter. - * custom_verifyFp: A TikTok parameter needed to work most of the time, optional + * custom_verify_fp: A TikTok parameter needed to work most of the time, optional To get this parameter look at [this video](https://youtu.be/zwLmLfVI-VQ?t=117) I recommend watching the entire thing, as it will help setup this package. All the methods take this as a optional parameter, however it's cleaner code @@ -124,7 +89,7 @@ class to prevent issues from arising with playwright * use_test_endpoints: Send requests to TikTok's test endpoints, optional This parameter when set to true will make requests to TikTok's testing endpoints instead of the live site. I can't guarantee this will work - in the future, however currently basically any custom_verifyFp will + in the future, however currently basically any custom_verify_fp will work here which is helpful. * proxy: A string containing your proxy address, optional @@ -137,12 +102,7 @@ class to prevent issues from arising with playwright to store this at the instance level. You can override this at the specific methods. - * use_selenium: Option to use selenium over playwright, optional - Playwright is selected by default and is the one that I'm designing the - package to be compatable for, however if playwright doesn't work on - your machine feel free to set this to True. - - * executablePath: The location of the driver, optional + * executable_path: The location of the driver, optional This shouldn't be needed if you're using playwright * **kwargs @@ -150,125 +110,145 @@ class to prevent issues from arising with playwright that interact with this main class. These may or may not be documented in other places. """ - if not TikTokApi.__instance: - TikTokApi(**kwargs) - return TikTokApi.__instance - def clean_up(self): - """A basic cleanup method, called automatically from the code""" - self.__del__() + if cls._instance is None: + cls._instance = super(TikTokApi, cls).__new__(cls) + cls._instance._initialize( + logging_level=logging_level, + request_delay=request_delay, + custom_device_id=custom_device_id, + generate_static_device_id=generate_static_device_id, + custom_verify_fp=custom_verify_fp, + use_test_endpoints=use_test_endpoints, + proxy=proxy, + executable_path=executable_path, + *args, + **kwargs, + ) + return cls._instance - def __del__(self): - """A basic cleanup method, called automatically from the code""" - try: - self.browser.clean_up() - except Exception: - pass - try: - get_playwright().stop() - except Exception: - pass - TikTokApi.__instance = None + def _initialize(self, logging_level=logging.WARNING, **kwargs): + # Add classes from the api folder + User.parent = self + Search.parent = self + Sound.parent = self + Hashtag.parent = self + Video.parent = self + Trending.parent = self - def external_signer(self, url, custom_device_id=None, verifyFp=None): - """Makes requests to an external signer instead of using a browser. + self.logger.setLevel(level=logging_level) - ##### Parameters - * url: The server to make requests to - This server is designed to sign requests. You can find an example - of this signature server in the examples folder. + # Some Instance Vars + self._executable_path = kwargs.get("executable_path", None) - * custom_device_id: A TikTok parameter needed to download videos - The code generates these and handles these pretty well itself, however - for some things such as video download you will need to set a consistent - one of these. + if kwargs.get("custom_did") != None: + raise Exception("Please use 'custom_device_id' instead of 'custom_did'") + self._custom_device_id = kwargs.get("custom_device_id", None) + self._user_agent = "5.0+(iPhone%3B+CPU+iPhone+OS+14_8+like+Mac+OS+X)+AppleWebKit%2F605.1.15+(KHTML,+like+Gecko)+Version%2F14.1.2+Mobile%2F15E148+Safari%2F604.1" + self._proxy = kwargs.get("proxy", None) + self._custom_verify_fp = kwargs.get("custom_verify_fp") + self._signer_url = kwargs.get("external_signer", None) + self._request_delay = kwargs.get("request_delay", None) + self._requests_extra_kwargs = kwargs.get("requests_extra_kwargs", {}) - * custom_verifyFp: A TikTok parameter needed to work most of the time, - To get this parameter look at [this video](https://youtu.be/zwLmLfVI-VQ?t=117) - I recommend watching the entire thing, as it will help setup this package. - """ - if custom_device_id is not None: - query = { - "url": url, - "custom_device_id": custom_device_id, - "verifyFp": verifyFp, - } - else: - query = {"url": url, "verifyFp": verifyFp} - data = requests.get( - self.signer_url + "?{}".format(urlencode(query)), - **self.requests_extra_kwargs, - ) - parsed_data = data.json() + if kwargs.get("use_test_endpoints", False): + global BASE_URL + BASE_URL = "https://t.tiktok.com/" - return ( - parsed_data["verifyFp"], - parsed_data["device_id"], - parsed_data["_signature"], - parsed_data["userAgent"], - parsed_data["referrer"], - ) + if kwargs.get("generate_static_device_id", False): + self._custom_device_id = "".join( + random.choice(string.digits) for num in range(19) + ) + + if self._signer_url is None: + self._browser = browser(**kwargs) + self._user_agent = self._browser.user_agent + + try: + self._timezone_name = self._browser.timezone_name + self._browser_language = self._browser.browser_language + self._width = self._browser.width + self._height = self._browser.height + self._region = self._browser.region + self._language = self._browser.language + except Exception as e: + self.logger.exception( + "An error occurred while opening your browser, but it was ignored\n", + "Are you sure you ran python -m playwright install?", + ) + + self._timezone_name = "" + self._browser_language = "" + self._width = "1920" + self._height = "1080" + self._region = "US" + self._language = "en" - def get_data(self, **kwargs) -> dict: + def get_data(self, path, subdomain="m", **kwargs) -> dict: """Makes requests to TikTok and returns their JSON. This is all handled by the package so it's unlikely you will need to use this. """ - ( - region, - language, - proxy, - maxCount, - device_id, - ) = self.__process_kwargs__(kwargs) - kwargs["custom_device_id"] = device_id - if self.request_delay is not None: - time.sleep(self.request_delay) - - if self.proxy is not None: - proxy = self.proxy - - if kwargs.get("custom_verifyFp") == None: - if self.custom_verifyFp != None: - verifyFp = self.custom_verifyFp + processed = self._process_kwargs(kwargs) + kwargs["custom_device_id"] = processed.device_id + if self._request_delay is not None: + time.sleep(self._request_delay) + + if self._proxy is not None: + proxy = self._proxy + + if kwargs.get("custom_verify_fp") == None: + if self._custom_verify_fp != None: + verifyFp = self._custom_verify_fp else: verifyFp = "verify_khr3jabg_V7ucdslq_Vrw9_4KPb_AJ1b_Ks706M8zIJTq" else: - verifyFp = kwargs.get("custom_verifyFp") + verifyFp = kwargs.get("custom_verify_fp") tt_params = None send_tt_params = kwargs.get("send_tt_params", False) - if self.signer_url is None: - kwargs["custom_verifyFp"] = verifyFp - verify_fp, device_id, signature, tt_params = self.browser.sign_url( - calc_tt_params=send_tt_params, **kwargs + full_url = f"https://{subdomain}.tiktok.com/" + path + + if self._signer_url is None: + kwargs["custom_verify_fp"] = verifyFp + verify_fp, device_id, signature, tt_params = self._browser.sign_url( + full_url, calc_tt_params=send_tt_params, **kwargs ) - userAgent = self.browser.userAgent - referrer = self.browser.referrer + user_agent = self._browser.user_agent + referrer = self._browser.referrer else: - verify_fp, device_id, signature, userAgent, referrer = self.external_signer( - kwargs["url"], + ( + verify_fp, + device_id, + signature, + user_agent, + referrer, + ) = self.external_signer( + full_url, custom_device_id=kwargs.get("custom_device_id"), - verifyFp=kwargs.get("custom_verifyFp", verifyFp), + verifyFp=kwargs.get("custom_verify_fp", verifyFp), ) if not kwargs.get("send_tt_params", False): tt_params = None query = {"verifyFp": verify_fp, "device_id": device_id, "_signature": signature} - url = "{}&{}".format(kwargs["url"], urlencode(query)) + url = "{}&{}".format(full_url, urlencode(query)) h = requests.head( url, headers={"x-secsdk-csrf-version": "1.2.5", "x-secsdk-csrf-request": "1"}, - proxies=self.__format_proxy(proxy), - **self.requests_extra_kwargs, + proxies=self._format_proxy(processed.proxy), + **self._requests_extra_kwargs, ) - csrf_session_id = h.cookies["csrf_session_id"] - csrf_token = h.headers["X-Ware-Csrf-Token"].split(",")[1] - kwargs["csrf_session_id"] = csrf_session_id + + csrf_token = None + if subdomain == "m": + csrf_session_id = h.cookies["csrf_session_id"] + csrf_token = h.headers["X-Ware-Csrf-Token"].split(",")[1] + kwargs["csrf_session_id"] = csrf_session_id headers = { "authority": "m.tiktok.com", @@ -276,37 +256,40 @@ def get_data(self, **kwargs) -> dict: "path": url.split("tiktok.com")[1], "scheme": "https", "accept": "application/json, text/plain, */*", - "accept-encoding": "gzip, deflate, br", + "accept-encoding": "gzip", "accept-language": "en-US,en;q=0.9", "origin": referrer, "referer": referrer, "sec-fetch-dest": "empty", "sec-fetch-mode": "cors", - "sec-fetch-site": "same-site", + "sec-fetch-site": "none", "sec-gpc": "1", - "user-agent": userAgent, + "user-agent": user_agent, "x-secsdk-csrf-token": csrf_token, "x-tt-params": tt_params, } - logging.info(f"GET: {url}\n\theaders: {headers}") + self.logger.info(f"GET: %s\n\theaders: %s", url, headers) r = requests.get( url, headers=headers, - cookies=self.get_cookies(**kwargs), - proxies=self.__format_proxy(proxy), - **self.requests_extra_kwargs, + cookies=self._get_cookies(**kwargs), + proxies=self._format_proxy(processed.proxy), + **self._requests_extra_kwargs, ) + try: - json = r.json() + parsed_data = r.json() if ( - json.get("type") == "verify" - or json.get("verifyConfig", {}).get("type", "") == "verify" + parsed_data.get("type") == "verify" + or parsed_data.get("verifyConfig", {}).get("type", "") == "verify" ): - logging.error( - "Tiktok wants to display a catcha. Response is:\n" + r.text + self.logger.error( + "Tiktok wants to display a captcha.\nResponse:\n%s\nCookies:\n%s\nURL:\n%s", + r.text, + self._get_cookies(**kwargs), + url, ) - logging.error(self.get_cookies(**kwargs)) raise TikTokCaptchaError() # statusCode from props->pageProps->statusCode thanks @adiantek on #403 @@ -350,8 +333,8 @@ def get_data(self, **kwargs) -> dict: "10404": "FYP_VIDEO_LIST_LIMIT", "undefined": "MEDIA_ERROR", } - statusCode = json.get("statusCode", 0) - logging.info(f"TikTok Returned: {json}") + statusCode = parsed_data.get("statusCode", 0) + self.logger.info(f"TikTok Returned: %s", json) if statusCode == 10201: # Invalid Entity raise TikTokNotFoundError( @@ -370,29 +353,79 @@ def get_data(self, **kwargs) -> dict: return r.json() except ValueError as e: text = r.text - logging.error("TikTok response: " + text) + self.logger.info("TikTok response: %s", text) if len(text) == 0: raise EmptyResponseError( "Empty response from Tiktok to " + url ) from None else: - logging.error("Converting response to JSON failed") - logging.error(e) + self.logger.exception("Converting response to JSON failed") raise JSONDecodeFailure() from e - def get_cookies(self, **kwargs): + def __del__(self): + """A basic cleanup method, called automatically from the code""" + try: + self._browser._clean_up() + except Exception: + pass + try: + get_playwright().stop() + except Exception: + pass + TikTokApi._instance = None + + def external_signer(self, url, custom_device_id=None, verifyFp=None): + """Makes requests to an external signer instead of using a browser. + + ##### Parameters + * url: The server to make requests to + This server is designed to sign requests. You can find an example + of this signature server in the examples folder. + + * custom_device_id: A TikTok parameter needed to download videos + The code generates these and handles these pretty well itself, however + for some things such as video download you will need to set a consistent + one of these. + + * custom_verify_fp: A TikTok parameter needed to work most of the time, + To get this parameter look at [this video](https://youtu.be/zwLmLfVI-VQ?t=117) + I recommend watching the entire thing, as it will help setup this package. + """ + if custom_device_id is not None: + query = { + "url": url, + "custom_device_id": custom_device_id, + "verifyFp": verifyFp, + } + else: + query = {"url": url, "verifyFp": verifyFp} + data = requests.get( + self._signer_url + "?{}".format(urlencode(query)), + **self._requests_extra_kwargs, + ) + parsed_data = data.json() + + return ( + parsed_data["verifyFp"], + parsed_data["device_id"], + parsed_data["_signature"], + parsed_data["user_agent"], + parsed_data["referrer"], + ) + + def _get_cookies(self, **kwargs): """Extracts cookies from the kwargs passed to the function for get_data""" device_id = kwargs.get( "custom_device_id", "".join(random.choice(string.digits) for num in range(19)), ) - if kwargs.get("custom_verifyFp") == None: - if self.custom_verifyFp != None: - verifyFp = self.custom_verifyFp + if kwargs.get("custom_verify_fp") is None: + if self._custom_verify_fp is not None: + verifyFp = self._custom_verify_fp else: - verifyFp = "verify_khr3jabg_V7ucdslq_Vrw9_4KPb_AJ1b_Ks706M8zIJTq" + verifyFp = None else: - verifyFp = kwargs.get("custom_verifyFp") + verifyFp = kwargs.get("custom_verify_fp") if kwargs.get("force_verify_fp_on_cookie_header", False): return { @@ -420,22 +453,22 @@ def get_cookies(self, **kwargs): def get_bytes(self, **kwargs) -> bytes: """Returns TikTok's response as bytes, similar to get_data""" - ( - region, - language, - proxy, - maxCount, - device_id, - ) = self.__process_kwargs__(kwargs) - kwargs["custom_device_id"] = device_id - if self.signer_url is None: - verify_fp, device_id, signature, _ = self.browser.sign_url( + processed = self._process_kwargs(kwargs) + kwargs["custom_device_id"] = processed.device_id + if self._signer_url is None: + verify_fp, device_id, signature, _ = self._browser.sign_url( calc_tt_params=False, **kwargs ) - userAgent = self.browser.userAgent - referrer = self.browser.referrer + user_agent = self._browser.user_agent + referrer = self._browser.referrer else: - verify_fp, device_id, signature, userAgent, referrer = self.external_signer( + ( + verify_fp, + device_id, + signature, + user_agent, + referrer, + ) = self.external_signer( kwargs["url"], custom_device_id=kwargs.get("custom_device_id", None) ) query = {"verifyFp": verify_fp, "_signature": signature} @@ -452,1043 +485,13 @@ def get_bytes(self, **kwargs) -> bytes: "Pragma": "no-cache", "Range": "bytes=0-", "Referer": "https://www.tiktok.com/", - "User-Agent": userAgent, + "User-Agent": user_agent, }, - proxies=self.__format_proxy(proxy), - cookies=self.get_cookies(**kwargs), + proxies=self._format_proxy(processed.proxy), + cookies=self._get_cookies(**kwargs), ) return r.content - def by_trending(self, count=30, **kwargs) -> dict: - """ - Gets trending TikToks - - ##### Parameters - * count: The amount of TikToks you want returned, optional - - Note: TikTok seems to only support at MOST ~2000 TikToks - from a single endpoint. - """ - ( - region, - language, - proxy, - maxCount, - device_id, - ) = self.__process_kwargs__(kwargs) - kwargs["custom_device_id"] = device_id - - spawn = requests.head( - "https://www.tiktok.com", - proxies=self.__format_proxy(proxy), - **self.requests_extra_kwargs, - ) - ttwid = spawn.cookies["ttwid"] - - response = [] - first = True - - while len(response) < count: - if count < maxCount: - realCount = count - else: - realCount = maxCount - - query = { - "count": realCount, - "id": 1, - "sourceType": 12, - "itemID": 1, - "insertedItemID": "", - "region": region, - "priority_region": region, - "language": language, - } - api_url = "{}api/recommend/item_list/?{}&{}".format( - BASE_URL, self.__add_url_params__(), urlencode(query) - ) - res = self.get_data(url=api_url, ttwid=ttwid, **kwargs) - for t in res.get("itemList", []): - response.append(t) - - if not res.get("hasMore", False) and not first: - logging.info("TikTok isn't sending more TikToks beyond this point.") - return response[:count] - - realCount = count - len(response) - - first = False - - return response[:count] - - def search_for_users(self, search_term, count=28, **kwargs) -> list: - """Returns a list of users that match the search_term - - ##### Parameters - * search_term: The string to search for users by - This string is the term you want to search for users by. - - * count: The number of users to return - Note: maximum is around 28 for this type of endpoint. - """ - return self.discover_type(search_term, prefix="user", count=count, **kwargs) - - def search_for_music(self, search_term, count=28, **kwargs) -> list: - """Returns a list of music that match the search_term - - ##### Parameters - * search_term: The string to search for music by - This string is the term you want to search for music by. - - * count: The number of music to return - Note: maximum is around 28 for this type of endpoint. - """ - return self.discover_type(search_term, prefix="music", count=count, **kwargs) - - def search_for_hashtags(self, search_term, count=28, **kwargs) -> list: - """Returns a list of hashtags that match the search_term - - ##### Parameters - * search_term: The string to search for music by - This string is the term you want to search for music by. - - * count: The number of music to return - Note: maximum is around 28 for this type of endpoint. - """ - return self.discover_type( - search_term, prefix="challenge", count=count, **kwargs - ) - - def discover_type(self, search_term, prefix, count=28, offset=0, **kwargs) -> list: - """Returns a list of whatever the prefix type you pass in - - ##### Parameters - * search_term: The string to search by - - * prefix: The prefix of what to search for - - * count: The number search results to return - Note: maximum is around 28 for this type of endpoint. - """ - ( - region, - language, - proxy, - maxCount, - device_id, - ) = self.__process_kwargs__(kwargs) - kwargs["custom_device_id"] = device_id - - response = [] - while len(response) < count: - query = { - "discoverType": 0, - "needItemList": False, - "keyWord": search_term, - "offset": offset, - "count": count, - "useRecommend": False, - "language": "en", - } - api_url = "{}api/discover/{}/?{}&{}".format( - BASE_URL, prefix, self.__add_url_params__(), urlencode(query) - ) - data = self.get_data(url=api_url, **kwargs) - - if "userInfoList" in data.keys(): - for x in data["userInfoList"]: - response.append(x) - elif "musicInfoList" in data.keys(): - for x in data["musicInfoList"]: - response.append(x) - elif "challengeInfoList" in data.keys(): - for x in data["challengeInfoList"]: - response.append(x) - else: - logging.info("TikTok is not sending videos beyond this point.") - break - - offset += maxCount - - return response[:count] - - def user_posts(self, userID, secUID, count=30, cursor=0, **kwargs) -> dict: - """Returns an array of dictionaries representing TikToks for a user. - - ##### Parameters - * userID: The userID of the user, which TikTok assigns - - You can find this from utilizing other methods or - just use by_username to find it. - * secUID: The secUID of the user, which TikTok assigns - - You can find this from utilizing other methods or - just use by_username to find it. - * count: The number of posts to return - - Note: seems to only support up to ~2,000 - """ - ( - region, - language, - proxy, - maxCount, - device_id, - ) = self.__process_kwargs__(kwargs) - kwargs["custom_device_id"] = device_id - - response = [] - first = True - - while len(response) < count: - if count < maxCount: - realCount = count - else: - realCount = maxCount - - query = { - "count": realCount, - "id": userID, - "cursor": cursor, - "type": 1, - "secUid": secUID, - "sourceType": 8, - "appId": 1233, - "region": region, - "priority_region": region, - "language": language, - } - api_url = "{}api/post/item_list/?{}&{}".format( - BASE_URL, self.__add_url_params__(), urlencode(query) - ) - - res = self.get_data(url=api_url, send_tt_params=True, **kwargs) - - if "itemList" in res.keys(): - for t in res.get("itemList", []): - response.append(t) - - if not res.get("hasMore", False) and not first: - logging.info("TikTok isn't sending more TikToks beyond this point.") - return response - - realCount = count - len(response) - cursor = res["cursor"] - - first = False - - return response[:count] - - def by_username(self, username, count=30, **kwargs) -> dict: - """Returns a dictionary listing TikToks given a user's username. - - ##### Parameters - * username: The username of the TikTok user to get TikToks from - - * count: The number of posts to return - - Note: seems to only support up to ~2,000 - """ - ( - region, - language, - proxy, - maxCount, - device_id, - ) = self.__process_kwargs__(kwargs) - kwargs["custom_device_id"] = device_id - data = self.get_user_object(username, **kwargs) - return self.user_posts( - data["id"], - data["secUid"], - count=count, - **kwargs, - ) - - def user_page(self, userID, secUID, page_size=30, cursor=0, **kwargs) -> dict: - """Returns a dictionary listing of one page of TikToks given a user's ID and secUID - - ##### Parameters - * userID: The userID of the user, which TikTok assigns - - You can find this from utilizing other methods or - just use by_username to find it. - * secUID: The secUID of the user, which TikTok assigns - - You can find this from utilizing other methods or - just use by_username to find it. - * page_size: The number of posts to return per page - - Gets a specific page of a user, doesn't iterate. - * cursor: The offset of a page - - The offset to return new videos from - """ - ( - region, - language, - proxy, - maxCount, - device_id, - ) = self.__process_kwargs__(kwargs) - kwargs["custom_device_id"] = device_id - - api_url = ( - BASE_URL + "api/post/item_list/?{}&count={}&id={}&type=1&secUid={}" - "&cursor={}&sourceType=8&appId=1233®ion={}&language={}".format( - self.__add_url_params__(), - page_size, - str(userID), - str(secUID), - cursor, - region, - language, - ) - ) - - return self.get_data(url=api_url, send_tt_params=True, **kwargs) - - def get_user_pager(self, username, page_size=30, cursor=0, **kwargs): - """Returns a generator to page through a user's feed - - ##### Parameters - * username: The username of the user - - * page_size: The number of posts to return in a page - - * cursor: The offset of a page - - The offset to return new videos from - """ - ( - region, - language, - proxy, - maxCount, - device_id, - ) = self.__process_kwargs__(kwargs) - kwargs["custom_device_id"] = device_id - data = self.get_user_object(username, **kwargs) - - while True: - resp = self.user_page( - data["id"], - data["secUid"], - page_size=page_size, - cursor=cursor, - **kwargs, - ) - - try: - page = resp["itemList"] - except KeyError: - # No mo results - return - - cursor = resp["cursor"] - - yield page - - if not resp["hasMore"]: - return # all done - - def user_liked(self, userID, secUID, count=30, cursor=0, **kwargs) -> dict: - """Returns a dictionary listing TikToks that a given a user has liked. - Note: The user's likes must be public - - ##### Parameters - * userID: The userID of the user, which TikTok assigns - - * secUID: The secUID of the user, which TikTok assigns - - * count: The number of posts to return - - Note: seems to only support up to ~2,000 - * cursor: The offset of a page - - The offset to return new videos from - """ - ( - region, - language, - proxy, - maxCount, - device_id, - ) = self.__process_kwargs__(kwargs) - kwargs["custom_device_id"] = device_id - response = [] - first = True - - while len(response) < count: - if count < maxCount: - realCount = count - else: - realCount = maxCount - - query = { - "count": realCount, - "id": userID, - "type": 2, - "secUid": secUID, - "cursor": cursor, - "sourceType": 9, - "appId": 1233, - "region": region, - "priority_region": region, - "language": language, - } - api_url = "{}api/favorite/item_list/?{}&{}".format( - BASE_URL, self.__add_url_params__(), urlencode(query) - ) - - res = self.get_data(url=api_url, **kwargs) - - try: - res["itemList"] - except Exception: - logging.error("User's likes are most likely private") - return [] - - for t in res.get("itemList", []): - response.append(t) - - if not res.get("hasMore", False) and not first: - logging.info("TikTok isn't sending more TikToks beyond this point.") - return response - - realCount = count - len(response) - cursor = res["cursor"] - - first = False - - return response[:count] - - def user_liked_by_username(self, username, count=30, **kwargs) -> dict: - """Returns a dictionary listing TikToks a user has liked by username. - Note: The user's likes must be public - - ##### Parameters - * username: The username of the user - - * count: The number of posts to return - - Note: seems to only support up to ~2,000 - """ - ( - region, - language, - proxy, - maxCount, - device_id, - ) = self.__process_kwargs__(kwargs) - kwargs["custom_device_id"] = device_id - data = self.get_user_object(username, **kwargs) - return self.user_liked( - data["id"], - data["secUid"], - count=count, - **kwargs, - ) - - def by_sound(self, id, count=30, offset=0, **kwargs) -> dict: - """Returns a dictionary listing TikToks with a specific sound. - - ##### Parameters - * id: The sound id to search by - - Note: Can be found in the URL of the sound specific page or with other methods. - * count: The number of posts to return - - Note: seems to only support up to ~2,000 - """ - ( - region, - language, - proxy, - maxCount, - device_id, - ) = self.__process_kwargs__(kwargs) - kwargs["custom_device_id"] = device_id - response = [] - - while len(response) < count: - if count < maxCount: - realCount = count - else: - realCount = maxCount - - query = { - "secUid": "", - "musicID": str(id), - "count": str(realCount), - "cursor": offset, - "shareUid": "", - "language": language, - } - api_url = "{}api/music/item_list/?{}&{}".format( - BASE_URL, self.__add_url_params__(), urlencode(query) - ) - - res = self.get_data(url=api_url, send_tt_params=True, **kwargs) - - try: - for t in res["items"]: - response.append(t) - except KeyError: - for t in res.get("itemList", []): - response.append(t) - - if not res.get("hasMore", False): - logging.info("TikTok isn't sending more TikToks beyond this point.") - return response - - realCount = count - len(response) - offset = res["cursor"] - - return response[:count] - - def by_sound_page(self, id, page_size=30, cursor=0, **kwargs) -> dict: - """Returns a page of tiktoks with a specific sound. - - Parameters - ---------- - id: The sound id to search by - Note: Can be found in the URL of the sound specific page or with other methods. - cursor: offset for pagination - page_size: The number of posts to return - """ - ( - region, - language, - proxy, - maxCount, - device_id, - ) = self.__process_kwargs__(kwargs) - kwargs["custom_device_id"] = device_id - - query = { - "musicID": str(id), - "count": str(page_size), - "cursor": cursor, - "language": language, - } - api_url = "{}api/music/item_list/?{}&{}".format( - BASE_URL, self.__add_url_params__(), urlencode(query) - ) - - return self.get_data(url=api_url, send_tt_params=True, **kwargs) - - def get_music_object(self, id, **kwargs) -> dict: - """Returns a music object for a specific sound id. - - ##### Parameters - * id: The sound id to get the object for - - This can be found by using other methods. - """ - return self.get_music_object_full(id, **kwargs)["music"] - - def get_music_object_full(self, id, **kwargs): - """Returns a music object for a specific sound id. - - ##### Parameters - * id: The sound id to get the object for - - This can be found by using other methods. - """ - ( - region, - language, - proxy, - maxCount, - device_id, - ) = self.__process_kwargs__(kwargs) - r = requests.get( - "https://www.tiktok.com/music/-{}".format(id), - headers={ - "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9", - "Accept-Encoding": "gzip, deflate", - "Connection": "keep-alive", - "User-Agent": self.userAgent, - }, - proxies=self.__format_proxy(kwargs.get("proxy", None)), - cookies=self.get_cookies(**kwargs), - **self.requests_extra_kwargs, - ) - - j_raw = self.__extract_tag_contents(r.text) - return json.loads(j_raw)["props"]["pageProps"]["musicInfo"] - - def get_music_object_full_by_api(self, id, **kwargs): - """Returns a music object for a specific sound id, but using the API rather than HTML requests. - - ##### Parameters - * id: The sound id to get the object for - - This can be found by using other methods. - """ - ( - region, - language, - proxy, - maxCount, - device_id, - ) = self.__process_kwargs__(kwargs) - kwargs["custom_device_id"] = device_id - - api_url = "{}node/share/music/-{}?{}".format( - BASE_URL, id, self.__add_url_params__() - ) - res = self.get_data(url=api_url, **kwargs) - - if res.get("statusCode", 200) == 10203: - raise TikTokNotFoundError() - - return res["musicInfo"] - - def by_hashtag(self, hashtag, count=30, offset=0, **kwargs) -> dict: - """Returns a dictionary listing TikToks with a specific hashtag. - - ##### Parameters - * hashtag: The hashtag to search by - - Without the # symbol - - A valid string is "funny" - * count: The number of posts to return - Note: seems to only support up to ~2,000 - """ - ( - region, - language, - proxy, - maxCount, - device_id, - ) = self.__process_kwargs__(kwargs) - kwargs["custom_device_id"] = device_id - id = self.get_hashtag_object(hashtag)["challengeInfo"]["challenge"]["id"] - response = [] - - required_count = count - - while len(response) < required_count: - if count > maxCount: - count = maxCount - query = { - "count": count, - "challengeID": id, - "type": 3, - "secUid": "", - "cursor": offset, - "priority_region": "", - } - api_url = "{}api/challenge/item_list/?{}&{}".format( - BASE_URL, self.__add_url_params__(), urlencode(query) - ) - res = self.get_data(url=api_url, **kwargs) - - for t in res.get("itemList", []): - response.append(t) - - if not res.get("hasMore", False): - logging.info("TikTok isn't sending more TikToks beyond this point.") - return response - - offset += maxCount - - return response[:required_count] - - def get_hashtag_object(self, hashtag, **kwargs) -> dict: - """Returns a hashtag object. - - ##### Parameters - * hashtag: The hashtag to search by - - Without the # symbol - """ - ( - region, - language, - proxy, - maxCount, - device_id, - ) = self.__process_kwargs__(kwargs) - kwargs["custom_device_id"] = device_id - query = {"name": hashtag, "isName": True, "lang": language} - api_url = "{}node/share/tag/{}?{}&{}".format( - BASE_URL, quote(hashtag), self.__add_url_params__(), urlencode(query) - ) - data = self.get_data(url=api_url, **kwargs) - if data["challengeInfo"].get("challenge") is None: - raise TikTokNotFoundError("Challenge {} does not exist".format(hashtag)) - return data - - def get_recommended_tiktoks_by_video_id(self, id, count=30, **kwargs) -> dict: - """Returns a dictionary listing reccomended TikToks for a specific TikTok video. - - - ##### Parameters - * id: The id of the video to get suggestions for - - Can be found using other methods - * count: The count of results you want to return - """ - ( - region, - language, - proxy, - maxCount, - device_id, - ) = self.__process_kwargs__(kwargs) - kwargs["custom_device_id"] = device_id - - response = [] - first = True - - while len(response) < count: - if count < maxCount: - realCount = count - else: - realCount = maxCount - - query = { - "count": realCount, - "id": 1, - "secUid": "", - "sourceType": 12, - "appId": 1233, - "region": region, - "priority_region": region, - "language": language, - } - api_url = "{}api/recommend/item_list/?{}&{}".format( - BASE_URL, self.__add_url_params__(), urlencode(query) - ) - - res = self.get_data(url=api_url, **kwargs) - - for t in res.get("itemList", []): - response.append(t) - - if not res.get("hasMore", False) and not first: - logging.info("TikTok isn't sending more TikToks beyond this point.") - return response[:count] - - realCount = count - len(response) - - first = False - - return response[:count] - - def get_tiktok_by_id(self, id, **kwargs) -> dict: - """Returns a dictionary of a specific TikTok. - - ##### Parameters - * id: The id of the TikTok you want to get the object for - """ - ( - region, - language, - proxy, - maxCount, - device_id, - ) = self.__process_kwargs__(kwargs) - kwargs["custom_device_id"] = device_id - device_id = kwargs.get("custom_device_id", None) - query = { - "itemId": id, - "language": language, - } - api_url = "{}api/item/detail/?{}&{}".format( - BASE_URL, self.__add_url_params__(), urlencode(query) - ) - - return self.get_data(url=api_url, **kwargs) - - def get_tiktok_by_url(self, url, **kwargs) -> dict: - """Returns a dictionary of a TikTok object by url. - - - ##### Parameters - * url: The TikTok url you want to retrieve - - """ - - url = requests.head(url=url, allow_redirects=True).url - - ( - region, - language, - proxy, - maxCount, - device_id, - ) = self.__process_kwargs__(kwargs) - kwargs["custom_device_id"] = device_id - custom_device_id = kwargs.get("custom_device_id", None) - if "@" in url and "/video/" in url: - post_id = url.split("/video/")[1].split("?")[0] - else: - raise Exception( - "URL format not supported. Below is an example of a supported url.\n" - "https://www.tiktok.com/@therock/video/6829267836783971589" - ) - - return self.get_tiktok_by_id( - post_id, - **kwargs, - ) - - def get_tiktok_by_html(self, url, **kwargs) -> dict: - """This method retrieves a TikTok using the html - endpoints rather than the API based ones. - - ##### Parameters - * url: The url of the TikTok to get - """ - ( - region, - language, - proxy, - maxCount, - device_id, - ) = self.__process_kwargs__(kwargs) - - r = requests.get( - url, - headers={ - "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9", - "path": url.split("tiktok.com")[1], - "Accept-Encoding": "gzip, deflate", - "Connection": "keep-alive", - "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.111 Safari/537.36", - }, - proxies=self.__format_proxy(kwargs.get("proxy", None)), - cookies=self.get_cookies(**kwargs), - **self.requests_extra_kwargs, - ) - - t = r.text - try: - j_raw = self.__extract_tag_contents(r.text) - except IndexError: - if not t: - logging.error("TikTok response is empty") - else: - logging.error("TikTok response: \n " + t) - raise TikTokCaptchaError() - - data = json.loads(j_raw)["props"]["pageProps"] - - if data["serverCode"] == 404: - raise TikTokNotFoundError("TikTok with that url doesn't exist") - - return data - - def get_user_object(self, username, **kwargs) -> dict: - """Gets a user object (dictionary) - - ##### Parameters - * username: The username of the user - """ - ( - region, - language, - proxy, - maxCount, - device_id, - ) = self.__process_kwargs__(kwargs) - kwargs["custom_device_id"] = device_id - return self.get_user(username, **kwargs)["userInfo"]["user"] - - def get_user(self, username, **kwargs) -> dict: - """Gets the full exposed user object - - ##### Parameters - * username: The username of the user - """ - ( - region, - language, - proxy, - maxCount, - device_id, - ) = self.__process_kwargs__(kwargs) - r = requests.get( - "https://tiktok.com/@{}?lang=en".format(quote(username)), - headers={ - "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9", - "path": "/@{}".format(quote(username)), - "Accept-Encoding": "gzip, deflate", - "Connection": "keep-alive", - "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.111 Safari/537.36", - }, - proxies=self.__format_proxy(kwargs.get("proxy", None)), - cookies=self.get_cookies(**kwargs), - **self.requests_extra_kwargs, - ) - - t = r.text - - try: - j_raw = self.__extract_tag_contents(r.text) - except IndexError: - if not t: - logging.error("Tiktok response is empty") - else: - logging.error("Tiktok response: \n " + t) - raise TikTokCaptchaError() - - user = json.loads(j_raw)["props"]["pageProps"] - - if user["serverCode"] == 404: - raise TikTokNotFoundError( - "TikTok user with username {} does not exist".format(username) - ) - - return user - - def get_video_by_tiktok(self, data, **kwargs) -> bytes: - """Downloads video from TikTok using a TikTok object. - - You will need to set a custom_device_id to do this for anything but trending. - To do this, this is pretty simple you can either generate one yourself or, - you can pass the generate_static_device_id=True into the constructor of the - TikTokApi class. - - ##### Parameters - * data: A TikTok object - - A TikTok JSON object from any other method. - """ - ( - region, - language, - proxy, - maxCount, - device_id, - ) = self.__process_kwargs__(kwargs) - kwargs["custom_device_id"] = device_id - try: - api_url = data["video"]["downloadAddr"] - except Exception: - try: - api_url = data["itemInfos"]["video"]["urls"][0] - except Exception: - api_url = data["itemInfo"]["itemStruct"]["video"]["playAddr"] - return self.get_video_by_download_url(api_url, **kwargs) - - def get_video_by_download_url(self, download_url, **kwargs) -> bytes: - """Downloads video from TikTok using download url in a TikTok object - - ##### Parameters - * download_url: The download url key value in a TikTok object - """ - ( - region, - language, - proxy, - maxCount, - device_id, - ) = self.__process_kwargs__(kwargs) - kwargs["custom_device_id"] = device_id - return self.get_bytes(url=download_url, **kwargs) - - def get_video_by_url(self, video_url, **kwargs) -> bytes: - """Downloads a TikTok video by a URL - - ##### Parameters - * video_url: The TikTok url to download the video from - """ - ( - region, - language, - proxy, - maxCount, - device_id, - ) = self.__process_kwargs__(kwargs) - kwargs["custom_device_id"] = device_id - - tiktok_schema = self.get_tiktok_by_url(video_url, **kwargs) - download_url = tiktok_schema["itemInfo"]["itemStruct"]["video"]["downloadAddr"] - - return self.get_bytes(url=download_url, **kwargs) - - def get_video_no_watermark(self, video_url, return_bytes=1, **kwargs) -> bytes: - """Gets the video with no watermark - .. deprecated:: - - Deprecated due to TikTok fixing this - - ##### Parameters - * video_url: The url of the video you want to download - - * return_bytes: Set this to 0 if you want url, 1 if you want bytes - """ - ( - region, - language, - proxy, - maxCount, - device_id, - ) = self.__process_kwargs__(kwargs) - raise Exception("Deprecated method, TikTok fixed this.") - - def get_music_title(self, id, **kwargs): - """Retrieves a music title given an ID - - ##### Parameters - * id: The music id to get the title for - """ - r = requests.get( - "https://www.tiktok.com/music/-{}".format(id), - headers={ - "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9", - "Accept-Encoding": "gzip, deflate", - "Connection": "keep-alive", - "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.111 Safari/537.36", - }, - proxies=self.__format_proxy(kwargs.get("proxy", None)), - cookies=self.get_cookies(**kwargs), - **self.requests_extra_kwargs, - ) - t = r.text - j_raw = self.__extract_tag_contents(r.text) - - music_object = json.loads(j_raw)["props"]["pageProps"]["musicInfo"] - if not music_object.get("title", None): - raise TikTokNotFoundError("Song of {} id does not exist".format(str(id))) - - return music_object["title"] - - def get_secuid(self, username, **kwargs): - """Gets the secUid for a specific username - - ##### Parameters - * username: The username to get the secUid for - """ - r = requests.get( - "https://tiktok.com/@{}?lang=en".format(quote(username)), - headers={ - "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9", - "path": "/@{}".format(quote(username)), - "Accept-Encoding": "gzip, deflate", - "Connection": "keep-alive", - "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.111 Safari/537.36", - }, - proxies=self.__format_proxy( - kwargs.get("proxy", None), cookies=self.get_cookies(**kwargs) - ), - **self.requests_extra_kwargs, - ) - try: - return r.text.split('"secUid":"')[1].split('","secret":')[0] - except IndexError as e: - logging.info(r.text) - logging.error(e) - raise Exception( - "Retrieving the user secUid failed. Likely due to TikTok wanting captcha validation. Try to use a proxy." - ) - @staticmethod def generate_device_id(): """Generates a valid device_id for other methods. Pass this as the custom_device_id field to download videos""" @@ -1498,77 +501,65 @@ def generate_device_id(): # PRIVATE METHODS # - def __format_proxy(self, proxy) -> dict: + def _format_proxy(self, proxy) -> Optional[dict]: """ Formats the proxy object """ - if proxy is None and self.proxy is not None: - proxy = self.proxy + if proxy is None and self._proxy is not None: + proxy = self._proxy if proxy is not None: return {"http": proxy, "https": proxy} else: return None - def __get_js(self, proxy=None) -> str: - return requests.get( - "https://sf16-muse-va.ibytedtos.com/obj/rc-web-sdk-gcs/acrawler.js", - proxies=self.__format_proxy(proxy), - **self.requests_extra_kwargs, - ).text + # Process the kwargs + def _process_kwargs(self, kwargs): + region = kwargs.get("region", "US") + language = kwargs.get("language", "en") + proxy = kwargs.get("proxy", None) + + if kwargs.get("custom_device_id", None) != None: + device_id = kwargs.get("custom_device_id") + else: + if self._custom_device_id != None: + device_id = self._custom_device_id + else: + device_id = "".join(random.choice(string.digits) for num in range(19)) + + @dataclass + class ProcessedKwargs: + region: str + language: str + proxy: str + device_id: str - def __format_new_params__(self, parm) -> str: - # TODO: Maybe try not doing this? It should be handled by the urlencode. - return parm.replace("/", "%2F").replace(" ", "+").replace(";", "%3B") + return ProcessedKwargs( + region=region, language=language, proxy=proxy, device_id=device_id + ) - def __add_url_params__(self) -> str: + def _add_url_params(self) -> str: query = { "aid": 1988, "app_name": "tiktok_web", "device_platform": "web_mobile", - "region": self.region or "US", + "region": self._region or "US", "priority_region": "", "os": "ios", "referer": "", - "root_referer": "", "cookie_enabled": "true", - "screen_width": self.width, - "screen_height": self.height, - "browser_language": self.browser_language.lower() or "en-us", + "screen_width": self._width, + "screen_height": self._height, + "browser_language": self._browser_language.lower() or "en-us", "browser_platform": "iPhone", "browser_name": "Mozilla", - "browser_version": self.__format_new_params__(self.userAgent), + "browser_version": self._user_agent, "browser_online": "true", - "timezone_name": self.timezone_name or "America/Chicago", + "timezone_name": self._timezone_name or "America/Chicago", "is_page_visible": "true", "focus_state": "true", "is_fullscreen": "false", "history_len": random.randint(0, 30), - "language": self.language or "en", + "language": self._language or "en", } - return urlencode(query) - - def __extract_tag_contents(self, html): - nonce_start = '' - nonce = html.split(nonce_start)[1].split(nonce_end)[0] - j_raw = html.split( - '")[0] - return j_raw - - # Process the kwargs - def __process_kwargs__(self, kwargs): - region = kwargs.get("region", "US") - language = kwargs.get("language", "en") - proxy = kwargs.get("proxy", None) - maxCount = kwargs.get("maxCount", 35) - if kwargs.get("custom_device_id", None) != None: - device_id = kwargs.get("custom_device_id") - else: - if self.custom_device_id != None: - device_id = self.custom_device_id - else: - device_id = "".join(random.choice(string.digits) for num in range(19)) - return region, language, proxy, maxCount, device_id + return urlencode(query) diff --git a/TikTokApi/tiktokuser.py b/TikTokApi/tiktokuser.py deleted file mode 100644 index 87be56be..00000000 --- a/TikTokApi/tiktokuser.py +++ /dev/null @@ -1,101 +0,0 @@ -import requests - - -class TikTokUser: - def __init__(self, user_cookie, debug=False, proxy=None): - """A TikTok User Class. Represents a single user that is logged in. - - :param user_cookie: The cookies from a signed in session of TikTok. - Sign into TikTok.com and run document.cookie in the javascript console - and then copy the string and place it into this parameter. - """ - self.cookies = user_cookie - self.debug = debug - self.proxy = proxy - - def get_insights(self, videoID, username=None, proxy=None) -> dict: - """Get insights/analytics for a video. - - :param videoID: The TikTok ID to look up the insights for. - """ - api_url = "https://api.tiktok.com/aweme/v1/data/insighs/?tz_offset=-25200&aid=1233&carrier_region=US" - if username is not None: - referrer = "https://www.tiktok.com/@" + username + "/video/" + videoID - else: - referrer = "https://www.tiktok.com/" - insights = [ - "video_info", - "video_page_percent", - "video_region_percent", - "video_total_duration", - "video_per_duration", - ] - # Note: this list of parameters has to be in exactly this order with exactly this format - # or else you will get "Invalid parameters" - - def build_insight(insight, videoID): - return '{"insigh_type":"' + insight + '","aweme_id":"' + videoID + '"}' - - insight_string = ",".join([build_insight(i, videoID) for i in insights]) - insight_string = ( - insight_string - + ',{"insigh_type": "user_info"}' - + ',{"insigh_type":"video_uv","aweme_id":"' - + videoID - + '"}' - + ',{"insigh_type":"vv_history","days":8}' - + ',{"insigh_type":"follower_num_history","days":9}' - + ',{"insigh_type":"follower_num"}' - + ',{"insigh_type":"user_info"}' - ) - r = requests.post( - api_url, - headers={ - "accept": "*/*", - "accept-language": "en-US,en;q=0.9", - "content-type": "application/x-www-form-urlencoded", - "sec-fetch-dest": "empty", - "sec-fetch-mode": "cors", - "sec-fetch-site": "same-site", - "referrer": referrer, - "referrerPolicy": "no-referrer-when-downgrade", - "method": "POST", - "mode": "cors", - "credentials": "include", - }, - data="type_requests=[" + insight_string + "]", - proxies=self.__format_proxy(proxy), - cookies=self.__cookies_to_json(self.cookies), - ) - try: - return r.json() - except Exception: - if debug: - print(f"Failed converting following to JSON\n{r.text}") - raise Exception("Invalid Response (from TikTok)") - - # - # PRIVATE METHODS - # - def __format_proxy(self, proxy) -> dict: - """ - Formats the proxy object - """ - if proxy is not None: - return {"http": proxy, "https": proxy} - else: - return None - - def __cookies_to_json(self, cookie_string) -> dict: - """ - Turns a cookie string into a dict for - use in the requests module - """ - if isinstance(cookie_string, dict): - return cookie_string - - cookie_dict = {} - for cookie in cookie_string.split("; "): - cookie_dict[cookie.split("=")[0]] = cookie.split("=")[1] - - return cookie_dict diff --git a/TikTokApi/utilities.py b/TikTokApi/utilities.py index 337b94f9..4e3315df 100644 --- a/TikTokApi/utilities.py +++ b/TikTokApi/utilities.py @@ -1,6 +1,7 @@ import subprocess import sys -import pkg_resources + +LOGGER_NAME: str = "TikTokApi" def update_messager(): diff --git a/docs/TikTokApi.html b/docs/TikTokApi.html index c7ca002e..1bd7472b 100644 --- a/docs/TikTokApi.html +++ b/docs/TikTokApi.html @@ -3,64 +3,50 @@ - + TikTokApi API documentation - +