Skip to content

Commit

Permalink
Add support for NHI GWO database (#319)
Browse files Browse the repository at this point in the history
* Add support for NHI GWO database

with extraction data

* Add timeout to solve codacy-issues

* docs typos

* Make nhi gwo methods more robust

Add checks for empty empty content
Determine empty lines in get_gwo_wells, to be used in skiprows

* Fix codacy issue

---------

Co-authored-by: Davíd Brakenhoff <d.brakenhoff@artesia-water.nl>
  • Loading branch information
rubencalje and dbrakenhoff authored Jan 31, 2024
1 parent 904f269 commit 30197dd
Show file tree
Hide file tree
Showing 3 changed files with 263 additions and 1 deletion.
6 changes: 6 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,17 @@ jobs:
- name: Run notebooks
if: ${{ github.event_name == 'push' }}
env:
NHI_GWO_USERNAME: ${{ secrets.NHI_GWO_USERNAME}}
NHI_GWO_PASSWORD: ${{ secrets.NHI_GWO_PASSWORD}}
run: |
py.test ./tests -m "not notebooks"
- name: Run tests only
if: ${{ github.event_name == 'pull_request' }}
env:
NHI_GWO_USERNAME: ${{ secrets.NHI_GWO_USERNAME}}
NHI_GWO_PASSWORD: ${{ secrets.NHI_GWO_PASSWORD}}
run: |
py.test ./tests -m "not notebooks"
Expand Down
215 changes: 214 additions & 1 deletion nlmod/read/nhi.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import logging
import os
import io
import requests

import numpy as np
import requests
import pandas as pd
import geopandas as gpd

import rioxarray

from ..dims.resample import structured_da_to_ds
Expand Down Expand Up @@ -173,3 +177,212 @@ def add_buisdrainage(
ds[depth_var] = ds[depth_var] / 100.0

return ds


def get_gwo_wells(
username,
password,
n_well_filters=1_000,
well_site=None,
organisation=None,
status=None,
well_index="Name",
timeout=120,
**kwargs,
):
"""
Get metadata of extraction wells from the NHI GWO database
Parameters
----------
username : str
The username of the NHI GWO database. To retrieve a username and password visit
https://gwo.nhi.nu/register/.
password : str
The password of the NHI GWO database. To retrieve a username and password visit
https://gwo.nhi.nu/register/.
n_well_filters : int, optional
The number of wells that are requested per page. This number determines in how
many pieces the request is split. The default is 1000.
organisation : str, optional
The organisation that manages the wells. If not None, the organisation will be
used to filter the wells. The default is None.
well_site : str, optional
The name of well site the wells belong to. If not None, the well site will be
used to filter the wells. The default is None.
status : str, optional
The status of the wells. If not None, the status will be used to filter the
wells. Possible values are "Active", "Inactive" or "Abandoned". The default is
None.
well_index : str, tuple or list, optional
The column(s) in the resulting GeoDataFrame that is/are used as the index of
this GeoDataFrame. The default is "Name".
timeout : int, optional
The timeout time (in seconds) for requests to the database. The default is
120 seconds.
**kwargs : dict
Kwargs are passed as additional parameters in the request to the database. For
available parameters see https://gwo.nhi.nu/api/v1/download/.
Returns
-------
gdf : geopandas.GeoDataFrame
A GeoDataFrame containing the properties of the wells and their filters.
"""
# zie https://gwo.nhi.nu/api/v1/download/
url = "https://gwo.nhi.nu/api/v1/well_filters/"

page = 1
properties = []
while page is not None:
params = {"format": "csv", "n_well_filters": n_well_filters, "page": page}
if status is not None:
params["well__status"] = status
if organisation is not None:
params["well__organization"] = organisation
if well_site is not None:
params["well__site"] = well_site
params.update(kwargs)

r = requests.get(url, auth=(username, password), params=params, timeout=timeout)
content = r.content.decode("utf-8")
if len(content) == 0:
if page == 1:
msg = "No extraction wells found for the requested parameters"
raise ValueError(msg)
else:
# the number of wells is exactly a multiple of n_well_filters
page = None
continue
lines = content.split("\n")
empty_lines = np.where([set(line) == set(";") for line in lines])[0]
assert len(empty_lines) == 1, "Returned extraction wells cannot be interpreted"
skiprows = list(range(empty_lines[0] + 1)) + [empty_lines[0] + 2]
df = pd.read_csv(io.StringIO(content), skiprows=skiprows, sep=";")
properties.append(df)

if len(df) == n_well_filters:
page += 1
else:
page = None
df = pd.concat(properties)
geometry = gpd.points_from_xy(df.XCoordinate, df.YCoordinate)
gdf = gpd.GeoDataFrame(df, geometry=geometry)
if well_index is not None:
gdf = gdf.set_index(well_index)
return gdf


def get_gwo_measurements(
username,
password,
n_measurements=10_000,
well_site=None,
well_index="Name",
measurement_index=("Name", "DateTime"),
timeout=120,
**kwargs,
):
"""
Get extraction rates and metadata of wells from the NHI GWO database
Parameters
----------
username : str
The username of the NHI GWO database. To retrieve a username and password visit
https://gwo.nhi.nu/register/.
password : str
The password of the NHI GWO database. To retrieve a username and password visit
https://gwo.nhi.nu/register/.
n_measurements : int, optional
The number of measurements that are requested per page, with a maximum of
200,000. This number determines in how many pieces the request is split. The
default is 10,000.
well_site : str, optional
The name of well site the wells belong to. If not None, the well site will be
used to filter the wells. The default is None.
well_index : str, tuple or list, optional
The column(s) in the resulting GeoDataFrame that is/are used as the index of
this GeoDataFrame. The default is "Name".
measurement_index : str, tuple or list, optional, optional
The column(s) in the resulting measurement-DataFrame that is/are used as the
index of this DataFrame. The default is ("Name", "DateTime").
timeout : int, optional
The timeout time (in seconds) of requests to the database. The default is
120 seconds.
**kwargs : dict
Kwargs are passed as additional parameters in the request to the database. For
available parameters see https://gwo.nhi.nu/api/v1/download/.
Returns
-------
measurements : pandas.DataFrame
A DataFrame containing the extraction rates of the wells in the database.
gdf : geopandas.GeoDataFrame
A GeoDataFrame containing the properties of the wells and their filters.
"""
url = "http://gwo.nhi.nu/api/v1/measurements/"
properties = []
measurements = []
page = 1
while page is not None:
params = {
"format": "csv",
"n_measurements": n_measurements,
"page": page,
}
if well_site is not None:
params["filter__well__site"] = well_site
params.update(kwargs)
r = requests.get(url, auth=(username, password), params=params, timeout=timeout)

content = r.content.decode("utf-8")
if len(content) == 0:
if page == 1:
msg = "No extraction rates found for the requested parameters"
raise (ValueError(msg))
else:
# the number of measurements is exactly a multiple of n_measurements
page = None
continue
lines = content.split("\n")
empty_lines = np.where([set(line) == set(";") for line in lines])[0]
assert len(empty_lines) == 2, "Returned extraction rates cannot be interpreted"

# read properties
skiprows = list(range(empty_lines[0] + 1)) + [empty_lines[0] + 2]
nrows = empty_lines[1] - empty_lines[0] - 3
df = pd.read_csv(io.StringIO(content), sep=";", skiprows=skiprows, nrows=nrows)
properties.append(df)

# read measurements
skiprows = list(range(empty_lines[1] + 1)) + [empty_lines[1] + 2]
df = pd.read_csv(
io.StringIO(content),
skiprows=skiprows,
sep=";",
parse_dates=["DateTime"],
dayfirst=True,
)
measurements.append(df)
if len(df) == n_measurements:
page += 1
else:
page = None
measurements = pd.concat(measurements)
# drop columns without measurements
measurements = measurements.loc[:, ~measurements.isna().all()]
if measurement_index is not None:
if isinstance(measurement_index, tuple):
measurement_index = list(measurement_index)
measurements = measurements.set_index(["Name", "DateTime"])
df = pd.concat(properties)
geometry = gpd.points_from_xy(df.XCoordinate, df.YCoordinate)
gdf = gpd.GeoDataFrame(df, geometry=geometry)
if well_index is not None:
gdf = gdf.set_index(well_index)
# drop duplicate properties from multiple pages
gdf = gdf[~gdf.index.duplicated()]
return measurements, gdf
43 changes: 43 additions & 0 deletions tests/test_021_nhi.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import os
import numpy as np
import geopandas as gpd
import tempfile
import nlmod
import pytest
import matplotlib.pyplot as plt

tmpdir = tempfile.gettempdir()

Expand All @@ -20,3 +22,44 @@ def test_buidrainage():
# assert that all locations with a positive conductance also have a specified depth
mask = ds["buisdrain_cond"] > 0
assert np.all(~np.isnan(ds["buisdrain_depth"].data[mask]))


def test_gwo():
username = os.environ["NHI_GWO_USERNAME"]
password = os.environ["NHI_GWO_PASSWORD"]

# download all wells from Brabant Water
wells = nlmod.read.nhi.get_gwo_wells(
username=username, password=password, organisation="Brabant Water"
)
assert isinstance(wells, gpd.GeoDataFrame)

# download extractions from well "13-PP016" of pomping station Veghel
measurements, gdf = nlmod.read.nhi.get_gwo_measurements(
username, password, well_site="veghel", filter__well__name="13-PP016"
)
assert measurements.reset_index()["Name"].isin(gdf.index).all()


@pytest.mark.skip("too slow")
def test_gwo_entire_pumping_station():
username = os.environ["NHI_GWO_USERNAME"]
password = os.environ["NHI_GWO_PASSWORD"]
measurements, gdf = nlmod.read.nhi.get_gwo_measurements(
username,
password,
well_site="veghel",
)
assert measurements.reset_index()["Name"].isin(gdf.index).all()

ncols = 3
nrows = int(np.ceil(len(gdf.index) / ncols))
f, axes = plt.subplots(
nrows=nrows, ncols=ncols, figsize=(10, 10), sharex=True, sharey=True
)
axes = axes.ravel()
for name, ax in zip(gdf.index, axes):
measurements.loc[name, "Volume"].plot(ax=ax)
ax.set_xlabel("")
ax.set_title(name)
f.tight_layout(pad=0.0)

0 comments on commit 30197dd

Please sign in to comment.