This repository has been archived by the owner on Jul 3, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 20
/
local_test_of_function.py
executable file
·114 lines (95 loc) · 4.07 KB
/
local_test_of_function.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import json
import logging
from sqlalchemy import Column, Integer, String, Float, DateTime, Boolean, func
from iotfunctions import bif
from custom.functions import MaximoAssetHTTP
from iotfunctions.metadata import EntityType
from iotfunctions.db import Database
#from iotfunctions.enginelog import EngineLogging
from custom import settings
#EngineLogging.configure_console_logging(logging.DEBUG)
'''
# Replace with a credentials dictionary or provide a credentials
# Explore > Usage > Watson IOT Platform Analytics > Copy to clipboard
# Past contents in a json file.
'''
#with open('credentials_staging.json', encoding='utf-8') as F:
with open('credentials.json', encoding='utf-8') as F:
#with open('credentials_dev.json', encoding='utf-8') as F:
credentials = json.loads(F.read())
'''
Developing Test Pipelines
-------------------------
When creating a set of functions you can test how they these functions will
work together by creating a test pipeline.
'''
'''
Create a database object to access Watson IOT Platform Analytics DB.
'''
db = Database(credentials = credentials)
db_schema = None # set if you are not using the default
'''
To do anything with IoT Platform Analytics, you will need one or more entity type.
You can create entity types through the IoT Platform or using the python API as shown below.
The database schema is only needed if you are not using the default schema. You can also rename the timestamp.
'''
entity_name = settings.ENTITY_NAME or 'buildings'
db_schema = settings.DB_SCHEMA or "BLUADMIN" # None # replace if you are not using the default schema
db.drop_table(entity_name, schema = db_schema)
# Credentials to access Building Insights API.
USERNAME = settings.USERNAME
PASSWORD = settings.PASSWORD
# TENANT_ID = settings.TENANT_ID
URL = settings.URL
entity = EntityType(entity_name, db,
# following columns can be dynamically generated based on meters associated with each asset
Column('building',String(50)),
# Column('energy_value',Float()),
# Column('energy_unit',String(50)),
Column('temperature',Float()),
Column('temperature_unit',String(50)),
MaximoAssetHTTP( username = USERNAME,
password = PASSWORD,
request='GET',
url=URL,
output_item = 'http_preload_done'),
# bif.PythonExpression(expression='df["energy_value"]*df["energy_value"]',
bif.PythonExpression(expression='df["temperature"]',
output_name = 'F'),
**{
'_timestamp' : 'evt_timestamp',
'_db_schema' : db_schema
})
'''
When creating an EntityType object you will need to specify the name of the entity, the database
object that will contain entity data
After creating an EntityType you will need to register it so that it visible in the Add Data to Entity Function UI.
To also register the functions and constants associated with the entity type, specify
'publish_kpis' = True.
'''
entity.register(raise_error=False)
db.register_functions([MaximoAssetHTTP])
'''
To test the execution of kpi calculations defined for the entity type locally
use this function.
A local test will not update the server job log or write kpi data to the AS data
lake. Instead kpi data is written to the local filesystem in csv form.
'''
entity.exec_local_pipeline()
'''
view entity data
'''
# can get entity metadata with following
#
# TODO, for some reason this returns data but also throws exception.
'''
meta = db.get_entity_type(entity_name)
db_schema = meta['schemaName']
table_name = meta['metricTableName']
'''
db_schema = settings.DB_SCHEMA or "BLUADMIN"
table_name = settings.TABLE_NAME or entity_name
df = db.read_table(table_name=table_name, schema=db_schema)
# df = db.read_table(table_name=entity_name, schema=db_schema)
print(df.head())
print ( 'Done registering entity %s ' %db.register_functions([MaximoAssetHTTP]) )