Figures 4-5-6-7-8-9-10-11: Uncertainty quantification for measurement errors#

Imports#

import dolfin  # https://fenicsproject.org
import logging; logging.getLogger('FFC').setLevel(logging.WARNING)
import matplotlib.pyplot as plt
import multiprocessing
import numpy
import os
import pandas as pd
import subprocess
import time
import warnings; warnings.filterwarnings("ignore")

import dolfin_mech as dmech

import compute_disp
import helpers

Parameters#

Fixed parameters#

### default parameter values 
alpha_healthy = 0.09 ### reference stiffnesses in kPa
gamma = 0.5 ### [-]
c1, c2 = 0.2, 0.4 ### in kPa
pe, pi = -0.5, -2 ### end-exhalation and end-inhalation pleural pressures, in kPa

Parameters to identify#

### uncomment the right parametrization
## Figure 4
parameters_to_identify = {"alpha": 0.09}

# ## Figure 5
# parameters_to_identify = {"pi": -2}

# ## Figure 6 - top row
# parameters_to_identify = {"alpha_healthy": 0.09, "pi": -2}

## Figure  7 - top row
# parameters_to_identify = {"alpha_healthy": 0.09, "alpha_fibrose": 0.9}

# ## Figure  8 - top row
# parameters_to_identify = {"alpha_fibrose": 0.9, "pi": -2}

# ## Figure  9 - top row
# parameters_to_identify = {"alpha_fibrose_1": 0.9, "alpha_fibrose_2": 0.67, "pi": -2}

# ## Figure 10 / 12 - top row 
# parameters_to_identify = {"alpha_fibrose_1": 1.1, "alpha_fibrose_2": 0.9, "alpha_fibrose_3": 0.67, "pi": -2}

# ## Figure 11 - top row 
# parameters_to_identify = {"alpha_healthy": 0.09, "alpha_fibrose": 0.67, "pi": -2}

Synthetic data parameters#

noise_lst = [0., 0.05, 0.1, 0.2, 0.4] ### noise added to the synthetic data
SNR_lst = [(2/noise_lst[i+1] if noise_lst[i] == 0 else 1/noise_lst[i]) for i in range(len(noise_lst))]

Geometry#

### generic mesh is stored in Data folder - note that the geometry used for the computation is different from the one used for generating the Figures, for copyrights reasons (the authors do not detain ownership of the geometry of the lungs)
cube_params = {"path_and_mesh_name": "./Data/mesh_cube.xdmf"}

Loading#

### position for the computations
position_lst = ["Supine", "Prone", "Supine+Prone"] ### supine, prone, prone + supine
### unloading problem
### generic unloading problem, no gravity is applied
load_params_unloading_generic = {"type":"p_boundary_condition0", "f":0, "P0" : float(pe)} 

### loading problem
### generic end-exhalation configuration in prone and in supine positions
load_params_loading_prone = {"type":"p_boundary_condition", "f": -9.81e3, "P0" : float(pe)}
load_params_loading_supine = {"type":"p_boundary_condition", "f": 9.81e3, "P0" : float(pe)}

Initialization: write reference end-exhalation configurations -prone and supine-#

### create necessary directories for computations
current_directory = os.getcwd() ### get name of current directory
directory_prone, directory_supine = helpers.initialize_directories(current_directory) ### if needed, create directories for computations; get name of directories for prone and supine computations

### creating mesh for end-exhalation prone and supine
mesh_prone = dolfin.Mesh()
mesh_supine = dolfin.Mesh()
mesh_name = str(cube_params["path_and_mesh_name"])
dolfin.XDMFFile(mesh_name).read(mesh_prone)
dolfin.XDMFFile(mesh_name).read(mesh_supine)

### generic configurations, computed with one zone for the sake of simplicity
parameters = {"alpha": alpha_healthy, "gamma":gamma, "c1":c1, "c2":c2, "kappa":1e2, "eta":1e-5}
mat_params={"scaling": "linear", "parameters": parameters}

### computing unloaded configuration from initial configuration
U_unloading, phis_unloading, dV_unloading = dmech.run_RivlinCube_PoroHyperelasticity(
    dim = 3,
    inverse = 1,
    cube_params = cube_params,
    porosity_params = {"type": "mesh_function_random_xml"},
    get_results = 1,
    mat_params = mat_params,
    step_params = {"dt_min":1e-4},
    load_params = load_params_unloading_generic,
    res_basename = "generic_unloaded",
    inertia_params = {"applied":True},
    plot_curves = 0,
    verbose =1 )

