-
-
Notifications
You must be signed in to change notification settings - Fork 1
/
dadjokes.py
111 lines (85 loc) · 2.84 KB
/
dadjokes.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
#!/usr/bin/env python3
"""Dad Jokes from Fatherhood.gov (https://fatherhood.gov/for-dads/dad-jokes)"""
import argparse
import logging
import random
import time
from typing import List
from typing import NamedTuple
from typing import Set
import httpx
import vesta
class Joke(NamedTuple):
opener: str
response: str
def fetch_jokes(limit) -> List[Joke]:
jokes: Set[Joke] = set()
next_url = "https://fatherhood.gov/jsonapi/node/dad_jokes"
while next_url and len(jokes) < limit:
logging.info("Fetching %s", next_url)
resp = httpx.get(next_url)
resp.raise_for_status()
payload = resp.json()
data = payload.get("data", [])
if not data:
break
jokes.update(
Joke(
opener=joke["attributes"]["field_joke_opener"],
response=joke["attributes"]["field_joke_response"],
)
for joke in data
)
next_url = payload.get("links", {}).get("next", {}).get("href")
if len(jokes) < 2:
raise RuntimeError("Failed to load enough jokes (%d)", len(jokes))
return list(jokes)[:limit]
def main(args: argparse.Namespace):
logging.basicConfig(
format="%(asctime)s %(levelname)s %(message)s",
level=logging.INFO,
)
client = vesta.SubscriptionClient(args.key, args.secret)
jokes = fetch_jokes(args.count)
logging.info("Loaded %d jokes", len(jokes))
joke = random.choice(jokes)
last = None
while True:
while joke == last:
joke = random.choice(jokes)
try:
chars = vesta.encode_text(
f"{joke.opener}\n\n{joke.response}",
valign="middle",
)
except ValueError:
# Skip jokes that result in too many characters when encoded.
continue
logging.info("%s", joke)
if args.pprint:
vesta.pprint(chars)
try:
client.send_message(args.sub, chars)
except httpx.HTTPStatusError as e:
logging.error(e)
time.sleep(60)
continue
last = joke
time.sleep(args.interval)
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument("--key", required=True, help="API key")
parser.add_argument("--secret", required=True, help="API secret")
parser.add_argument("--sub", required=True, help="subscription ID")
parser.add_argument("--pprint", action="store_true", help="print the board")
parser.add_argument(
"--count", type=int, default=100, help="number of jokes to fetch"
)
parser.add_argument(
"--interval", type=int, default=10 * 60, help="update interval in seconds"
)
args = parser.parse_args()
main(args)