Skip to content

Commit

Permalink
Add support for RIFF metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
chummersone committed Oct 15, 2022
1 parent d3b82f8 commit 138c231
Show file tree
Hide file tree
Showing 7 changed files with 237 additions and 5 deletions.
10 changes: 9 additions & 1 deletion src/wavfile/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import builtins
import os
from abc import ABC
from typing import Any, IO, List, Optional, Tuple, Union
from typing import Any, Dict, IO, List, Optional, Tuple, Union

from . import chunk

Expand All @@ -28,6 +28,7 @@ def __init__(self) -> None:
self._should_close_file = False
self._riff_chunk = None
self._data_chunk = None
self._list_chunk = None

def _init_fp(self, f: Union[str, os.PathLike, IO], mode: str) -> None:
"""
Expand Down Expand Up @@ -153,6 +154,13 @@ def _block_align(self) -> int:
"""Number of audio frames in the file"""
return self._data_chunk.fmt_chunk.block_align

@property
def metadata(self) -> Optional[Dict[str, str]]:
"""Metadata from the .wav file"""
if self._list_chunk is not None:
return self._list_chunk.info
return None

@staticmethod
def _buffer_max_abs(data: List[List[Union[float, int]]]) -> Union[float, int]:
"""
Expand Down
84 changes: 83 additions & 1 deletion src/wavfile/chunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import struct
import sys
from enum import Enum
from typing import IO, List, Optional, Union
from typing import IO, List, Optional, Union, Dict
try:
from typing import Literal
except ImportError:
Expand All @@ -26,6 +26,7 @@ class ChunkID(Enum):
RIFF_CHUNK: 'ChunkID' = b'RIFF'
FMT_CHUNK: 'ChunkID' = b'fmt '
DATA_CHUNK: 'ChunkID' = b'data'
LIST_CHUNK: 'ChunkID' = b'LIST'
UNKNOWN_CHUNK: 'ChunkID' = b' '


Expand All @@ -35,6 +36,22 @@ class WavFormat(Enum):
IEEE_FLOAT: 'WavFormat' = 0x0003


class ListType(Enum):
"""LIST chunk list types"""
INFO: 'ListType' = b'INFO'


class InfoItem(Enum):
"""Items of the INFO chunk"""
track: 'InfoItem' = b'INAM'
album: 'InfoItem' = b'IPRD'
artist: 'InfoItem' = b'IART'
date: 'InfoItem' = b'ICRD'
track_number: 'InfoItem' = b'ITRK'
comment: 'InfoItem' = b'ICMT'
genre: 'InfoItem' = b'IGNR'


class Chunk:
"""Chunk read and write"""

Expand Down Expand Up @@ -520,3 +537,68 @@ def tell(self) -> int:
else:
return (self.fp.tell() - self.content_start) // \
self.fmt_chunk.block_align


class ListChunk(Chunk):
"""List chunk read and write"""

info: Optional[Dict[str, str]]

def __init__(self, fp: IO) -> None:
"""
Initialise the chunk from a file pointer.
:param fp: Open file pointer.
"""
self.chunk_id = ChunkID.LIST_CHUNK
Chunk.__init__(self, fp, bigendian=False)
self.info = None

if 'r' in self.fp.mode:
if self.chunk_id != ChunkID.LIST_CHUNK:
raise exception.ReadError('Chunk is not a LIST chunk')
subchnk = self.read(4)
# read the INFO content of the LIST
if subchnk == ListType.INFO.value:
self.info = {}
while self.fp.tell() < self.content_start + self.size:
key = self.read(4)
size = self.read_int(4)
pad = size % self.align
if key in [e.value for e in InfoItem]:
field: str = InfoItem(key).name
data = self.read(size).decode('ascii').rstrip('\x00')
if field == 'track_number':
try:
data = int(data)
except ValueError:
pass
self.info[field] = data
self.fp.seek(pad, 1)
else:
self.fp.seek(size + pad, 1)

def write_info(self):
"""
Write the INFO to the LIST chunk.
"""

