-
Notifications
You must be signed in to change notification settings - Fork 0
/
aviso-extremes-dt.py
57 lines (48 loc) · 1.66 KB
/
aviso-extremes-dt.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
from pprint import pprint as pp
from pyaviso import NotificationManager, user_config
# Constants
LISTENER_EVENT = "data" # Event for the listener, options are mars and dissemination
TRIGGER_TYPE = "echo" # Type of trigger for the listener
REQUEST = {
"class": "d1",
"expver": "0001",
"stream": "wave",
"step": [1, 2, 3],
"levtype": "sfc",
"type": "fc",
} # Request configuration for the listener
CONFIG = {
"notification_engine": {
"host": "aviso.lumi.apps.dte.destination-earth.eu",
"port": 443,
"https": True,
},
"configuration_engine": {
"host": "aviso.lumi.apps.dte.destination-earth.eu",
"port": 443,
"https": True,
},
"schema_parser": "generic",
"remote_schema": True,
"auth_type": "none",
} # manually defined configuration
def create_listener():
"""
Creates and returns a listener configuration.
"""
trigger = {"type": TRIGGER_TYPE} # Define the trigger for the listener
# Return the complete listener configuration
return {"event": LISTENER_EVENT, "request": REQUEST, "triggers": [trigger]}
def main():
try:
listener = create_listener() # Create listener configuration
listeners_config = {"listeners": [listener]} # Define listeners configuration
config = user_config.UserConfig(**CONFIG)
print("loaded config:")
pp(CONFIG)
nm = NotificationManager() # Initialize the NotificationManager
nm.listen(listeners=listeners_config, config=config) # Start listening
except Exception as e:
print(f"Failed to initialize the Notification Manager: {e}")
if __name__ == "__main__":
main()