#### redefining porosity field in the unloaded configuration so it is physiological
phisref_imposed = list(numpy.random.uniform(low = 0.4, high = 0.6, size = len(phis_unloading)))

#### computing end-exhalation configuration prone position
Uprone, phisprone, dVprone = dmech.run_RivlinCube_PoroHyperelasticity(
    dim = 3,
    inverse = 0,
    porosity_params = {"type":"function_xml_from_array", "val":phisref_imposed},
    cube_params = cube_params,
    mat_params = mat_params,
    step_params = {"dt_ini": 0.125, "dt_min": 1e-4},
    load_params = load_params_loading_prone,
    res_basename = directory_prone+"/prone",
    move_params = {"move": True, "U": U_unloading},  ### applying the displacement field from generic end-exhalation configuration to unloaded configuration
    inertia_params={"applied": True},
    get_results = 1,
    plot_curves=0,
    verbose=1)

helpers.write_porosity(porosity_field = phisprone, n_cells = len(mesh_prone.cells()), filepath = directory_prone + "/prone-poro.xml")

### getting displacement field from generic end-exhalation to prone end-exhalation configuration
Uexhal_prone = U_unloading.copy(deepcopy=True)
Uexhal_prone.vector()[:] +=  Uprone.vector()[:]
dolfin.ALE.move(mesh_prone, Uexhal_prone)

### writing mesh prone
xdmf_file_mesh = dolfin.XDMFFile(directory_prone + "/cubeprone.xdmf")
xdmf_file_mesh.write(mesh_prone)
xdmf_file_mesh.close()

#### computing end-exhalation configuration supine position
Usupine, phissupine, dVsupine = dmech.run_RivlinCube_PoroHyperelasticity(
    dim = 3,
    inverse = 0,
    porosity_params = {"type":"function_xml_from_array", "val":phisref_imposed},
    cube_params = cube_params,
    mat_params = mat_params,
    step_params = {"dt_ini": 0.125, "dt_min": 1e-4},
    load_params = load_params_loading_supine,
    res_basename = directory_supine+"/supine",
    move_params = {"move": True, "U": U_unloading},  ### applying the displacement field from generic end-exhalation configuration to unloaded configuration
    inertia_params={"applied": True},
    get_results = 1,
    plot_curves=0,
    verbose=1)

helpers.write_porosity(porosity_field = phissupine, n_cells = len(mesh_supine.cells()), filepath = directory_supine + "/supine-poro.xml")

### getting displacement field from generic end-exhalation to prone end-exhalation configuration
Uexhal_supine = U_unloading.copy(deepcopy=True)
Uexhal_supine.vector()[:] +=  Usupine.vector()[:]
dolfin.ALE.move(mesh_supine, Uexhal_supine)

### writing mesh supine
xdmf_file_mesh = dolfin.XDMFFile(directory_supine + "/cubesupine.xdmf")
xdmf_file_mesh.write(mesh_supine)
xdmf_file_mesh.close()

Computation of the error distribution of the estimated parameter(s)#