if self.info is not None:
self.fp.seek(self.content_start)
self.write(ListType.INFO.value)
# write each item
for key, val in self.info.items():
if key not in InfoItem.__members__:
raise exception.WriteError('Unknown metadata field. Valid fields are: ' +
', '.join([e.name for e in InfoItem]))
if key == 'track_number':
val = str(val)
data = val.encode('ascii')
size = len(data)
self.write(InfoItem[key].value)
self.write_int(size, 4)
self.write(data)
# align next item
pad = self.fp.tell() % self.align
if pad > 0:
self.write(bytearray(pad), update_size=False)
2 changes: 2 additions & 0 deletions src/wavfile/wavread.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ def _init_file(self) -> None:
if fmt_chunk is None:
raise exception.ReadError('DATA chunk read before FMT chunk')
self._data_chunk = chunk.WavDataChunk(self.fp, fmt_chunk)
elif chnk.chunk_id == chunk.ChunkID.LIST_CHUNK:
self._list_chunk = chunk.ListChunk(self.fp)

# skip superfluous bytes
if chnk.chunk_id != chunk.ChunkID.RIFF_CHUNK:
Expand Down
39 changes: 39 additions & 0 deletions src/wavfile/wavwrite.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from . import base
from . import chunk
from . import exception


class WavWrite(base.Wavfile):
Expand Down Expand Up @@ -44,6 +45,7 @@ def __init__(self, fp: Union[str, os.PathLike, IO], sample_rate: int = 44100,
if num_channels is not None:
self._data_chunk.fmt_chunk.num_channels = int(num_channels)
self._data_chunk.fmt_chunk.bits_per_sample = int(bits_per_sample)
self._list_chunk = None

# go to data chunk content start ready to write samples
self.fp.seek(self._data_chunk.content_start)
Expand Down Expand Up @@ -104,6 +106,43 @@ def write(self, audio: List[List[Union[int, float]]]) -> None:
else:
self.write_int(audio)

def add_metadata(self, **kwargs):
"""
Add metadata to the wav file. Note that this method can only be called once, and the
metadata cannot be updated once it is written. The metadata chunk will be written before or
after the data chunk, depending on when this method is called.
See chunk.InfoItem for a list of supported tags.
:param kwargs: The metadata to write, provided as keyword arguments.
:return: Dict[str, str]
"""

if self._list_chunk is not None:
raise exception.WriteError('Metadata already written to file. '
'Editing is not currently supported.')

# if the data chunk is empty, then overwrite it and recreate it after the metadata
recreate_data_chunk = False
if self._data_chunk.size == 0:
# overwrite data chunk
self.fp.seek(self._data_chunk.start)
recreate_data_chunk = True
else:
# write after data
self.fp.seek(
self._data_chunk.content_start +
self._data_chunk.size +
self._data_chunk.pad
)
self._list_chunk = chunk.ListChunk(self.fp)
self._list_chunk.info = kwargs
self._list_chunk.write_info()
if recreate_data_chunk:
# recreate data chunk
fmt_chunk = self._data_chunk.fmt_chunk
self._data_chunk = chunk.WavDataChunk(self.fp, fmt_chunk)

def close(self) -> None:
"""Close the file."""
num_align_bytes = self._data_chunk.size % chunk.Chunk.align
Expand Down
Binary file added tests/noise_44100_24bit_w_metadata.wav
Binary file not shown.
88 changes: 88 additions & 0 deletions tests/test_metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Provides test cases for metadata read/write.
"""

import unittest

import wavfile

from test_module import test_file_path
from test_wavfile_write import WavfileWriteTestRunner


class TestReadMetadata(unittest.TestCase):

def test_read(self):

filename = test_file_path('noise_44100_24bit_w_metadata.wav')
with wavfile.open(filename, 'r') as wfp:
self.assertEqual('Joe Bloggs', wfp.metadata['artist'])
self.assertEqual('Noise', wfp.metadata['track'])
self.assertEqual('pywavfile', wfp.metadata['album'])
self.assertEqual('This is a comment', wfp.metadata['comment'])
self.assertEqual('postmodern', wfp.metadata['genre'])
self.assertEqual(1, wfp.metadata['track_number'])


class TestWavfileWriteMetadata(WavfileWriteTestRunner):

def test_metadata_pre(self):
audio_data_in = [
[0, 0],
[256, 512],
[512, 256],
[-256, -512],
[-512, -256],
]
read_callback = "read_int"
sample_rate = 48000
bits_per_sample = 16
metadata = {
'artist': 'Joey',
'track': 'test_metadata_pre',
'album': 'test suite',
'date': 'today',
'track_number': 1,
'comment': 'Short comment',
'genre': 'chillout'
}
self.run_test(audio_data_in, read_callback, sample_rate, bits_per_sample,
len(audio_data_in[0]), metadata=metadata, metadata_mode='pre')

def test_metadata_post(self):
audio_data_in = [
[0, 0],
[256, 512],
[512, 256],
[-256, -512],
[-512, -256],
]
read_callback = "read_int"
sample_rate = 48000
bits_per_sample = 16
metadata = {
'artist': 'Wavey McWaveface',
'track': 'test_metadata_post',
'album': 'test suite',
'date': 'yesterday',
'track_number': 'one',
'comment': 'this is a slightly longer comment',
'genre': 'postmodern'
}
self.run_test(audio_data_in, read_callback, sample_rate, bits_per_sample,
len(audio_data_in[0]), metadata=metadata, metadata_mode='post')

def test_metadata_rewrite(self):
filename = test_file_path("tmp.wav")
with wavfile.open(filename, 'w') as wfp:
wfp: wavfile.wavwrite.WavWrite
wfp.add_metadata(comment='test')
self.assertRaises(wavfile.exception.WriteError, wfp.add_metadata, comment='more')

def test_metadata_write_invalid(self):
filename = test_file_path("tmp.wav")
with wavfile.open(filename, 'w') as wfp:
wfp: wavfile.wavwrite.WavWrite
self.assertRaises(wavfile.exception.WriteError, wfp.add_metadata, invalid='test')
19 changes: 16 additions & 3 deletions tests/test_wavfile_write.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,18 @@
from test_module import test_file_path


class TestWavfileWrite(unittest.TestCase):
class WavfileWriteTestRunner(unittest.TestCase):

def run_test(self, audio_data_in, read_callback, sample_rate,
bits_per_sample, num_channels, reference=None):
def run_test(self, audio_data_in, read_callback, sample_rate, bits_per_sample, num_channels,
reference=None, metadata=None, metadata_mode='pre'):
filename = test_file_path("tmp.wav")
with wavfile.open(filename, 'w',
sample_rate=sample_rate,
bits_per_sample=bits_per_sample,
num_channels=num_channels) as wfp:
# write metadata before data
if metadata is not None and metadata_mode == 'pre':
wfp.add_metadata(**metadata)
wfp: wavfile.wavwrite.WavWrite
wfp.write(audio_data_in)
self.assertEqual(wfp.tell(), len(audio_data_in))
Expand All @@ -28,6 +31,9 @@ def run_test(self, audio_data_in, read_callback, sample_rate,
wfp.seek(0)
wfp.write(audio_data_in)
self.assertEqual(wfp.tell(), len(audio_data_in))
# write metadata after data
if metadata is not None and metadata_mode == 'post':
wfp.add_metadata(**metadata)

with wavfile.open(filename, 'r') as wfp:
audio_out = getattr(wfp, read_callback)()
Expand All @@ -47,6 +53,13 @@ def run_test(self, audio_data_in, read_callback, sample_rate,
self.assertEqual(wfp.num_channels, num_channels)
self.assertEqual(wfp.num_frames, len(reference))
self.assertEqual(wfp.duration, len(reference) / sample_rate)
if metadata is not None:
self.assertDictEqual(metadata, wfp.metadata)
else:
self.assertIsNone(wfp.metadata)


class TestWavfileWrite(WavfileWriteTestRunner):

def test_read_write_audio_short_int_1(self):
audio_data_in = [
Expand Down

0 comments on commit 138c231

Please sign in to comment.