Skip to content

Commit

Permalink
embed audio capture time into audio samples stacks in order to remove…
Browse files Browse the repository at this point in the history
… issues with time offset due to latency
  • Loading branch information
nicolas-f committed Aug 22, 2023
1 parent f731d87 commit acd8dc2
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 30 deletions.
21 changes: 17 additions & 4 deletions services/ansible_openvpn/playbooks/update_noisesensor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,11 @@
dest: /home/pi/noisesensor/
single_branch: yes
version: yamnet_trigger
- name: Restart noisesensor services
- name: Stop noisesensor services
become: true
ansible.builtin.systemd:
name: "{{ item }}"
daemon_reload: true
enabled: true
state: restarted
state: stopped
with_items:
- zeroble.service
- zerogps.service
Expand All @@ -22,3 +20,18 @@
- zerostorage.service
- zerostorage_indicators.service
- zerotrigger.service
- name: Start noisesensor services
become: true
ansible.builtin.systemd:
name: "{{ item }}"
daemon_reload: true
enabled: true
state: started
with_items:
- zerogps.service
- zerostorage.service
- zerostorage_indicators.service
- zerorecord.service
- zeroindicators.service
- zerotrigger.service
- zeroble.service
26 changes: 14 additions & 12 deletions services/zero_indicators.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import datetime
import os.path
import struct
import time
from noisesensor.spectrumchannel import SpectrumChannel, compute_leq
import sys
Expand Down Expand Up @@ -76,7 +77,6 @@ def __init__(self, config):
self.config = config
self.epoch = datetime.datetime.utcfromtimestamp(0)
self.total_read = 0
self.current_stack_time = 0
self.filter_config = json.load(open(self.config.configuration_file,
"r"))
self.channel = SpectrumChannel(self.filter_config, use_scipy=False,
Expand Down Expand Up @@ -112,22 +112,25 @@ def run(self):
current_window = np.zeros(shape=window_samples, dtype=np.single)
cursor = 0
while True:
audio_data_bytes = np.frombuffer(self.socket.recv(),
time_bytes, audio_data_bytes = self.socket.recv_multipart()
audio_data_samples = np.frombuffer(audio_data_bytes,
dtype=np.single)
if self.current_stack_time == 0:
self.current_stack_time = time.time() - len(audio_data_bytes)\
/ self.filter_config["configuration"]["sample_rate"]
frame_time = struct.unpack("d", time_bytes)[0]
first_stack_time = frame_time
buffer_cursor = 0
while buffer_cursor < len(audio_data_bytes):
while buffer_cursor < len(audio_data_samples):
# Process part of samples to fit configured windows
to_read = min(window_samples-cursor, len(audio_data_bytes)
to_read = min(window_samples-cursor, len(audio_data_samples)
- buffer_cursor)
current_window[cursor:cursor+to_read] = \
audio_data_bytes[buffer_cursor:buffer_cursor+to_read]
audio_data_samples[buffer_cursor:buffer_cursor+to_read]
cursor += to_read
buffer_cursor += to_read
self.total_read += to_read
if cursor == window_samples:
window_time = frame_time + buffer_cursor / \
self.filter_config["configuration"][
"sample_rate"]
# analysis window filled
cursor = 0
lzeq = round(compute_leq(current_window) + db_delta, 2)
Expand All @@ -147,20 +150,19 @@ def run(self):
round(lzeq + db_delta, 2))
if self.config.output_time:
self.current_stack_dict["timestamps"].append(
round(time.time() - window_samples/sample_rate, 3))
round(window_time, 3))
self.stack_count += 1
if self.stack_count == self.config.output_stack:
# stack of noise indicator complete
# send the full document
self.stack_count = 0
self.current_stack_dict[
"date_start"] = epoch_to_elasticsearch_date(
self.current_stack_time)
first_stack_time)
self.current_stack_dict[
"date_end"] = epoch_to_elasticsearch_date(
time.time())
window_time)
self.socket_out.send_json(self.current_stack_dict)
self.current_stack_time = 0
fields, stack_dict = generate_stack_dict(
self.filter_config, self.config.output_time)
self.current_stack_dict = stack_dict
Expand Down
4 changes: 2 additions & 2 deletions services/zero_play.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import sys
import zmq
import argparse

import struct