### launching minimizations in parallel for the estimation
results_all_positions = {}
for position in position_lst:
    distribution_converged = ['no'] * len(noise_lst) ### is the distribution converged for each noise level?
    results, storing_values_for_convergence_check, storing_processes, reference_value, param_names, nb_parameters = helpers.initialize_lsts_and_params(parameters_to_identify, noise_lst, "noise")
    ########### computation of reference displacement field
    if position=='Supine+Prone':
        ### computing displacement field in supine position
        Umeas_supine, Vmeas_supine = compute_disp.compute_disp(position = 'Supine', parameters_to_identify = parameters_to_identify, noise='', dirpath = current_directory)
        V0_supine = dolfin.assemble(dolfin.Constant(1)*Vmeas_supine)
        Umeas_norm_supine = (dolfin.assemble(dolfin.inner(Umeas_supine, Umeas_supine)*Vmeas_supine)/2/V0_supine)**(1/2)
        ### writing displacement field supine position
        file_supine = dolfin.File("refSupine/displacement_exhal_to_inhal.xml")
        file_supine << Umeas_supine
        ### computing displacement field in prone position
        Umeas_prone, Vmeas_prone = compute_disp.compute_disp(position = 'Prone', parameters_to_identify = parameters_to_identify, noise='', dirpath = current_directory)
        V0_prone = dolfin.assemble(dolfin.Constant(1)*Vmeas_prone)
        Umeas_norm_prone = (dolfin.assemble(dolfin.inner(Umeas_prone, Umeas_prone)*Vmeas_prone)/2/V0_prone)**(1/2)
        ### writing displacement field prone position
        file_prone = dolfin.File("refProne/displacement_exhal_to_inhal.xml")
        file_prone << Umeas_prone
    else :
        Umeas, Vmeas = compute_disp.compute_disp(position = position, parameters_to_identify = parameters_to_identify, noise = '', dirpath = current_directory)
        V0 = dolfin.assemble(dolfin.Constant(1)*Vmeas)
        Umeas_norm = (dolfin.assemble(dolfin.inner(Umeas, Umeas)*Vmeas)/2/V0)**(1/2)
        ### writing displacement field in prone or supine (depending on the case investigated) position
        file_prone_or_supine = dolfin.File("ref"+str(position)+"/displacement_exhal_to_inhal.xml")
        file_prone_or_supine << Umeas
    ### checking how many CPUs are available to start computations
    number_cpus = multiprocessing.cpu_count()
    min_iterations = number_cpus // len(noise_lst) ### attribute the same number of CPU for each noise level
    converged = False ### convergence of the error distributions - checks if the calculation is over for all noise levels
    converged_for_noise_level = False ### variable checking if error distribution converged for a given noise level
    while not converged:
        converged = all(convergence_status == "converged" for convergence_status in distribution_converged) ### checks if the calculation is over for all noise levels
        if converged: ### if converged for all noise levels
            break
        over = False ### variable used to check if all computations launched in parallel are over for a given noise level
        for i in range(len(noise_lst)):
            if distribution_converged[i] == 'converged': ### if error distribution converged for a given noise level
                storing_processes.pop(str(noise_lst[i]), None) ### no more calculation is launched for this noise level
            elif distribution_converged[i] == 'no': ### if finished but did not converge
                storing_processes[str(noise_lst[i])] = [] ### should launch new computations
        for noise, lst in storing_values_for_convergence_check.items(): ### should launch new computations 
            if distribution_converged[noise_lst.index(float(noise))] == 'no': ### all computations finished for a particular noise level but did not converge, should launch new computations 
                for iteration in range(0, min_iterations): ### relaunching all processes for a given noise level
                    ini_calculation = []
                    for param, param_value in parameters_to_identify.items(): ### initializing the computation between -30 and + 30% of the ground-truth value of the parameters
                        ini_calculation.append(float(numpy.random.normal(loc = param_value, scale = abs(0.3*param_value), size = 1)[0]))
                    process = subprocess.Popen(["python",  "-W", "%s" %"ignore", "./minimization.py", "--position", "%s" %position, "--noise_level", "%s" %noise, "--parameters_to_identify", "%s" %parameters_to_identify, "--iteration", "%s" %iteration, "--dirpath", "%s" %current_directory, "--initialization", "%s" %ini_calculation], stdout=subprocess.PIPE )  ### launching computation in parallel
                    storing_processes[str(noise)].append(process) ### storing each process launched
                    distribution_converged[noise_lst.index(float(noise))] = 'waiting' ### changing status from 'no', i.e., finished but not converged, to 'waiting', i.e., no action is done until all processes reach their end (for a given noise level) 
        while not over: ### while all processes of a given noise level did not finish
            time.sleep(1) ### wait before checking again
            for noise, lst in storing_processes.items():
                noise = float(noise)
                over = helpers.checking_if_processes_converged(storing_processes[str(noise)]) ### check if all processes of a given noise finished
                if over: ### if all processes ended for a given noise level
                    for process in storing_processes[str(noise)]: ### for each process launched
                        try: ### read the output of the minimization
                            out = process.communicate()[0]
                            out = out.decode("utf-8").split()
                        except: ### if minimization unsuccessful (e.g., if did not converge fast enough -max number of iteration specified to avoid computations that are too expensive)
                            pass       
                        process.terminate()  ### ensuring process ends          
                        solution = {} ### storing results 
                        if out != []: ### out is [] if the minimization did not converge
                            results['noise'].append(noise) ### storing the results
                            for i in range(0, nb_parameters): ### getting the initial values, and the error (in %) of the initialization
                                lst_name = "ini_"+str(param_names[i])
                                results[lst_name].append(((float(out[i])-reference_value[i]))/reference_value[i]*100)
                            for k in range (nb_parameters, 2*nb_parameters): ### getting the estimated values, and computing the error (in %)
                                i = k - nb_parameters
                                results[param_names[i]].append((float(out[k]) - reference_value[i])/reference_value[i]*100)
                                storing_values_for_convergence_check[str(noise)][i].append(float(out[k])) ### storing values to check if the error distribution converged
                    if len(storing_values_for_convergence_check[str(noise)][0]) > min_iterations+1: ###  minimum number of iterations to check for convergence
                        converged_for_noise_level, crit = helpers.checking_if_converged(storing_values_for_convergence_check[str(noise)], len(storing_processes[str(noise)]), tol=5e-2)
                    if converged_for_noise_level: ### if the distribution converged
                        distribution_converged[noise_lst.index(float(noise))] = 'converged' ### no more minimizations launched for this noise level
                    else:
                        distribution_converged[noise_lst.index(float(noise))] = 'no' ### new processes will be launched
                    break
    results_all_positions[position] = results ### dictionary storing results for all positions
    df = pd.DataFrame(results)
    df_sorted = df.sort_values(by="noise") ### sorting by noise
    myfile= open("./results_estimation-model_error-position="+str(position), 'w') ### sorting by noise
    myfile.write(df_sorted.to_string(index=False)) ### writing the error for each position
    myfile.close()

