  1. import os
  2. import pandas as pd
  3. import numpy as np
  4. import seaborn as sns
  5. import matplotlib.ticker as tck
  6. import matplotlib.pyplot as plt
  7. from matplotlib.backends.backend_pdf import PdfPages
  8. from io import BytesIO
  9. from docx import Document
  10. from docx.shared import Inches, Pt
  11. from docx.enum.text import WD_ALIGN_PARAGRAPH
  12. from docx.enum.table import WD_ALIGN_VERTICAL
  13. from docx.oxml import OxmlElement
  14. from docx.oxml.ns import qn
  16. # Definition of data quality and comparison functions
  18. def prepare_data(path, tenor_tag, col_names, update_col_names=False):
  19.     """
  20.    Reads and prepares data from a specified CSV file path. It handles data transformations based on
  21.    tenor-related information, dynamically constructs columns when required, and manages data cleaning
  22.    by dropping duplicates and removing null values.
  24.    Args:
  25.        path (str): Path to the CSV file to be read.
  26.        tenor_tag (bool): Indicates whether operations related to tenors are to be applied, which may include
  27.                          constructing multifactor identifiers and filtering based on column presence.
  28.        col_names (list): Specifies column names or a list of column names that are used in operations.
  29.                          This parameter can be a single list of strings or a nested list when multiple columns
  30.                          are used to construct a single identifier.
  31.        update_col_names (bool): If True, updates the primary column names in the DataFrame, typically after
  32.                                 creating multifactor identifiers.
  34.    Returns:
  35.        pd.DataFrame: A DataFrame processed based on the specified parameters, cleaned for duplicates and null values.
  37.    Raises:
  38.        ValueError: If required columns for operations are missing from the DataFrame.
  39.    """
  40.     df = pd.read_csv(path)
  41.     column_set = set(df.columns.str.lower())  # Lowercase all columns for case-insensitive comparison
  43.     if tenor_tag:
  44.         if isinstance(col_names[0], list):
  45.             required_cols = col_names[0]
  46.         else:
  47.             required_cols = [col_names[0], col_names[1], col_names[2]]
  49.         missing_cols = [col for col in required_cols if col.lower() not in column_set]
  50.         if missing_cols:
  51.             raise ValueError(f"Missing columns in DataFrame: {', '.join(missing_cols)}")
  53.         if isinstance(col_names[0], list):
  54.             df['MF'] = df.apply(lambda row: combine_columns(row, 'MF', col_names[0]), axis=1)
  55.             df['Curve'] = df.apply(lambda row: combine_columns(row, 'curve', col_names[0]), axis=1)
  57.             if 'tenor' not in column_set:
  58.                 df['Tenor'] = df[col_names[0]].str.split(".").str[-1]
  60.             if update_col_names:
  61.                 col_names[0] = 'MF'
  63.             df = df[['Curve', 'MF', 'Tenor', col_names[1], col_names[2]]]
  64.         else:
  65.             if 'tenor' not in column_set:
  66.                 df['Tenor'] = df[col_names[0]].str.split(".").str[-1]
  67.             if 'curve' not in column_set:
  68.                 df['Curve'] = df[col_names[0]].str.split(".").str[0]
  69.             df = df[['Curve', col_names[0], 'Tenor', col_names[1], col_names[2]]]
  71.         df = df.drop_duplicates(subset=df.columns[[1, 3]], keep='first').dropna()
  72.     else:
  73.         df = df[[col_names[0], col_names[1], col_names[2]]]
  74.         df = df.drop_duplicates(subset=[col_names[0], col_names[1]], keep='first').dropna()
  76.     return df
  78. def combine_columns(row, combine_type, col_names):
  79.     """
  80.    Constructs a concatenated identifier from selected DataFrame columns based on the specified type.
  82.    Args:
  83.        row (pd.Series): A row from a DataFrame from which to extract column values.
  84.        combine_type (str): Specifies how the columns should be combined.
  85.                    'MF' will use all columns, 'curve' will exclude the last column.
  86.        col_names (list): A list of column names to be used in the concatenation.
  88.    Returns:
  89.        str: A concatenated string serving as a complex identifier.
  91.    Raises:
  92.        ValueError: If an unsupported type is provided.
  93.    """
  94.     if combine_type == 'MF':
  95.         return '.'.join(str(row[col]) for col in col_names)
  96.     elif combine_type == 'curve':
  97.         return '.'.join(str(row[col]) for col in col_names[:-1])
  98.     else:
  99.         raise ValueError(f"Unsupported type '{combine_type}'. Expected 'MF' or 'curve'.")
  102. def compare_tenors(df_prev, df_curr, threshold=None):
  103.     """
  104.    Compares the number of tenors per curve between two dataframes.
  106.    Args:
  107.      df_prev (pd.DataFrame): DataFrame containing previous period data.
  108.      df_curr (pd.DataFrame): DataFrame containing current period data.
  109.      threshold (int, optional): Threshold for filtering the results based on the change in the number of tenors.
  111.    Returns:
  112.      pd.DataFrame: DataFrame showing the changes in the number of tenors per curve.
  113.    """
  114.     tenors_prev = df_prev.groupby('Curve')['Tenor'].nunique()
  115.     tenors_curr = df_curr.groupby('Curve')['Tenor'].nunique()
  117.     # Concatenate the results into a single DataFrame
  118.     tenor_comparison = pd.concat([tenors_prev, tenors_curr], axis=1, keys=['prev', 'curr'])
  119.     tenor_comparison.fillna(0, inplace=True)
  121.     # Calculate the change in number of tenors
  122.     tenor_comparison['change'] = tenor_comparison['curr'] - tenor_comparison['prev']
  123.     tenor_comparison = tenor_comparison.sort_values('change', ascending=False)
  125.     # Apply the threshold filter if specified
  126.     if threshold is not None:
  127.         tenor_comparison = tenor_comparison[abs(tenor_comparison['change']) >= threshold]
  129.     return tenor_comparison.reset_index()
  133. def compare_with_dq_stats(df, stats_path):
  134.     """
  135.    Compares the data in the given DataFrame with the DQ stats provided in a CSV file.
  137.    Parameters:
  138.    df (pd.DataFrame): The DataFrame representing the current data to compare.
  139.    stats_path (str): The file path to the DQ stats CSV file.
  141.    Returns:
  142.    pd.DataFrame: A DataFrame summarizing the comparison results, including any warnings or discrepancies.
  143.    """
  144.     # Load the stats DataFrame from a path
  145.     stats = pd.read_csv(stats_path)
  147.     # Extracting relevant values from the stats DataFrame
  148.     total_unique_points_df = df.shape[0]
  149.     total_unique_points_stats = stats['TOTAL_POINTS'].iloc[0]
  150.     confirmed_stales = stats['Confirmed_Stales'].iloc[0]
  151.     replaced_stales = stats['Replaced_Stales'].iloc[0]
  152.     other_stales = stats['Other_Stales'].iloc[0]
  153.     total_stales_reported = stats['Total_Stales'].iloc[0]
  154.     total_stales_calculated = confirmed_stales + replaced_stales + other_stales
  156.     # Organize data in a list of tuples for DataFrame construction
  157.     data = [
  158.         ('Total unique points in delivery pack', total_unique_points_df),
  159.         ('Total unique points according to the DQ stats', total_unique_points_stats),
  160.         ('', ''),  # Empty row for spacing
  161.         ('Confirmed Stales', confirmed_stales),
  162.         ('Replaced Stales', replaced_stales),
  163.         ('Other Stales', other_stales),
  164.         ('Total Stales Calculated', total_stales_calculated),
  165.         ('Total Stales Reported in DQ stats', total_stales_reported),
  166.         ('', '')  # Another empty row for spacing
  167.     ]
  169.     # Check stales mismatch or match
  170.     stales_check = 'Match' if total_stales_calculated == total_stales_reported else 'Mismatch'
  171.     data.append(('Stales Check', stales_check))
  173.     # Adding warnings
  174.     if total_unique_points_df > total_unique_points_stats:
  175.         warning = 'Warning! Non-remediated points were added to the delivery set.'
  176.     elif total_unique_points_df < total_unique_points_stats:
  177.         warning = 'Warning! Less datapoints in the delivery pack than in the reported DQ stats.'
  178.     else:
  179.         warning = ''
  180.     data.append(('Conclusion', warning))
  182.     # Convert list of tuples to DataFrame
  183.     worksheet_df = pd.DataFrame(data, columns=['Metric', 'Value'])
  185.     return worksheet_df
  188. def compare_with_dq_stats_per_mkf(summary_df, stats_per_mkf_path, col_names):
  189.     """
  190.    Compares the MKF-level data in the summary DataFrame with the DQ stats provided in a CSV file.
  192.    Parameters:
  193.    summary (pd.DataFrame): The DataFrame representing the summary data to compare.
  194.    stats_per_mkf_path (str): The file path to the DQ stats CSV file.
  195.    col_names (list of str): A list containing the column names to be used in the comparison.
  197.    Returns:
  198.    pd.DataFrame: A DataFrame summarizing the comparison results, including any discrepancies or missing data.
  199.    """
  200.     # Load the stats DataFrame from a path
  201.     stats = pd.read_csv(stats_per_mkf_path)
  202.     summary = summary_df.copy()
  203.     # Fill NaNs in the summary DataFrame with a specific message
  204.     summary[f'{col_names[1]}_min_curr'] = summary[f'{col_names[1]}_min_curr'].fillna(
  205.         'MKF missing both in current datapack and batch download pack (according to DQ stats), but present in previous delivery.')
  207.     # Drop unnecessary columns from the stats DataFrame
  208.     stats.drop(columns=['Asset_class', 'Start_Date', 'End_Date', 'Mid_Date', 'Total_mkf'], inplace=True)
  210.     # Rename and select relevant columns in the summary DataFrame
  211.     summary = summary[[col_names[0], f'{col_names[1]}_min_curr', f'{col_names[1]}_max_curr', f'Datapoints_count_curr']]
  212.     summary.columns = [col_names[0], f'{col_names[1]}_min', f'{col_names[1]}_max', 'Datapoints_count']
  214.     # Rename columns in the stats DataFrame to match the summary DataFrame
  215.     stats.rename(columns={"MKF_CCR": col_names[0],
  216.                           "TOTAL_POINTS": "Datapoints_count",
  217.                           "MIN_DATE": f'{col_names[1]}_min',
  218.                           "MAX_DATE": f'{col_names[1]}_max'}, inplace=True)
  220.     # Merge the summary and stats DataFrames on the specified MKF column
  221.     summary_stats_merged = pd.merge(summary, stats, on=col_names[0], how='outer', suffixes=['_datapack', '_dq_stats'])
  223.     # Calculate the difference in datapoints and check for discrepancies in min and max dates
  224.     summary_stats_merged['Datapoints_count_without_removed_stales_dq_stats'] = summary_stats_merged['Datapoints_count_dq_stats'] - summary_stats_merged['Other_Stales']
  225.     summary_stats_merged['Datapoints_count_diff'] = summary_stats_merged['Datapoints_count_datapack'] - summary_stats_merged['Datapoints_count_without_removed_stales_dq_stats']
  226.     summary_stats_merged[f'{col_names[1]}_min_check'] = summary_stats_merged[f'{col_names[1]}_min_datapack'] == summary_stats_merged[f'{col_names[1]}_min_dq_stats']
  227.     summary_stats_merged[f'{col_names[1]}_max_check'] = summary_stats_merged[f'{col_names[1]}_max_datapack'] == summary_stats_merged[f'{col_names[1]}_max_dq_stats']
  229.     # Fill missing values with specific messages
  230.     summary_stats_merged[f'{col_names[1]}_min_datapack'] = summary_stats_merged[f'{col_names[1]}_min_datapack'].fillna('MKF missing in datapack')
  231.     summary_stats_merged[f'{col_names[1]}_min_dq_stats'] = summary_stats_merged[f'{col_names[1]}_min_dq_stats'].fillna('MKF missing in DQ stats')
  233.     # Organize the columns
  234.     cols_to_move = ['Datapoints_count_without_removed_stales_dq_stats', 'Datapoints_count_diff', f'{col_names[1]}_min_check', f'{col_names[1]}_max_check']
  235.     all_cols = list(summary_stats_merged.columns)
  237.     # Remove the columns to move from the original list
  238.     for col in cols_to_move:
  239.         all_cols.remove(col)
  241.     # Insert the columns back at the target position (index 7 in this example)
  242.     for i, col in enumerate(cols_to_move):
  243.         all_cols.insert(7 + i, col)
  245.     # Reorder the DataFrame columns
  246.     summary_stats_merged = summary_stats_merged[all_cols]
  248.     return summary_stats_merged
  250. def calculate_vols(df, vol_type, normal_vol, min_date, max_date, col_names):
  251.     """
  252.    Calculates volatility for a specified date range and type within a DataFrame.
  254.    Args:
  255.        df (pd.DataFrame): DataFrame containing the data to analyze.
  256.        vol_type (str): Specifies the type of volatility calculation ('inner', 'outer_prev', 'outer_curr', 'full').
  257.        normal_vol (bool): Determines whether to calculate normal volatility (True) or log volatility (False).
  258.        min_date (str): Minimum date for the data filtering.
  259.        max_date (str): Maximum date for the data filtering.
  260.        col_names (list): Column names to be used in sorting and filtering.
  262.    Returns:
  263.        pd.DataFrame: DataFrame with calculated returns.
  264.    """
  265.     # Sort data to ensure consistent grouping
  266.     df = df.sort_values(by=[col_names[0], col_names[1]])
  268.     # Filter data based on vol_type and date range
  269.     if vol_type in {'inner', 'outer_prev', 'outer_curr'}:
  270.         df = df[(df[col_names[1]] >= min_date) & (df[col_names[1]] <= max_date)]
  271.     elif vol_type == 'full':
  272.         df = df.copy()
  273.     else:
  274.         raise ValueError("Invalid vol_type specified. Use 'inner', 'full', 'outer_prev' or 'outer_curr'.")
  276.     # Convert columns to appropriate data types
  277.     df[col_names[1]] = pd.to_datetime(df[col_names[1]], format='%Y%m%d')
  278.     df[col_names[2]] = pd.to_numeric(df[col_names[2]], errors='coerce')
  280.     # Calculate returns
  281.     df['Ret'] = df.groupby(col_names[0])[col_names[2]].transform(
  282.         lambda x: calculate_returns_grouped(
  283.             pd.DataFrame({col_names[2]: x}),
  284.             normal_vol=normal_vol,
  285.             col_names=col_names)['Ret'] # lambda instead of apply to avoid deprecation warning
  286.     )
  287.     df.dropna(subset=['Ret'], inplace=True)
  289.     return df
  291. def calculate_returns_grouped(group, normal_vol, col_names):
  292.     """
  293.    Calculates returns for a group of MKF.
  295.    Args:
  296.        group (pd.DataFrame): DataFrame containing the data to analyze.
  297.        normal_vol (bool): Determines whether to calculate normal returns (True) or log returns (False).
  298.        col_names (list): Column names to be used in calculations.
  300.    Returns:
  301.        pd.DataFrame: DataFrame with calculated returns.
  302.    """
  303.     group = group.copy()
  305.     if normal_vol:
  306.         group['Ret'] = group[col_names[2]].pct_change(fill_method=None)
  307.     else:
  308.         # Filter out non-positive values for log returns
  309.         group = group[group[col_names[2]] > 0]
  310.         group['Ret'] = np.log(group[col_names[2]] / group[col_names[2]].shift(1)).dropna()
  311.     return group
  316. def outer_join(df_prev, df_curr, col_names, threshold=None):
  317.     """
  318.    Performs an outer join between previous and current dataframes to identify non-overlapping entries
  319.    and returns grouped statistics based on the identifier column.
  321.    Args:
  322.      df_prev (pd.DataFrame): DataFrame containing previous period data.
  323.      df_curr (pd.DataFrame): DataFrame containing current period data.
  324.      col_names (list): Column names used to perform the join. Expects [identifier, date_col, value_col].
  325.      threshold (int, optional): Threshold for filtering the results based on the number of non-overlapping entries.
  327.    Returns:
  328.      tuple: Two DataFrames, one with previous period not overlapping with the current and another with current period not overlapping with the previous.
  330.    Raises:
  331.      ValueError: If provided column names do not exist in the DataFrames.
  332.    """
  333.     # Check if the required columns exist in both DataFrames
  334.     required_cols = set(col_names)
  335.     if not required_cols.issubset(df_prev.columns) or not required_cols.issubset(df_curr.columns):
  336.         missing_cols = list((required_cols - set(df_prev.columns)) | (required_cols - set(df_curr.columns)))
  337.         raise ValueError(f"Missing columns in DataFrame: {missing_cols}")
  339.     # Perform the outer join based on the identifier and date columns
  340.     df_merged = pd.merge(df_prev, df_curr, on=[col_names[0], col_names[1]], how='outer', indicator=True, suffixes=['_prev', '_curr'])
  342.     # Determine the overlapping scope based on the date column
  343.     overlapping_scope = (df_curr[col_names[1]].min(), df_prev[col_names[1]].max())
  344.     df_merged = df_merged[(df_merged[col_names[1]] >= overlapping_scope[0]) & (df_merged[col_names[1]] <= overlapping_scope[1])]
  346.     # Filter to find non-overlapping entries
  347.     df_prev_nas = df_merged[df_merged[f'{col_names[2]}_prev'].isna()]
  348.     df_curr_nas = df_merged[df_merged[f'{col_names[2]}_curr'].isna()]
  350.     # Group by the identifier and calculate statistics
  351.     grouped_prev_nas = df_prev_nas.groupby(col_names[0]).agg(
  352.         min_date=(col_names[1], 'min'),
  353.         max_date=(col_names[1], 'max'),
  354.         count=(col_names[1], 'count'),
  355.         mean_price=(f'{col_names[2]}_curr', 'mean'),
  356.         std_price=(f'{col_names[2]}_curr', 'std')
  357.     ).reset_index()
  359.     grouped_curr_nas = df_curr_nas.groupby(col_names[0]).agg(
  360.         min_date=(col_names[1], 'min'),
  361.         max_date=(col_names[1], 'max'),
  362.         count=(col_names[1], 'count'),
  363.         mean_price=(f'{col_names[2]}_prev', 'mean'),
  364.         std_price=(f'{col_names[2]}_prev', 'std')
  365.     ).reset_index()
  367.     if threshold is not None:
  368.         grouped_prev_nas = grouped_prev_nas[grouped_prev_nas['count'] > threshold]
  369.         grouped_curr_nas = grouped_curr_nas[grouped_curr_nas['count'] > threshold]
  371.     return grouped_prev_nas, grouped_curr_nas
  375. def count_outer_points(df_prev, df_curr, col_names, threshold=None):
  376.     """
  377.    Counts the number of points that fall outside the overlapping date range for previous and current dataframes.
  379.    Args:
  380.        df_prev (pd.DataFrame): DataFrame containing previous period data.
  381.        df_curr (pd.DataFrame): DataFrame containing current period data.
  382.        col_names (list): Column names used for the calculations. Expects [identifier, date_col, value_col].
  383.        threshold (int, optional): Threshold for filtering the results based on the absolute difference in the number of points.
  385.    Returns:
  386.        pd.DataFrame: DataFrame with the count of non-overlapping points and their differences.
  387.    """
  388.     # Determine the overlapping date range
  389.     overlapping_scope = (df_curr[col_names[1]].min(), df_prev[col_names[1]].max())
  391.     # Filter data outside the overlapping date range
  392.     df_prev_only = df_prev[df_prev[col_names[1]] < overlapping_scope[0]]
  393.     df_curr_only = df_curr[df_curr[col_names[1]] > overlapping_scope[1]]
  395.     # Group by the identifier and count the values
  396.     prev_only_grouped = df_prev_only.groupby(col_names[0])[col_names[2]].count().reset_index()
  397.     curr_only_grouped = df_curr_only.groupby(col_names[0])[col_names[2]].count().reset_index()
  399.     # Merge the grouped data
  400.     grouped_merged = pd.merge(prev_only_grouped, curr_only_grouped, on=[col_names[0]], how='outer', indicator=True, suffixes=['_prev_count', '_curr_count'])
  402.     # Calculate the difference in counts (curr - prev)
  403.     grouped_merged['diff_in_count'] = grouped_merged[f'{col_names[2]}_curr_count'] - grouped_merged[f'{col_names[2]}_prev_count']
  404.     grouped_merged = grouped_merged.sort_values('diff_in_count')
  406.     # Apply the threshold filter if specified (absolute value of diff_in_count)
  407.     if threshold is not None:
  408.         grouped_merged = grouped_merged[abs(grouped_merged['diff_in_count']) >= threshold]
  410.     return grouped_merged
  413. def limit_plotting_scope(df_prev, df_curr, col_names, normal_vol, mf_count, combined_vols=None):
  414.     """
  415.    Limits the scope of data for plotting by filtering and sorting based on volatility changes.
  417.    Args:
  418.        df_prev (pd.DataFrame): DataFrame containing previous period data.
  419.        df_curr (pd.DataFrame): DataFrame containing current period data.
  420.        col_names (list): Column names used for the calculations.
  421.        normal_vol (bool): Determines whether to calculate normal volatility (True) or log volatility (False).
  422.        mf_count (int): Number of records to limit the scope to.
  423.        combined_vols (pd.DataFrame, optional): DataFrame containing precomputed volatility comparisons.
  425.    Returns:
  426.        pd.DataFrame: DataFrame limited to the specified number of records with the highest absolute relative differences in volatility.
  427.    """
  428.     if combined_vols is None:
  429.         combined_vols = calculate_vol_comparison(df_prev, df_curr, col_names, normal_vol)
  431.     vols_comparison = combined_vols[[col_names[0], 'Vol_full_prev', 'Vol_full_curr', 'rel_diff_full (%)']]
  433.     # Filter out rows where previous or current volatility is zero
  434.     vols_comparison = vols_comparison[(vols_comparison['Vol_full_prev'] != 0) & (vols_comparison['Vol_full_curr'] != 0)]
  436.     # Calculate absolute relative difference
  437.     vols_comparison['abs_rel_diff'] = abs(vols_comparison['rel_diff_full (%)'])
  439.     # Drop rows with NaN values
  440.     vols_comparison = vols_comparison.dropna()
  442.     # Sort by absolute relative difference in descending order
  443.     vols_comparison = vols_comparison.sort_values(by='abs_rel_diff', ascending=False).reset_index(drop=True)
  445.     # Limit to the specified number of records
  446.     return vols_comparison[:mf_count]
  449. def base_plotter(df_prev, df_curr, path_out, col_names, normal_vol, mf_count=None, pdf=None, combine_plots=True, combined_vols=None):
  450.     """
  451.    Creates and saves plots for comparing previous and current period data.
  453.    Args:
  454.        df_prev (pd.DataFrame): DataFrame containing previous period data.
  455.        df_curr (pd.DataFrame): DataFrame containing current period data.
  456.        path_out (str): Output directory for saving plots.
  457.        col_names (list): Column names used for the calculations.
  458.        normal_vol (bool): Determines whether to calculate normal volatility (True) or log volatility (False).
  459.        mf_count (int, optional): Number of records to limit the scope to. Defaults to None.
  460.        pdf (PdfPages, optional): PdfPages object for saving combined plots. Defaults to None.
  461.        combine_plots (bool, optional): Whether to combine plots into a single PDF. Defaults to True.
  462.        combined_vols (pd.DataFrame, optional): DataFrame containing precomputed volatility comparisons.
  464.    Returns:
  465.        None
  466.    """
  467.     # Ensure the output directory exists
  468.     os.makedirs(os.path.join(path_out, 'vol_change_plots'), exist_ok=True)
  469.     path_out_plots = os.path.join(path_out, 'vol_change_plots')
  471.     # Initialize PdfPages object if combining plots
  472.     if combine_plots:
  473.         pdf_out = PdfPages(os.path.join(path_out, 'combined_plots.pdf'))
  475.     # Determine the list of identifiers (mfs_check) to plot
  476.     if mf_count is not None:
  477.         vols_comparison = limit_plotting_scope(df_prev, df_curr, col_names, normal_vol, mf_count, combined_vols)
  478.         mfs_check = vols_comparison[col_names[0]].unique().tolist()
  479.     else:
  480.         mfs_check = pd.concat([df_prev[col_names[0]], df_curr[col_names[0]]]).unique().tolist()
  482.     # Filter the dataframes to include only the identifiers in mfs_check
  483.     df_curr = df_curr[df_curr[col_names[0]].isin(mfs_check)]
  484.     df_prev = df_prev[df_prev[col_names[0]].isin(mfs_check)]
  486.     # Merge the dataframes on the identifier and date columns
  487.     df = pd.merge(df_prev, df_curr, on=[col_names[0], col_names[1]], how='outer', suffixes=('_prev_dataset', '_curr_dataset'))
  488.     df = df.sort_values([col_names[0], col_names[1]])
  489.     df['diff_abs'] = abs(df[f'{col_names[2]}_prev_dataset'] - df[f'{col_names[2]}_curr_dataset'])
  490.     df = df[[col_names[0], col_names[1], f'{col_names[2]}_prev_dataset', f'{col_names[2]}_curr_dataset']]
  491.     df[col_names[1]] = pd.to_datetime(df[col_names[1]], format='%Y%m%d')
  493.     # Reshape the dataframe for plotting
  494.     df = df.melt(id_vars=[col_names[0], col_names[1]]).rename(columns={'variable': 'set', 'value': col_names[2]})
  496.     # Create and save plots for each identifier in mfs_check
  497.     for mf in mfs_check:
  498.         tmp = df[df[col_names[0]] == mf]
  499.         g = sns.lineplot(data=tmp, x=col_names[1], y=col_names[2], hue='set')
  500.         plt.xticks(rotation=15)
  501.         g.set_title(mf)
  502.         fig = plt.gcf()
  503.         fig.savefig(os.path.join(path_out_plots, '{}.png'.format(mf)))
  504.         if combine_plots:
  505.             pdf_out.savefig(fig)
  506.         plt.clf()
  508.     # Close the PdfPages object if combining plots
  509.     if combine_plots:
  510.         pdf_out.close()
  513. def create_summary(df_prev, df_curr, col_names, normal_vol, dq_stats_per_mkf_path_curr=None, dq_stats_per_mkf_path_prev=None, threshold=None):
  514.     """
  515.    Creates a summary DataFrame comparing previous and current period data.
  517.    Args:
  518.        df_prev (pd.DataFrame): DataFrame containing previous period data.
  519.        df_curr (pd.DataFrame): DataFrame containing current period data.
  520.        col_names (list): Column names used for the calculations.
  521.            Expected format: [group_column, date_column, value_column].
  522.        normal_vol (bool): Determines whether to calculate normal volatility (True) or log volatility (False).
  523.        dq_stats_per_mkf_path_curr (str, optional): Path to the current period DQ stats per MKF CSV file.
  524.        dq_stats_per_mkf_path_prev (str, optional): Path to the previous period DQ stats per MKF CSV file.
  525.        threshold (float, optional): Threshold for filtering the summary DataFrame.
  527.    Returns:
  528.        pd.DataFrame: Summary DataFrame containing the comparison results.
  529.    """
  530.     # Pivot tables for counting data points
  531.     pivot_prev = df_prev.pivot_table(index=col_names[0], values=col_names[2], aggfunc='count').rename(
  532.         columns={col_names[2]: 'Datapoints_count_prev'})
  533.     pivot_curr = df_curr.pivot_table(index=col_names[0], values=col_names[2], aggfunc='count').rename(
  534.         columns={col_names[2]: 'Datapoints_count_curr'})
  536.     # Calculate full volatility for previous and current periods
  537.     ts_prev = calculate_vols(df_prev, 'full', normal_vol, df_prev[col_names[1]].min(), df_prev[col_names[1]].max(), col_names)
  538.     ts_curr = calculate_vols(df_curr, 'full', normal_vol, df_curr[col_names[1]].min(), df_curr[col_names[1]].max(), col_names)
  539.     vol_prev = ts_prev.groupby(col_names[0])['Ret'].std(ddof=0).reset_index().rename(columns={'Ret': 'Vol_prev'})
  540.     vol_curr = ts_curr.groupby(col_names[0])['Ret'].std(ddof=0).reset_index().rename(columns={'Ret': 'Vol_curr'})
  542.     # Merging volatilities
  543.     vols = pd.merge(vol_prev, vol_curr, on=col_names[0], how='outer')
  544.     vols['rel vol change in %'] = abs(vols['Vol_prev'] - vols['Vol_curr']) / vols['Vol_prev'] * 100
  546.     # Merging date range information
  547.     date_ranges_prev = df_prev.groupby(col_names[0])[col_names[1]].agg(['min', 'max']).rename(
  548.         columns={'min': f'{col_names[1]}_min_prev', 'max': f'{col_names[1]}_max_prev'})
  549.     date_ranges_curr = df_curr.groupby(col_names[0])[col_names[1]].agg(['min', 'max']).rename(
  550.         columns={'min': f'{col_names[1]}_min_curr', 'max': f'{col_names[1]}_max_curr'})
  552.     # Combine all information into one DataFrame
  553.     summary_df = pd.merge(pivot_prev, pivot_curr, on=col_names[0], how='outer')
  554.     summary_df = pd.merge(summary_df, date_ranges_prev, on=col_names[0], how='outer')
  555.     summary_df = pd.merge(summary_df, date_ranges_curr, on=col_names[0], how='outer')
  556.     summary_df = pd.merge(summary_df, vols, on=col_names[0], how='outer')
  557.     summary_df = summary_df.sort_values(by='rel vol change in %', ascending=False)
  559.     # Calculate count change
  560.     summary_df['count_change'] = summary_df['Datapoints_count_curr'] - summary_df['Datapoints_count_prev']
  562.     # Apply summary threshold if specified
  563.     if threshold is not None:
  564.         summary_df = summary_df[summary_df['rel vol change in %'] > threshold]
  567.     if dq_stats_per_mkf_path_prev:
  568.         # Load the dq_stats_per_mkf DataFrame
  569.         dq_stats_df_prev = pd.read_csv(dq_stats_per_mkf_path_prev)
  570.         dq_stats_df_prev.rename(columns={'MKF_CCR': col_names[0]}, inplace=True)
  572.         # Merge the summary_df DataFrame with the dq_stats_df DataFrame
  573.         summary_df = pd.merge(
  574.             summary_df,
  575.             dq_stats_df_prev[[col_names[0], 'Other_Jumps']],
  576.             left_on=col_names[0],
  577.             right_on=col_names[0],
  578.             how='left'
  579.         )
  580.         summary_df.rename(columns={'Other_Jumps': 'Other_Jumps_prev'}, inplace=True)
  582.     # Connect the information regarding other jumps from DQ stats
  583.     if dq_stats_per_mkf_path_curr:
  584.         # Load the dq_stats_per_mkf DataFrame
  585.         dq_stats_df_curr = pd.read_csv(dq_stats_per_mkf_path_curr)
  586.         dq_stats_df_curr.rename(columns={'MKF_CCR': col_names[0]}, inplace=True)
  587.         # Merge the summary_df DataFrame with the dq_stats_df DataFrame
  588.         summary_df = pd.merge(
  589.             summary_df,
  590.             dq_stats_df_curr[[col_names[0], 'Other_Jumps']],
  591.             left_on=col_names[0],
  592.             right_on=col_names[0],
  593.             how='left'
  594.         )
  595.         summary_df.rename(columns={'Other_Jumps': 'Other_Jumps_curr'}, inplace=True)
  597.     # Create the comment column
  598.     def generate_comment(row):
  600.         if pd.notnull(row.get('Other_Jumps_curr')) and row['Other_Jumps_curr'] > 0:
  601.             comments.append(f"{int(row['Other_Jumps_curr'])} current")
  602.         if pd.notnull(row.get('Other_Jumps_prev')) and row['Other_Jumps_prev'] > 0:
  603.             comments.append(f"{int(row['Other_Jumps_prev'])} previous")
  604.         if comments:
  605.             return f"Vol change may be related to {' and '.join(comments)} other not confirmed (kept) jumps."
  606.         else:
  607.             return np.nan
  609.     summary_df['comment'] = summary_df.apply(generate_comment, axis=1)
  611.     return summary_df
  614. def compare_mkfs(df_prev, df_curr, col_name):
  615.     """
  616.    Compare unique identifiers ('MKF' or 'MF') from two dataframes and categorize them into
  617.    intersection, removed, and added identifiers. Returns a dataframe with these categories
  618.    and their counts in the column names.
  620.    Args:
  621.    df_prev (pd.DataFrame): DataFrame containing previous period data.
  622.    df_curr (pd.DataFrame): DataFrame containing current period data.
  623.    col_name (str): The name of the column containing identifiers to be compared.
  625.    Returns:
  626.    pd.DataFrame: A DataFrame with three columns, each containing lists of identifiers:
  627.                  - Intersecting identifiers from both dataframes.
  628.                  - Identifiers removed in the current dataframe.
  629.                  - Identifiers added in the current dataframe.
  630.                  Each column name includes the count of identifiers in that category.
  631.    """
  632.     # Collecting unique identifiers from both dataframes
  633.     mkf_prev = set(df_prev[col_name].unique())
  634.     mkf_curr = set(df_curr[col_name].unique())
  636.     # Calculating intersection and differences
  637.     intersection = sorted(mkf_prev.intersection(mkf_curr))
  638.     mkfs_removed = sorted(mkf_prev.difference(mkf_curr))
  639.     mkfs_added = sorted(mkf_curr.difference(mkf_prev))
  641.     # Creating dictionary with results formatted with count details
  642.     comparison_dict = {
  643.         f'{col_name} Intersection - {len(intersection)} items': list(intersection),
  644.         f'{col_name} Removed - {len(mkfs_removed)} items': list(mkfs_removed),
  645.         f'{col_name} Added - {len(mkfs_added)} items': list(mkfs_added)
  646.     }
  648.     # Converting dictionary to DataFrame
  649.     comparison_df = pd.DataFrame(dict([(k, pd.Series(v)) for k, v in comparison_dict.items()]))
  651.     return comparison_df
  653. def set_column_widths(dataframe, writer, sheet_name):
  654.     """
  655.    Sets the width of the columns in the specified Excel sheet to fit the contents.
  657.    Args:
  658.        dataframe (pd.DataFrame): DataFrame to adjust column widths for.
  659.        writer (pd.ExcelWriter): Excel writer object.
  660.        sheet_name (str): Name of the Excel sheet to adjust.
  662.    Returns:
  663.        None
  664.    """
  665.     for column in dataframe:
  666.         column_length = max(dataframe[column].astype(str).map(len).max(), len(column)) + 2
  667.         col_idx = dataframe.columns.get_loc(column)
  668.         writer.sheets[sheet_name].set_column(col_idx, col_idx, column_length)
  671. def create_report(dataframes_to_export, export_path, assetclass):
  672.     """
  673.    Creates an Excel report containing multiple DataFrames as separate sheets,
  674.    splitting large DataFrames across multiple sheets if necessary.
  676.    Args:
  677.        dataframes_to_export (dict): Dictionary where keys are sheet names and values are DataFrames to export.
  678.        export_path (str): Path to the directory where the report will be saved.
  679.        assetclass (str): Asset class name used in the filename.
  681.    Returns:
  682.        None
  683.    """
  684.     max_rows_per_sheet = 1048576  # max num of rows in an Excel worksheet
  685.     file_path = os.path.join(export_path, f'{assetclass}_comparison_report.xlsx')
  687.     with pd.ExcelWriter(file_path, engine='xlsxwriter') as writer:
  688.         for sheet_name, dataframe in dataframes_to_export.items():
  689.             if dataframe.empty:
  690.                 # Create a DataFrame with a no data message
  691.                 no_data_message = pd.DataFrame({'Message': ['No data/rows/output available.']})
  692.                 no_data_message.to_excel(writer, sheet_name=sheet_name, index=False)
  693.                 set_column_widths(no_data_message, writer, sheet_name)
  694.             elif len(dataframe) <= max_rows_per_sheet:
  695.                 dataframe.to_excel(writer, sheet_name=sheet_name, index=False)
  696.                 set_column_widths(dataframe, writer, sheet_name)
  697.             else:
  698.                 num_parts = (len(dataframe) // max_rows_per_sheet) + 1
  699.                 for part in range(num_parts):
  700.                     start_row = part * max_rows_per_sheet
  701.                     end_row = min((part + 1) * max_rows_per_sheet, len(dataframe))
  702.                     part_dataframe = dataframe.iloc[start_row:end_row]
  703.                     part_sheet_name = f"{sheet_name}_part{part + 1}"
  704.                     part_dataframe.to_excel(writer, sheet_name=part_sheet_name, index=False)
  705.                     set_column_widths(part_dataframe, writer, part_sheet_name)
  708. def calculate_vol_comparison(df_prev, df_curr, col_names, normal_vol, vol_type='all', threshold=None):
  709.     """
  710.    Calculates volatility comparison between two datasets for all specified types or a specific type.
  712.    Args:
  713.        df_prev (pd.DataFrame): Previous period data.
  714.        df_curr (pd.DataFrame): Current period data.
  715.        col_names (list): Column names used for the calculations.
  716.        normal_vol (bool): Determines whether to calculate normal volatility (True) or log volatility (False).
  717.        vol_type (str): Specifies the type of volatility to calculate ('all', 'inner', 'full', 'outer').
  718.        threshold (dict, optional): Dictionary containing threshold values for filtering results. Example:
  719.            {
  720.                'abs_diff_inner': 0.1,
  721.                'rel_diff_inner': 5,
  722.                'abs_diff_full': 0.1,
  723.                'rel_diff_full': 5,
  724.                'abs_diff_outer': 0.1,
  725.                'rel_diff_outer': 5
  726.            }
  728.    Returns:
  729.        pd.DataFrame: Contains the results of the volatility comparison.
  730.    """
  731.     def calculate_vol(df, vol_type, min_date, max_date, suffix):
  732.         ts = calculate_vols(df, vol_type, normal_vol, min_date, max_date, col_names)
  733.         vol = ts.groupby(col_names[0])['Ret'].std(ddof=0).reset_index()
  734.         vol.columns = [col_names[0], f'Vol_{suffix}']
  735.         return vol
  737.     def add_diff_columns(df, col1, col2, suffix):
  738.         df[f'abs_diff_{suffix}'] = abs(df[col1] - df[col2])
  739.         df[f'rel_diff_{suffix} (%)'] = abs((df[col2] - df[col1]) / df[col1] * 100).fillna(0)
  740.         return df
  742.     combined_vols = pd.DataFrame()
  744.     if vol_type in ('all', 'inner'):
  745.         min_date_inner, max_date_inner = df_curr[col_names[1]].min(), df_prev[col_names[1]].max()
  746.         vol_inner_prev = calculate_vol(df_prev, 'inner', min_date_inner, max_date_inner, 'inner_prev')
  747.         vol_inner_curr = calculate_vol(df_curr, 'inner', min_date_inner, max_date_inner, 'inner_curr')
  748.         vol_inner = vol_inner_prev.merge(vol_inner_curr, on=col_names[0], how='outer')
  749.         vol_inner = add_diff_columns(vol_inner, 'Vol_inner_prev', 'Vol_inner_curr', 'inner')
  750.         combined_vols = vol_inner if combined_vols.empty else combined_vols.merge(vol_inner, on=col_names[0], how='outer')
  752.     if vol_type in ('all', 'full'):
  753.         min_date_full_prev, max_date_full_prev = df_prev[col_names[1]].min(), df_prev[col_names[1]].max()
  754.         min_date_full_curr, max_date_full_curr = df_curr[col_names[1]].min(), df_curr[col_names[1]].max()
  755.         vol_full_prev = calculate_vol(df_prev, 'full', min_date_full_prev, max_date_full_prev, 'full_prev')
  756.         vol_full_curr = calculate_vol(df_curr, 'full', min_date_full_curr, max_date_full_curr, 'full_curr')
  757.         vol_full = vol_full_prev.merge(vol_full_curr, on=col_names[0], how='outer')
  758.         vol_full = add_diff_columns(vol_full, 'Vol_full_prev', 'Vol_full_curr', 'full')
  759.         combined_vols = vol_full if combined_vols.empty else combined_vols.merge(vol_full, on=col_names[0], how='outer')
  761.     if vol_type in ('all', 'outer'):
  762.         min_date_outer_prev, max_date_outer_prev = df_prev[col_names[1]].min(), df_curr[col_names[1]].min()
  763.         min_date_outer_curr, max_date_outer_curr = df_prev[col_names[1]].min(), df_curr[col_names[1]].max()
  764.         vol_outer_prev = calculate_vol(df_prev, 'outer_prev', min_date_outer_prev, max_date_outer_prev, 'outer_prev')
  765.         vol_outer_curr = calculate_vol(df_curr, 'outer_curr', min_date_outer_curr, max_date_outer_curr, 'outer_curr')
  766.         vol_outer = vol_outer_prev.merge(vol_outer_curr, on=col_names[0], how='outer')
  767.         vol_outer = add_diff_columns(vol_outer, 'Vol_outer_prev', 'Vol_outer_curr', 'outer')
  768.         combined_vols = vol_outer if combined_vols.empty else combined_vols.merge(vol_outer, on=col_names[0], how='outer')
  770.     # Apply threshold filtering
  771.     if threshold:
  772.         for key, value in threshold.items():
  773.             if key in combined_vols.columns:
  774.                 combined_vols = combined_vols[combined_vols[key] > value]
  776.     if 'rel_diff_full (%)' in combined_vols.columns:
  777.         combined_vols = combined_vols.sort_values(by='rel_diff_full (%)', ascending=False)
  778.     return combined_vols
  781. def create_threshold_details_sheet(threshold_params):
  782.     """
  783.    Creates a DataFrame containing details of applied thresholds.
  785.    Args:
  786.        threshold_params (dict): Dictionary containing the thresholds for various parts of the analysis.
  788.    Returns:
  789.        pd.DataFrame: DataFrame with details of the applied thresholds.
  790.    """
  791.     threshold_details = []
  793.     for key, value in threshold_params.items():
  794.         threshold_details.append({'Threshold Type': key, 'Value': value})
  796.     return pd.DataFrame(threshold_details)
  799. ######### Functions related to specific asset classes:
  801. def find_gaps(df_1, df_2, col_names, missing_threshold=5):
  802.     intervals = []
  803.     subset1 = pd.concat([df_1[[col_names[0]]].copy(), df_2[[col_names[0]]].copy()], ignore_index=True)
  804.     subset1 = subset1.drop_duplicates()
  805.     for name in subset1[col_names[0]].unique():
  806.         df1_name = df_1[df_1[col_names[0]] == name]
  807.         df2_name = df_2[df_2[col_names[0]] == name]
  808.         if df1_name.empty:
  809.             if not df2_name.empty:
  810.                 print(name)
  811.                 intervals.append({
  812.                     col_names[0]: name,
  813.                     'start_date': df2_name[col_names[1]].min(),
  814.                     'end_date': df2_name[col_names[1]].max(),
  815.                     'gap lenght': len(df2_name)
  816.                 })
  817.             continue
  819.         merged = pd.merge(df1_name, df2_name, how='outer', on=col_names[1], indicator=True)
  820.         merged = merged.sort_values(col_names[1])
  821.         merged['right_only'] = merged['_merge'] == 'right_only'
  822.         merged['gap'] = (~merged['right_only']).cumsum()
  823.         for gap_id, group in merged[merged['right_only']].groupby('gap'):
  824.             if len(group) >= missing_threshold:
  825.                 intervals.append({
  826.                     col_names[0]: name,
  827.                     'start_date': group[col_names[1]].min(),
  828.                     'end_date': group[col_names[1]].max(),
  829.                     'gap lenght': len(group)
  830.                 })
  832.     return pd.DataFrame(intervals)
  834. def commodity_curves_plotter(df_prev, df_curr, export_path, filename, date_cutoff):
  835.     # Ensure the directory exists
  836.     if not os.path.exists(export_path):
  837.         os.makedirs(export_path)
  839.     full_path = os.path.join(export_path, filename)
  840.     df1 = df_prev.copy()
  841.     df2 = df_curr.copy()
  843.     # Convert 'Date' to datetime if it's not already
  844.     df1['Date'] = pd.to_datetime(df1['Date'], format='%Y%m%d')
  845.     df2['Date'] = pd.to_datetime(df2['Date'], format='%Y%m%d')
  847.     # Apply date cutoff if provided
  848.     if date_cutoff is not None:
  849.         cutoff = pd.to_datetime(date_cutoff)
  850.         df1 = df1[df1['Date'] >= cutoff]
  851.         df2 = df2[df2['Date'] >= cutoff]
  853.     # Create pse_code column
  854.     df1['pse_code'] = df1['MF'].str.split('.').str[0]
  855.     df2['pse_code'] = df2['MF'].str.split('.').str[0]
  857.     with PdfPages(full_path) as pdf:
  858.         # Get unique pse_codes from both dataframes
  859.         pse_codes = sorted(set(df1['pse_code']).union(df2['pse_code']))
  861.         for pse_code in pse_codes:
  862.             fig, axes = plt.subplots(nrows=2, ncols=1, figsize=(12, 8))
  864.             # Plot df1 data for current pse_code
  865.             subset_df1 = df1[df1['pse_code'] == pse_code]
  866.             subset_df1 = subset_df1.sort_values('Date')
  867.             if not subset_df1.empty:
  868.                 for mf, group in subset_df1.groupby('MF'):
  869.                     axes[0].plot(group['Date'], group['Price'], alpha=0.5)
  870.                 axes[0].set_title(f"MFs for pse_code: {pse_code} from df_prev")
  872.             # Plot df2 data for current pse_code
  873.             subset_df2 = df2[df2['pse_code'] == pse_code]
  874.             subset_df2 = subset_df2.sort_values('Date')
  875.             if not subset_df2.empty:
  876.                 for mf, group in subset_df2.groupby('MF'):
  877.                     axes[1].plot(group['Date'], group['Price'], alpha=0.5)
  878.                 axes[1].set_title(f"MFs for pse_code: {pse_code} from df_curr")
  880.             # Set the same xlim only when the pse_code is in both dfs
  881.             if not subset_df1.empty and not subset_df2.empty:
  882.                 min_date = min(subset_df1['Date'].min(), subset_df2['Date'].min())
  883.                 max_date = max(subset_df1['Date'].max(), subset_df2['Date'].max())
  885.                 # min_date = min_date = DateOffset(months=1)
  886.                 # max_date = max_date = DateOffset(months=1)
  888.                 axes[0].set_xlim([min_date, max_date])
  889.                 axes[1].set_xlim([min_date, max_date])
  890.             elif not subset_df1.empty:
  891.                 min_date = subset_df1['Date'].min()
  892.                 max_date = subset_df1['Date'].max()
  893.                 axes[0].set_xlim([min_date, max_date])
  894.             elif not subset_df2.empty:
  895.                 min_date = subset_df2['Date'].min()
  896.                 max_date = subset_df2['Date'].max()
  897.                 axes[1].set_xlim([min_date, max_date])
  899.             # Save the current figure to the PDF
  900.             pdf.savefig(fig)
  901.             plt.close(fig)
  905. def set_font(run, font_name="Calibri", font_size=11):
  906.     """Helper function to set the font of a run."""
  907. = font_name
  908.     run._element.rPr.rFonts.set(qn('w:eastAsia'), font_name)  # Ensure it's applied for all types
  909.     run.font.size = Pt(font_size)
