Skip to content

Commit

Permalink
Merge pull request #476 from axif0/workflow_query_forms
Browse files Browse the repository at this point in the history
Workflow query forms
  • Loading branch information
andrewtavis authored Oct 24, 2024
2 parents 8620582 + e6b3c20 commit 52c8363
Show file tree
Hide file tree
Showing 4 changed files with 257 additions and 8 deletions.
252 changes: 252 additions & 0 deletions src/scribe_data/check/check_query_forms.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
)


# MARK: Extract Forms


def extract_forms_from_sparql(file_path: Path) -> str:
"""
Extracts the QID from a SPARQL query file based on the provided pattern.
Expand Down Expand Up @@ -71,6 +74,9 @@ def extract_forms_from_sparql(file_path: Path) -> str:
return None


# MARK: Check Label


def check_form_label(form_text: str):
"""
Checks that the label of the form matches the representation label.
Expand Down Expand Up @@ -110,6 +116,9 @@ def check_form_label(form_text: str):
return form_rep_label == current_form_rep_label


# MARK: Get Label


def extract_form_rep_label(form_text: str):
"""
Extracts the representation label from an optional query form.
Expand All @@ -131,6 +140,9 @@ def extract_form_rep_label(form_text: str):
return label_match[1].strip()


# MARK: Get QIDs


def extract_form_qids(form_text: str):
"""
Extracts all QIDs from an optional query form.
Expand All @@ -150,6 +162,9 @@ def extract_form_qids(form_text: str):
return [q.split("wd:")[1].split(" .")[0] for q in match[0].split(", ")]


# MARK: Correct Label


