Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE] GUI to demonstrate and control SOF components on target hardware #9223

Open
alexb3103 opened this issue Jun 12, 2024 · 4 comments
Open
Assignees
Labels
enhancement New feature or request
Milestone

Comments

@alexb3103
Copy link
Contributor

Problem Statement:

There is currently no easy way to demonstrate SOF on target hardware, especially to non-developers and people who are not familiar with DSP.

Proposed Solution:

Develop a generic GUI and TUI that interacts with SOF components on target HW, utilizing sof-ctl for real time control.

This UI will be developed in python, with the GUI utilizing the python GTK library.

The initial features for the first version of the UI (Est. completion Mid July) will be:

  • Real time control of audio playback and record through ALSA
  • Real time control of Volume, EQ, and DRC SOF components
  • Tested functionality on NXP target HW

The features for the final version (completed Late August) will be:

  • All previous features
  • Clean/professional graphics and intuitive control
  • Functionality on all supported HW (some may only work through the TUI)
  • Clear interfaces and procedures for adding new SOF components to the UI

Alternatives

alsamixer and sof-ctl provide some of these functionalities, however utilizing them on their own has a high lack of customization and is also not very presentable to an interested party.

Additional Context

Hi SOF community, I'm a new GSoC contributor for SOF through the Linux Foundation this summer and will be developing this feature as my project for that. I'm very excited to get started.

@alexb3103 alexb3103 added the enhancement New feature or request label Jun 12, 2024
@lgirdwood lgirdwood added this to the v2.11 milestone Jun 12, 2024
@lgirdwood
Copy link
Member

@alexb3103 btw @singalsu has octave scripts that generate the coefficients for all modules (to change processing settings at runtime). I've also got some python that parses topology and creates a Gtk4 listview of all objects (I will clean up and post it soon).

@lgirdwood
Copy link
Member

@alexb3103 I was going to paste here, but GH wont allow me to post python. I will create a staging repo for python tooling that can share common libraries. In the mean time I've pasted the topology2 parser here.

#!/usr/bin/python3
# SPDX-License-Identifier: BSD-3-Clause

import os
import re

# import the Gtk module
import gi
gi.require_version('Gtk', '4.0')
gi.require_version('Adw', '1')
from gi.repository import Gtk, Adw, Gio, Gdk, GLib, GObject

glb_search_dirs = [""]
glb_root_objects = []
glb_root_tokens = []

# keywords to search for in the topology file
tplg_keywords = ["DefineAttribute",
            "DefineArgument",
            "Class",
            "Control",
            "attributes",
            "constraints",
            "[", "]",
            "{", "}",
            ".",
            "!",
            "Object",
            "constructor",
            "immutable",
            "unique",
            "deprecated",
            "mandatory",
            "Define",
            "IncludeByKey"]

def obj_name(line):
    """Extracts the object name from a line."""
    start = line.find('"')
    end = line[start+1:].find('"')
    name = line[start + 1:start + 1 + end]
    return name

def file_name(line):
    """Extracts the file name from a line."""
    start = line.find('<')
    end = line[start+1:].find('>')
    name = line[start + 1:start + 1 + end]
    return name

def search_dir(line):
    """Extracts the search directory name from a line."""
    start = line.find('<searchdir:') # 11chars long
    end = line[start + 11:].find('>')
    name = line[start + 11:start + 11 + end]
    return name

# get the start and end index of a section
def section_get_limits(tokens, idx):
    """Get the start and end index of a section."""
    count = 0
    start = 0
    end = 0
    for i in range(idx, len(tokens)):
        token = tokens[i].get("token")
        if token == "{":
            count += 1
            if count == 1:
                start = i
            continue
        elif token == "}":
            count -= 1
            if count == 0:
                end = i
                break

    return start, end

# get the start and end index of an array
def array_get_limits(tokens, idx):
    """Get the start and end index of an array."""
    count = 0
    start = 0
    end = 0
    for i in range(idx, len(tokens)):
        token = tokens[i].get("token")
        if token == "[":
            count += 1
            if count == 1:
                start = i
            continue
        elif token == "]":
            count -= 1
            if count == 0:
                end = i
                break

    return start, end

# get the preamble comments for the object
def section_get_preamble(tokens, idx):
    """Get any preceding comments."""

    for i in range(idx -1, -1, -1):
        token = tokens[i].get("token")
        if token == "}":
            break

    return i + 1

