Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor non-API routes into separate files. #2082

Merged
merged 7 commits into from
Oct 7, 2024
Empty file added anthias_app/__init__.py
Empty file.
94 changes: 94 additions & 0 deletions anthias_app/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import uuid
import yaml
from datetime import datetime
from flask import render_template
from os import getenv, path

from lib import assets_helper, db
from lib.github import is_up_to_date
from lib.utils import get_video_duration
from settings import settings


def template(template_name, **context):
"""
This is a template response wrapper that shares the
same function signature as Flask's render_template() method
but also injects some global context."""

# Add global contexts
context['date_format'] = settings['date_format']
context['default_duration'] = settings['default_duration']
context['default_streaming_duration'] = (
settings['default_streaming_duration'])
context['template_settings'] = {
'imports': ['from lib.utils import template_handle_unicode'],
'default_filters': ['template_handle_unicode'],
}
context['up_to_date'] = is_up_to_date()
context['use_24_hour_clock'] = settings['use_24_hour_clock']

return render_template(template_name, context=context)


def prepare_default_asset(**kwargs):
if kwargs['mimetype'] not in ['image', 'video', 'webpage']:
return

asset_id = 'default_{}'.format(uuid.uuid4().hex)
duration = (
int(get_video_duration(kwargs['uri']).total_seconds())
if "video" == kwargs['mimetype']
else kwargs['duration']
)

return {
'asset_id': asset_id,
'duration': duration,
'end_date': kwargs['end_date'],
'is_active': 1,
'is_enabled': True,
'is_processing': 0,
'mimetype': kwargs['mimetype'],
'name': kwargs['name'],
'nocache': 0,
'play_order': 0,
'skip_asset_check': 0,
'start_date': kwargs['start_date'],
'uri': kwargs['uri']
}


def add_default_assets():
settings.load()

datetime_now = datetime.now()
default_asset_settings = {
'start_date': datetime_now,
'end_date': datetime_now.replace(year=datetime_now.year + 6),
'duration': settings['default_duration']
}

default_assets_yaml = path.join(
getenv('HOME'), '.screenly/default_assets.yml')

with open(default_assets_yaml, 'r') as yaml_file:
default_assets = yaml.safe_load(yaml_file).get('assets')
with db.conn(settings['database']) as conn:
for default_asset in default_assets:
default_asset_settings.update({
'name': default_asset.get('name'),
'uri': default_asset.get('uri'),
'mimetype': default_asset.get('mimetype')
})
asset = prepare_default_asset(**default_asset_settings)
if asset:
assets_helper.create(conn, asset)


def remove_default_assets():
settings.load()
with db.conn(settings['database']) as conn:
for asset in assets_helper.read(conn):
if asset['asset_id'].startswith('default_'):
assets_helper.delete(conn, asset['asset_id'])
266 changes: 266 additions & 0 deletions anthias_app/views.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
import ipaddress
import logging
import psutil
from datetime import timedelta
from flask import Blueprint, request
from hurry.filesize import size
from os import getenv, statvfs
from platform import machine
from urllib.parse import urlparse

from anthias_app.helpers import (
add_default_assets,
remove_default_assets,
template,
)
from lib import (
diagnostics,
raspberry_pi_helper,
)
from lib.auth import authorized
from lib.utils import (
connect_to_redis,
get_balena_supervisor_version,
get_node_ip,
get_node_mac_address,
is_balena_app,
is_demo_node,
is_docker,
)
from settings import (
CONFIGURABLE_SETTINGS,
DEFAULTS,
settings,
ZmqPublisher,
)

r = connect_to_redis()
anthias_app_bp = Blueprint('anthias_app', __name__)


@anthias_app_bp.route('/')
@authorized
def index():
player_name = settings['player_name']
my_ip = urlparse(request.host_url).hostname
is_demo = is_demo_node()
balena_uuid = getenv("BALENA_APP_UUID", None)

ws_addresses = []

if settings['use_ssl']:
ws_addresses.append('wss://' + my_ip + '/ws/')
else:
ws_addresses.append('ws://' + my_ip + '/ws/')

if balena_uuid:
ws_addresses.append(
'wss://{}.balena-devices.com/ws/'.format(balena_uuid))

return template(
'index.html',
ws_addresses=ws_addresses,
player_name=player_name,
is_demo=is_demo,
is_balena=is_balena_app(),
)


@anthias_app_bp.route('/settings', methods=["GET", "POST"])
@authorized
def settings_page():
context = {'flash': None}

if request.method == "POST":
try:
# Put some request variables in local variables to make them
# easier to read.
current_pass = request.form.get('current-password', '')
auth_backend = request.form.get('auth_backend', '')

if (
auth_backend != settings['auth_backend']
and settings['auth_backend']
):
if not current_pass:
raise ValueError(
"Must supply current password to change "
"authentication method"
)
if not settings.auth.check_password(current_pass):
raise ValueError("Incorrect current password.")