##### plotting results
for parameter_name, value in parameters_to_identify.items():
    fig, ax = plt.subplots()
    ax.cla()
    ### plotting parameters
    plt.rc("xtick", labelsize=18)
    plt.rc("ytick", labelsize=18)
    plt.rc("legend", fontsize=16)
    plt.ylim([-100, 100])
    plt.xlabel("Signal to Noise Ratio (SNR)", fontsize=14)
    plt.ylabel("Estimation error (%)", fontsize=14)
    color_lst=['royalblue', 'firebrick', 'forestgreen'] ### different color for each position
    color_lst_lines=['royalblue', 'firebrick', 'darkgreen']
    alpha_lst=[0.6, 0.45, 0.6] ### transparenciesto be able to see all distributions superimposed
    label_lst = position_lst.copy()
    for position in position_lst:
        ### reorganize data 
        results_position = results_all_positions[position]
        frame = pd.DataFrame(results_position)
        sorted_frame = frame.sort_values(by="noise")
        parametrization_name = ''
        stats_results = sorted_frame.groupby("noise")[str(parameter_name)].agg(['mean', 'std'])
        stats_results['mean_'+str(parameter_name)] = stats_results['mean']
        stats_results['mean_plus_std'+str(parameter_name)] = stats_results['mean'] + stats_results['std']
        stats_results['mean_minus_std'+str(parameter_name)] = stats_results['mean'] - stats_results['std']
        parametrization_name += parameter_name
        #### plot data
        plt.plot(SNR_lst, stats_results['mean_'+str(parameter_name)], color=color_lst_lines[0], label = label_lst[0])
        ax.fill_between(SNR_lst, stats_results['mean_minus_std'+str(parameter_name)], stats_results['mean_plus_std'+str(parameter_name)], alpha = alpha_lst[0], color = color_lst[0])
        color_lst = color_lst[1:]
        label_lst = label_lst[1:]
        alpha_lst = alpha_lst[1:]
        color_lst_lines = color_lst_lines[1:]
    plt.xlim([2.5, 44])
    plt.gca().set_xscale('log')
    ax.errorbar(SNR_lst, len(SNR_lst)*[0], yerr = 30, linewidth = 1, markersize = 10, color = 'black', fmt = 'x', capsize = 5, label = "Initial distribution")
    plt.legend()
    plt.grid()
    plt.savefig("parametrization="+parametrization_name+"_identification_parameter="+str(parameter_name)+"_position="+str(position)+"_impact_measurement_errors.pdf", bbox_inches='tight')
    plt.show()