diff --git a/audiobuffer.py b/audiobuffer.py index e208767..0f1d00b 100644 --- a/audiobuffer.py +++ b/audiobuffer.py @@ -1,5 +1,3 @@ -""" AudioBuffer class for storing and analyzing audio samples. """"" -from collections import deque import numpy as np from pyloudnorm import Meter from av import AudioFrame @@ -7,71 +5,58 @@ BUFFER_LENGTH_SECONDS = 1 SAMPLE_RATE = 48000 +NUM_CHANNELS = 2 BUFFER_SIZE = BUFFER_LENGTH_SECONDS * SAMPLE_RATE +SAMPLE_FORMATS = { + 's16': (np.int16, np.iinfo(np.int16).max), + 's32': (np.int32, np.iinfo(np.int32).max), + 'fltp': (np.float32, 1.0), + 'dbl': (np.float64, 1.0) +} + def _rms(buffer): return 20 * np.log10(np.sqrt(np.mean(np.square(buffer)))) def _samples_to_numpy(frame: AudioFrame, channel: int): - dtype, scale_factor = _get_dtype_and_scale_factor(frame.format.name) + dtype, scale_factor = SAMPLE_FORMATS.get(frame.format.name, (None, None)) + if dtype is None: + raise ValueError(f"Unsupported sample format: {frame.format.name}") audio_samples = np.frombuffer(frame.planes[channel], dtype) if scale_factor != 1.0: audio_samples = audio_samples.astype(np.float32) / scale_factor return audio_samples -def _get_dtype_and_scale_factor(sample_format): - if sample_format == 's16': - dtype = np.int16 - scale_factor = np.iinfo(dtype).max - elif sample_format == 's32': - dtype = np.int32 - scale_factor = np.iinfo(dtype).max - elif sample_format == 'fltp': - dtype = np.float32 - scale_factor = 1.0 - elif sample_format == 'dbl': - dtype = np.float64 - scale_factor = 1.0 - else: - raise ValueError(f"Unsupported sample format: {sample_format}") - return dtype, scale_factor - - class AudioBuffer(): """ A circular buffer for audio samples. Includes LUFS and dBFS analysis. """ - left_buffer = deque(maxlen=BUFFER_SIZE) - right_buffer = deque(maxlen=BUFFER_SIZE) - meter = Meter(SAMPLE_RATE) + + def __init__(self): + self.buffer = np.zeros((NUM_CHANNELS, BUFFER_SIZE), dtype=np.float32) + self.meter = Meter(SAMPLE_RATE) def append(self, frame: AudioFrame): """ Appends a frame to the buffer.""" - # Only stereo supported for now assert len(frame.planes) == 2 - self.left_buffer.extend(_samples_to_numpy(frame, 0)) - self.right_buffer.extend(_samples_to_numpy(frame, 1)) + num_samples = frame.samples + + self.buffer[0] = np.roll(self.buffer[0], shift=-num_samples) + self.buffer[0][-num_samples:] = _samples_to_numpy(frame, 0) + self.buffer[1] = np.roll(self.buffer[1], shift=-num_samples) + self.buffer[1][-num_samples:] = _samples_to_numpy(frame, 1) def lufs(self): """ Returns the LUFS value for the given channel. """ - - if (len(self.left_buffer) < BUFFER_SIZE or len(self.right_buffer) < BUFFER_SIZE): - log.info( - f"Buffer {len(self.left_buffer)} too small to calculate LUFS") - return 0 - log.debug("Calculating LUFS") - left_array = np.array(self.left_buffer) - right_array = np.array(self.right_buffer) - return self.meter.integrated_loudness(np.array([left_array, right_array]).T) + return self.meter.integrated_loudness(self.buffer.T) def dbfs(self, channel): """ Returns the RMS value in dBFS for the given channel. """ if channel == 0: - return _rms(self.left_buffer) + return _rms(self.buffer[0]) if channel == 1: - return _rms(self.right_buffer) - + return _rms(self.buffer[1]) raise ValueError(f"Unsupported channel: {channel}") diff --git a/logger.py b/logger.py index ec170cd..2fe8536 100644 --- a/logger.py +++ b/logger.py @@ -6,6 +6,11 @@ if sys.stdout.isatty(): structlog.configure( processors=[ + structlog.processors.CallsiteParameterAdder( + [structlog.processors.CallsiteParameter.FILENAME, + structlog.processors.CallsiteParameter.FUNC_NAME, + structlog.processors.CallsiteParameter.LINENO], + ), structlog.stdlib.add_logger_name, structlog.stdlib.add_log_level, structlog.stdlib.PositionalArgumentsFormatter(), diff --git a/ts_probe.py b/ts_probe.py index 90152f3..e9afeff 100644 --- a/ts_probe.py +++ b/ts_probe.py @@ -12,8 +12,6 @@ prom = Prometheus() DEFAULT_VIDEO_URL = 'http://simula.frikanalen.no:9094/frikanalen.ts' PROMETHEUS_PORT = 8000 -BUFFER_SIZE = 100 - # MPEG-2 transport stream URL url = os.environ.get('VIDEO_URL', None) @@ -23,40 +21,52 @@ audio_buffer = AudioBuffer() video_buffer = VideoBuffer() -padding_packet_count: int = 0 -total_bytes_received: int = 0 START_TIME = time.time() # Start Prometheus HTTP server prom.listen(PROMETHEUS_PORT) -log.info("Opening stream: %s" % url) -stream = av.open(url) - -while True: - log.info("Stream is open") - try: - for frame in stream.decode(): - if isinstance(frame, VideoFrame): - prom.video_frame_count.inc() - video_buffer.append(frame) - prom.video_brightness_gauge.set(video_buffer.avg_brightness) - prom.motion_gauge.set(video_buffer.motion) - - elif isinstance(frame, AudioFrame): - audio_buffer.append(frame) - prom.audio_amplitude_lufs_gauge.set(audio_buffer.lufs()) - prom.audio_amplitude_dbfs_gauge.set(audio_buffer.dbfs(0)) - - except av.AVError as e: - log.error(e) - prom.decode_error_count.inc() - stream = av.open(url) - - except KeyboardInterrupt: - log.info("Keyboard interrupt") - break - - except Exception as e: - log.error(e) - break + +def run(): + VIDEO_PROBE_INTERVAL = 10 + AUDIO_PROBE_INTERVAL = 10 + log.info("Opening stream: %s" % url) + stream = av.open(url) + while True: + log.info("Stream is open") + try: + for frame in stream.decode(): + if isinstance(frame, VideoFrame): + prom.video_frame_count.inc() + video_buffer.append(frame) + + VIDEO_PROBE_INTERVAL -= 1 + if VIDEO_PROBE_INTERVAL == 0: + prom.video_brightness_gauge.set( + video_buffer.avg_brightness) + prom.motion_gauge.set( + video_buffer.motion) + VIDEO_PROBE_INTERVAL = 20 + + elif isinstance(frame, AudioFrame): + audio_buffer.append(frame) + AUDIO_PROBE_INTERVAL -= 1 + + if AUDIO_PROBE_INTERVAL == 0: + prom.audio_amplitude_lufs_gauge.set( + audio_buffer.lufs()) + prom.audio_amplitude_dbfs_gauge.set( + audio_buffer.dbfs(0)) + AUDIO_PROBE_INTERVAL = 20 + + except av.AVError as e: + log.error(e) + prom.decode_error_count.inc() + stream = av.open(url) + + except KeyboardInterrupt: + log.info("Keyboard interrupt") + break + + +run() diff --git a/videobuffer.py b/videobuffer.py index 00ef0d1..1f6f257 100644 --- a/videobuffer.py +++ b/videobuffer.py @@ -1,56 +1,79 @@ """ Video analysis """ from logger import log -from collections import deque import numpy as np import numpy.typing as npt from av import VideoFrame -from itertools import islice BUFFER_SIZE = 10 FRAME_RATE = 50 -def _motion(buffer: deque[npt.NDArray[np.uint8]]): - """ Returns the average motion between the given frames. """ - if len(buffer) < 2: - log.warning("Buffer too small to calculate motion") - return 0 +def compute_frame_difference(frame1: npt.NDArray[np.uint8], frame2: npt.NDArray[np.uint8]) -> float: + """Compute the difference between two frames.""" + diff = np.subtract(frame1, frame2, dtype=np.int16) + return np.sum(np.abs(diff)) - frame_diffs = [ - np.mean(abs(buffer[i].astype(np.int16) - frame).astype(np.uint8)) - for i, frame in enumerate(islice(buffer, 1, None)) - ] - # Return the mean of the absolute differences - return np.mean(frame_diffs) / 255 +def compute_frame_brightness(frame: npt.NDArray[np.uint8]) -> float: + """Compute the brightness of a frame.""" + return np.sum(frame) -def _avg_brightness(buffer: deque[npt.NDArray[np.uint8]]): - """ Returns the average brightness of the given frames. """ - - if len(buffer) == 0: - log.warning("Buffer empty, returning 0") - return 0 +class VideoBuffer(): + height = None + width = None - return np.mean(buffer) / 255 + """ A buffer for video frames. Contains analysis functions. """ + def __init__(self): + self.buffer_size = BUFFER_SIZE * FRAME_RATE + self.video_buffer = np.empty( + (self.buffer_size, 1, 1), dtype=np.uint8) + self.index = 0 + self.total_brightness = 0.0 + self.total_motion = 0.0 -class VideoBuffer(): - """ A circular buffer for video frames. - Contains analysis functions. """ - video_buffer = deque(maxlen=BUFFER_SIZE * FRAME_RATE) + def _initialize_buffer(self, height, width): + """ Reinitialize the video buffer with a new resolution. """ + self.video_buffer = np.empty( + (self.buffer_size, height, width), dtype=np.uint8) + self.index = 0 + self.height = height + self.width = width + log.info( + "Initialized video buffer with resolution: {}x{}".format(height, width)) def append(self, frame: VideoFrame): """ Appends a frame to the buffer. """ - self.video_buffer.append(frame.reformat( - format='gray8').to_ndarray().astype(np.uint8)) + new_frame = frame.reformat(format='gray8').to_ndarray() + frame_height, frame_width = new_frame.shape + + # Check if the resolution of the incoming frame matches the buffer's resolution + if frame_height != self.height or frame_width != self.width: + self._initialize_buffer(frame_height, frame_width) + + # Adjust total brightness + self.total_brightness -= compute_frame_brightness( + self.video_buffer[self.index]) + self.total_brightness += compute_frame_brightness(new_frame) + + # Adjust total motion if there's a previous frame + if self.index > 0: + prev_index = self.index - 1 + else: + prev_index = self.buffer_size - 1 + self.total_motion -= compute_frame_difference( + self.video_buffer[prev_index], self.video_buffer[self.index]) + self.total_motion += compute_frame_difference( + new_frame, self.video_buffer[prev_index]) + + self.video_buffer[self.index] = new_frame + self.index = (self.index + 1) % self.buffer_size @property def avg_brightness(self): - """ Returns the average brightness of the frames in the buffer. """ - return _avg_brightness(self.video_buffer) + return self.total_brightness / (self.buffer_size * self.height * self.width) @property def motion(self): - """ Returns the average motion between the frames in the buffer. """ - return _motion(self.video_buffer) + return self.total_motion / (self.buffer_size * self.height * self.width)