def open_file_from_directories(filename):
    """
    Attempts to open a file by searching through a list of directories.

    Args:
        filename: The name of the file to open.
        search_dirs: A list of directory paths to search in.

    Returns:
        file object: If the file is found and successfully opened, returns a file object.
        None: If the file cannot be found or opened in any of the directories.
    """
    for directory in glb_search_dirs:
        filepath = os.path.join(directory, filename)
        if os.path.exists(filepath):  # Check if the file exists
            try:
                file = open(filepath, "r", encoding="utf-8")  # Open the file in read mode
                return file  # Return the file object if opened successfully
            except OSError as e:
                print(f"Error opening file '{filename}' in '{filepath}': {e}")

    return None  # File not found or couldn't be opened in any directory

def load_file_into_buffer(filename):
    """Loads the entire content of a text file into a string buffer."""
    filename = filename.strip("'\"")
    with open_file_from_directories(filename) as f:  # Use appropriate encoding
        buffer = f.read()
    return buffer

def tokenize_with_metadata(filename, text):
    """Tokenizes text, preserving all tokens with detailed metadata."""

    # Escape keywords for regex and sort by length (longest first)
    pattern = "|".join(map(re.escape, sorted(tplg_keywords, key=len, reverse=True)))
    pattern = f"({pattern})"  # Capture keywords

    current_line_number = 1
    key_check = False

    for line in text.splitlines(keepends=False):  # Discard newline characters

        string = ""
        line = line.expandtabs(4)

        # lines starting with # are whole line comments
        if re.match(r"\s*#", line):
            glb_root_tokens.append({
                "token": line[line.find('#'):].strip(),
                "type": "comment",
                "line_number": current_line_number,
                "file": filename
            })
            current_line_number += 1
            continue

        matches = re.finditer(pattern + r"|\S+", line)  # \S+ matches any non-whitespace

        for match in matches:
            token = match.group().strip()
            if not token:
                continue  # Skip empty tokens

            if string != "":
                string += " " + token
                if token.endswith("\"") or token.endswith("'"):
                    glb_root_tokens.append({
                        "token": string,
                        "type": "string",
                        "line_number": current_line_number,
                        "file": filename
                    })
                    string = ""
                continue

            # Identify token type (keyword, comment, or other)
            token_type = "keyword" if token in tplg_keywords else "other"
            if token.startswith("#"):
                glb_root_tokens.append({
                "token": line[match.start():].strip(),
                "type": "comment",
                "line_number": current_line_number,
                "file": filename
                })
                break
            elif token.startswith("\"") or token.startswith("'"):
                token_type = "string"
                if not token.endswith("\"") and not token.endswith("'"):
                    string = token
                    continue
            elif token.startswith("<searchdir:"):
                token_type = "searchdir"
                glb_search_dirs.append(search_dir(line))
            elif token.startswith("<"):
                token_type = "file"
                buffer = load_file_into_buffer(file_name(token))
                tokenize_with_metadata(file_name(token), buffer)
            elif token.startswith("IncludeByKey"):
                token_type = "key"
            elif token.startswith("["):
                token_type = "array"
            elif token.startswith("]"):
                token_type = "eoa"
            elif token.startswith("{"):
                token_type = "section"
            elif token.startswith("}"):
                    token_type = "eos"

            # Store token info with line number and position
            glb_root_tokens.append({
                "token": token.strip(),
                "type": token_type,
                "line_number": current_line_number,
                "file": filename
            })

        current_line_number += 1

    return glb_root_tokens

