-
Notifications
You must be signed in to change notification settings - Fork 50
/
Copy pathInputFile.py
173 lines (138 loc) · 5.87 KB
/
InputFile.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
import struct
import numpy as np
import tempfile
import shutil
import subprocess
import os
import time
class InputFile:
def __init__(self, filename):
"""Open an Audio file with the given file path.
Supported formats: WAVE, MP3.
All MP3 files will be converted to WAVE using the LAME program
This document http://www-mmsp.ece.mcgill.ca/documents/AudioFormats/WAVE/WAVE.html
was used as a spec for files. We implement a limited subset
of the WAVE format. We assume a RIFF chunk contains a fmt
chunk and then a data chunk, and do not read past that.
We also will only open WAVE_FORMAT_PCM files.
At the end of this constructor. self.wav_file will be positioned
at the first byte of audio data in the file."""
lame = os.path.dirname(os.path.abspath(__file__)) + "/lame"
original_name = filename
self.wav_file = open( filename, "r" )
#try to use lame to convert
self.workingdir = tempfile.mkdtemp()
if not self.__is_wave_file( self.wav_file ):
self.wav_file.close()
canonical_form = self.workingdir + str( time.time() )
#make sure the filename has a ".mp3" extension before sending to lame
if filename[-4:] != ".mp3" :
# create a copy of the file in
temp_file_name = self.workingdir + str(time.time()) + ".mp3"
shutil.copyfile( filename, temp_file_name )
filename = temp_file_name
# Use lame to make a wav representation of the mp3 file to be analyzed
lame = [lame, '--silent', '--decode', filename, canonical_form]
return_code = subprocess.call(lame, shell=False)
if return_code != 0 or not os.path.exists(canonical_form):
raise IOError("{f} 's format is not supported".format(f=original_name))
# At this point, we should be confident that "lame" create a correct WAVE file
self.wav_file = open( canonical_form , "r")
# At this point, audio file should have the canonical form(WAVE)
self.wav_file.seek(4,0)
riff_size = InputFile.__read_size(self.wav_file)
self.wav_file.seek(16,0)
fmt_chunk_size = InputFile.__read_size(self.wav_file)
fmt_data = self.wav_file.read(fmt_chunk_size)
# get some info from the file header
self.channels = self.__read_ushort(fmt_data[2:4])
self.sample_rate = self.__read_uint(fmt_data[4:8])
self.block_align = self.__read_ushort(fmt_data[12:14])
self.wav_file.seek( 40 , 0 )
self.data_chunk_size = InputFile.__read_size(self.wav_file)
self.total_samples = (self.data_chunk_size / self.block_align)
@staticmethod
def __is_wave_file(file):
if ( not InputFile.__check_riff_format(file) ):
return False
if ( not InputFile.__check_wave_id(file) ):
return False
if ( not InputFile.__check_fmt(file) ):
return False
file.seek( 20 )
data = file.read( 2 )
file.seek( 0 )
if( not InputFile.__check_fmt_valid(data) ):
return False
return InputFile.__check_data(file)
@staticmethod
def __check_riff_format(file):
RIFF = file.read(4)
file.seek(0)
return RIFF == "RIFF"
@staticmethod
def __check_wave_id(file):
file.seek(8)
WAVE = file.read(4)
file.seek(0)
return WAVE == "WAVE"
@staticmethod
def __check_fmt(file):
file.seek(12)
fmt = file.read(4)
file.seek(0)
return fmt == "fmt "
@staticmethod
def __check_data(file):
file.seek(36)
data = file.read(4)
file.seek(0)
return data == "data"
@staticmethod
def __check_fmt_valid(data):
format_tag = InputFile.__read_ushort(data[0:2])
return format_tag == 1
@staticmethod
def __read_size(file):
"""Read a 4 byte uint from the file."""
return InputFile.__read_uint(file.read(4))
@staticmethod
def __read_ushort(data):
"""Turn a 2-byte little endian number into a Python number."""
return struct.unpack("<H", data)[0]
@staticmethod
def __read_uint(data):
"""Turn a 4-byte little endian number into a Python number."""
return struct.unpack("<I", data)[0]
def get_audio_samples(self, n):
"""Get n audio samples from each channel.
Returns an array of arrays. There will be one
array for each channel, each with n samples in it.
If we encounter end of file, we may return less than
n samples. If we were already at end of file, we raise
EOFError.
This function assumes that self.wav_file is positioned
at the place in the file you want to read from."""
data = np.fromfile(self.wav_file, dtype=np.int16, count=n*self.channels)
result = np.zeros((self.channels, n), dtype=int)
for c in range(self.channels):
result[c] = data[c::self.channels]
return result
def get_channels(self):
"""Returns the number of channels in the file."""
return self.channels
def get_block_align(self):
"""Returns the number of bytes used in the file
to represent a sample, multiplied by the number of channels."""
return self.block_align
def get_sample_rate(self):
"""Returns the numbers of samples per second, per channel."""
return self.sample_rate
def get_total_samples(self):
"""Returns the total number of samples per channel."""
return self.total_samples
def close(self):
"""Close the input file."""
self.wav_file.close()
#Delete temporary working directory and its contents.
shutil.rmtree(self.workingdir)