prev_auth_backend = settings['auth_backend']
if not current_pass and prev_auth_backend:
current_pass_correct = None
else:
current_pass_correct = (
settings
.auth_backends[prev_auth_backend]
.check_password(current_pass)
)
next_auth_backend = settings.auth_backends[auth_backend]
next_auth_backend.update_settings(current_pass_correct)
settings['auth_backend'] = auth_backend

for field, default in list(CONFIGURABLE_SETTINGS.items()):
value = request.form.get(field, default)

if not value and field in [
'default_duration',
'default_streaming_duration',
]:
value = str(0)
if isinstance(default, bool):
value = value == 'on'

if field == 'default_assets' and settings[field] != value:
if value:
add_default_assets()
else:
remove_default_assets()

settings[field] = value

settings.save()
publisher = ZmqPublisher.get_instance()
publisher.send_to_viewer('reload')
context['flash'] = {
'class': "success",
'message': "Settings were successfully saved.",
}
except ValueError as e:
context['flash'] = {'class': "danger", 'message': e}
except IOError as e:
context['flash'] = {'class': "danger", 'message': e}
except OSError as e:
context['flash'] = {'class': "danger", 'message': e}
else:
settings.load()
for field, default in list(DEFAULTS['viewer'].items()):
context[field] = settings[field]

auth_backends = []
for backend in settings.auth_backends_list:
if backend.template:
html, ctx = backend.template
context.update(ctx)
else:
html = None
auth_backends.append({
'name': backend.name,
'text': backend.display_name,
'template': html,
'selected': (
'selected'
if settings['auth_backend'] == backend.name
else ''
)
})

try:
ip_addresses = get_node_ip().split()
except Exception as error:
logging.warning(f"Error getting IP addresses: {error}")
ip_addresses = ['IP_ADDRESS']

context.update({
'user': settings['user'],
'need_current_password': bool(settings['auth_backend']),
'is_balena': is_balena_app(),
'is_docker': is_docker(),
'auth_backend': settings['auth_backend'],
'auth_backends': auth_backends,
'ip_addresses': ip_addresses,
'host_user': getenv('HOST_USER')
})

return template('settings.html', **context)


@anthias_app_bp.route('/system-info')
@authorized
def system_info():
loadavg = diagnostics.get_load_avg()['15 min']
display_power = r.get('display_power')

# Calculate disk space
slash = statvfs("/")
free_space = size(slash.f_bavail * slash.f_frsize)

# Memory
virtual_memory = psutil.virtual_memory()
memory = {
'total': virtual_memory.total >> 20,
'used': virtual_memory.used >> 20,
'free': virtual_memory.free >> 20,
'shared': virtual_memory.shared >> 20,
'buff': virtual_memory.buffers >> 20,
'available': virtual_memory.available >> 20
}

# Get uptime
system_uptime = timedelta(seconds=diagnostics.get_uptime())

# Player name for title
player_name = settings['player_name']

device_model = raspberry_pi_helper.parse_cpu_info().get('model')

if device_model is None and machine() == 'x86_64':
device_model = 'Generic x86_64 Device'

version = '{}@{}'.format(
diagnostics.get_git_branch(),
diagnostics.get_git_short_hash()
)

return template(
'system-info.html',
player_name=player_name,
loadavg=loadavg,
free_space=free_space,
uptime=system_uptime,
memory=memory,
display_power=display_power,
device_model=device_model,
version=version,
mac_address=get_node_mac_address(),
is_balena=is_balena_app(),
)


@anthias_app_bp.route('/integrations')
@authorized
def integrations():

context = {
'player_name': settings['player_name'],
'is_balena': is_balena_app(),
}

if context['is_balena']:
context['balena_device_id'] = getenv('BALENA_DEVICE_UUID')
context['balena_app_id'] = getenv('BALENA_APP_ID')
context['balena_app_name'] = getenv('BALENA_APP_NAME')
context['balena_supervisor_version'] = get_balena_supervisor_version()
context['balena_host_os_version'] = getenv('BALENA_HOST_OS_VERSION')
context['balena_device_name_at_init'] = getenv(
'BALENA_DEVICE_NAME_AT_INIT')

return template('integrations.html', **context)


@anthias_app_bp.route('/splash-page')
def splash_page():
ip_addresses = []

for ip_address in get_node_ip().split():
ip_address_object = ipaddress.ip_address(ip_address)

if isinstance(ip_address_object, ipaddress.IPv6Address):
ip_addresses.append(f'http://[{ip_address}]')
else:
ip_addresses.append(f'http://{ip_address}')

return template('splash-page.html', ip_addresses=ip_addresses)
Empty file added api/__init__.py
Empty file.
Loading
Loading