class t2Base(GObject.Object):
    name = GObject.Property(type=str)
    def __init__(self, preamble, depth):
        super().__init__()
        self.tokens = []
        self.head = []
        self.tail = []
        self.objects = []
        self.cond_objects = []
        self.preamble = preamble
        self.depth = depth
        self.type = None
        self.value = None
        self.ui_data = None

    def print(self):
        print("Base")
        for token in self.tokens:
            print(token)

    def draw(self):
        pass
    def write(self):
        pass

    # get the define value by recursively searching the objects for the key
    def get_define(self, key):
        for object in self.objects:
            value = object.get_define(key)
            if value != None:
                return value
            if isinstance(object, t2Attribute):
                if (object.tokens[object.attr_idx].get("token") == key):
                    return object.tokens[object.value_idx].get("token")
        return None


    # print the tokens in topology format
    def print_tplg_tokens(self, tokens):

        if(len(tokens) == 0):
            return

        last_line = int(tokens[0].get("line_number"))
        last_file = tokens[0].get("file")
        line = ""
        strip = False

        # for each token in the object
        for i in range(len(tokens)):

            this_token = tokens[i].get("token")
            this_line = int(tokens[i].get("line_number"))
            this_type = tokens[i].get("type")
            this_file = tokens[i].get("file")

            # if the line number or file changes print the line
            if this_file != last_file:
                print(f"{line}", end="\n")
                line =""
                last_line = this_line

            # if the line number changes print the line
            for i in range(last_line, this_line):
                print(f"{line}", end="\n")
                line =""

            # if the token is a comment add it to the line
            if (this_type == "comment"):
                # if the line is empty add the comment
                if (line != ""):
                    line = f"{line}\t{this_token}"
                else:
                    # if the line is not empty then add the comment after tab
                    for i in range(0, self.depth):
                        line = f"{line}{"\t"}"
                    line = f"{line}{this_token}"
                last_line = this_line
                continue

            # handle special token types
            match this_token:
                # new section or array
                case "{" | "[":
                    if line == "":
                        for i in range(0, self.depth):
                            line = f"{line}{"\t"}"
                        line = f"{line}{this_token}"
                    else:
                        line = f"{line} {this_token}"
                    self.depth += 1
                # end of section or array
                case "}" | "]":
                    line = ""
                    self.depth -= 1
                    for i in range(0, self.depth):
                        line = f"{line}{"\t"}"
                    line = f"{line}{this_token}"
                # join two strings with a .
                case ".":
                    line = line.rstrip()
                    line = f"{line}{this_token}"
                    strip = True
                # join postfix string
                case "!":
                    for i in range(0, self.depth):
                        line = f"{line}{"\t"}"
                    line = f"{line}{this_token}"
                    strip = True
                # default case
                case _:
                    # if the last token was a . then join the strings
                    if strip:
                        strip = False
                        line = f"{line}{this_token}"
                    else:
                        # if the line is empty add the token
                        if line == "":
                            for i in range(0, self.depth):
                                line = f"{line}{"\t"}"
                            line = f"{line}{this_token}"
                        else:
                            # if the line is not empty then add the token after tab
                            line = f"{line}\t{this_token}"

            # update the last line and file
            last_line = this_line
            last_file = this_file

        # print the last tokens and line
        print(f"{line}", end="\n")

    # print the section in topology format
    def print_tplg_object(self):

        # print the head
        self.print_tplg_tokens(self.head)

        # print the tokens in the object recursively
        for object in self.objects:
            object.print_tplg()
        self.print_tplg_tokens(self.tokens)

        # print the tail
        self.print_tplg_tokens(self.tail)

        # print the conditional objects
        for object in self.cond_objects:
            object.print_tplg()

    def print_tplg(self):
        self.print_tplg_object()

    # print the section in topology format
    def print_objects(self):

        # print the head
        print(" " * self.depth, self.name, self.value)

        # print the tokens in the object recursively
        for object in self.objects:
            object.print_object()

        # print the conditional objects
        #for object in self.cond_objects:
        #    object.print_tplg()

    def print_object(self):
        self.print_objects()


def t2_child_model(obj, data):  # Function to create child models on demand
    if isinstance(obj, Gtk.TreeListRow):
        obj = obj.get_item()

    if obj.objects != []:
        child_store = Gio.ListStore(item_type=t2Base)
        for child in obj.objects:
            child_store.append(child)
        return Gtk.TreeListModel.new(child_store, False, False, t2_child_model, None)  # Use new directly
    return None  # No children


# Array definition in topology using []
class t2Array(t2Base):
    def __init__(self, parent_tokens, preamble, parent_depth):
        print("->New Array")
        super().__init__(preamble, parent_depth)

        # get the array limits
        array_start, array_end = array_get_limits(parent_tokens, preamble)

        # skip "{" token
        array_start += 1

        # set the tokens for this object
        self.tokens = parent_tokens[array_start:array_end]
        self.tail = parent_tokens[array_end:array_end+1]
        self.head = parent_tokens[0:array_start]

        # set the name
        self.name = self.head[preamble + 1].get("token")

        # remove our tokens from the list
        for i in range(0, array_end + 1):
            parent_tokens.remove(parent_tokens[0])

        # get the next objects in the array
        while True:
            object = tokenizer_next(self.tokens, self.depth + 1)
            if object == None:
                break
            self.objects.append(object)

