"""
Module containing the HeatDemandController class.
"""
import numpy as np
import logging
from pandaprosumer.controller.base import BasicProsumerController
from pandaprosumer.mapping.fluid_mix import FluidMixMapping
from pandaprosumer.constants import CELSIUS_TO_K, TEMPERATURE_CONVERGENCE_THRESHOLD_C
logger = logging.getLogger(__name__)
[docs]
class HeatDemandController(BasicProsumerController):
"""
Controller for Heat Demand.
:param prosumer: The prosumer object
:param heat_demand_object: The heat demand object
:param order: The order of the controller
:param level: The level of the controller
:param scale_factor: The scale factor for the controller
:param in_service: The in-service status of the controller
:param index: The index of the controller
:param name: The name of the controller
:param kwargs: Additional keyword arguments
"""
@classmethod
def name(cls):
return "heat_demand_controller"
def __init__(self, prosumer, heat_demand_object, order=-1, level=-1,
in_service=True, index=None, name=None, **kwargs):
"""
Initializes the HeatDemandController.
"""
super().__init__(prosumer, heat_demand_object, order=order, level=level, in_service=in_service,
index=index, name=name, **kwargs)
self.t_previous_out_c = np.nan
self.t_previous_in_c = np.nan
self.mdot_previous_in_kg_per_s = np.nan
@property
def _q_demand_kw(self):
return self._get_input('q_demand_kw')
@property
def _mdot_demand_kg_per_s(self):
return self._get_input('mdot_demand_kg_per_s')
def _t_feed_demand_c(self,prosumer):
return self._get_input('t_feed_demand_c',prosumer)
@property
def _t_return_demand_c(self):
return self._get_input('t_return_demand_c')
@property
def _t_in_c(self):
if not np.isnan(self.input_mass_flow_with_temp[FluidMixMapping.TEMPERATURE_KEY]):
return self.input_mass_flow_with_temp[FluidMixMapping.TEMPERATURE_KEY]
else:
return np.nan
# return self.inputs[:, self.input_columns.index("fluid_mix")][0][FluidMixMapping.TEMPERATURE_KEY]
@property
def _mdot_received_kg_per_s(self):
if not np.isnan(self.input_mass_flow_with_temp[FluidMixMapping.MASS_FLOW_KEY]):
return self.input_mass_flow_with_temp[FluidMixMapping.MASS_FLOW_KEY]
else:
return np.nan
# if np.isnan(self.inputs[:, self.input_columns.index("fluid_mix")][0]):
# return np.nan
# return self.inputs[:, self.input_columns.index("fluid_mix")][0][FluidMixMapping.MASS_FLOW_KEY]
def q_to_receive_kw(self, prosumer):
"""
Calculates the heat to receive in kW.
:param prosumer: The prosumer object
:return: Heat to receive in kW
"""
self.applied = False
q_to_receive_kw = 0.
for responder in self._get_generic_mapped_responders(prosumer):
# The demand is normally not mapped to anything
q_to_receive_kw += responder.q_to_receive_kw(prosumer)
q_to_receive_kw += self._q_demand_kw # The actual demand
if not np.isnan(self._get_input('q_received_kw')):
# If there is already some power in the input, don't require it again
q_received_kw = self._get_input('q_received_kw')
q_to_receive_kw -= q_received_kw
q_to_receive_kw = max(0., q_to_receive_kw)
return q_to_receive_kw
def _demand_q_tf_tr_m(self, prosumer):
if not (np.isnan(self._t_feed_demand_c(prosumer)) or np.isnan(self._t_return_demand_c)
or np.isnan(self._q_demand_kw) or np.isnan(self._mdot_demand_kg_per_s)):
raise ValueError("Should not provide a value for all Heat Demand inputs")
if np.isnan(self._t_feed_demand_c(prosumer)):
if np.isnan(self._t_return_demand_c) or np.isnan(self._q_demand_kw) or np.isnan(self._mdot_demand_kg_per_s):
t_feed_demand_c = self.element_instance.t_in_set_c[self.element_index[0]]#TODO : error if t_in_set_c do not exists
else:
cp = float(prosumer.fluid.get_heat_capacity(CELSIUS_TO_K + self._t_return_demand_c)) / 1000
t_feed_demand_c = self._t_return_demand_c + self._q_demand_kw / (self._mdot_demand_kg_per_s * cp)
else:
t_feed_demand_c = self._t_feed_demand_c(prosumer)
if np.isnan(self._t_return_demand_c):
if np.isnan(self._q_demand_kw) or np.isnan(self._mdot_demand_kg_per_s):
t_return_demand_c = self.element_instance.t_out_set_c[self.element_index[0]]#TODO : error if t_out_set_c do not exists
else:
cp = float(prosumer.fluid.get_heat_capacity(CELSIUS_TO_K + t_feed_demand_c)) / 1000
t_return_demand_c = t_feed_demand_c - self._q_demand_kw / (self._mdot_demand_kg_per_s * cp)
else:
t_return_demand_c = self._t_return_demand_c
if np.isnan(self._mdot_demand_kg_per_s):
if np.isnan(self._q_demand_kw):
raise ValueError("Should provide at least mdot_demand_kg_per_s or q_demand_kw as Heat Demand input")
else:
t_mean_c = (t_feed_demand_c + t_return_demand_c) / 2
cp = float(prosumer.fluid.get_heat_capacity(CELSIUS_TO_K + t_mean_c)) / 1000
if abs(t_feed_demand_c - t_return_demand_c) < 1e-12:
mdot_demand_kg_per_s = 0
else:
mdot_demand_kg_per_s = self._q_demand_kw / (cp * (t_feed_demand_c - t_return_demand_c))
else:
mdot_demand_kg_per_s = self._mdot_demand_kg_per_s
if np.isnan(self._q_demand_kw):
t_mean_c = (t_feed_demand_c + t_return_demand_c) / 2
cp = float(prosumer.fluid.get_heat_capacity(CELSIUS_TO_K + t_mean_c)) / 1000
q_demand_kw = mdot_demand_kg_per_s * cp * (t_feed_demand_c - t_return_demand_c)
else:
q_demand_kw = self._q_demand_kw
# if mdot_demand_kg_per_s < 1e-3:
# mdot_demand_kg_per_s = 0
# t_return_demand_c = t_feed_demand_c
return q_demand_kw, t_feed_demand_c, t_return_demand_c, mdot_demand_kg_per_s
def _t_m_to_receive_init(self, prosumer):
"""
Return the expected received Feed temperature, return temperature and mass flow in °C and kg/s
:param prosumer: The prosumer object
:return: A Tuple (Feed temperature, return temperature and mass flow)
"""
q_demand_kw, t_feed_demand_c, t_return_demand_c, mdot_demand_kg_per_s = self._demand_q_tf_tr_m(prosumer)
if not np.isnan(self.t_previous_out_c):
return self.t_previous_in_c, self.t_previous_out_c, self.mdot_previous_in_kg_per_s
else:
if t_feed_demand_c <= t_return_demand_c or mdot_demand_kg_per_s < 1e-12:
t_feed_demand_c = t_return_demand_c
mdot_demand_kg_per_s = 0
assert not np.isnan(t_feed_demand_c)
assert not np.isnan(t_return_demand_c)
assert not np.isnan(mdot_demand_kg_per_s)
assert mdot_demand_kg_per_s >= 0
assert t_feed_demand_c >= t_return_demand_c
return t_feed_demand_c, t_return_demand_c, mdot_demand_kg_per_s
def control_step(self, prosumer):
"""
Executes the control step for the controller.
:param prosumer: The prosumer object
"""
super().control_step(prosumer)
if not self._are_initiators_converged(prosumer):
# If some of the initiators are not converged, do not run the control step
self._unapply_initiators(prosumer)
self.input_mass_flow_with_temp = {FluidMixMapping.TEMPERATURE_KEY: np.nan,
FluidMixMapping.MASS_FLOW_KEY: np.nan}
return
if not np.isnan(self._get_input('q_received_kw')):
q_received_kw = self._get_input('q_received_kw')
q_uncovered_kw = self._q_demand_kw - q_received_kw
result = np.array([[q_received_kw, q_uncovered_kw, 0, 0, 0]])
self.finalize(prosumer, result)
self.applied = True
return
q_demand_kw, t_feed_demand_c, t_return_demand_c, mdot_demand_kg_per_s = self._demand_q_tf_tr_m(prosumer)
# ToDo: If t_in < t_out, return t_in, not t_out
assert not np.isnan(self._t_in_c), f"Heat Demand {self.name} t_in_c is NaN for timestep {self.time} in prosumer {prosumer.name}"
assert not np.isnan(t_feed_demand_c), f"Heat Demand {self.name} t_feed_demand_c is NaN for timestep {self.time} in prosumer {prosumer.name}"
assert not np.isnan(t_return_demand_c), f"Heat Demand {self.name} t_return_demand_c is NaN for timestep {self.time} in prosumer {prosumer.name}"
assert not np.isnan(q_demand_kw), f"Heat Demand {self.name} q_demand_kw is NaN for timestep {self.time} in prosumer {prosumer.name}"
assert not np.isnan(mdot_demand_kg_per_s), f"Heat Demand {self.name} mdot_demand_kg_per_s is NaN for timestep {self.time} in prosumer {prosumer.name}"
t_mean_c = (self._t_in_c + t_return_demand_c) / 2
cp_kj_per_kgk = float(prosumer.fluid.get_heat_capacity(CELSIUS_TO_K + t_mean_c)) / 1000
if np.isnan(self._mdot_received_kg_per_s):
q_received_kw = q_demand_kw
mdot_received_kg_per_s = q_demand_kw / (cp_kj_per_kgk * (self._t_in_c - t_return_demand_c))
else:
mdot_received_kg_per_s = self._mdot_received_kg_per_s
q_received_kw = cp_kj_per_kgk * mdot_received_kg_per_s * (self._t_in_c - t_return_demand_c)
t_out_c = t_return_demand_c
# Calculate the difference between the received and the required power, wo considering the temperature level
# FixMe: Consider the temperature level in the output
q_uncovered_kw = q_demand_kw - q_received_kw
result = np.array([[q_received_kw, q_uncovered_kw, mdot_received_kg_per_s, self._t_in_c, t_out_c]])
if np.isnan(result).any():
self.input_mass_flow_with_temp = {FluidMixMapping.TEMPERATURE_KEY: np.nan,
FluidMixMapping.MASS_FLOW_KEY: np.nan}
else:
if np.isnan(self.t_keep_return_c) or mdot_received_kg_per_s == 0 or abs(t_out_c - self.t_keep_return_c) < TEMPERATURE_CONVERGENCE_THRESHOLD_C: # or len(self._get_mapped_initiators_on_same_level(prosumer)) == 0:
# If the actual output temperature is the same as the promised one, the storage is correctly applied
self.finalize(prosumer, result)
self.applied = True
self.t_previous_out_c = np.nan
self.t_previous_in_c = np.nan
self.mdot_previous_in_kg_per_s = np.nan
else:
# Else, reapply the upstream controllers with the new temperature so no energy appears or disappears
self._unapply_initiators(prosumer)
self.t_previous_out_c = t_out_c
self.t_previous_in_c = self._t_in_c
self.mdot_previous_in_kg_per_s = mdot_received_kg_per_s
self.input_mass_flow_with_temp = {FluidMixMapping.TEMPERATURE_KEY: np.nan,
FluidMixMapping.MASS_FLOW_KEY: np.nan}