def main():
parser = argparse.ArgumentParser(description='This program read audio stream from zeromq and push to stdout',
Expand All @@ -24,7 +24,7 @@ def main():
out_buffer = sys.stdout.buffer

while True:
audio_data_bytes = socket.recv()
time_bytes, audio_data_bytes = socket.recv_multipart()
out_buffer.write(audio_data_bytes)


Expand Down
8 changes: 5 additions & 3 deletions services/zero_record.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import os
import collections
import threading
import struct

ALERT_STACK_BYTES = 960000
ALERT_DELAY = 5.0
Expand All @@ -56,7 +57,7 @@ def __init__(self, args):
self.last_warning = 0

def push_bytes(self, samples_bytes):
self.samples_queue.append(samples_bytes)
self.samples_queue.append([time.time(), samples_bytes])
sum_bytes = sum([len(element) for element in self.samples_queue])
if sum_bytes > ALERT_STACK_BYTES and \
time.time() - self.last_warning > ALERT_DELAY:
Expand All @@ -75,8 +76,9 @@ def run(self):
print(address)
while self.args.running or len(self.samples_queue) > 0:
while len(self.samples_queue) > 0:
audio_data_bytes = self.samples_queue.popleft()
socket.send(audio_data_bytes)
capture_time, audio_data_bytes = self.samples_queue.popleft()
socket.send_multipart([struct.pack("d", capture_time),
audio_data_bytes])
time.sleep(0.05)


Expand Down
21 changes: 12 additions & 9 deletions services/zero_trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
import tflite_runtime.interpreter as tflite
import resampy
from importlib.resources import files

import struct

class Params:
"""
Expand Down Expand Up @@ -168,6 +168,7 @@ class TriggerProcessor:
"""

def __init__(self, config):
self.frame_time = 0
self.config = config
self.total_read = 0 # Total audio samples read
self.sample_rate = self.config.sample_rate
Expand Down Expand Up @@ -251,16 +252,18 @@ def fetch_audio_data(self, feed_cache=True):
if feed_cache:
if self.samples_stack is None:
self.samples_stack = collections.deque()
audio_data_bytes = np.frombuffer(self.socket.recv(), dtype=np.single)
self.total_read += len(audio_data_bytes)
time_bytes, audio_data_bytes = self.socket.recv_multipart()
audio_data_samples = np.frombuffer(audio_data_bytes, dtype=np.single)
self.frame_time = struct.unpack("d", time_bytes)[0]
self.total_read += len(audio_data_samples)
if feed_cache:
self.samples_stack.append(audio_data_bytes)
self.samples_stack.append(audio_data_samples)
# will keep keep_only_samples samples, and drop older stack elements
keep_only_samples = max(self.config.cached_length, self.yamnet_config.patch_window_seconds) * \
self.config.sample_rate
while sum([len(s) for s in self.samples_stack]) > keep_only_samples + len(audio_data_bytes):
while sum([len(s) for s in self.samples_stack]) > keep_only_samples + len(audio_data_samples):
self.samples_stack.popleft()
return audio_data_bytes
return audio_data_samples

def run(self):
reference_pressure = 1 / 10 ** (
Expand All @@ -271,7 +274,6 @@ def run(self):
document = {}
processing_time = 0
while True:
cur_time = time.time()
if last_day_of_year != datetime.datetime.now().timetuple().tm_yday\
and "trigger_count" in self.config:
# reset trigger counter each day
Expand Down Expand Up @@ -300,7 +302,7 @@ def run(self):
if self.yamnet_samples_index < len(self.yamnet_samples):
# window is not complete so wait for more samples
continue
self.yamnet_samples_index = 0 # reset index
self.yamnet_samples_index = 0 # reset index
sum_samples = np.sum(
np.power(waveform / reference_pressure, 2.0))
leq = 10 * math.log10(sum_samples / len(waveform))
Expand Down Expand Up @@ -367,7 +369,8 @@ def run(self):
"scores_perc": scores_percentage,
"scores_time": threshold_time,
"leq": round(leq, 2),
"date": epoch_to_elasticsearch_date(cur_time)}
"date": epoch_to_elasticsearch_date(
self.frame_time)}
if self.config.add_spectrogram:
document["spectrogram"] = base64.b64encode(
spectrogram.astype(np.float16).
Expand Down

0 comments on commit acd8dc2

Please sign in to comment.