class t2Attribute(t2Base):
    def __init__(self, parent_tokens, preamble, parent_depth):
        print("->New Attribute")
        super().__init__(preamble, parent_depth)

        self.attr_idx = 0
        self.value_idx = 0
        self.type = "Attribute"

        # get the attribute and value tokens
        count = self.get_tuple(parent_tokens)

        # validate attr and value
        if self.attr_idx < 0 or self.attr_idx > count:
            raise ValueError("attr index ", self.attr_idx, " out of range", count)
        if self.value_idx < 0 or self.value_idx > count:
            raise ValueError("value index ", self.value_idx, " out of range", count)

        # set the tokens for this object
        self.tokens = parent_tokens[0:count]

        # set the name and value
        self.name = self.tokens[self.attr_idx].get("token")
        self.value = self.tokens[self.value_idx].get("token")

        # remove our tokens from the list
        for i in range(0, len(self.tokens)):
            parent_tokens.remove(parent_tokens[0])


    # get the attribute and value tokens alongside any comments for this tuple
    def get_tuple(self, tokens):
        count = 0
        end = len(tokens)

        # preceding comments
        while count < end and tokens[count].get("type") == "comment":
            count += 1

        # attribute
        if count < end:
            self.attr_idx = count
            count += 1

        # value
        if count < end:
            current_line = tokens[count].get("line_number")
            self.value_idx = count
            count += 1

        # trailing comments on the same line
        while count < end and tokens[count].get("type") == "comment" and current_line == tokens[count].get("line_number"):
            count += 1
        return count

class t2Section(t2Base):
    def __init__(self, parent_tokens, preamble, parent_depth):
        print("->New Section")
        super().__init__(preamble, parent_depth)

        # get the section limits
        sect_start, sect_end = section_get_limits(parent_tokens, preamble)

        # skip "{" token
        sect_start += 1

        self.start = 0
        self.tokens = parent_tokens[sect_start:sect_end]
        self.tail = parent_tokens[sect_end:sect_end+1]
        self.head = parent_tokens[0:sect_start]

        # set the name
        for token in self.head:
            #print("head", self.head)
            if (token.get("type") == "string" or token.get("type") == "other"):
                self.name += token.get("token")

        if self.name == "":
            self.name = self.head[preamble].get("token")
        self.name = self.name.strip("\"")
        #print("**Section", self.name)
        #print("head", self.head)
        #print("preamble", preamble) 

        # remove our tokens from the list
        for i in range(0, sect_end + 1):
            parent_tokens.remove(parent_tokens[0])

        # get the next objects in the section
        while True:
            object = tokenizer_next(self.tokens, self.depth + 1)
            if object == None:
                break
            self.objects.append(object)

#
# Define {
#	NUM_HDMIS 3
#}
class t2Define(t2Section):
    def __init__(self, parent_tokens, preamble, parent_depth):
        print("->New Define")
        super().__init__(parent_tokens, preamble, parent_depth)
        self.type = "Define"

# DefineAttribute."priority" {
class t2DefineAttribute(t2Section):
    def __init__(self, parent_tokens, preamble, parent_depth):
        print("->New DefineAttribute")
        super().__init__(parent_tokens, preamble, parent_depth)
        self.type = "DefineAttribute"

# Class.Widget."pipeline" {
class t2Class(t2Section):
    def __init__(self, parent_tokens, preamble, parent_depth):
        print("->New Class")
        super().__init__(parent_tokens, preamble, parent_depth)
        self.type = "Class"

# Class.Widget."pipeline" {
class t2Object(t2Section):
    def __init__(self, parent_tokens, preamble, parent_depth):
        print("->New Object")
        super().__init__(parent_tokens, preamble, parent_depth)
        self.type = "Object"

# get the value of a key in the objects
def key_get_value(key):
    for object in glb_root_objects:
        value = object.get_define(key)
        #todo remove the none check
        if (value != None):
            return value

