# -*- python -*-
# -*- coding:utf-8 -*-
#
# Copyright 2020 INRAE-CIRAD
# Distributed under the Cecill-C License.
# See https://cecill.info/licences/Licence_CeCILL-C_V1-en.html
#
# ==============================================================================
import pandas
import numpy as np
import json
import xarray as xr
import matplotlib.pyplot as plt
[docs]def patch_call(instance, func, doc):
class _(type(instance)):
def __call__(self, *arg, **kwarg):
return func(*arg, **kwarg)
instance.__class__ = _
instance.__call__.__func__.__doc__ = doc
[docs]class DSS(object):
def __init__(self, name, meta, models, manager):
self.name = name
self.meta = meta
self.models=models
self._dssm = manager
@property
def name_(self):
return '_'.join(self.name.split('.'))
[docs] def as_package(self):
org = self.meta['organization']
institute = ', '.join([org['name'],org['address'], org['postal_code'], org['city'], org['country']])
metainfo=dict(
license='unknown',
version=self.meta['version'],
authors=', '.join([org['email'], org['url']]),
institutes=institute,
url=self.meta['url'],
description=self.meta['name']
)
package_dict = {k: self.get(k).as_node() for k,v in self.models.items()}
package = dict(name=self.name_,
metainfo=metainfo,
package_dict=package_dict)
return package
[docs] def get(self, model_name="PSILARTEMP"):
"""[Get model]
Parameters
----------
dss : str, optional
[description], by default "no.nibio.vips"
model : str, optional
[description], by default "PSILARTEMP"
"""
if model_name in self.models:
model = Model(self.models[model_name], self.name, self._dssm)
def _model_call(*args, **kwargs):
return kwargs
patch_call(model, _model_call, 'test_doc')
return model
else:
raise ValueError('Model ' + model_name + ' not found in ' + self.name)
[docs]class Model(object):
""" Model Class derived from Hub. It allows to displays informations and run model and plot output
Parameters
----------
Hub : class
Class allows to access IPM catalog and get one model
"""
def __init__(self, model, dss, manager):
"""Init of Model class model
Parameters
----------_
dss : str
id of the dss
model : str
id of the model
"""
self._model = model
self.dss = dss
self._dssm = manager
self._parameters = {p['id'] : p for p in self._dssm._ipm.get_parameter()}
@property
def model_id(self):
return self._model['id']
@property
def model(self):
return self._model['name']
@property
def meta(self):
return {k: v for k, v in self._model.items() if k not in ['input', 'output', 'execution']}
@property
def inputs(self):
inputm = {'parameters': {},
'weather_data': {},
'field_observations': {}}
_input_schema = self._model['execution']['input_schema'].copy()
_input = self._model['input'].copy()
for k in ('modelId', 'weatherData', 'fieldObservations'):
if k in _input_schema['properties']:
_input_schema['properties'].pop(k)
for p in _input_schema['properties'].values():
if isinstance(p, dict):
if 'properties' in p:
inputm['parameters'].update(p['properties'])
else:
inputm['parameters'].update(p)
if _input['weather_parameters'] is not None:
wp = _input['weather_parameters']
parameters = [elt['parameter_code'] for elt in wp]
inputm['weather_data'].update({p: self._parameters[p] for p in parameters})
# filter parameters that are used internaly to query weather data
for w in ('weather_data_period_start', 'weather_data_period_end'):
items = {item['determined_by']: item['value']
for item in _input[w]}
if 'INPUT_SCHEMA_PROPERTY' in items:
fields = items['INPUT_SCHEMA_PROPERTY'].split('.')
if len(fields) > 1:
name = fields[-1]
if name in inputm['parameters']:
inputm['parameters'].pop(name)
if _input['field_observation'] is not None:
for w in ('fieldObservations', 'fieldObservationQuantifications'):
if w in inputm['parameters']:
inputm['parameters'].pop(w)
observations = next(iter(_input_schema['definitions'].values()))
inputm['field_observations'] = observations['properties']
return inputm
@property
def outputs(self):
return [{k:v for k,v in d.items() if k != 'chart_info'} for d in self._model['output']['result_parameters']]
[docs] def as_node(self):
"""Construct inputs of an Openalea node representing the model"""
inputs = []
for p, v in self.inputs['weather_data'].items():
dp = {}
dp['name'] = p
dp['description'] = v['name']
dp['interface'] = 'Ifloat'
inputs.append(dp)
for p, v in self.inputs['field_observations'].items():
dp = {}
dp['name'] = p
dp['description'] = v['title']
dp['interface'] = 'I' + v.get('type', 'float')
inputs.append(dp)
for p,v in self.inputs['parameters'].items():
dp = {}
dp['name'] = p
dp['description'] = v.get('title', None)
dp['interface'] = v.get('type', None)
dp['value'] = v.get('default', None)
inputs.append(dp)
outputs=[]
for item in self.outputs:
dp={}
dp['name'] = item['id']
dp['description'] = item['title']
outputs.append(dp)
node = dict(
name=self.meta['id'],
category='DSS',
description=self.meta['name'],
inputs=inputs,
outputs=outputs,
nodemodule='_'.join(self.dss.split('.')) + '.py',
nodeclass='_'.join(self.dss.split('.')) + '_' + self.model_id,
authors=self.meta['authors']
)
return node
def __xarray_convert__(self, output):
""" Convert model output of the model in xarray dataset
Parameters
----------
output : dict
output of (run) model
Returns
-------
xarray.Dataset
return a xarray.Dataset containing output of model with meta-information (attribut)
"""
data = {str(var): vals for var, vals in zip(output['resultParameters'], zip(*output['locationResult'][0]['data']))}
data['warningStatus']=output['locationResult'][0]['warningStatus']
output_result = {key: data[key] for key in [item["id"] for item in self.informations()["output"]["result_parameters"]]}
times_index= pandas.date_range(start=output['timeStart'], end=output['timeEnd'], freq=str(output['interval'])+"s")
df=pandas.DataFrame(output_result, index=times_index.values)
# convert dataframe to xarray dataset
ds = xr.Dataset.from_dataframe(df)
ds = ds.rename({'index':'time'})
# add attribut
ds.time.attrs["name"]="time"
ds.time.attrs["units"]="days"
#variable attributs
source = self.informations()
data_vars_attrs={el['id']:{key:el[key] for key in ['title','description']} for el in source['output']['result_parameters']}
for el in list(ds.data_vars):
if el in list(data_vars_attrs):
ds.data_vars[el].attrs=data_vars_attrs[el]
# dataset attribut
attrs={}
attrs['name']=source['name']
attrs['id']=source['id']
attrs['version']=source['version']
attrs['authors']=source['authors'][0]
attrs['description']=source['description']
attrs['description_url']=source['description_URL']
ds.attrs = attrs
return ds
[docs] def run(self, weatherdata=None, fieldObservation=None,view="ds"):
""" Run model
Parameters
----------
weatherdata : object, optional
list of json containing weather data information from ipm weatherdata service, by default None
fieldObservation : object, optional
json object containing field observation, by default None
view : str, optional
parameter controlling the type of output (xarray.Dataset or json), by default "ds"
"""
def input(weatherdata=None, fieldObservation=None):
""" Fonction to create input data model according to IPM format
Parameters
----------
weatherdata : Object JSON, optional
weatherdata from weather data ipm services in json format, by default None
fieldObservation : Object JSON, optional
field observation data in json format according IPM format, by default None
Returns
-------
Object JSON
return input json object to run model
"""
if weatherdata is not None:
fieldObservation=None
d= {"modelId": self.model,
"configParameters": {
"timeZone": "UTC",
"timeStart": weatherdata[0]["timeStart"],
"timeEnd": weatherdata[0]["timeStart"]
},
"weatherData": weatherdata[0]
}
return d
else:
if type(fieldObservation) is pandas.DataFrame:
json_obs={"fieldObservations":
[
{"location":
{"type":"Point",
"coordinates":fieldObservation.attrs["location"]["coordinates"]
},
"time":fieldObservation.attrs["time"],
"pestEPPOCode":fieldObservation.attrs["pestEPPOCode"],
"cropEPPOCode":fieldObservation.attrs["cropEPPOCode"]
}
],
"fieldObservationQuantifications":
[
{"trapCountCropEdge":int(fieldObservation.trapCountCropEdge.dropna().values),
"trapCountCropInside":int(fieldObservation.trapCountCropInside.dropna().values)
}
]
}
field_observation_input={
"modelId":self.model,
"configParameters":
{"timeZone": fieldObservation.index.tz._tzname,
"startDateCalculation":np.datetime_as_string(pandas.Timestamp.to_datetime64(fieldObservation.index[0]), unit='D'),
"endDateCalculation":np.datetime_as_string(pandas.Timestamp.to_datetime64(fieldObservation.index[-1]), unit='D')
}
}
field_observation_input["configParameters"].update(json_obs)
return field_observation_input
output= self._dssm._ipm.run_model(self._model,
input_data= input(weatherdata=weatherdata,fieldObservation=fieldObservation))
if view== "ds":
return self.__xarray_convert__(output=output)
else:
return output
[docs] def plot(self,output):
"""Plot output result
Parameters
----------
output : xarray.Dataset
Return a plot from output model conform to the description of output information of the model
"""
output.fillna(0).to_dataframe().plot.line()
plt.ylabel(self.informations()["output"]['chart_groups'][0]["title"])
plt.suptitle(output.name)
[docs] def df_reader_fieldObservation(self,path,
longitude,
latitude,
timeZone,
sep,
dayfirst,
pestEPPOCode,
cropEPPOCode,
convert_name=None):
"""
Reader dataframe for field observations.It must contain:
Date of observation: start Date of experiment, End date experiment and date of Observation
Parameters
----------
path : str, optional
field observation datafile path,
longitude : float, optional
longitude in degree of experimental site, by default 11.025635
latitude : float, optional
latitude in degree of experimental site, by default 59.715791
timeZone : str, optional
timezone of experimental site, by default "Europe/Paris"
sep : str, optional
separator of file, by default ";"
dayfirst : bool, optional
if date begin by time True else False, by default True
PestEPPOCode: str
pestEPPOCode of the model selected here SEPTAP
cropEPPOCode: str
cropEPPOCode of the model selected here APUGD
Returns
-------
pandas.Dataframe
return a dataframe with attribute information need to compute observation file input
"""
data=pandas.read_csv(path,sep=sep)
time=pandas.to_datetime(data["time"],dayfirst=dayfirst).tolist()
data.index=pandas.date_range(start=time[0],end=time[-1],tz=timeZone)
data=data.drop(columns="time")
data.attrs={"time":np.datetime_as_string(data.dropna().index.tz_convert("UTC").values[0],timezone='UTC',unit="m"),"location":{"type":"Point","coordinates":[str(longitude), str(latitude)]},"pestEPPOCode":pestEPPOCode,"cropEPPOCode":cropEPPOCode}
if convert_name is not None:
data=data.rename(convert_name)
else:
data=data
return data