Source code for pandaprosumer.controller.models.stratified_heat_storage

"""
Module containing the StratifiedHeatStorageController class.
"""

import numpy as np
import pandas as pd
from functools import partial
import matplotlib.pyplot as plt
from numba import njit

from pandaprosumer.controller.base import BasicProsumerController

from pandaprosumer.mapping import FluidMixMapping
from pandaprosumer.constants import CELSIUS_TO_K, TEMPERATURE_CONVERGENCE_THRESHOLD_C
from operator import add



@njit
def TVDResolutionInTime(layer_temps_c,
                             mdot_charge_kg_per_s,
                             t_charge_c,
                             mdot_discharge_kg_per_s,
                             t_return_c,
                             t_ext_c,
                             cp_j_per_kgk,
                             rho_kg_per_m3,
                             A_m2,
                             dz_m,
                             Sl_m2,
                             S1_m2,
                             SN_m2,
                             k_star_w_per_mk,
                             U_w_per_m2k,
                             U1_w_per_m2k,
                             UN_w_per_m2k,
                             resol,
                             max_dt_s):
    if resol < max_dt_s:
        # If the resolution is too small, we use a timestep
        # that is a multiple of the resolution
        nb_intervals = 1
        dt_s = resol
    else:
        # Calculate the number of intervals needed to reach the resolution
        offset = 1 if resol // max_dt_s != resol / max_dt_s else 0
        nb_intervals = int(resol // max_dt_s + offset)
        dt_s = resol / nb_intervals

    # Repeat the calculation so the timestep is not too big
    for i in range(nb_intervals):
        layer_temps_c = tvdConvectionStep(layer_temps_c,
                              mdot_charge_kg_per_s,
                              t_charge_c,
                              mdot_discharge_kg_per_s,
                              t_return_c,
                              t_ext_c,
                              cp_j_per_kgk,
                              rho_kg_per_m3,
                              A_m2,
                              dz_m,
                              Sl_m2,
                              S1_m2,
                              SN_m2,
                              k_star_w_per_mk,
                              U_w_per_m2k,
                              U1_w_per_m2k,
                              UN_w_per_m2k,
                              dt_s)
    # layer_temps_c = np.linalg.solve(a, b).tolist()
    return layer_temps_c


@njit
def phi(r):
    """
    For the moment, only superbee is introduced.

    r is the ratio of successive gradients on the solution; see r_{i}={\frac {u_{i}-u_{i-1}}{u_{i+1}-u_{i}}}
    """
    limiterName = "superbee"
    if limiterName == "superbee":
        return np.maximum(0, np.maximum(np.minimum(1., 2 * r), np.minimum(2., r)))
    elif limiterName == "vanleer":#Not used
        return (r + abs(r)) / (1 + abs(r))
    else:
        raise Exception("only superbee and van Leer are supported")


@njit
def tvdConvectionStep(layer_temps_c,
                       mdot_charge_kg_per_s,
                       t_charge_c,
                       mdot_discharge_kg_per_s,
                       t_return_c,
                       t_ext_c,
                       cp_j_per_kgk,
                       rho_kg_per_m3,
                       A_m2,
                       dz_m,
                       Sl_m2,
                       S1_m2,
                       SN_m2,
                       k_star_w_per_mk,
                       U_w_per_m2k,
                       U1_w_per_m2k,
                       UN_w_per_m2k,
                       resol):
    #
    # We introduce a constant time step
    #
    ## bottom layer, see equation (3) in the paper
    #
    # print("time step and: ", timeStep, temp_charge_c, temp_return_c, mass_flow_charge_kg_per_s, mass_flow_discharge_kg_per_s);input()
    T = layer_temps_c
    deltaT = np.diff(T, 1)
    T_return = t_return_c
    T_charge = t_charge_c
    T_amb = t_ext_c
    T_1 = T[0]
    T_2 = T[1]
    T_N = T[-1]
    T_N_minus_1 = T[-2]
    m_cC_p = mdot_charge_kg_per_s * cp_j_per_kgk
    m_dC_p = mdot_discharge_kg_per_s * cp_j_per_kgk
    m_eC_p = (mdot_charge_kg_per_s - mdot_discharge_kg_per_s) * cp_j_per_kgk

    k_starA_wm_per_k = k_star_w_per_mk * A_m2
    k_starAoverDz_w_per_k = k_starA_wm_per_k / dz_m

    term_1 = U1_w_per_m2k * S1_m2 * (T_amb - T_1)  # heat losses to the env
    term_2 = (4 / 3) * k_starAoverDz_w_per_k * deltaT[0]  # diffusive terms
    # term_3 = m_cC_p * (T_2 - T_1)                                                    # convective terms
    term_3 = m_cC_p * (deltaT[0])
    term_4 = m_dC_p * (T_return - T_1)
    delta_layer_0 = term_1 + term_2 + term_3 + term_4
    # print("delta_layer_0: ", delta_layer_0)
    #
    theta = np.ones_like(T)
    limiter = np.ones_like(T)
    num = np.zeros_like(T)
    den = np.ones_like(T)
    den[2:] = -deltaT[1:]
    # print("den.size: ", deltaT[1:].size)
    if m_eC_p > 0:
        num[:-1] = -deltaT[:]
    else:
        num[2:] = -deltaT[:-1]

    eps = 1.e-10
    den1 = np.abs(den)
    for i in range(den.size):
        if den1[i] >= eps:
            theta[i] = num[i] / den[i]

    scheme = "superbee"
    limiter = phi(theta)

    # T = np.array(T)
    deltaT = np.diff(T, 1)
    term_1 = U_w_per_m2k * Sl_m2 * (t_ext_c - T[1:-1])
    term_2 = k_starAoverDz_w_per_k * (deltaT[1:] - deltaT[:-1])
    mass_flow_balance_J_per_K_per_s = (
            mdot_charge_kg_per_s - mdot_discharge_kg_per_s)  # this term can be negative
    c = mass_flow_balance_J_per_K_per_s / rho_kg_per_m3 / A_m2

    if c > 0:
        oneMinusCfl = 1. - (c * resol / dz_m)
        term_3 = mass_flow_balance_J_per_K_per_s * deltaT[1:]
        # term_33 =-0.5 * mass_flow_balance_J_per_K_per_s * oneMinusCfl * ((self.T[i - 1] - self.T[i]) * self.limiter[i] - (self.T[i] - self.T[i + 1]) * self.limiter[i + 1] )
        term_33 = -0.5 * mass_flow_balance_J_per_K_per_s * oneMinusCfl * (
                    -deltaT[:-1] * limiter[1:-1] + deltaT[1:] * limiter[2:])
    else:
        oneMinusCfl = 1. - (-c * resol / dz_m)
        term_3 = mass_flow_balance_J_per_K_per_s * deltaT[:-1]
        term_33 = +0.5 * mass_flow_balance_J_per_K_per_s * oneMinusCfl * (
                    deltaT[1:] * limiter[1:-1] - deltaT[:-1] * limiter[0:-2])

    layer_temp_deltas = term_1 + term_2 + cp_j_per_kgk * (term_3 + term_33)
    # layer_temp_deltas = layer_temp_deltas.tolist()

    #
    term_1 = UN_w_per_m2k * SN_m2 * (T_amb - T_N)  # heat losses to the env
    # term_2 = (4 / 3) * self.k_starAoverDz * (T_N_minus_1 - T_N)                       # diffusive terms
    term_2 = -(4 / 3) * k_starAoverDz_w_per_k * deltaT[-1]  # diffusive terms

    # convective terms
    if m_eC_p > 0:
        term_3 = m_eC_p * (T_charge - T_N)
        # delta_layer_N_l = timeStep * (term_1 + term_2 + term_3) * self.denominator
        delta_layer_N_l = term_1 + term_2 + term_3
    else:
        # term_4 = m_dC_p * (T_N_minus_1 - T_N)
        term_4 = -m_dC_p * deltaT[-1]
        # delta_layer_N_l = timeStep * (term_1 + term_2 + term_4) * self.denominator
        delta_layer_N_l = term_1 + term_2 + term_4
    #
    # Assuming layer_temp_deltas is a NumPy array
    new_layer_temp_deltas = np.empty(len(layer_temp_deltas) + 2, dtype=layer_temp_deltas.dtype)

    # Assign the first and last layers
    new_layer_temp_deltas[0] = delta_layer_0
    new_layer_temp_deltas[-1] = delta_layer_N_l

    # Copy the original array into the middle
    new_layer_temp_deltas[1:-1] = layer_temp_deltas
    #
    denominator_k_per_j = 1. / (rho_kg_per_m3 * cp_j_per_kgk * A_m2 * dz_m)
    return T + resol * new_layer_temp_deltas * denominator_k_per_j  # ToDo: Divide ?


[docs] class StratifiedHeatStorageController(BasicProsumerController): """ Controller for stratified heat storage systems. The stratified heat storage is implemented from *Untrau et al., A fast and accurate 1-dimensional model for dynamic simulation and optimization of a stratified thermal energy storage, 2023* Model a stratified thermal energy storage, which uses one single tank that is charged from the top with hot fluid while the cold fluid returning to the storage is charged from the bottom. The model assumes a vertical discretization of the tank with layers of uniform temperatures When charging with the mapped feed temperature and mass flow, the return temperature is the one on the bottom layer of the storage. When discharging with the return temperature and mass flow required by the downstream elements, the discharge temperature is the one on top of the storage. Note that the time step should not be too long compared to the volume of a layer and the (dis)charging mass flow so that less than the volume of a layer will be displaced in a time step. :param prosumer: The prosumer object :param stratified_heat_storage_object: The stratified heat storage object :param order: The order of the controller :param level: The level of the controller :param init_soc: Initial state of charge :param in_service: The in-service status of the controller :param index: The index of the controller :param kwargs: Additional keyword arguments """ @classmethod def name(cls): return "stratified_heat_storage" def __init__(self, prosumer, stratified_heat_storage_object, order, level, init_layer_temps_c=None, plot=False, bypass=True, in_service=True, index=None, name=None, **kwargs): """ Constructor method """ super().__init__(prosumer, stratified_heat_storage_object, order=order, level=level, in_service=in_service, index=index, name=name, **kwargs) # Not in the paper nomenclature, is the number of layers self.N_l = int(self._get_element_param(prosumer, 'n_layers')) t_ext_c = float(self._get_element_param(prosumer, 't_ext_c')) if init_layer_temps_c is None: self._layer_temps_c = [t_ext_c] * self.N_l elif not np.iterable(init_layer_temps_c): self._layer_temps_c = [float(init_layer_temps_c)] * self.N_l else: self._layer_temps_c = init_layer_temps_c self.fluid = prosumer.fluid # TODO: use thermal conductivity of prosumer.fluid k_w_per_mk = self._get_element_param(prosumer, 'k_fluid_w_per_mk') # 0.598 # water self.H_m = self._get_element_param(prosumer, 'tank_height_m') r_int_m = self._get_element_param(prosumer, 'tank_internal_radius_m') d_insu_m = self._get_element_param(prosumer, 'insulation_thickness_m') r_ext_m = self._get_element_param(prosumer, 'tank_external_radius_m') self.A_m2 = np.pi * r_int_m ** 2 perimeter_m = np.pi * r_int_m * 2 k_insu_w_per_mk = self._get_element_param(prosumer, 'k_insu_w_per_mk') # 45 # steel k_wall_w_per_mk = self._get_element_param(prosumer, 'k_wall_w_per_mk') self.k_star_w_per_mk = k_w_per_mk + k_wall_w_per_mk * ((r_ext_m ** 2 - r_int_m ** 2) / r_int_m ** 2) h_ext_w_per_m2k = self._get_element_param(prosumer, 'h_ext_w_per_m2k') # natural convection # Heat transfer coefficient with the environment (diffusion through insulation + convection with ambient air) self.U_w_per_m2k = 1 / ((1 / h_ext_w_per_m2k) + (d_insu_m / k_insu_w_per_mk)) # see eq. 6 # We assume that the tank fluid to overall heat transfer coefficient for the top and bottom layers is the same # as the overall heat transfer coefficient self.U1_w_per_m2k = self.UN_w_per_m2k = self.U_w_per_m2k self.dz_m = self.H_m / self.N_l # 𝛥z (layer high) # Heat exchange areas self.SN_m2 = perimeter_m * self.dz_m self.S1_m2 = self.Sl_m2 = self.A_m2 + self.SN_m2 # initial temperatures are stored for computing stored energy (see eq. 8 in the paper) self.init_layer_temps_c = self._layer_temps_c.copy() self.t_previous_out_charge_c = np.nan self.t_previous_in_charge_c = np.nan self.mdot_previous_in_kg_per_s = np.nan # Used for debugging self.plot = plot self.bypass = bypass if self.plot: self._layer_temps_tab_c = [] self.t_charge_tab_c = [] self.t_discharge_tab_c = [] self.mdot_charge_tab_kg_per_s = [] self.mdot_discharge_tab_kg_per_s = [] @property def _t_received_in_c(self): if not np.isnan(self.input_mass_flow_with_temp[FluidMixMapping.TEMPERATURE_KEY]): return self.input_mass_flow_with_temp[FluidMixMapping.TEMPERATURE_KEY] else: return np.nan @property def _mdot_received_kg_per_s(self): if not np.isnan(self.input_mass_flow_with_temp[FluidMixMapping.MASS_FLOW_KEY]): return self.input_mass_flow_with_temp[FluidMixMapping.MASS_FLOW_KEY] else: return np.nan def _t_m_to_receive_init(self, prosumer): """ Return the expected received Feed temperature, return temperature and mass flow in °C and kg/s :param prosumer: The prosumer object :return: A Tuple (Feed temperature, return temperature and mass flow) """ t_demand_out_c, t_demand_in_c, mdot_demand_tab_required_kg_per_s = self.t_m_to_deliver(prosumer) mdot_demand_kg_per_s = np.sum(mdot_demand_tab_required_kg_per_s) t_charge_in_c = self._get_element_param(prosumer, 'min_useful_temp_c') # self._layer_temps_c[-1] # If the required feed downstream temperature is null, it means that there is no downstream mapped controller # so ask treturn_required_c charge the storage at the min_useful_temp_c (or current top layer temperature ?) if t_demand_out_c < 1e-6 or mdot_demand_kg_per_s < 1e-6: t_required_in_c = t_charge_in_c else: t_required_in_c = t_demand_out_c # TODO: Is the return temp the bottom layer temp or the responders' t_return ? t_charge_out_c = self._t_charge_out # If the storage is full -> do not required heat from upstream for charging remaining_capacity_kwh = self._get_max_stored_energy_kwh(t_charge_in_c, t_required_in_c) - \ self._get_stored_energy_kwh(t_charge_in_c) if mdot_demand_kg_per_s > 0 and remaining_capacity_kwh < self._get_element_param(prosumer, "max_remaining_capacity_kwh"): # FixMe: Go here if min_useful_temp_c = t_charge_in_c > t_required_in_c = t_demand_out_c return t_demand_out_c, t_demand_in_c, mdot_demand_kg_per_s # For charging, Ask to receive a mass flow corresponding to the volume of all layers in the storage for which # the temperature is smaller than the feed temperature rho_mean_kg_per_m3 = self.fluid.get_density(CELSIUS_TO_K + np.mean(self._layer_temps_c)) m_layer_kg = self.A_m2 * self.dz_m * rho_mean_kg_per_m3 nb_cold_layers = np.sum(np.array(self._layer_temps_c) < t_required_in_c) mdot_charge_kg_per_s = nb_cold_layers * m_layer_kg / self.resol mdot_required_kg_per_s = mdot_demand_kg_per_s + mdot_charge_kg_per_s t_required_out_c = (mdot_demand_kg_per_s * t_demand_in_c + mdot_charge_kg_per_s * t_charge_out_c) / mdot_required_kg_per_s if not np.isnan(self.t_previous_out_charge_c): mdot_charge_kg_per_s = mdot_demand_kg_per_s * (t_demand_in_c - t_required_out_c) / (t_required_out_c - t_charge_out_c) return self.t_previous_in_charge_c, self.t_previous_out_charge_c, mdot_charge_kg_per_s return t_required_in_c, t_required_out_c, mdot_required_kg_per_s @property def _t_charge_out(self): """ Temperature of the fluid returned to the initiator in °C """ return self._layer_temps_c[0] def _get_max_stored_energy_kwh(self, extraction_temp_c, t_charge_c): """ Maximal storable energy E if charging at the temperature temp_charge_c. Calculated by comparing a case with uniform temperature to the initial state. See equation (8) in the paper :param extraction_temp_c: Useful Extraction temperature (only energy above this level is considered) (°C) :param t_charge_c: Temperature charge (°C) :return: Maximum storable energy E (kWh) """ # 3.6e6 is the conversion factor from J to kWh ret = sum([float(self.fluid.get_density(CELSIUS_TO_K + t_charge_c)) * self.A_m2 * float(self.fluid.get_heat_capacity(CELSIUS_TO_K + t_charge_c)) * (t_charge_c - self.init_layer_temps_c[z]) * self.dz_m for z in range(self.N_l) if t_charge_c >= extraction_temp_c - .1]) / 3.6e6 return ret def _get_stored_energy_kwh(self, t_extraction_c): """ Stored energy E(t) at any time Calculated by comparing current temperatures to the initial state. See equation (8) in the paper :param t_extraction_c: Useful Extraction temperature (only energy above this level is considered) (°C) :type t_extraction_c: float :return: Stored energy E(t) compared to initial state (kWh) """ # 3.6e6 is the conversion factor from J to kWh ret = sum([float(self.fluid.get_density(CELSIUS_TO_K + self._layer_temps_c[z])) * self.A_m2 * float(self.fluid.get_heat_capacity(CELSIUS_TO_K + self._layer_temps_c[z])) * (self._layer_temps_c[z] - self.init_layer_temps_c[z]) * self.dz_m for z in range(self.N_l) if self._layer_temps_c[z] >= t_extraction_c - .1]) / 3.6e6 return ret def _calculate_heat_storage(self, prosumer, mdot_demand_kg_per_s, t_received_in_c, t_demand_out_c, t_demand_in_c, t_discharge_out_c, mdot_received_kg_per_s, t_charge_out_c): if not self.bypass: mdot_charge_kg_per_s = mdot_received_kg_per_s mdot_discharge_kg_per_s = mdot_demand_kg_per_s mdot_bypass_kg_per_s = 0 t_bypass_in_c = t_received_in_c else: # mass flow to provide at self._t_charge_c to provide the same energy to the demand if t_received_in_c - t_demand_out_c > 0: delta_t_demand_c = t_demand_out_c - t_demand_in_c mdot_toprovide_kg_per_s = mdot_demand_kg_per_s * delta_t_demand_c / (t_received_in_c - t_demand_out_c) else: mdot_toprovide_kg_per_s = mdot_demand_kg_per_s if mdot_toprovide_kg_per_s > mdot_received_kg_per_s: # If this energy cannot be provided, no charging but discharge mdot_bypass_kg_per_s = mdot_received_kg_per_s t_bypass_in_c = t_received_in_c mdot_charge_kg_per_s = 0 # mdot_discharge_kg_per_s = sum(mdot_demand_tab_kg_per_s) - mdot_bypass_kg_per_s # If the required output temperature is between the feed and top layer temperature, calculate the # discharge mass flow to get this required temperature if abs(t_demand_out_c - t_discharge_out_c) > self._get_element_param(prosumer, "t_discharge_out_tol_c"): mdot_discharge_kg_per_s = mdot_bypass_kg_per_s * (t_received_in_c - t_demand_out_c) / ( t_demand_out_c - t_discharge_out_c) else: mdot_discharge_kg_per_s = mdot_demand_kg_per_s - mdot_bypass_kg_per_s if mdot_discharge_kg_per_s < 0: # else, calculate the discharge mass flow to get the same energy # (considering that the return temperature would be the same) e_demand = mdot_demand_kg_per_s * (t_demand_out_c - t_demand_in_c) e_bypass = mdot_bypass_kg_per_s * (t_received_in_c - t_demand_out_c) mdot_discharge_kg_per_s = (e_demand - e_bypass) / (t_discharge_out_c - t_demand_in_c) else: # If this energy can be provided, charging with the extra mass flow mdot_bypass_kg_per_s = mdot_toprovide_kg_per_s mdot_charge_kg_per_s = mdot_received_kg_per_s - mdot_bypass_kg_per_s mdot_discharge_kg_per_s = 0 t_bypass_in_c = t_received_in_c # for i, m in enumerate(mdot_demand_tab_kg_per_s): # if (t_discharge_c - t_return_demand_c) > 0: # # Compensate the value of mass flow to provide the same energy # # Because the discharge temperature is the one on the extraction layer # # which can be different from the required feed temperature # # FixMe: Take into account the temperature that will go through the storage from the production # mdot_demand_tab_kg_per_s[i] = m * (t_feed_demand_c - t_return_demand_c) / (t_discharge_c - t_return_demand_c) # else: # # If the temperature in the storage is lower than the return temperature, it cannot provide any power # mdot_demand_tab_kg_per_s[i] = 0 # # mdot_discharge_kg_per_s = np.sum(mdot_demand_tab_kg_per_s) height_charge_in_m = self._get_element_param(prosumer, 'height_charge_in_m') height_charge_out_m = self._get_element_param(prosumer, 'height_charge_out_m') height_discharge_out_m = self._get_element_param(prosumer, 'height_discharge_out_m') height_discharge_in_m = self._get_element_param(prosumer, 'height_discharge_in_m') layer_charge_in = height_charge_in_m / self.H_m * self.N_l if not np.isnan(height_charge_in_m) else None layer_charge_out = height_charge_out_m / self.H_m * self.N_l if not np.isnan(height_charge_out_m) else None layer_discharge_out = height_discharge_out_m / self.H_m * self.N_l if not np.isnan(height_discharge_out_m) else None layer_discharge_in = height_discharge_in_m / self.H_m * self.N_l if not np.isnan(height_discharge_in_m) else None cp_discharge_j_per_kgk = self.fluid.get_heat_capacity(CELSIUS_TO_K + np.mean(self._layer_temps_c)) rho_kg_per_m3 = self.fluid.get_density(CELSIUS_TO_K + np.mean(self._layer_temps_c)) t_ext_c = self._get_element_param(prosumer, 't_ext_c') k_fluid_w_per_mk = self._get_element_param(prosumer, 'k_fluid_w_per_mk') # dt_max_s = (self.dz_m ** 2 * rho_kg_per_m3 * cp_discharge_j_per_kgk) / (2 * k_fluid_w_per_mk) if np.isnan(self._get_element_param(prosumer, 'max_dt_s')): max_dt_s = self.resol else: max_dt_s = self._get_element_param(prosumer, 'max_dt_s') self._layer_temps_c = TVDResolutionInTime(layer_temps_c=np.array(self._layer_temps_c), mdot_charge_kg_per_s=mdot_charge_kg_per_s, t_charge_c=t_received_in_c, mdot_discharge_kg_per_s=mdot_discharge_kg_per_s, t_return_c=t_demand_in_c, t_ext_c=t_ext_c, cp_j_per_kgk=cp_discharge_j_per_kgk, rho_kg_per_m3=rho_kg_per_m3, A_m2=self.A_m2, dz_m=self.dz_m, Sl_m2=self.Sl_m2, S1_m2=self.S1_m2, SN_m2=self.SN_m2, k_star_w_per_mk=self.k_star_w_per_mk, U_w_per_m2k=self.U_w_per_m2k, U1_w_per_m2k=self.U1_w_per_m2k, UN_w_per_m2k=self.UN_w_per_m2k, resol=self.resol, max_dt_s=max_dt_s) assert (self._layer_temps_c > 0).all(), (f"The SHS model has diverged - " f"Negative temperature in the storage for " f"timestep {self.time} layers= {self._layer_temps_c}") cp_discharge_j_per_kgk = float(self.fluid.get_heat_capacity(CELSIUS_TO_K + (t_discharge_out_c + t_demand_in_c) / 2)) q_discharge_kw = mdot_discharge_kg_per_s * cp_discharge_j_per_kgk * (t_discharge_out_c - t_demand_in_c) / 1e3 min_useful_temp_c = self._get_element_param(prosumer, 'min_useful_temp_c') e_stored_kwh = self._get_stored_energy_kwh(min_useful_temp_c) cp_bypass_j_per_kgk = self.fluid.get_heat_capacity(CELSIUS_TO_K + (t_bypass_in_c + t_demand_in_c) / 2) q_bypass_kw = mdot_bypass_kg_per_s * cp_bypass_j_per_kgk * (t_bypass_in_c - t_demand_in_c) / 1e3 q_delivered_kw = q_discharge_kw + q_bypass_kw mdot_delivered_kg_per_s = mdot_bypass_kg_per_s + mdot_discharge_kg_per_s if mdot_delivered_kg_per_s > 0: t_delivered_out_c = (t_bypass_in_c * mdot_bypass_kg_per_s + t_discharge_out_c * mdot_discharge_kg_per_s) / mdot_delivered_kg_per_s else: t_delivered_out_c = t_discharge_out_c if abs(mdot_charge_kg_per_s + mdot_bypass_kg_per_s) < 1e-3: t_received_out_c = self._t_received_in_c else: t_received_out_c = (mdot_charge_kg_per_s * t_charge_out_c + mdot_bypass_kg_per_s * t_demand_in_c) / (mdot_charge_kg_per_s + mdot_bypass_kg_per_s) return (q_delivered_kw, q_bypass_kw, q_discharge_kw, e_stored_kwh, mdot_received_kg_per_s, t_received_in_c, t_received_out_c, mdot_delivered_kg_per_s, t_demand_in_c, t_delivered_out_c, mdot_charge_kg_per_s, t_charge_out_c, mdot_discharge_kg_per_s, t_discharge_out_c)
[docs] def control_step(self, prosumer): """ Executes the control step for the controller. :param prosumer: The prosumer object """ super().control_step(prosumer) if not self._are_initiators_converged(prosumer): # If some of the initiators are not converged, do not run the control step self._unapply_initiators(prosumer) self.input_mass_flow_with_temp = {FluidMixMapping.TEMPERATURE_KEY: np.nan, FluidMixMapping.MASS_FLOW_KEY: np.nan} return # The discharge temperature is the one on top of the storage # Note that the time step should not be too long compared to the volume of this top layer and the mass flow t_discharge_out_c = self._layer_temps_c[-1] # When discharging, the temperature of the fluid that will be added at the bottom of the storage is the # temperature returned by the downstream elements. # Simple average of the temperatures but will have to be weighted by the mass flows in the future t_demand_out_c, t_demand_in_c, mdot_demand_tab_kg_per_s = self.t_m_to_deliver(prosumer) mdot_demand_kg_per_s = sum(mdot_demand_tab_kg_per_s) assert mdot_demand_kg_per_s >= 0, f"SHS {self.name} mdot_demand_kg_per_s is negative ({mdot_demand_kg_per_s}) for timestep {self.time} in prosumer {prosumer.name}" assert t_demand_out_c >= t_demand_in_c, f"SHS {self.name} t_demand_out_c < t_demand_in_c is negative ({t_demand_out_c} < {t_demand_in_c}) for timestep {self.time} in prosumer {prosumer.name}" assert t_demand_in_c >= 0, f"SHS {self.name} t_demand_in_c is negative ({t_demand_in_c}) for timestep {self.time} in prosumer {prosumer.name}" layer_temp_init_c = self._layer_temps_c.copy() rerun = True while rerun: self._layer_temps_c = layer_temp_init_c.copy() (q_delivered_kw, q_bypass_kw, q_discharge_kw, e_stored_kwh, mdot_received_kg_per_s, t_received_in_c, t_received_out_c, mdot_delivered_kg_per_s, t_demand_in_c, t_delivered_out_c, mdot_charge_kg_per_s, t_charge_out_c, mdot_discharge_kg_per_s, t_discharge_out_c) = self._calculate_heat_storage(prosumer, mdot_demand_kg_per_s, self._t_received_in_c, t_demand_out_c, t_demand_in_c, t_discharge_out_c, self._mdot_received_kg_per_s, self._t_charge_out) result_mdot_tab_kg_per_s = self._merit_order_mass_flow(prosumer, mdot_delivered_kg_per_s, mdot_demand_tab_kg_per_s) rerun = False if len(self._get_mapped_responders(prosumer)) > 1 and mdot_delivered_kg_per_s < mdot_demand_kg_per_s: # If the heat Pump is not able to deliver the required mass flow, # recalculate the condenser input temperature, considering that all the downstream elements will be # still return the same temperature, even if the mass flow delivered to them by the Heat Pump is lower t_return_tab_c = self.get_treturn_tab_c(prosumer) if abs(mdot_delivered_kg_per_s) > 1e-8: t_return_demand_new_c = np.sum(result_mdot_tab_kg_per_s * t_return_tab_c) / mdot_delivered_kg_per_s else: t_return_demand_new_c = t_demand_in_c if abs(t_return_demand_new_c - t_demand_in_c) > 1: # If this recalculation changes the condenser input temperature, rerun the calculation # with the new temperature t_demand_in_c = t_return_demand_new_c rerun = True # result = np.array([[mdot_discharge_kg_per_s, # t_discharge_out_c, # q_delivered_kw, # e_stored_kwh]]) cp_received_j_per_kgk = self.fluid.get_heat_capacity(CELSIUS_TO_K + (t_received_in_c + t_received_out_c) / 2) q_received_kw = mdot_received_kg_per_s * cp_received_j_per_kgk * (t_received_in_c - t_received_out_c) / 1e3 cp_charge_j_per_kgk = self.fluid.get_heat_capacity(CELSIUS_TO_K + (t_received_in_c + t_charge_out_c) / 2) q_charge_kw = mdot_charge_kg_per_s * cp_charge_j_per_kgk * (t_received_in_c - t_charge_out_c) / 1e3 result = np.array([[mdot_discharge_kg_per_s, t_discharge_out_c, q_discharge_kw, e_stored_kwh]]) result_fluid_mix = [] for mdot_kg_per_s in result_mdot_tab_kg_per_s: result_fluid_mix.append({FluidMixMapping.TEMPERATURE_KEY: t_delivered_out_c, FluidMixMapping.MASS_FLOW_KEY: mdot_kg_per_s}) # Used for debugging if self.plot: self._layer_temps_tab_c += [self._layer_temps_c] # Store all the layers temps history for analysis self.t_charge_tab_c += [t_received_in_c] self.t_discharge_tab_c += [t_demand_in_c] self.mdot_charge_tab_kg_per_s += [mdot_charge_kg_per_s] self.mdot_discharge_tab_kg_per_s += [mdot_discharge_kg_per_s] #assert q_received_kw >= 0, f"SHS {self.name} q_received_kw is negative ({q_received_kw}) for timestep {self.time} in prosumer {prosumer.name}" #assert q_delivered_kw >= 0, f"SHS {self.name} q_delivered_kw is negative ({q_delivered_kw}) for timestep {self.time} in prosumer {prosumer.name}" #assert q_charge_kw >= 0, f"SHS {self.name} q_charge_kw is negative ({q_charge_kw}) for timestep {self.time} in prosumer {prosumer.name}" #assert q_discharge_kw >= 0, f"SHS {self.name} q_discharge_kw is negative ({q_discharge_kw}) for timestep {self.time} in prosumer {prosumer.name}" # assert e_stored_kwh >= 0, f"SHS {self.name} e_stored_kwh is negative ({e_stored_kwh}) for timestep {self.time} in prosumer {prosumer.name}" #assert mdot_received_kg_per_s >= 0, f"SHS {self.name} mdot_received_kg_per_s is negative ({mdot_received_kg_per_s}) for timestep {self.time} in prosumer {prosumer.name}" #assert mdot_delivered_kg_per_s >= 0, f"SHS {self.name} mdot_delivered_kg_per_s is negative ({mdot_delivered_kg_per_s}) for timestep {self.time} in prosumer {prosumer.name}" if np.isnan(self.t_keep_return_c) or mdot_received_kg_per_s == 0 or abs(t_received_out_c - self.t_keep_return_c) < TEMPERATURE_CONVERGENCE_THRESHOLD_C or len(self._get_mapped_initiators_on_same_level(prosumer)) == 0: # If the actual output temperature is the same as the promised one, the storage is correctly applied self.finalize(prosumer, result, result_fluid_mix) self.applied = True self.t_previous_out_charge_c = np.nan self.t_previous_in_charge_c = np.nan self.mdot_previous_in_kg_per_s = np.nan else: # Else, reapply the upstream controllers with the new temperature so no energy appears or disappears self._layer_temps_c = layer_temp_init_c self._unapply_initiators(prosumer) self.t_previous_out_charge_c = t_received_out_c self.t_previous_in_charge_c = t_received_in_c self.mdot_previous_in_kg_per_s = mdot_delivered_kg_per_s self.input_mass_flow_with_temp = {FluidMixMapping.TEMPERATURE_KEY: np.nan, FluidMixMapping.MASS_FLOW_KEY: np.nan}