forked from beefoo/media-tools
-
Notifications
You must be signed in to change notification settings - Fork 0
/
samples_to_files.py
108 lines (90 loc) · 4.22 KB
/
samples_to_files.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# -*- coding: utf-8 -*-
import argparse
from lib.audio_utils import *
from lib.collection_utils import *
from lib.io_utils import *
from lib.math_utils import *
from lib.processing_utils import *
from multiprocessing import Pool
from multiprocessing.dummy import Pool as ThreadPool
import os
from pprint import pprint
import sys
# input
parser = argparse.ArgumentParser()
parser.add_argument('-in', dest="INPUT_FILE", default="tmp/samples.csv", help="Input file")
parser.add_argument('-dir', dest="MEDIA_DIRECTORY", default="media/sample/", help="Input file")
parser.add_argument('-out', dest="OUTPUT_FILE", default="output/sample_%s.mp3", help="Audio output pattern")
parser.add_argument('-dout', dest="OUTPUT_DATA_FILE", default="output/samples.csv", help="CSV output file")
parser.add_argument('-rvb', dest="REVERB", default=0, type=int, help="Reverberence (0-100)")
parser.add_argument('-mdb', dest="MATCH_DB", default=-9999, type=int, help="Match decibels, -9999 for none")
parser.add_argument('-fadein', dest="FADE_IN", default=0.1, type=float, help="Fade in as a percentage of clip duration")
parser.add_argument('-fadeout', dest="FADE_OUT", default=0.1, type=float, help="Fade out as a percentage of clip duration")
parser.add_argument('-maxd', dest="MAX_DUR", default=-1, type=int, help="Maximum duration in milliseconds, -1 for no limit")
parser.add_argument('-overwrite', dest="OVERWRITE", action="store_true", help="Overwrite existing data?")
parser.add_argument('-index', dest="INDEX_STYLE", action="store_true", help="Filenames should be index style?")
parser.add_argument('-fkey', dest="FILE_KEY", default="", help="Use this key for naming files; blank if use index/default")
parser.add_argument('-threads', dest="THREADS", default=1, type=int, help="Number of threads")
a = parser.parse_args()
# Read files
fieldNames, samples = readCsv(a.INPUT_FILE)
sampleCount = len(samples)
samples = addIndices(samples)
samples = prependAll(samples, ("filename", a.MEDIA_DIRECTORY))
# Make sure output dirs exist
makeDirectories(a.OUTPUT_FILE)
# group by filename
params = groupList(samples, "filename")
fileCount = len(params)
def samplesToFiles(p):
global sampleCount
global a
fn = p["filename"]
samples = p["items"]
audio = getAudio(fn)
audioDurationMs = len(audio)
newSamples = []
samples = sorted(samples, key=lambda s: s["start"])
fsampleCount = len(samples)
print("Creating %s samples for %s..." % (len(samples), fn))
for i, sample in enumerate(samples):
sdur = min(sample["dur"], a.MAX_DUR) if a.MAX_DUR > 0 else sample["dur"]
outfilename = ""
if a.INDEX_STYLE:
outfilename = a.OUTPUT_FILE % zeroPad(sample["index"], sampleCount)
elif len(a.FILE_KEY) > 0:
outfilename = a.OUTPUT_FILE % sample[a.FILE_KEY]
else:
basename = getBasename(fn) + "_" + zeroPad(i+1, fsampleCount+1) + "_" + formatSeconds(sample["start"]/1000.0, separator="-", retainHours=True)
outfilename = a.OUTPUT_FILE % basename
newSample = sample.copy()
newSample["sourceFilename"] = os.path.basename(fn)
newSample["sourceStart"] = sample["start"]
newSample["filename"] = os.path.basename(outfilename)
newSample["id"] = getBasename(outfilename)
newSample["start"] = 0
newSample["dur"] = sdur
newSamples.append(newSample)
if os.path.isfile(outfilename) and not a.OVERWRITE:
continue
clipAudio = getAudioClip(audio, sample["start"], sdur, audioDurationMs)
clipAudio = applyAudioProperties(clipAudio, {
"matchDb": a.MATCH_DB,
"fadeIn": roundInt(sample["dur"] * a.FADE_IN),
"fadeOut": roundInt(sample["dur"] * a.FADE_OUT),
"reverb": a.REVERB
})
format = outfilename.split(".")[-1]
clipAudio.export(outfilename, format=format)
return newSamples
threads = getThreadCount(a.THREADS)
pool = ThreadPool(threads)
data = pool.map(samplesToFiles, params)
pool.close()
pool.join()
if len(a.OUTPUT_DATA_FILE) > 0:
for h in ["id", "sourceFilename", "sourceStart"]:
if h not in fieldNames:
fieldNames.append(h)
newSamples = flattenList(data)
writeCsv(a.OUTPUT_DATA_FILE, newSamples, headings=fieldNames)