class t2IncludeKey(t2Section):
    def __init__(self, parent_tokens, preamble, parent_depth):
        t2Base.__init__(self, preamble, parent_depth)

        self.type = "IncludeByKey"

        valid_condition = False
        key_tokens = []

        # get the section limits
        sect_start, sect_end = section_get_limits(parent_tokens, preamble)

        # skip "{" token
        sect_start += 1

        self.start = 0
        self.tokens = parent_tokens[sect_start:sect_end]
        self.tail = parent_tokens[sect_end:sect_end+1]
        self.head = parent_tokens[0:sect_start]

        # get the key
        self.key = self.head[2 + preamble].get("token")

        # remove our tokens from the list
        for i in range(0, sect_end + 1):
            parent_tokens.remove(parent_tokens[0])

        # get the key value
        self.key_value = key_get_value(self.key)
        if self.key_value == None:
            print(f"Key {self.key} not found")
            return

        self.key_value = self.key_value.strip("'\"")

        # get the condition and statement
        i = 0
        while i < len(self.tokens):
            condition = self.tokens[i].get("token").strip("'\"")

            # check if the condition matches the key
            if condition.isnumeric():
                # numeric key to match
                num_condition = int(condition)
                num_key = int(self.key_value)

                if num_condition == num_key:
                    valid_condition = True
                else:
                    valid_condition = False
            elif re.match(r"\[\d+-\d+\]$", condition):
                # range key to match [x-y]
                start, end = map(int, condition[1:-1].split("-"))

                # Check if the number is within the range
                number = int(self.key_value)
                valid_condition = start <= number <= end
            else:
                # string key to match
                if condition == self.key_value:
                    valid_condition = True
                else:
                    valid_condition = False

            i += 1
            if (i >= len(self.tokens)):
                break

            # get the statement
            statement = self.tokens[i].get("token")

            # check if the statement is a section
            is_section = True if statement == "{" else False  # ternary operator
            if is_section:
                # Skip the section
                sect_start, i = section_get_limits(self.tokens, i)

            # check if the condition is valid
            if not valid_condition:
                # Skip the statement
                i += 1
                continue

            # Process the statement
            if not is_section:
                key_tokens = []
                buffer = load_file_into_buffer(statement)
                #todo remove return value
                key_tokens = tokenize_with_metadata(statement, buffer)
            else:
                key_tokens = self.tokens[sect_start + 1:i]

            # get the next objects in the section
            while True:
                object = tokenizer_next(key_tokens, self.depth)
                if object == None:
                    break
                self.cond_objects.append(object)

            # go to the next token
            i += 1

# get the next object in the tokens
def tokenizer_next(tokens, depth):

    # preamble is the number of comments before the object
    preamble = 0
    is_attribute = 0

    # get the next object in the tokens
    while (tokens and len(tokens) > preamble):

        # get the next token
        token = tokens[preamble] # cant pop on recursion
        token_name = token.get("token")
        token_type = token.get("type")

        #print("%%:" + " " * depth, preamble, is_attribute, token)

        # skip comments but keep track of the preamble and create a new attribute
        match token_type:
            case "comment":
                preamble += 1
                is_attribute = 0
                continue
            case "other" | "string":
                is_attribute += 1
                if is_attribute > 1:
                    return t2Attribute(tokens, preamble, depth)
            case _:
                is_attribute = 0

        # check the token name and create the object
        match token_name:
            case "Define":
                return t2Define(tokens, preamble, depth)
            case "DefineAttribute":
                return t2DefineAttribute(tokens, preamble, depth)
            case "Class":
                return t2Class(tokens, preamble, depth)
            case "Object":
                return t2Object(tokens, preamble, depth)
            case "IncludeByKey":
                return t2IncludeKey(tokens, preamble, depth)
            case "attributes" | "constraints":
                return t2Section(tokens, preamble, depth)
            case "!":
                return t2Array(tokens, preamble, depth)
            case "{":
                return t2Section(tokens, preamble, depth)
            case _:
                preamble += 1
                continue;

    # no more tokens
    return None

def tplg_reset():
    glb_root_objects.clear()
    glb_root_tokens.clear()
    glb_search_dirs.clear()
    glb_search_dirs.append("")

def import_tplg_as_objects(filename, verbose):

    tplg_reset()

    # Load the topology file into a buffer
    buffer = load_file_into_buffer(filename)
    tokens = tokenize_with_metadata(filename, buffer)

    # get the next object in the tokens
    while True:
        object = tokenizer_next(tokens, 0)
        if object == None:
            break
        glb_root_objects.append(object)

    # print the objects
    if verbose:
        for object in glb_root_objects:
            object.print_tplg()

    return glb_root_objects

@alexb3103
Copy link
Contributor Author

Wow, this is incredibly useful, I appreciate it a lot. Definitely something for further along in the project but it'll be invaluable for sure.

Let me know when you create the repo, very open contributing to that as well if I make anything useful for it.

@singalsu
Copy link
Collaborator

@alexb3103 btw @singalsu has octave scripts that generate the coefficients for all modules (to change processing settings at runtime). I've also got some python that parses topology and creates a Gtk4 listview of all objects (I will clean up and post it soon).

Yep. It should be quite straightforward to convert the Matlab/Octave scripts to Python SciPy and NumPy code. In first step retrieving current settings blobs and displaying them graphically as frequency responses etc. would also be quite simple. I have for some component blobs a decoder script.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

4 participants