def return_correct_form_label(qids: list):
"""
Returns the correct label for a lexeme form representation given the QIDs that compose it.
Expand Down Expand Up @@ -183,14 +198,251 @@ def return_correct_form_label(qids: list):
return correct_label[:1].lower() + correct_label[1:]


# MARK: Return Forms


def check_unique_return_forms(query_text: str) -> bool:
"""
Checks that each form returned by the SELECT statement is unique.
Parameters
----------
query_text : str
The full text of the SPARQL query.
Returns
-------
bool
True if all returned forms are unique, False otherwise.
"""

error_output = ""
select_pattern = r"SELECT\s*(.*?)\s*WHERE"
if match := re.search(pattern=select_pattern, string=query_text, flags=re.DOTALL):
# Extracting forms after '?' and handling cases where 'AS' is used for aliasing.
return_forms = []
for part in match[1].split():
if "?" in part:
form = part.split("?")[-1]
if "AS" in form:
form = form.split("AS")[0].strip()
return_forms.append(form)

unique_forms = set(return_forms)
if len(return_forms) != len(unique_forms):
error_output += f"\nDuplicate forms found: {', '.join([form for form in return_forms if return_forms.count(form) > 1])}"
return error_output

return True

return True


# MARK: Unreturned Forms


def check_unreturned_optional_forms(query_text: str) -> str:
"""
Checks if there are any optional forms in the query that aren't returned in the SELECT statement.
Parameters
----------
query_text : str
The full text of the SPARQL query.
Returns
-------
str
Error message listing any unreturned forms, or empty string if all forms are returned.
"""
# Extract forms from SELECT statement.
select_pattern = r"SELECT\s*(.*?)\s*WHERE"
select_forms = set()
if select_match := re.search(
pattern=select_pattern, string=query_text, flags=re.DOTALL
):
for part in select_match[1].split():
if "?" in part:
form = part.split("?")[-1]
if "AS" in form:
form = form.split("AS")[0].strip()
select_forms.add(form)

# Extract forms from OPTIONAL blocks
optional_forms = set()
optional_pattern = r"OPTIONAL\s*\{([^}]*)\}"
for match in re.finditer(optional_pattern, query_text):
form_text = match.group(1)
rep_pattern = r"ontolex:representation\s+\?([\w]+)\s*;"
if rep_match := re.search(rep_pattern, form_text):
optional_forms.add(rep_match[1])

# Find forms that appear in OPTIONAL blocks but not in SELECT.
unreturned_forms = optional_forms - select_forms

if unreturned_forms:
return f"Unreturned optional forms: {', '.join(sorted(unreturned_forms))}"

return ""


# MARK: Undefined Forms


def check_undefined_return_forms(query_text: str) -> str:
"""
Checks if the query is trying to return forms that aren't defined in the WHERE clause
when there are no OPTIONAL blocks.
Parameters
----------
query_text : str
The full text of the SPARQL query.
Returns
-------
str
Error message listing any undefined forms being returned, or empty string if all
returned forms are properly defined.
"""

# Check if query has any OPTIONAL blocks.
optional_pattern = r"OPTIONAL\s*\{"
has_optional_blocks = bool(re.search(optional_pattern, query_text))

if has_optional_blocks:
return "" # skip check for queries with OPTIONAL blocks

# Extract forms from SELECT statement and track aliases.
select_pattern = r"SELECT\s*(.*?)\s*WHERE"
select_forms = set()
aliases = set()

if select_match := re.search(
pattern=select_pattern, string=query_text, flags=re.DOTALL
):
select_clause = select_match[1]

# Process each SELECT item.
items = select_clause.split("\n")
for item in items:
item = item.strip()
if not item:
continue

# Handle REPLACE...AS statements.
if "AS ?" in item:
if alias_match := re.search(r"AS \?(\w+)", item):
aliases.add(alias_match[1])

if var_match := re.findall(r"\?(\w+)", item):
select_forms.update(v for v in var_match if v not in aliases)

elif "?" in item:
var_match = re.findall(r"\?(\w+)", item)
select_forms.update(var_match)

# Extract defined variables from WHERE clause.
where_pattern = r"WHERE\s*\{(.*?)\}(?:\s*ORDER BY|\s*$)"
defined_vars = set()
if where_match := re.search(
pattern=where_pattern, string=query_text, flags=re.DOTALL
):
where_clause = where_match[1]
var_pattern = r"\?(\w+)"
defined_vars = set(re.findall(var_pattern, where_clause))

if undefined_forms := {
form for form in select_forms - defined_vars if form not in aliases
}:
return f"Undefined forms in SELECT: {', '.join(sorted(undefined_forms))}"

return ""


# MARK: Defined Return Forms


def check_defined_return_forms(query_text: str) -> str:
"""
Ensures that all variables defined in the WHERE clause are returned in the SELECT clause.
Parameters
----------
query_text : str
The full text of the SPARQL query.
Returns
-------
str
Error message listing any defined but unreturned forms, or empty string if all forms are returned.
"""
# Check if query has any OPTIONAL blocks.
optional_pattern = r"OPTIONAL\s*\{"
has_optional_blocks = bool(re.search(optional_pattern, query_text))

if has_optional_blocks:
return "" # skip check for queries with OPTIONAL blocks

# Extract forms from WHERE clause.
where_pattern = r"WHERE\s*\{(.*?)\}"
where_forms = set()
if where_match := re.search(
pattern=where_pattern, string=query_text, flags=re.DOTALL
):
where_clause = where_match[1]
where_forms = set(re.findall(r"\?(\w+)", where_clause))

# Extract forms from SELECT statement.
select_pattern = r"SELECT\s*(.*?)\s*WHERE"
select_forms = set()
if select_match := re.search(
pattern=select_pattern, string=query_text, flags=re.DOTALL
):
select_clause = select_match[1]
select_forms = set(re.findall(r"\?(\w+)", select_clause))

# Find forms that are defined but not returned, excluding allowed unreturned variables.
unreturned_forms = where_forms - select_forms

if unreturned_forms:
return f"Defined but unreturned forms: {', '.join(sorted(unreturned_forms))}"
return ""


# MARK: Main Query Forms Validation
def check_query_forms() -> None:
"""
Validates SPARQL queries in the language data directory to check for correct form QIDs.
"""

error_output = ""
index = 0
for query_file in LANGUAGE_DATA_EXTRACTION_DIR.glob("**/*.sparql"):
query_file_str = str(query_file)
with open(query_file, "r", encoding="utf-8") as file:
query_text = file.read()

# Check for unique return forms and handle the error message.
unique_check_result = check_unique_return_forms(query_text)
if unique_check_result is not True:
error_output += f"\n{index}. {query_file_str}: {unique_check_result}\n"
index += 1

if undefined_forms := check_undefined_return_forms(query_text):
error_output += f"\n{index}. {query_file_str}: {undefined_forms}\n"
index += 1

if unreturned_optional_forms := check_unreturned_optional_forms(query_text):
error_output += (
f"\n{index}. {query_file_str}: {unreturned_optional_forms}\n"
)
index += 1

if defined_unreturned_forms := check_defined_return_forms(query_text):
error_output += f"\n{index}. {query_file_str}: {defined_unreturned_forms}\n"
index += 1

if extract_forms_from_sparql(query_file):
query_form_check_dict = {}
for form_text in extract_forms_from_sparql(query_file):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
SELECT
(REPLACE(STR(?lexeme), "http://www.wikidata.org/entity/", "") AS ?lexemeID)
?adjective
?definiteSingularPositive
?pluralPositive
?pluralSuperlative
?comparative
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
SELECT
(REPLACE(STR(?lexeme), "http://www.wikidata.org/entity/", "") AS ?lexemeID)
?adjective
?femininePlural
?feminineSingular
?masculineSingular
?femininePlural
?masculinePlural
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,10 @@

SELECT
(REPLACE(STR(?lexeme), "http://www.wikidata.org/entity/", "") AS ?lexemeID)
?infinitive
?verb

WHERE {
?lexeme dct:language wd:Q8798 ;
wikibase:lexicalCategory wd:Q24905 .

# MARK: Infinitive
?lexeme ontolex:lexicalForm ?infinitiveForm .
?infinitiveForm ontolex:representation ?infinitive ;
wikibase:grammaticalFeature wd:Q179230 .
wikibase:lexicalCategory wd:Q24905 ;
wikibase:lemma ?verb .
}

0 comments on commit 52c8363

Please sign in to comment.