-
Notifications
You must be signed in to change notification settings - Fork 0
/
tweet.py
124 lines (94 loc) · 3.18 KB
/
tweet.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import datetime
import shelve
import sys
from io import BytesIO
from time import strftime
from twython import Twython
from mastodon import Mastodon
import jsonpath_rw_ext as jp
import requests
from config import *
def sensor_json(url):
return requests.get(url).json()
def sensor_item(sensor_json_list, sensor_id):
return [item for item in sensor_json_list if item['sensor']['id'] == sensor_id][-1]
def extract_value_from_json_item(json, key):
query = '$.sensordatavalues[?(value_type="{}")].value'.format(key)
matches = jp.match(query, json)
return float(matches[-1])
def read_last_tweet_ts():
db = shelve.open(conf_storage)
if not 'tweet_ts' in db:
db['tweet_ts'] = datetime.datetime(year=1970, month=1, day=1)
tweet_ts = db['tweet_ts']
db.close()
return tweet_ts
def write_last_tweet_ts(tweet_ts):
db = shelve.open(conf_storage)
db['tweet_ts'] = tweet_ts
db.close()
def quiet_period_exceeded():
return datetime.datetime.now() - datetime.timedelta(hours=conf_quiet_period_in_hours) > read_last_tweet_ts()
def update_quiet_period():
write_last_tweet_ts(datetime.datetime.now())
if not quiet_period_exceeded():
print("noop, within quiet period")
sys.exit(0)
pm_sensor_item = sensor_item(
sensor_json(conf_url_pm_sensor),
conf_particle_sensor_id)
th_sensor_item = sensor_item(
sensor_json(conf_url_th_sensor),
conf_temperature_sensor_id)
value_pm100 = extract_value_from_json_item(pm_sensor_item, "P1")
value_pm025 = extract_value_from_json_item(pm_sensor_item, "P2")
value_temperature = extract_value_from_json_item(th_sensor_item, "temperature")
value_humidity = extract_value_from_json_item(th_sensor_item, "humidity")
print([value_pm100, value_pm025, value_temperature, value_humidity])
current_time = strftime("%d.%m.%Y %H:%M:%S")
message = '''
{}
⚠ PM10: {}µg/m³, PM2.5: {}µg/m³, {}°C, RH:{}% ({})
Details: {}
This is a #bot. Code available on github aschuma/air_tweets
'''.format(
conf_msg_preamble,
value_pm100,
value_pm025,
value_temperature,
value_humidity,
current_time,
conf_luftdaten_map_url
)
print(message)
if value_pm100 < conf_limit_pm_10_0:
print("noop, limit not exceeded, current=",
value_pm100, " limit=", conf_limit_pm_10_0)
sys.exit(0)
imageBytes = requests.get(conf_luftdaten_graph_url).content
if twitter_enabled:
twitter = Twython(
twitter_consumer_key,
twitter_consumer_secret,
twitter_access_token,
twitter_access_token_secret)
twitter.verify_credentials()
twitter_upload_response = twitter.upload_media(
media=BytesIO(imageBytes))
twitter.update_status(
status=message,
media_ids=[twitter_upload_response['media_id']])
print("Twitter: Done")
if mastodon_enabled:
mastodon = Mastodon(
access_token=mastodon_access_token,
api_base_url=mastodon_api_base_url)
mastodon_upload_response = mastodon.media_post(
media_file=BytesIO(imageBytes),
mime_type=conf_luftdaten_graph_mime_type)
mastodon.status_post(
status=message,
media_ids=[mastodon_upload_response])
print("Mastodon: Done")
update_quiet_period()
print("Done")