- import os
- import pandas as pd
- import numpy as np
- import seaborn as sns
- import matplotlib.ticker as tck
- import matplotlib.pyplot as plt
- from matplotlib.backends.backend_pdf import PdfPages
- from io import BytesIO
- from docx import Document
- from docx.shared import Inches, Pt
- from docx.enum.text import WD_ALIGN_PARAGRAPH
- from docx.enum.table import WD_ALIGN_VERTICAL
- from docx.oxml import OxmlElement
- from docx.oxml.ns import qn
- # Definition of data quality and comparison functions
- def prepare_data(path, tenor_tag, col_names, update_col_names=False):
- """
- Reads and prepares data from a specified CSV file path. It handles data transformations based on
- tenor-related information, dynamically constructs columns when required, and manages data cleaning
- by dropping duplicates and removing null values.
- Args:
- path (str): Path to the CSV file to be read.
- tenor_tag (bool): Indicates whether operations related to tenors are to be applied, which may include
- constructing multifactor identifiers and filtering based on column presence.
- col_names (list): Specifies column names or a list of column names that are used in operations.
- This parameter can be a single list of strings or a nested list when multiple columns
- are used to construct a single identifier.
- update_col_names (bool): If True, updates the primary column names in the DataFrame, typically after
- creating multifactor identifiers.
- Returns:
- pd.DataFrame: A DataFrame processed based on the specified parameters, cleaned for duplicates and null values.
- Raises:
- ValueError: If required columns for operations are missing from the DataFrame.
- """
- df = pd.read_csv(path)
- column_set = set(df.columns.str.lower()) # Lowercase all columns for case-insensitive comparison
- if tenor_tag:
- if isinstance(col_names[0], list):
- required_cols = col_names[0]
- else:
- required_cols = [col_names[0], col_names[1], col_names[2]]
- missing_cols = [col for col in required_cols if col.lower() not in column_set]
- if missing_cols:
- raise ValueError(f"Missing columns in DataFrame: {', '.join(missing_cols)}")
- if isinstance(col_names[0], list):
- df['MF'] = df.apply(lambda row: combine_columns(row, 'MF', col_names[0]), axis=1)
- df['Curve'] = df.apply(lambda row: combine_columns(row, 'curve', col_names[0]), axis=1)
- if 'tenor' not in column_set:
- df['Tenor'] = df[col_names[0]].str.split(".").str[-1]
- if update_col_names:
- col_names[0] = 'MF'
- df = df[['Curve', 'MF', 'Tenor', col_names[1], col_names[2]]]
- else:
- if 'tenor' not in column_set:
- df['Tenor'] = df[col_names[0]].str.split(".").str[-1]
- if 'curve' not in column_set:
- df['Curve'] = df[col_names[0]].str.split(".").str[0]
- df = df[['Curve', col_names[0], 'Tenor', col_names[1], col_names[2]]]
- df = df.drop_duplicates(subset=df.columns[[1, 3]], keep='first').dropna()
- else:
- df = df[[col_names[0], col_names[1], col_names[2]]]
- df = df.drop_duplicates(subset=[col_names[0], col_names[1]], keep='first').dropna()
- return df
- def combine_columns(row, combine_type, col_names):
- """
- Constructs a concatenated identifier from selected DataFrame columns based on the specified type.
- Args:
- row (pd.Series): A row from a DataFrame from which to extract column values.
- combine_type (str): Specifies how the columns should be combined.
- 'MF' will use all columns, 'curve' will exclude the last column.
- col_names (list): A list of column names to be used in the concatenation.
- Returns:
- str: A concatenated string serving as a complex identifier.
- Raises:
- ValueError: If an unsupported type is provided.
- """
- if combine_type == 'MF':
- return '.'.join(str(row[col]) for col in col_names)
- elif combine_type == 'curve':
- return '.'.join(str(row[col]) for col in col_names[:-1])
- else:
- raise ValueError(f"Unsupported type '{combine_type}'. Expected 'MF' or 'curve'.")
- def compare_tenors(df_prev, df_curr, threshold=None):
- """
- Compares the number of tenors per curve between two dataframes.
- Args:
- df_prev (pd.DataFrame): DataFrame containing previous period data.
- df_curr (pd.DataFrame): DataFrame containing current period data.
- threshold (int, optional): Threshold for filtering the results based on the change in the number of tenors.
- Returns:
- pd.DataFrame: DataFrame showing the changes in the number of tenors per curve.
- """
- tenors_prev = df_prev.groupby('Curve')['Tenor'].nunique()
- tenors_curr = df_curr.groupby('Curve')['Tenor'].nunique()
- # Concatenate the results into a single DataFrame
- tenor_comparison = pd.concat([tenors_prev, tenors_curr], axis=1, keys=['prev', 'curr'])
- tenor_comparison.fillna(0, inplace=True)
- # Calculate the change in number of tenors
- tenor_comparison['change'] = tenor_comparison['curr'] - tenor_comparison['prev']
- tenor_comparison = tenor_comparison.sort_values('change', ascending=False)
- # Apply the threshold filter if specified
- if threshold is not None:
- tenor_comparison = tenor_comparison[abs(tenor_comparison['change']) >= threshold]
- return tenor_comparison.reset_index()
- def compare_with_dq_stats(df, stats_path):
- """
- Compares the data in the given DataFrame with the DQ stats provided in a CSV file.
- Parameters:
- df (pd.DataFrame): The DataFrame representing the current data to compare.
- stats_path (str): The file path to the DQ stats CSV file.
- Returns:
- pd.DataFrame: A DataFrame summarizing the comparison results, including any warnings or discrepancies.
- """
- # Load the stats DataFrame from a path
- stats = pd.read_csv(stats_path)
- # Extracting relevant values from the stats DataFrame
- total_unique_points_df = df.shape[0]
- total_unique_points_stats = stats['TOTAL_POINTS'].iloc[0]
- confirmed_stales = stats['Confirmed_Stales'].iloc[0]
- replaced_stales = stats['Replaced_Stales'].iloc[0]
- other_stales = stats['Other_Stales'].iloc[0]
- total_stales_reported = stats['Total_Stales'].iloc[0]
- total_stales_calculated = confirmed_stales + replaced_stales + other_stales
- # Organize data in a list of tuples for DataFrame construction
- data = [
- ('Total unique points in delivery pack', total_unique_points_df),
- ('Total unique points according to the DQ stats', total_unique_points_stats),
- ('', ''), # Empty row for spacing
- ('Confirmed Stales', confirmed_stales),
- ('Replaced Stales', replaced_stales),
- ('Other Stales', other_stales),
- ('Total Stales Calculated', total_stales_calculated),
- ('Total Stales Reported in DQ stats', total_stales_reported),
- ('', '') # Another empty row for spacing
- ]
- # Check stales mismatch or match
- stales_check = 'Match' if total_stales_calculated == total_stales_reported else 'Mismatch'
- data.append(('Stales Check', stales_check))
- # Adding warnings
- if total_unique_points_df > total_unique_points_stats:
- warning = 'Warning! Non-remediated points were added to the delivery set.'
- elif total_unique_points_df < total_unique_points_stats:
- warning = 'Warning! Less datapoints in the delivery pack than in the reported DQ stats.'
- else:
- warning = ''
- data.append(('Conclusion', warning))
- # Convert list of tuples to DataFrame
- worksheet_df = pd.DataFrame(data, columns=['Metric', 'Value'])
- return worksheet_df
- def compare_with_dq_stats_per_mkf(summary_df, stats_per_mkf_path, col_names):
- """
- Compares the MKF-level data in the summary DataFrame with the DQ stats provided in a CSV file.
- Parameters:
- summary (pd.DataFrame): The DataFrame representing the summary data to compare.
- stats_per_mkf_path (str): The file path to the DQ stats CSV file.
- col_names (list of str): A list containing the column names to be used in the comparison.
- Returns:
- pd.DataFrame: A DataFrame summarizing the comparison results, including any discrepancies or missing data.
- """
- # Load the stats DataFrame from a path
- stats = pd.read_csv(stats_per_mkf_path)
- summary = summary_df.copy()
- # Fill NaNs in the summary DataFrame with a specific message
- summary[f'{col_names[1]}_min_curr'] = summary[f'{col_names[1]}_min_curr'].fillna(
- 'MKF missing both in current datapack and batch download pack (according to DQ stats), but present in previous delivery.')
- # Drop unnecessary columns from the stats DataFrame
- stats.drop(columns=['Asset_class', 'Start_Date', 'End_Date', 'Mid_Date', 'Total_mkf'], inplace=True)
- # Rename and select relevant columns in the summary DataFrame
- summary = summary[[col_names[0], f'{col_names[1]}_min_curr', f'{col_names[1]}_max_curr', f'Datapoints_count_curr']]
- summary.columns = [col_names[0], f'{col_names[1]}_min', f'{col_names[1]}_max', 'Datapoints_count']
- # Rename columns in the stats DataFrame to match the summary DataFrame
- stats.rename(columns={"MKF_CCR": col_names[0],
- "TOTAL_POINTS": "Datapoints_count",
- "MIN_DATE": f'{col_names[1]}_min',
- "MAX_DATE": f'{col_names[1]}_max'}, inplace=True)
- # Merge the summary and stats DataFrames on the specified MKF column
- summary_stats_merged = pd.merge(summary, stats, on=col_names[0], how='outer', suffixes=['_datapack', '_dq_stats'])
- # Calculate the difference in datapoints and check for discrepancies in min and max dates
- summary_stats_merged['Datapoints_count_without_removed_stales_dq_stats'] = summary_stats_merged['Datapoints_count_dq_stats'] - summary_stats_merged['Other_Stales']
- summary_stats_merged['Datapoints_count_diff'] = summary_stats_merged['Datapoints_count_datapack'] - summary_stats_merged['Datapoints_count_without_removed_stales_dq_stats']
- summary_stats_merged[f'{col_names[1]}_min_check'] = summary_stats_merged[f'{col_names[1]}_min_datapack'] == summary_stats_merged[f'{col_names[1]}_min_dq_stats']
- summary_stats_merged[f'{col_names[1]}_max_check'] = summary_stats_merged[f'{col_names[1]}_max_datapack'] == summary_stats_merged[f'{col_names[1]}_max_dq_stats']
- # Fill missing values with specific messages
- summary_stats_merged[f'{col_names[1]}_min_datapack'] = summary_stats_merged[f'{col_names[1]}_min_datapack'].fillna('MKF missing in datapack')
- summary_stats_merged[f'{col_names[1]}_min_dq_stats'] = summary_stats_merged[f'{col_names[1]}_min_dq_stats'].fillna('MKF missing in DQ stats')
- # Organize the columns
- cols_to_move = ['Datapoints_count_without_removed_stales_dq_stats', 'Datapoints_count_diff', f'{col_names[1]}_min_check', f'{col_names[1]}_max_check']
- all_cols = list(summary_stats_merged.columns)
- # Remove the columns to move from the original list
- for col in cols_to_move:
- all_cols.remove(col)
- # Insert the columns back at the target position (index 7 in this example)
- for i, col in enumerate(cols_to_move):
- all_cols.insert(7 + i, col)
- # Reorder the DataFrame columns
- summary_stats_merged = summary_stats_merged[all_cols]
- return summary_stats_merged
- def calculate_vols(df, vol_type, normal_vol, min_date, max_date, col_names):
- """
- Calculates volatility for a specified date range and type within a DataFrame.
- Args:
- df (pd.DataFrame): DataFrame containing the data to analyze.
- vol_type (str): Specifies the type of volatility calculation ('inner', 'outer_prev', 'outer_curr', 'full').
- normal_vol (bool): Determines whether to calculate normal volatility (True) or log volatility (False).
- min_date (str): Minimum date for the data filtering.
- max_date (str): Maximum date for the data filtering.
- col_names (list): Column names to be used in sorting and filtering.
- Returns:
- pd.DataFrame: DataFrame with calculated returns.
- """
- # Sort data to ensure consistent grouping
- df = df.sort_values(by=[col_names[0], col_names[1]])
- # Filter data based on vol_type and date range
- if vol_type in {'inner', 'outer_prev', 'outer_curr'}:
- df = df[(df[col_names[1]] >= min_date) & (df[col_names[1]] <= max_date)]
- elif vol_type == 'full':
- df = df.copy()
- else:
- raise ValueError("Invalid vol_type specified. Use 'inner', 'full', 'outer_prev' or 'outer_curr'.")
- # Convert columns to appropriate data types
- df[col_names[1]] = pd.to_datetime(df[col_names[1]], format='%Y%m%d')
- df[col_names[2]] = pd.to_numeric(df[col_names[2]], errors='coerce')
- # Calculate returns
- df['Ret'] = df.groupby(col_names[0])[col_names[2]].transform(
- lambda x: calculate_returns_grouped(
- pd.DataFrame({col_names[2]: x}),
- normal_vol=normal_vol,
- col_names=col_names)['Ret'] # lambda instead of apply to avoid deprecation warning
- )
- df.dropna(subset=['Ret'], inplace=True)
- return df
- def calculate_returns_grouped(group, normal_vol, col_names):
- """
- Calculates returns for a group of MKF.
- Args:
- group (pd.DataFrame): DataFrame containing the data to analyze.
- normal_vol (bool): Determines whether to calculate normal returns (True) or log returns (False).
- col_names (list): Column names to be used in calculations.
- Returns:
- pd.DataFrame: DataFrame with calculated returns.
- """
- group = group.copy()
- if normal_vol:
- group['Ret'] = group[col_names[2]].pct_change(fill_method=None)
- else:
- # Filter out non-positive values for log returns
- group = group[group[col_names[2]] > 0]
- group['Ret'] = np.log(group[col_names[2]] / group[col_names[2]].shift(1)).dropna()
- return group
- def outer_join(df_prev, df_curr, col_names, threshold=None):
- """
- Performs an outer join between previous and current dataframes to identify non-overlapping entries
- and returns grouped statistics based on the identifier column.
- Args:
- df_prev (pd.DataFrame): DataFrame containing previous period data.
- df_curr (pd.DataFrame): DataFrame containing current period data.
- col_names (list): Column names used to perform the join. Expects [identifier, date_col, value_col].
- threshold (int, optional): Threshold for filtering the results based on the number of non-overlapping entries.
- Returns:
- tuple: Two DataFrames, one with previous period not overlapping with the current and another with current period not overlapping with the previous.
- Raises:
- ValueError: If provided column names do not exist in the DataFrames.
- """
- # Check if the required columns exist in both DataFrames
- required_cols = set(col_names)
- if not required_cols.issubset(df_prev.columns) or not required_cols.issubset(df_curr.columns):
- missing_cols = list((required_cols - set(df_prev.columns)) | (required_cols - set(df_curr.columns)))
- raise ValueError(f"Missing columns in DataFrame: {missing_cols}")
- # Perform the outer join based on the identifier and date columns
- df_merged = pd.merge(df_prev, df_curr, on=[col_names[0], col_names[1]], how='outer', indicator=True, suffixes=['_prev', '_curr'])
- # Determine the overlapping scope based on the date column
- overlapping_scope = (df_curr[col_names[1]].min(), df_prev[col_names[1]].max())
- df_merged = df_merged[(df_merged[col_names[1]] >= overlapping_scope[0]) & (df_merged[col_names[1]] <= overlapping_scope[1])]
- # Filter to find non-overlapping entries
- df_prev_nas = df_merged[df_merged[f'{col_names[2]}_prev'].isna()]
- df_curr_nas = df_merged[df_merged[f'{col_names[2]}_curr'].isna()]
- # Group by the identifier and calculate statistics
- grouped_prev_nas = df_prev_nas.groupby(col_names[0]).agg(
- min_date=(col_names[1], 'min'),
- max_date=(col_names[1], 'max'),
- count=(col_names[1], 'count'),
- mean_price=(f'{col_names[2]}_curr', 'mean'),
- std_price=(f'{col_names[2]}_curr', 'std')
- ).reset_index()
- grouped_curr_nas = df_curr_nas.groupby(col_names[0]).agg(
- min_date=(col_names[1], 'min'),
- max_date=(col_names[1], 'max'),
- count=(col_names[1], 'count'),
- mean_price=(f'{col_names[2]}_prev', 'mean'),
- std_price=(f'{col_names[2]}_prev', 'std')
- ).reset_index()
- if threshold is not None:
- grouped_prev_nas = grouped_prev_nas[grouped_prev_nas['count'] > threshold]
- grouped_curr_nas = grouped_curr_nas[grouped_curr_nas['count'] > threshold]
- return grouped_prev_nas, grouped_curr_nas
- def count_outer_points(df_prev, df_curr, col_names, threshold=None):
- """
- Counts the number of points that fall outside the overlapping date range for previous and current dataframes.
- Args:
- df_prev (pd.DataFrame): DataFrame containing previous period data.
- df_curr (pd.DataFrame): DataFrame containing current period data.
- col_names (list): Column names used for the calculations. Expects [identifier, date_col, value_col].
- threshold (int, optional): Threshold for filtering the results based on the absolute difference in the number of points.
- Returns:
- pd.DataFrame: DataFrame with the count of non-overlapping points and their differences.
- """
- # Determine the overlapping date range
- overlapping_scope = (df_curr[col_names[1]].min(), df_prev[col_names[1]].max())
- # Filter data outside the overlapping date range
- df_prev_only = df_prev[df_prev[col_names[1]] < overlapping_scope[0]]
- df_curr_only = df_curr[df_curr[col_names[1]] > overlapping_scope[1]]
- # Group by the identifier and count the values
- prev_only_grouped = df_prev_only.groupby(col_names[0])[col_names[2]].count().reset_index()
- curr_only_grouped = df_curr_only.groupby(col_names[0])[col_names[2]].count().reset_index()
- # Merge the grouped data
- grouped_merged = pd.merge(prev_only_grouped, curr_only_grouped, on=[col_names[0]], how='outer', indicator=True, suffixes=['_prev_count', '_curr_count'])
- # Calculate the difference in counts (curr - prev)
- grouped_merged['diff_in_count'] = grouped_merged[f'{col_names[2]}_curr_count'] - grouped_merged[f'{col_names[2]}_prev_count']
- grouped_merged = grouped_merged.sort_values('diff_in_count')
- # Apply the threshold filter if specified (absolute value of diff_in_count)
- if threshold is not None:
- grouped_merged = grouped_merged[abs(grouped_merged['diff_in_count']) >= threshold]
- return grouped_merged
- def limit_plotting_scope(df_prev, df_curr, col_names, normal_vol, mf_count, combined_vols=None):
- """
- Limits the scope of data for plotting by filtering and sorting based on volatility changes.
- Args:
- df_prev (pd.DataFrame): DataFrame containing previous period data.
- df_curr (pd.DataFrame): DataFrame containing current period data.
- col_names (list): Column names used for the calculations.
- normal_vol (bool): Determines whether to calculate normal volatility (True) or log volatility (False).
- mf_count (int): Number of records to limit the scope to.
- combined_vols (pd.DataFrame, optional): DataFrame containing precomputed volatility comparisons.
- Returns:
- pd.DataFrame: DataFrame limited to the specified number of records with the highest absolute relative differences in volatility.
- """
- if combined_vols is None:
- combined_vols = calculate_vol_comparison(df_prev, df_curr, col_names, normal_vol)
- vols_comparison = combined_vols[[col_names[0], 'Vol_full_prev', 'Vol_full_curr', 'rel_diff_full (%)']]
- # Filter out rows where previous or current volatility is zero
- vols_comparison = vols_comparison[(vols_comparison['Vol_full_prev'] != 0) & (vols_comparison['Vol_full_curr'] != 0)]
- # Calculate absolute relative difference
- vols_comparison['abs_rel_diff'] = abs(vols_comparison['rel_diff_full (%)'])
- # Drop rows with NaN values
- vols_comparison = vols_comparison.dropna()
- # Sort by absolute relative difference in descending order
- vols_comparison = vols_comparison.sort_values(by='abs_rel_diff', ascending=False).reset_index(drop=True)
- # Limit to the specified number of records
- return vols_comparison[:mf_count]
- def base_plotter(df_prev, df_curr, path_out, col_names, normal_vol, mf_count=None, pdf=None, combine_plots=True, combined_vols=None):
- """
- Creates and saves plots for comparing previous and current period data.
- Args:
- df_prev (pd.DataFrame): DataFrame containing previous period data.
- df_curr (pd.DataFrame): DataFrame containing current period data.
- path_out (str): Output directory for saving plots.
- col_names (list): Column names used for the calculations.
- normal_vol (bool): Determines whether to calculate normal volatility (True) or log volatility (False).
- mf_count (int, optional): Number of records to limit the scope to. Defaults to None.
- pdf (PdfPages, optional): PdfPages object for saving combined plots. Defaults to None.
- combine_plots (bool, optional): Whether to combine plots into a single PDF. Defaults to True.
- combined_vols (pd.DataFrame, optional): DataFrame containing precomputed volatility comparisons.
- Returns:
- None
- """
- # Ensure the output directory exists
- os.makedirs(os.path.join(path_out, 'vol_change_plots'), exist_ok=True)
- path_out_plots = os.path.join(path_out, 'vol_change_plots')
- # Initialize PdfPages object if combining plots
- if combine_plots:
- pdf_out = PdfPages(os.path.join(path_out, 'combined_plots.pdf'))
- # Determine the list of identifiers (mfs_check) to plot
- if mf_count is not None:
- vols_comparison = limit_plotting_scope(df_prev, df_curr, col_names, normal_vol, mf_count, combined_vols)
- mfs_check = vols_comparison[col_names[0]].unique().tolist()
- else:
- mfs_check = pd.concat([df_prev[col_names[0]], df_curr[col_names[0]]]).unique().tolist()
- # Filter the dataframes to include only the identifiers in mfs_check
- df_curr = df_curr[df_curr[col_names[0]].isin(mfs_check)]
- df_prev = df_prev[df_prev[col_names[0]].isin(mfs_check)]
- # Merge the dataframes on the identifier and date columns
- df = pd.merge(df_prev, df_curr, on=[col_names[0], col_names[1]], how='outer', suffixes=('_prev_dataset', '_curr_dataset'))
- df = df.sort_values([col_names[0], col_names[1]])
- df['diff_abs'] = abs(df[f'{col_names[2]}_prev_dataset'] - df[f'{col_names[2]}_curr_dataset'])
- df = df[[col_names[0], col_names[1], f'{col_names[2]}_prev_dataset', f'{col_names[2]}_curr_dataset']]
- df[col_names[1]] = pd.to_datetime(df[col_names[1]], format='%Y%m%d')
- # Reshape the dataframe for plotting
- df = df.melt(id_vars=[col_names[0], col_names[1]]).rename(columns={'variable': 'set', 'value': col_names[2]})
- # Create and save plots for each identifier in mfs_check
- for mf in mfs_check:
- tmp = df[df[col_names[0]] == mf]
- g = sns.lineplot(data=tmp, x=col_names[1], y=col_names[2], hue='set')
- plt.xticks(rotation=15)
- g.set_title(mf)
- fig = plt.gcf()
- fig.savefig(os.path.join(path_out_plots, '{}.png'.format(mf)))
- if combine_plots:
- pdf_out.savefig(fig)
- plt.clf()
- # Close the PdfPages object if combining plots
- if combine_plots:
- pdf_out.close()
- def create_summary(df_prev, df_curr, col_names, normal_vol, dq_stats_per_mkf_path_curr=None, dq_stats_per_mkf_path_prev=None, threshold=None):
- """
- Creates a summary DataFrame comparing previous and current period data.
- Args:
- df_prev (pd.DataFrame): DataFrame containing previous period data.
- df_curr (pd.DataFrame): DataFrame containing current period data.
- col_names (list): Column names used for the calculations.
- Expected format: [group_column, date_column, value_column].
- normal_vol (bool): Determines whether to calculate normal volatility (True) or log volatility (False).
- dq_stats_per_mkf_path_curr (str, optional): Path to the current period DQ stats per MKF CSV file.
- dq_stats_per_mkf_path_prev (str, optional): Path to the previous period DQ stats per MKF CSV file.
- threshold (float, optional): Threshold for filtering the summary DataFrame.
- Returns:
- pd.DataFrame: Summary DataFrame containing the comparison results.
- """
- # Pivot tables for counting data points
- pivot_prev = df_prev.pivot_table(index=col_names[0], values=col_names[2], aggfunc='count').rename(
- columns={col_names[2]: 'Datapoints_count_prev'})
- pivot_curr = df_curr.pivot_table(index=col_names[0], values=col_names[2], aggfunc='count').rename(
- columns={col_names[2]: 'Datapoints_count_curr'})
- # Calculate full volatility for previous and current periods
- ts_prev = calculate_vols(df_prev, 'full', normal_vol, df_prev[col_names[1]].min(), df_prev[col_names[1]].max(), col_names)
- ts_curr = calculate_vols(df_curr, 'full', normal_vol, df_curr[col_names[1]].min(), df_curr[col_names[1]].max(), col_names)
- vol_prev = ts_prev.groupby(col_names[0])['Ret'].std(ddof=0).reset_index().rename(columns={'Ret': 'Vol_prev'})
- vol_curr = ts_curr.groupby(col_names[0])['Ret'].std(ddof=0).reset_index().rename(columns={'Ret': 'Vol_curr'})
- # Merging volatilities
- vols = pd.merge(vol_prev, vol_curr, on=col_names[0], how='outer')
- vols['rel vol change in %'] = abs(vols['Vol_prev'] - vols['Vol_curr']) / vols['Vol_prev'] * 100
- # Merging date range information
- date_ranges_prev = df_prev.groupby(col_names[0])[col_names[1]].agg(['min', 'max']).rename(
- columns={'min': f'{col_names[1]}_min_prev', 'max': f'{col_names[1]}_max_prev'})
- date_ranges_curr = df_curr.groupby(col_names[0])[col_names[1]].agg(['min', 'max']).rename(
- columns={'min': f'{col_names[1]}_min_curr', 'max': f'{col_names[1]}_max_curr'})
- # Combine all information into one DataFrame
- summary_df = pd.merge(pivot_prev, pivot_curr, on=col_names[0], how='outer')
- summary_df = pd.merge(summary_df, date_ranges_prev, on=col_names[0], how='outer')
- summary_df = pd.merge(summary_df, date_ranges_curr, on=col_names[0], how='outer')
- summary_df = pd.merge(summary_df, vols, on=col_names[0], how='outer')
- summary_df = summary_df.sort_values(by='rel vol change in %', ascending=False)
- # Calculate count change
- summary_df['count_change'] = summary_df['Datapoints_count_curr'] - summary_df['Datapoints_count_prev']
- # Apply summary threshold if specified
- if threshold is not None:
- summary_df = summary_df[summary_df['rel vol change in %'] > threshold]
- if dq_stats_per_mkf_path_prev:
- # Load the dq_stats_per_mkf DataFrame
- dq_stats_df_prev = pd.read_csv(dq_stats_per_mkf_path_prev)
- dq_stats_df_prev.rename(columns={'MKF_CCR': col_names[0]}, inplace=True)
- # Merge the summary_df DataFrame with the dq_stats_df DataFrame
- summary_df = pd.merge(
- summary_df,
- dq_stats_df_prev[[col_names[0], 'Other_Jumps']],
- left_on=col_names[0],
- right_on=col_names[0],
- how='left'
- )
- summary_df.rename(columns={'Other_Jumps': 'Other_Jumps_prev'}, inplace=True)
- # Connect the information regarding other jumps from DQ stats
- if dq_stats_per_mkf_path_curr:
- # Load the dq_stats_per_mkf DataFrame
- dq_stats_df_curr = pd.read_csv(dq_stats_per_mkf_path_curr)
- dq_stats_df_curr.rename(columns={'MKF_CCR': col_names[0]}, inplace=True)
- # Merge the summary_df DataFrame with the dq_stats_df DataFrame
- summary_df = pd.merge(
- summary_df,
- dq_stats_df_curr[[col_names[0], 'Other_Jumps']],
- left_on=col_names[0],
- right_on=col_names[0],
- how='left'
- )
- summary_df.rename(columns={'Other_Jumps': 'Other_Jumps_curr'}, inplace=True)
- # Create the comment column
- def generate_comment(row):
- comments = []
- if pd.notnull(row.get('Other_Jumps_curr')) and row['Other_Jumps_curr'] > 0:
- comments.append(f"{int(row['Other_Jumps_curr'])} current")
- if pd.notnull(row.get('Other_Jumps_prev')) and row['Other_Jumps_prev'] > 0:
- comments.append(f"{int(row['Other_Jumps_prev'])} previous")
- if comments:
- return f"Vol change may be related to {' and '.join(comments)} other not confirmed (kept) jumps."
- else:
- return np.nan
- summary_df['comment'] = summary_df.apply(generate_comment, axis=1)
- return summary_df
- def compare_mkfs(df_prev, df_curr, col_name):
- """
- Compare unique identifiers ('MKF' or 'MF') from two dataframes and categorize them into
- intersection, removed, and added identifiers. Returns a dataframe with these categories
- and their counts in the column names.
- Args:
- df_prev (pd.DataFrame): DataFrame containing previous period data.
- df_curr (pd.DataFrame): DataFrame containing current period data.
- col_name (str): The name of the column containing identifiers to be compared.
- Returns:
- pd.DataFrame: A DataFrame with three columns, each containing lists of identifiers:
- - Intersecting identifiers from both dataframes.
- - Identifiers removed in the current dataframe.
- - Identifiers added in the current dataframe.
- Each column name includes the count of identifiers in that category.
- """
- # Collecting unique identifiers from both dataframes
- mkf_prev = set(df_prev[col_name].unique())
- mkf_curr = set(df_curr[col_name].unique())
- # Calculating intersection and differences
- intersection = sorted(mkf_prev.intersection(mkf_curr))
- mkfs_removed = sorted(mkf_prev.difference(mkf_curr))
- mkfs_added = sorted(mkf_curr.difference(mkf_prev))
- # Creating dictionary with results formatted with count details
- comparison_dict = {
- f'{col_name} Intersection - {len(intersection)} items': list(intersection),
- f'{col_name} Removed - {len(mkfs_removed)} items': list(mkfs_removed),
- f'{col_name} Added - {len(mkfs_added)} items': list(mkfs_added)
- }
- # Converting dictionary to DataFrame
- comparison_df = pd.DataFrame(dict([(k, pd.Series(v)) for k, v in comparison_dict.items()]))
- return comparison_df
- def set_column_widths(dataframe, writer, sheet_name):
- """
- Sets the width of the columns in the specified Excel sheet to fit the contents.
- Args:
- dataframe (pd.DataFrame): DataFrame to adjust column widths for.
- writer (pd.ExcelWriter): Excel writer object.
- sheet_name (str): Name of the Excel sheet to adjust.
- Returns:
- None
- """
- for column in dataframe:
- column_length = max(dataframe[column].astype(str).map(len).max(), len(column)) + 2
- col_idx = dataframe.columns.get_loc(column)
- writer.sheets[sheet_name].set_column(col_idx, col_idx, column_length)
- def create_report(dataframes_to_export, export_path, assetclass):
- """
- Creates an Excel report containing multiple DataFrames as separate sheets,
- splitting large DataFrames across multiple sheets if necessary.
- Args:
- dataframes_to_export (dict): Dictionary where keys are sheet names and values are DataFrames to export.
- export_path (str): Path to the directory where the report will be saved.
- assetclass (str): Asset class name used in the filename.
- Returns:
- None
- """
- max_rows_per_sheet = 1048576 # max num of rows in an Excel worksheet
- file_path = os.path.join(export_path, f'{assetclass}_comparison_report.xlsx')
- with pd.ExcelWriter(file_path, engine='xlsxwriter') as writer:
- for sheet_name, dataframe in dataframes_to_export.items():
- if dataframe.empty:
- # Create a DataFrame with a no data message
- no_data_message = pd.DataFrame({'Message': ['No data/rows/output available.']})
- no_data_message.to_excel(writer, sheet_name=sheet_name, index=False)
- set_column_widths(no_data_message, writer, sheet_name)
- elif len(dataframe) <= max_rows_per_sheet:
- dataframe.to_excel(writer, sheet_name=sheet_name, index=False)
- set_column_widths(dataframe, writer, sheet_name)
- else:
- num_parts = (len(dataframe) // max_rows_per_sheet) + 1
- for part in range(num_parts):
- start_row = part * max_rows_per_sheet
- end_row = min((part + 1) * max_rows_per_sheet, len(dataframe))
- part_dataframe = dataframe.iloc[start_row:end_row]
- part_sheet_name = f"{sheet_name}_part{part + 1}"
- part_dataframe.to_excel(writer, sheet_name=part_sheet_name, index=False)
- set_column_widths(part_dataframe, writer, part_sheet_name)
- def calculate_vol_comparison(df_prev, df_curr, col_names, normal_vol, vol_type='all', threshold=None):
- """
- Calculates volatility comparison between two datasets for all specified types or a specific type.
- Args:
- df_prev (pd.DataFrame): Previous period data.
- df_curr (pd.DataFrame): Current period data.
- col_names (list): Column names used for the calculations.
- normal_vol (bool): Determines whether to calculate normal volatility (True) or log volatility (False).
- vol_type (str): Specifies the type of volatility to calculate ('all', 'inner', 'full', 'outer').
- threshold (dict, optional): Dictionary containing threshold values for filtering results. Example:
- {
- 'abs_diff_inner': 0.1,
- 'rel_diff_inner': 5,
- 'abs_diff_full': 0.1,
- 'rel_diff_full': 5,
- 'abs_diff_outer': 0.1,
- 'rel_diff_outer': 5
- }
- Returns:
- pd.DataFrame: Contains the results of the volatility comparison.
- """
- def calculate_vol(df, vol_type, min_date, max_date, suffix):
- ts = calculate_vols(df, vol_type, normal_vol, min_date, max_date, col_names)
- vol = ts.groupby(col_names[0])['Ret'].std(ddof=0).reset_index()
- vol.columns = [col_names[0], f'Vol_{suffix}']
- return vol
- def add_diff_columns(df, col1, col2, suffix):
- df[f'abs_diff_{suffix}'] = abs(df[col1] - df[col2])
- df[f'rel_diff_{suffix} (%)'] = abs((df[col2] - df[col1]) / df[col1] * 100).fillna(0)
- return df
- combined_vols = pd.DataFrame()
- if vol_type in ('all', 'inner'):
- min_date_inner, max_date_inner = df_curr[col_names[1]].min(), df_prev[col_names[1]].max()
- vol_inner_prev = calculate_vol(df_prev, 'inner', min_date_inner, max_date_inner, 'inner_prev')
- vol_inner_curr = calculate_vol(df_curr, 'inner', min_date_inner, max_date_inner, 'inner_curr')
- vol_inner = vol_inner_prev.merge(vol_inner_curr, on=col_names[0], how='outer')
- vol_inner = add_diff_columns(vol_inner, 'Vol_inner_prev', 'Vol_inner_curr', 'inner')
- combined_vols = vol_inner if combined_vols.empty else combined_vols.merge(vol_inner, on=col_names[0], how='outer')
- if vol_type in ('all', 'full'):
- min_date_full_prev, max_date_full_prev = df_prev[col_names[1]].min(), df_prev[col_names[1]].max()
- min_date_full_curr, max_date_full_curr = df_curr[col_names[1]].min(), df_curr[col_names[1]].max()
- vol_full_prev = calculate_vol(df_prev, 'full', min_date_full_prev, max_date_full_prev, 'full_prev')
- vol_full_curr = calculate_vol(df_curr, 'full', min_date_full_curr, max_date_full_curr, 'full_curr')
- vol_full = vol_full_prev.merge(vol_full_curr, on=col_names[0], how='outer')
- vol_full = add_diff_columns(vol_full, 'Vol_full_prev', 'Vol_full_curr', 'full')
- combined_vols = vol_full if combined_vols.empty else combined_vols.merge(vol_full, on=col_names[0], how='outer')
- if vol_type in ('all', 'outer'):
- min_date_outer_prev, max_date_outer_prev = df_prev[col_names[1]].min(), df_curr[col_names[1]].min()
- min_date_outer_curr, max_date_outer_curr = df_prev[col_names[1]].min(), df_curr[col_names[1]].max()
- vol_outer_prev = calculate_vol(df_prev, 'outer_prev', min_date_outer_prev, max_date_outer_prev, 'outer_prev')
- vol_outer_curr = calculate_vol(df_curr, 'outer_curr', min_date_outer_curr, max_date_outer_curr, 'outer_curr')
- vol_outer = vol_outer_prev.merge(vol_outer_curr, on=col_names[0], how='outer')
- vol_outer = add_diff_columns(vol_outer, 'Vol_outer_prev', 'Vol_outer_curr', 'outer')
- combined_vols = vol_outer if combined_vols.empty else combined_vols.merge(vol_outer, on=col_names[0], how='outer')
- # Apply threshold filtering
- if threshold:
- for key, value in threshold.items():
- if key in combined_vols.columns:
- combined_vols = combined_vols[combined_vols[key] > value]
- if 'rel_diff_full (%)' in combined_vols.columns:
- combined_vols = combined_vols.sort_values(by='rel_diff_full (%)', ascending=False)
- return combined_vols
- def create_threshold_details_sheet(threshold_params):
- """
- Creates a DataFrame containing details of applied thresholds.
- Args:
- threshold_params (dict): Dictionary containing the thresholds for various parts of the analysis.
- Returns:
- pd.DataFrame: DataFrame with details of the applied thresholds.
- """
- threshold_details = []
- for key, value in threshold_params.items():
- threshold_details.append({'Threshold Type': key, 'Value': value})
- return pd.DataFrame(threshold_details)
- ######### Functions related to specific asset classes:
- def find_gaps(df_1, df_2, col_names, missing_threshold=5):
- intervals = []
- subset1 = pd.concat([df_1[[col_names[0]]].copy(), df_2[[col_names[0]]].copy()], ignore_index=True)
- subset1 = subset1.drop_duplicates()
- for name in subset1[col_names[0]].unique():
- df1_name = df_1[df_1[col_names[0]] == name]
- df2_name = df_2[df_2[col_names[0]] == name]
- if df1_name.empty:
- if not df2_name.empty:
- print(name)
- intervals.append({
- col_names[0]: name,
- 'start_date': df2_name[col_names[1]].min(),
- 'end_date': df2_name[col_names[1]].max(),
- 'gap lenght': len(df2_name)
- })
- continue
- merged = pd.merge(df1_name, df2_name, how='outer', on=col_names[1], indicator=True)
- merged = merged.sort_values(col_names[1])
- merged['right_only'] = merged['_merge'] == 'right_only'
- merged['gap'] = (~merged['right_only']).cumsum()
- for gap_id, group in merged[merged['right_only']].groupby('gap'):
- if len(group) >= missing_threshold:
- intervals.append({
- col_names[0]: name,
- 'start_date': group[col_names[1]].min(),
- 'end_date': group[col_names[1]].max(),
- 'gap lenght': len(group)
- })
- return pd.DataFrame(intervals)
- def commodity_curves_plotter(df_prev, df_curr, export_path, filename, date_cutoff):
- # Ensure the directory exists
- if not os.path.exists(export_path):
- os.makedirs(export_path)
- full_path = os.path.join(export_path, filename)
- df1 = df_prev.copy()
- df2 = df_curr.copy()
- # Convert 'Date' to datetime if it's not already
- df1['Date'] = pd.to_datetime(df1['Date'], format='%Y%m%d')
- df2['Date'] = pd.to_datetime(df2['Date'], format='%Y%m%d')
- # Apply date cutoff if provided
- if date_cutoff is not None:
- cutoff = pd.to_datetime(date_cutoff)
- df1 = df1[df1['Date'] >= cutoff]
- df2 = df2[df2['Date'] >= cutoff]
- # Create pse_code column
- df1['pse_code'] = df1['MF'].str.split('.').str[0]
- df2['pse_code'] = df2['MF'].str.split('.').str[0]
- with PdfPages(full_path) as pdf:
- # Get unique pse_codes from both dataframes
- pse_codes = sorted(set(df1['pse_code']).union(df2['pse_code']))
- for pse_code in pse_codes:
- fig, axes = plt.subplots(nrows=2, ncols=1, figsize=(12, 8))
- # Plot df1 data for current pse_code
- subset_df1 = df1[df1['pse_code'] == pse_code]
- subset_df1 = subset_df1.sort_values('Date')
- if not subset_df1.empty:
- for mf, group in subset_df1.groupby('MF'):
- axes[0].plot(group['Date'], group['Price'], alpha=0.5)
- axes[0].set_title(f"MFs for pse_code: {pse_code} from df_prev")
- # Plot df2 data for current pse_code
- subset_df2 = df2[df2['pse_code'] == pse_code]
- subset_df2 = subset_df2.sort_values('Date')
- if not subset_df2.empty:
- for mf, group in subset_df2.groupby('MF'):
- axes[1].plot(group['Date'], group['Price'], alpha=0.5)
- axes[1].set_title(f"MFs for pse_code: {pse_code} from df_curr")
- # Set the same xlim only when the pse_code is in both dfs
- if not subset_df1.empty and not subset_df2.empty:
- min_date = min(subset_df1['Date'].min(), subset_df2['Date'].min())
- max_date = max(subset_df1['Date'].max(), subset_df2['Date'].max())
- # min_date = min_date = DateOffset(months=1)
- # max_date = max_date = DateOffset(months=1)
- axes[0].set_xlim([min_date, max_date])
- axes[1].set_xlim([min_date, max_date])
- elif not subset_df1.empty:
- min_date = subset_df1['Date'].min()
- max_date = subset_df1['Date'].max()
- axes[0].set_xlim([min_date, max_date])
- elif not subset_df2.empty:
- min_date = subset_df2['Date'].min()
- max_date = subset_df2['Date'].max()
- axes[1].set_xlim([min_date, max_date])
- # Save the current figure to the PDF
- pdf.savefig(fig)
- plt.close(fig)
- def set_font(run, font_name="Calibri", font_size=11):
- """Helper function to set the font of a run."""
- run.font.name = font_name
- run._element.rPr.rFonts.set(qn('w:eastAsia'), font_name) # Ensure it's applied for all types
- run.font.size = Pt(font_size)