-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
4 changed files
with
126 additions
and
103 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,77 +1,62 @@ | ||
""" AudioBuffer class for storing and analyzing audio samples. """"" | ||
from collections import deque | ||
import numpy as np | ||
from pyloudnorm import Meter | ||
from av import AudioFrame | ||
from logger import log | ||
|
||
BUFFER_LENGTH_SECONDS = 1 | ||
SAMPLE_RATE = 48000 | ||
NUM_CHANNELS = 2 | ||
BUFFER_SIZE = BUFFER_LENGTH_SECONDS * SAMPLE_RATE | ||
|
||
SAMPLE_FORMATS = { | ||
's16': (np.int16, np.iinfo(np.int16).max), | ||
's32': (np.int32, np.iinfo(np.int32).max), | ||
'fltp': (np.float32, 1.0), | ||
'dbl': (np.float64, 1.0) | ||
} | ||
|
||
|
||
def _rms(buffer): | ||
return 20 * np.log10(np.sqrt(np.mean(np.square(buffer)))) | ||
|
||
|
||
def _samples_to_numpy(frame: AudioFrame, channel: int): | ||
dtype, scale_factor = _get_dtype_and_scale_factor(frame.format.name) | ||
dtype, scale_factor = SAMPLE_FORMATS.get(frame.format.name, (None, None)) | ||
if dtype is None: | ||
raise ValueError(f"Unsupported sample format: {frame.format.name}") | ||
audio_samples = np.frombuffer(frame.planes[channel], dtype) | ||
if scale_factor != 1.0: | ||
audio_samples = audio_samples.astype(np.float32) / scale_factor | ||
return audio_samples | ||
|
||
|
||
def _get_dtype_and_scale_factor(sample_format): | ||
if sample_format == 's16': | ||
dtype = np.int16 | ||
scale_factor = np.iinfo(dtype).max | ||
elif sample_format == 's32': | ||
dtype = np.int32 | ||
scale_factor = np.iinfo(dtype).max | ||
elif sample_format == 'fltp': | ||
dtype = np.float32 | ||
scale_factor = 1.0 | ||
elif sample_format == 'dbl': | ||
dtype = np.float64 | ||
scale_factor = 1.0 | ||
else: | ||
raise ValueError(f"Unsupported sample format: {sample_format}") | ||
return dtype, scale_factor | ||
|
||
|
||
class AudioBuffer(): | ||
""" A circular buffer for audio samples. Includes LUFS and dBFS analysis. """ | ||
left_buffer = deque(maxlen=BUFFER_SIZE) | ||
right_buffer = deque(maxlen=BUFFER_SIZE) | ||
meter = Meter(SAMPLE_RATE) | ||
|
||
def __init__(self): | ||
self.buffer = np.zeros((NUM_CHANNELS, BUFFER_SIZE), dtype=np.float32) | ||
self.meter = Meter(SAMPLE_RATE) | ||
|
||
def append(self, frame: AudioFrame): | ||
""" Appends a frame to the buffer.""" | ||
# Only stereo supported for now | ||
assert len(frame.planes) == 2 | ||
|
||
self.left_buffer.extend(_samples_to_numpy(frame, 0)) | ||
self.right_buffer.extend(_samples_to_numpy(frame, 1)) | ||
num_samples = frame.samples | ||
|
||
self.buffer[0] = np.roll(self.buffer[0], shift=-num_samples) | ||
self.buffer[0][-num_samples:] = _samples_to_numpy(frame, 0) | ||
self.buffer[1] = np.roll(self.buffer[1], shift=-num_samples) | ||
self.buffer[1][-num_samples:] = _samples_to_numpy(frame, 1) | ||
|
||
def lufs(self): | ||
""" Returns the LUFS value for the given channel. """ | ||
|
||
if (len(self.left_buffer) < BUFFER_SIZE or len(self.right_buffer) < BUFFER_SIZE): | ||
log.info( | ||
f"Buffer {len(self.left_buffer)} too small to calculate LUFS") | ||
return 0 | ||
|
||
log.debug("Calculating LUFS") | ||
left_array = np.array(self.left_buffer) | ||
right_array = np.array(self.right_buffer) | ||
return self.meter.integrated_loudness(np.array([left_array, right_array]).T) | ||
return self.meter.integrated_loudness(self.buffer.T) | ||
|
||
def dbfs(self, channel): | ||
""" Returns the RMS value in dBFS for the given channel. """ | ||
if channel == 0: | ||
return _rms(self.left_buffer) | ||
return _rms(self.buffer[0]) | ||
if channel == 1: | ||
return _rms(self.right_buffer) | ||
|
||
return _rms(self.buffer[1]) | ||
raise ValueError(f"Unsupported channel: {channel}") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,56 +1,79 @@ | ||
""" Video analysis """ | ||
from logger import log | ||
from collections import deque | ||
import numpy as np | ||
import numpy.typing as npt | ||
from av import VideoFrame | ||
from itertools import islice | ||
|
||
BUFFER_SIZE = 10 | ||
FRAME_RATE = 50 | ||
|
||
|
||
def _motion(buffer: deque[npt.NDArray[np.uint8]]): | ||
""" Returns the average motion between the given frames. """ | ||
if len(buffer) < 2: | ||
log.warning("Buffer too small to calculate motion") | ||
return 0 | ||
def compute_frame_difference(frame1: npt.NDArray[np.uint8], frame2: npt.NDArray[np.uint8]) -> float: | ||
"""Compute the difference between two frames.""" | ||
diff = np.subtract(frame1, frame2, dtype=np.int16) | ||
return np.sum(np.abs(diff)) | ||
|
||
frame_diffs = [ | ||
np.mean(abs(buffer[i].astype(np.int16) - frame).astype(np.uint8)) | ||
for i, frame in enumerate(islice(buffer, 1, None)) | ||
] | ||
|
||
# Return the mean of the absolute differences | ||
return np.mean(frame_diffs) / 255 | ||
def compute_frame_brightness(frame: npt.NDArray[np.uint8]) -> float: | ||
"""Compute the brightness of a frame.""" | ||
return np.sum(frame) | ||
|
||
|
||
def _avg_brightness(buffer: deque[npt.NDArray[np.uint8]]): | ||
""" Returns the average brightness of the given frames. """ | ||
|
||
if len(buffer) == 0: | ||
log.warning("Buffer empty, returning 0") | ||
return 0 | ||
class VideoBuffer(): | ||
height = None | ||
width = None | ||
|
||
return np.mean(buffer) / 255 | ||
""" A buffer for video frames. Contains analysis functions. """ | ||
|
||
def __init__(self): | ||
self.buffer_size = BUFFER_SIZE * FRAME_RATE | ||
self.video_buffer = np.empty( | ||
(self.buffer_size, 1, 1), dtype=np.uint8) | ||
self.index = 0 | ||
self.total_brightness = 0.0 | ||
self.total_motion = 0.0 | ||
|
||
class VideoBuffer(): | ||
""" A circular buffer for video frames. | ||
Contains analysis functions. """ | ||
video_buffer = deque(maxlen=BUFFER_SIZE * FRAME_RATE) | ||
def _initialize_buffer(self, height, width): | ||
""" Reinitialize the video buffer with a new resolution. """ | ||
self.video_buffer = np.empty( | ||
(self.buffer_size, height, width), dtype=np.uint8) | ||
self.index = 0 | ||
self.height = height | ||
self.width = width | ||
log.info( | ||
"Initialized video buffer with resolution: {}x{}".format(height, width)) | ||
|
||
def append(self, frame: VideoFrame): | ||
""" Appends a frame to the buffer. """ | ||
self.video_buffer.append(frame.reformat( | ||
format='gray8').to_ndarray().astype(np.uint8)) | ||
new_frame = frame.reformat(format='gray8').to_ndarray() | ||
frame_height, frame_width = new_frame.shape | ||
|
||
# Check if the resolution of the incoming frame matches the buffer's resolution | ||
if frame_height != self.height or frame_width != self.width: | ||
self._initialize_buffer(frame_height, frame_width) | ||
|
||
# Adjust total brightness | ||
self.total_brightness -= compute_frame_brightness( | ||
self.video_buffer[self.index]) | ||
self.total_brightness += compute_frame_brightness(new_frame) | ||
|
||
# Adjust total motion if there's a previous frame | ||
if self.index > 0: | ||
prev_index = self.index - 1 | ||
else: | ||
prev_index = self.buffer_size - 1 | ||
self.total_motion -= compute_frame_difference( | ||
self.video_buffer[prev_index], self.video_buffer[self.index]) | ||
self.total_motion += compute_frame_difference( | ||
new_frame, self.video_buffer[prev_index]) | ||
|
||
self.video_buffer[self.index] = new_frame | ||
self.index = (self.index + 1) % self.buffer_size | ||
|
||
@property | ||
def avg_brightness(self): | ||
""" Returns the average brightness of the frames in the buffer. """ | ||
return _avg_brightness(self.video_buffer) | ||
return self.total_brightness / (self.buffer_size * self.height * self.width) | ||
|
||
@property | ||
def motion(self): | ||
""" Returns the average motion between the frames in the buffer. """ | ||
return _motion(self.video_buffer) | ||
return self.total_motion / (self.buffer_size * self.height * self.width) |