Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds an audioplayer to the target side of the card using the AudioURL from the new duolingo API #80

Open
wants to merge 1 commit into
base: fix-from-new-library
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 79 additions & 43 deletions duolingo_sync/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,26 @@
from .duolingo_display_login_dialog import duolingo_display_login_dialog
from .duolingo_model import get_duolingo_model

import os
import requests

WORD_CHUNK_SIZE = 50
ADD_STATUS_TEMPLATE = "Importing from Duolingo: {} of {} complete."


@dataclass
class VocabResult:
identifier: str
text: str
audioURL: str
isNew: bool
translations: List[str] = field(default_factory=list)


@dataclass
class VocabRetrieveResult:
words_to_add: List[VocabResult]
success: bool = False
words_to_add: list = field(default_factory=list)
language_string: Optional[str] = None
lingo: Optional[Duolingo] = None

Expand Down Expand Up @@ -67,12 +79,7 @@ def login_and_retrieve_vocab(jwt) -> VocabRetrieveResult:

current_language = lingo.get_user_info()['learning_language_string']
language_abbreviation = lingo.get_abbreviation_of(current_language)
vocabs = lingo.get_vocabulary(language_abbreviation)

for vocab in vocabs:
# A prior version of the Duolingo API exposed vocabulary ids, which we used to
# de-duplicate vocabs. This version does not, so we build our own new ids.
vocab['id'] = vocab['text'] + "-" + language_abbreviation
vocabulary_response = lingo.get_vocabulary(language_abbreviation)

did = mw.col.decks.get(DEFAULT_DECK_ID)['id']
mw.col.decks.select(did)
Expand All @@ -81,9 +88,15 @@ def login_and_retrieve_vocab(jwt) -> VocabRetrieveResult:
deck['mid'] = model['id']
mw.col.decks.save(deck)

words_to_add = [vocab for vocab in vocabs if vocab['id'] not in gids_to_notes]
words_to_add_models = []
for vocab in vocabulary_response:
id = vocab['text'] + "-" + language_abbreviation
if id not in gids_to_notes:
words_to_add_models.append(VocabResult(identifier=id, text=vocab["text"], audioURL=vocab["audioURL"], isNew=vocab["isNew"],
translations=vocab["translations"]))

result.words_to_add = words_to_add_models
result.success = True
result.words_to_add = words_to_add
result.language_string = current_language

return result
Expand All @@ -103,46 +116,56 @@ def add_vocab(retrieve_result: VocabRetrieveResult) -> AddVocabResult:
result = AddVocabResult()

total_word_count = len(retrieve_result.words_to_add)
word_chunks = [retrieve_result.words_to_add[x:x + WORD_CHUNK_SIZE] for x in range(0, total_word_count, WORD_CHUNK_SIZE)]

aqt.mw.taskman.run_on_main(
lambda: mw.progress.update(label=ADD_STATUS_TEMPLATE.format(0, total_word_count), value=0, max=total_word_count)
)

def translations(vocab):
if vocab['translations']:
return '; '.join(vocab['translations'])
words_processed = 0
for vocabResult in retrieve_result.words_to_add:
n = mw.col.newNote()

# Update the underlying dictionary to accept more arguments for more customisable cards
n._fmap = defaultdict(str, n._fmap)

n['Gid'] = vocabResult.identifier
n['Gender'] = ''
n['Source'] = '; '.join(vocabResult.translations)
n['Pronunciation'] = ''
n['Target Language'] = retrieve_result.language_string
n.addTag(retrieve_result.language_string)
n.addTag('duolingo_sync')

# Add audio file to the target, this autoplays. I'm not sure if theres a better way to attach
# audio, as replay audio controls do not work
audio_filename = '{0}.mp3'.format(vocabResult.text)
audio_path = os.path.join(mw.col.media.dir(), audio_filename) # Path in Anki's media folder
audio_tag = f'<br><br><audio class="custom-audio" controls autoplay><source src="{audio_filename}" type="audio/mpeg"></audio>'
download_audio_from_url(vocabResult.audioURL, audio_path)

n['Target'] = vocabResult.text + audio_tag
css = """
<style>
.custom-audio {
height: 60px; /* Adjust height as needed */
}
</style>
"""
note_model = n.model()
note_model['css'] += css

num_cards = mw.col.addNote(n)

if num_cards:
result.notes_added += 1
else:
return "Provide the translation for '{}' from {}.".format(vocab['text'], retrieve_result.language_string)
result.problem_vocabs.append(vocabResult.text)
words_processed += 1

words_processed = 0
for word_chunk in word_chunks:
for vocab in word_chunk:
n = mw.col.newNote()

# Update the underlying dictionary to accept more arguments for more customisable cards
n._fmap = defaultdict(str, n._fmap)

n['Gid'] = vocab['id']
n['Gender'] = ''
n['Source'] = translations(vocab)
n['Target'] = vocab['text']
n['Pronunciation'] = ''
n['Target Language'] = retrieve_result.language_string
n.addTag(retrieve_result.language_string)
n.addTag('duolingo_sync')

num_cards = mw.col.addNote(n)

if num_cards:
result.notes_added += 1
else:
result.problem_vocabs.append(vocab['text'])
words_processed += 1

aqt.mw.taskman.run_on_main(
lambda: mw.progress.update(label=ADD_STATUS_TEMPLATE.format(result.notes_added, total_word_count), value=words_processed, max=total_word_count)
)
aqt.mw.taskman.run_on_main(
lambda: mw.progress.update(label=ADD_STATUS_TEMPLATE.format(result.notes_added, total_word_count),
value=words_processed, max=total_word_count)
)

aqt.mw.taskman.run_on_main(
lambda: mw.progress.finish()
Expand All @@ -151,12 +174,24 @@ def translations(vocab):
return result


# Downloads audio from a given URL and saves it on the file system
# used to then embed the audio links from the duolingo API to cards
def download_audio_from_url(url, save_path):
# Function to download audio file from URL
with requests.get(url, stream=True) as r:
r.raise_for_status()
with open(save_path, 'wb') as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)


def on_retrieve_success(retrieve_result: VocabRetrieveResult):
if not retrieve_result.success:
return

if not retrieve_result.words_to_add:
showInfo(f"Successfully logged in to Duolingo, but no new words found in {retrieve_result.language_string} language.")
showInfo(
f"Successfully logged in to Duolingo, but no new words found in {retrieve_result.language_string} language.")
elif askUser(f"Add {len(retrieve_result.words_to_add)} notes from {retrieve_result.language_string} language?"):
op = QueryOp(
parent=mw,
Expand Down Expand Up @@ -207,6 +242,7 @@ def sync_duolingo():

op.with_progress(label="Logging in...").run_in_background()


action = QAction("Pull from Duolingo", mw)
qconnect(action.triggered, sync_duolingo)
mw.form.menuTools.addAction(action)