Source code for pyfpa

"""
Project for Financial Planning and Analysis

Written by Erik Warren
Original Date: October 2020
Version Date: November 2020
version: 0.0.7 beta
"""


import pandas as pd
import numpy as np
import os
import datetime as dt
from openpyxl import load_workbook


[docs]class fpa: fpa_help = "pyfpa - Financial Planning and Analysis Python Project. Python intro for FP&A people." def __init__(self, df=pd.DataFrame()): self.accounts = pd.DataFrame() # Repository for Chart of Accounts or other # lists to add to dimensions self.base = df.copy() # Original import data for block self.block = df.copy() # Imported data block self.meta_block = pd.DataFrame() # Meta data for block. Filepath, last access/mod self.data = pd.DataFrame() # The main dataframe for all the data self.consolidation = pd.DataFrame() # Container for consolidate() result self.slice = pd.DataFrame() # Container for slicing and dicing .data self.pivot = pd.DataFrame() # Container pivot table functions self.variance = pd.DataFrame() # General container for functions self.function_result= [] # General container for functions # TODO: Add Resample # Index names can only be strings help_import_xl = '''Import Excel, CSV or DataFrame to .block. cols_to_index are columns for index \ which can be int or list for Excel. For DataFrame, string or list of string column names.'''
[docs] def import_xl(self, fpath, ws_name=0, cols_to_index=0, sep_val=','): """ Import a table from a worksheet in a Excel File, a CSV file or an existing pandas DataFrame. :param fpath: path to file OR a pandas DataFrame :param ws_name: worksheet name or index such as 0 or 2 :param cols_to_index: columns to put into index either a number or a list i.e. [0, 1, 2] """ if isinstance(fpath, pd.DataFrame): if cols_to_index != 0: fpath.set_index(cols_to_index, append=True, inplace=True) self.block = fpath.copy() else: if fpath[-3:] == 'csv': self.block = pd.read_csv( fpath, sep=sep_val, thousands=',' ) else: self.block = pd.read_excel( fpath, sheet_name=ws_name ) self.block.index.name = 'TEMP_NAME_ZZ' all_cols = list(self.block.columns) cols_to_index = cols_to_index if isinstance(cols_to_index, list) else [cols_to_index] if isinstance(cols_to_index, list): #Fix for pandas error with list cols. for item_c in cols_to_index: self.block.set_index(all_cols[item_c], append=True, inplace=True) self.drop_dimension('TEMP_NAME_ZZ', 'block') #self.block.dropna(0, "all", inplace=True) #self.block.dropna(1, "all", inplace=True) self.base = self.block.copy() data_block_value = 0 new_index_names = [] if not isinstance(self.block.index, pd.MultiIndex): new_index_names = ["Data_Block", self.block.index.name] if not isinstance(self.data.index, pd.MultiIndex): if not isinstance(self.block.index, pd.MultiIndex): data_block_value = int(np.random.rand() * 1000000000) new_index_values = [[data_block_value], list(self.block.index)] self.block.index = pd.MultiIndex.from_product( new_index_values, names=new_index_names ) else: data_block_value = int(np.random.rand() * 1000000000) dfi = self.block.index.to_frame() dfi.insert(1, "Data_Block", data_block_value) self.block.index = pd.MultiIndex.from_frame(dfi) else: data_block_no_dup = pd.Series( self.data.index.get_level_values("Data_Block") ) while data_block_value in data_block_no_dup: data_block_value = int(np.random.rand() * 1000000000) dfi = self.block.index.to_frame() dfi.insert(1, "Data_Block", data_block_value) self.block.index = pd.MultiIndex.from_frame(dfi) # Meta_df meta_df = pd.DataFrame( index=[data_block_value], columns=[ "file", "import_time", "last_modified", "last_accessed", "file_path", "modified_by", ], ) file_stats = os.stat(fpath) if isinstance(fpath, str) else "calculated block" meta_df.loc[data_block_value, "file"] = ( fpath.split("\\")[-1] if isinstance(fpath, str) else file_stats ) meta_df.loc[data_block_value, "import_time"] = dt.datetime.now() meta_df.loc[data_block_value, "file_path"] = ( fpath if isinstance(fpath, str) else file_stats ) meta_df.loc[data_block_value, "last_modified"] = ( dt.datetime.fromtimestamp(file_stats.st_mtime).strftime("%Y-%m-%d %H:%M") if isinstance(fpath, str) else dt.datetime.now().strftime("%Y-%m-%d %H:%M") ) meta_df.loc[data_block_value, "last_accessed"] = ( dt.datetime.fromtimestamp(file_stats.st_atime).strftime("%Y-%m-%d %H:%M") if isinstance(fpath, str) else dt.datetime.now().strftime("%Y-%m-%d %H:%M") ) self.meta_block = self.meta_block.append(meta_df)
help_import_custom_xl = '''Import Excel using custom mapping for table and dimensions. Enter most items as lists.'''
[docs] def import_custom_xl( self, f_path, ws_name=0, table_coords=None, idx_cols=0, dim_names=None, dim_values=None, dim_names_coords=None, dim_coords=None, fill_index_na=False, ): """ Custom import with mapping for data table and dimensions from Excel worksheet. :param f_path: Path to the Excel File. :param ws_name: Name or index number of the worksheet. i.e. 'Accounts' or 2. :param table_coords: Excel references for top left and bottom right of table. i.e. ['B7', 'Z20']. :param idx_cols: Number of index columns to use. 0 would be the first column and 3 would mean first 3 columns. :param dim_names: New dimensions names to add. i.e. ['Department', 'Geography'] :param dim_values: Values for new dimensions. i.e. ['Sales', 'North America'] :param dim_names_coords: Excel references to get dimension names from the Excel. i.e. ['A3', 'A4']. :param dim_coords: Excel references to get dimension values from the Excel. i.e. ['B3', 'B4']. """ self.block = pd.DataFrame() wb = load_workbook(f_path, data_only=True) ws_name = wb.sheetnames[0] if ws_name == 0 else ws_name ws = wb[ws_name] dim_names = [] if dim_names is None else dim_names if dim_names_coords.__ne__(None) and dim_names_coords.__ne__([None]): for item in dim_names_coords: dim_name = ws[item].value dim_names.append(dim_name) dim_values = [] if dim_values is None else dim_values if dim_coords.__ne__(None) and dim_coords.__ne__([None]): for item in dim_coords: dim_value = ws[item].value dim_values.append(dim_value) if table_coords == None or table_coords == [None]: df = pd.read_excel(f_path, sheet_name=ws_name, index_col=idx_cols) else: data_range = ws[table_coords[0] : table_coords[1]] range_cols, range_index, range_values, row_list = [], [], [], [] idx_name = data_range[0][0] for item in data_range[0][1:]: range_cols.append(item.value) col_width = len(range_cols) for item in data_range[1:]: range_index.append(item[0].value) for item in data_range[1:]: for itemx in item[1:]: range_values.append(itemx.value) row_count = int(len(range_values) / col_width) col_width_start, col_width_end = 0, col_width for item in list(range(1, row_count + 1)): append_list = range_values[col_width_start:col_width_end] row_list.append(append_list) col_width_start += col_width col_width_end += col_width df = pd.DataFrame(row_list, index=range_index, columns=range_cols) df.index.name = idx_name.value idx_cols = idx_cols if isinstance(idx_cols, list) else [idx_cols] if idx_cols != [0]: add_idx_range = idx_cols add_idx_range.reverse() for item in add_idx_range: add_col_name = range_cols[item] df.set_index(add_col_name, append=True, inplace=True) df.dropna(0, "all", inplace=True) df.dropna(1, "all", inplace=True) self.block = df self.base = self.block.copy() data_block_value = 0 if not isinstance(self.data.index, pd.MultiIndex): data_block_value = int(np.random.rand() * 1000000000) dfi = self.block.index.to_frame() dfi["Data_Block"] = data_block_value rev_cols = list(dfi.columns) rev_cols.reverse() dfi = dfi.loc[:, rev_cols] if fill_index_na == True: dfi.fillna(method="ffill", inplace=True) self.block.index = pd.MultiIndex.from_frame(dfi) else: data_block_no_dup = pd.Series( self.data.index.get_level_values("Data_Block") ) while data_block_value in data_block_no_dup: data_block_value = int(np.random.rand() * 1000000000) dfi = self.block.index.to_frame() dfi["Data_Block"] = data_block_value rev_cols = list(dfi.columns) rev_cols.reverse() dfi = dfi.loc[:, rev_cols] if fill_index_na == True: dfi.fillna(method="ffill", inplace=True) self.block.index = pd.MultiIndex.from_frame(dfi) if dim_values.__ne__([None]) and dim_names.__ne__([None]): self.add_dimensions(dim_names, dim_values, data_obj='block') #Add meta data meta_df = pd.DataFrame( index=[data_block_value], columns=[ "file", "import_time", "last_modified", "last_accessed", "file_path", "modified_by", ], ) file_stats = os.stat(f_path) meta_df.loc[data_block_value, "file"] = ( f_path.split("\\")[-1] if f_path.count("\\") > 0 else f_path.split("/")[-1] ) meta_df.loc[data_block_value, "file_path"] = f_path meta_df.loc[data_block_value, "import_time"] = dt.datetime.now() meta_df.loc[data_block_value, "last_modified"] = dt.datetime.fromtimestamp( file_stats.st_mtime ).strftime("%Y-%m-%d %H:%M") meta_df.loc[data_block_value, "last_accessed"] = dt.datetime.fromtimestamp( file_stats.st_atime ).strftime("%Y-%m-%d %H:%M") self.meta_block = self.meta_block.append(meta_df)
[docs] def import_accts_xl(self, f_path, ws_name=0, dim_name='nval', sep_val=','): """ Import a dataframe such as a chart of accounts or sales dimensions for adding to data objects. :param f_path: Path to Excel file. :param ws_name: Name or index number of the worksheet. i.e. 'Accounts' or 2. :param dim_name: Identifier for the group used when retrieving it. See merge_dim_from_accts. :return: self.accounts """ if f_path[-3:] == 'csv': dim_set = pd.read_csv( f_path, sep=sep_val, thousands=',' ) else: dim_set = pd.read_excel(f_path, ws_name) dim_set.index.name = "index" dim_set["dim_set"] = dim_name dim_set.set_index("dim_set", append=True, inplace=True) dim_set.index = dim_set.index.reorder_levels(["dim_set", "index"]) self.accounts = pd.concat([self.accounts, dim_set])
# TODO: Add check to see if adding an identical df. Concat with different indices.
[docs] def import_xl_sheets( self, f_path, wb_sheets=None, table_coords=None, idx_cols=0, dim_names=None, dim_vals=None, dim_names_coords=None, dim_coords=None, fill_index_na=False, ): """ Imports all the tables from worksheets within an Excel file. :param f_path: Path to the file :param wb_sheets: Worksheets to read. Input as list i.e. ['Sales', 'Operations'] :param table_coords: Excel references for top left and bottom right of table. i.e. ['B7', 'Z20']. :param idx_cols: Number of index columns to use. 0 would be the first column and 3 would mean first 3 columns. :param dim_names: New dimensions names to add. i.e. ['Department', 'Geography'] :param dim_vals: Values for new dimensions. i.e. ['Sales', 'North America'] :param dim_names_coords: Excel references to get dimension names from the Excel. i.e. ['A3', 'A4']. :param dim_coords: Excel references to get dimension values from the Excel. i.e. ['B3', 'B4']. :return: self.data """ dim_names = dim_names if isinstance(dim_names, list) else [dim_names] dim_vals = dim_vals if isinstance(dim_vals, list) else [dim_vals] dim_names_coords = dim_names_coords if isinstance(dim_names_coords, list) else [dim_names_coords] dim_coords = dim_coords if isinstance(dim_coords, list) else [dim_coords] wb = load_workbook(f_path, data_only=True) if wb_sheets == None: wb_sheets = wb.sheetnames if table_coords is None: for wsheet in wb_sheets: self.import_xl(f_path, wsheet) self.add_dimensions(["Work_Sheet"], [wsheet], 1, 'block') self.add_block_to_data() else: for wsheet in wb_sheets: self.import_custom_xl( f_path, wsheet, table_coords.copy(), idx_cols, dim_names.copy(), dim_vals.copy(), dim_names_coords.copy(), dim_coords.copy(), fill_index_na, ) self.add_block_to_data() self.block = pd.DataFrame()
[docs] def import_xl_directory( self, dir_path, xl_id=None, ws_name=0, table_coords=None, idx_cols=0, dim_names=None, dim_vals=None, dim_names_coords=None, dim_coords=None, fill_index_na=False, ): """ Imports dimensions and table from a worksheet from all the Excel files (with or without identifiers) from a directory. :param dir_path: Path to the directory containing the files :param xl_id: String used to filter files to extract data, i.e. if file name is "Budget v3.xlsx" you could say 'v3.xlsx' :param ws_name: Worksheet to read. Input as string i.e. 'Sales' or index. Zero is default. :param table_coords: Excel references for top left and bottom right of table. i.e. ['B7', 'Z20']. :param idx_cols: Number of index columns to use. 0 would be the first column and 3 would mean first 3 columns. :param dim_names: New dimensions names to add. i.e. ['Department', 'Geography'] :param dim_vals: Values for new dimensions. i.e. ['Sales', 'North America'] :param dim_names_coords: Excel references to get dimension names from the Excel. i.e. ['A3', 'A4']. :param dim_coords: Excel references to get dimension values from the Excel. i.e. ['B3', 'B4']. :return: self.data """ dim_names = dim_names if isinstance(dim_names, list) else [dim_names] dim_vals = dim_vals if isinstance(dim_vals, list) else [dim_vals] dim_names_coords = dim_names_coords if isinstance(dim_names_coords, list) else [dim_names_coords] dim_coords = dim_coords if isinstance(dim_coords, list) else [dim_coords] file_list = pd.Series(os.listdir(dir_path)) if xl_id != None: file_list = file_list[file_list.str.contains(xl_id, case=False, na=False)] for file in file_list: dir_path.replace("\\", "/") f_path = dir_path + "/" + file wb = load_workbook(f_path, data_only=True) # wb_sheets = wb.sheetnames wb_sheets = [ws_name] if table_coords is None: for wsheet in wb_sheets: self.import_xl(f_path, wsheet) #self.add_dimensions(["Work_Sheet"], [wsheet], 1, 'block') self.add_block_to_data() else: for wsheet in wb_sheets: self.import_custom_xl( f_path, ws_name, table_coords.copy(), idx_cols, dim_names.copy(), dim_vals.copy(), dim_names_coords.copy(), dim_coords.copy(), fill_index_na, ) self.add_block_to_data() self.block = pd.DataFrame()
[docs] def import_xl_directories( self, dir_path, xl_id=None, ws_name=0, table_coords=None, idx_cols=0, dim_names=None, dim_vals=None, dim_names_coords=None, dim_coords=None, fill_index_na=False, ): """ Imports dimensions and table from a worksheet from all the Excel files (with or without identifiers) from a series of directories. :param dir_path: Path to the root directory containing the directories which contain the files. :param xl_id: String used to filter files to extract data, i.e. if file name is "Budget v3.xlsx" you could say 'v3.xlsx' :param ws_name: Worksheet to read. Input as string i.e. 'Sales' or index. Zero is default. :param table_coords: Excel references for top left and bottom right of table. i.e. ['B7', 'Z20']. :param idx_cols: Number of index columns to use. 0 would be the first column and 3 would mean first 3 columns. :param dim_names: New dimensions names to add. i.e. ['Department', 'Geography'] :param dim_vals: Values for new dimensions. i.e. ['Sales', 'North America'] :param dim_names_coords: Excel references to get dimension names from the Excel. i.e. ['A3', 'A4']. :param dim_coords: Excel references to get dimension values from the Excel. i.e. ['B3', 'B4']. :return: self.data """ dim_names = dim_names if isinstance(dim_names, list) else [dim_names] dim_vals = dim_vals if isinstance(dim_vals, list) else [dim_vals] dim_names_coords = dim_names_coords if isinstance(dim_names_coords, list) else [dim_names_coords] dim_coords = dim_coords if isinstance(dim_coords, list) else [dim_coords] dir_list = pd.Series(os.listdir(dir_path)) dir_list = dir_list[~dir_list.str.contains("\.", na=False)] dir_path = dir_path.replace("\\", "/") for dir in dir_list: file_list = pd.Series(os.listdir(dir_path + "/" + dir)) if xl_id != None: file_list = file_list[ (file_list.str.contains(xl_id, case=False, na=False)) & (~file_list.str.contains('~', case=False, na=False)) ] for file in file_list: f_path = dir_path + "/" + dir + "/" + file wb = load_workbook(f_path, data_only=True) # wb_sheets = wb.sheetnames wb_sheets = [ws_name] if table_coords is None: for wsheet in wb_sheets: self.import_xl(f_path, wsheet) self.add_dimensions(["Work_Sheet"], [wsheet]) self.add_block_to_data() else: for wsheet in wb_sheets: self.import_custom_xl( f_path, ws_name, table_coords.copy(), idx_cols, dim_names.copy(), dim_vals.copy(), dim_names_coords.copy(), dim_coords.copy(), fill_index_na, ) self.add_block_to_data() self.block = pd.DataFrame()
[docs] def update_custom_xl( self, f_path=None, ws_name=0, table_coords=None, idx_cols=0, dim_names=None, dim_vals=None, dim_names_coords=None, dim_coords=None, ): """ Custom update of existing data with mapping for data and dimensions from Excel Worksheet. It will update a section of 'data', which has an identical dimension structure (excluding 'Data_Block' with new data. The new block's meta information will be referenced in the original blocks data to show history. :param f_path: Path to the Excel File. :param ws_name: Name or index number of the worksheet. i.e. 'Accounts' or 2. :param table_coords: Excel references for top left and bottom right of table. i.e. ['B7', 'Z20']. :param idx_cols: Number of index columns to use. 0 would be the first column and 3 would mean first 3 columns. :param dim_names: New dimensions names to add. i.e. ['Department', 'Geography'] :param dim_vals: Values for new dimensions. i.e. ['Sales', 'North America'] :param dim_names_coords: Excel references to get dimension names from the Excel. i.e. ['A3', 'A4']. :param dim_coords: Excel references to get dimension values from the Excel. i.e. ['B3', 'B4']. """ table_coords = table_coords if isinstance(table_coords, list) else [table_coords] dim_names = dim_names if isinstance(dim_names, list) else [dim_names] dim_vals = dim_vals if isinstance(dim_vals, list) else [dim_vals] dim_names_coords = dim_names_coords if isinstance(dim_names_coords, list) else [dim_names_coords] dim_coords = dim_coords if isinstance(dim_coords, list) else [dim_coords] self.import_custom_xl( f_path, ws_name, table_coords.copy(), idx_cols, dim_names.copy(), dim_vals.copy(), dim_names_coords.copy(), dim_coords.copy(), ) self._align_indicies() self.reorder_dimensions(self.data.index.names, 'block') data_nodb = self.data.droplevel("Data_Block") block_nodb = self.block.droplevel("Data_Block") count_iloc = range(0, len(block_nodb.index)) for item, itemx in zip(block_nodb.index, count_iloc): idx_data = data_nodb.index.get_loc(item) for col in self.block.columns: if col not in self.data.columns: self.data[col] = np.nan col_id = self.data.columns.get_loc(col) col_id_b = self.block.columns.get_loc(col) self.data.iloc[idx_data, col_id] = self.block.iloc[itemx, col_id_b] if isinstance(self.data.columns[0], dt.datetime): try: self.data.sort_index(1, inplace=True) except: pass # Add modified block number to modified_by in meta_block idx_db = list(self.data.index.names).index( "Data_Block" ) # get the int position of DB if isinstance( self.data.iloc[idx_data], pd.Series ): data_db_no = self.data.iloc[idx_data].name[idx_db] # Series else: data_db_no = self.data.iloc[idx_data].index.values[0][idx_db] # DF block_db_no = self.block.index.get_level_values("Data_Block")[0] mod_by = str(self.meta_block.loc[data_db_no, "modified_by"]) mod_by = mod_by + "|" + str(block_db_no) self.meta_block.loc[data_db_no, "modified_by"] = mod_by
# TODO: REFINE this.
[docs] def update_xl_sheets( self, f_path, wb_sheets=None, table_coords=None, idx_cols=0, dim_names=None, dim_vals=None, dim_names_coords=None, dim_coords=None, ): """ Updates all the tables from worksheets within an Excel file. It will update a section of 'data', which has an identical dimension structure (excluding 'Data_Block' with new data. The new block's meta information will be referenced in the original blocks data to show history. :param f_path: Path to the file :param wb_sheets: Worksheets to read. Input as list i.e. ['Sales', 'Operations'] :param table_coords: Excel references for top left and bottom right of table. i.e. ['B7', 'Z20']. :param idx_cols: Number of index columns to use. 0 would be the first column and 3 would mean first 3 columns. :param dim_names: New dimensions names to add. i.e. ['Department', 'Geography'] :param dim_vals: Values for new dimensions. i.e. ['Sales', 'North America'] :param dim_names_coords: Excel references to get dimension names from the Excel. i.e. ['A3', 'A4']. :param dim_coords: Excel references to get dimension values from the Excel. i.e. ['B3', 'B4']. :return: self.data """ dim_names = dim_names if isinstance(dim_names, list) else [dim_names] dim_vals = dim_vals if isinstance(dim_vals, list) else [dim_vals] dim_names_coords = dim_names_coords if isinstance(dim_names_coords, list) else [dim_names_coords] dim_coords = dim_coords if isinstance(dim_coords, list) else [dim_coords] wb = load_workbook(f_path, data_only=True) if wb_sheets == None: wb_sheets = wb.sheetnames if table_coords is None: for wsheet in wb_sheets: self.import_xl(f_path, wsheet) self.add_dimensions(["Work_Sheet"], [wsheet]) self.add_block_to_data() else: for wsheet in wb_sheets: self.update_custom_xl( f_path, wsheet, table_coords.copy(), idx_cols, dim_names.copy(), dim_vals.copy(), dim_names_coords.copy(), dim_coords.copy(), ) self.block = pd.DataFrame()
[docs] def update_xl_directory( self, dir_path, xl_id=None, ws_name=0, table_coords=None, idx_cols=0, dim_names=None, dim_vals=None, dim_names_coords=None, dim_coords=None, ): """ Updates dimensions and table from a worksheet from all the Excel files (with or without identifiers) from a directory. It will update a section of 'data', which has an identical dimension structure (excluding 'Data_Block' with new data. The new block's meta information will be referenced in the original blocks data to show history. :param dir_path: Path to the directory containing the files :param xl_id: String used to filter files to extract data, i.e. if file name is "Budget v3.xlsx" you could say 'v3.xlsx' :param ws_name: Worksheet to read. Input as string i.e. 'Sales' or index. Zero is default. :param table_coords: Excel references for top left and bottom right of table. i.e. ['B7', 'Z20']. :param idx_cols: Number of index columns to use. 0 would be the first column and 3 would mean first 3 columns. :param dim_names: New dimensions names to add. i.e. ['Department', 'Geography'] :param dim_vals: Values for new dimensions. i.e. ['Sales', 'North America'] :param dim_names_coords: Excel references to get dimension names from the Excel. i.e. ['A3', 'A4']. :param dim_coords: Excel references to get dimension values from the Excel. i.e. ['B3', 'B4']. :return: self.data """ dim_names = dim_names if isinstance(dim_names, list) else [dim_names] dim_vals = dim_vals if isinstance(dim_vals, list) else [dim_vals] dim_names_coords = dim_names_coords if isinstance(dim_names_coords, list) else [dim_names_coords] dim_coords = dim_coords if isinstance(dim_coords, list) else [dim_coords] file_list = pd.Series(os.listdir(dir_path)) if xl_id != None: file_list = file_list[file_list.str.contains(xl_id, case=False, na=False)] for file in file_list: dir_path.replace("\\", "/") f_path = dir_path + "/" + file wb_sheets = [ws_name] if table_coords is None: for wsheet in wb_sheets: self.import_xl(f_path, wsheet) self.add_dimensions(["Work_Sheet"], [wsheet]) self.add_block_to_data() else: for wsheet in wb_sheets: self.update_custom_xl( f_path, ws_name, table_coords.copy(), idx_cols, dim_names.copy(), dim_vals.copy(), dim_names_coords.copy(), dim_coords.copy(), ) self.block = pd.DataFrame()
[docs] def update_xl_directories( self, dir_path, xl_id=None, ws_name=0, table_coords=None, idx_cols=0, dim_names=None, dim_vals=None, dim_names_coords=None, dim_coords=None, ): """ Imports dimensions and table from a worksheet from all the Excel files (with or without identifiers) from a series of directories. It will update a section of 'data', which has an identical dimension structure (excluding 'Data_Block' with new data. The new block's meta information will be referenced in the original blocks data to show history. :param dir_path: Path to the root directory containing the directories which contain the files. :param xl_id: String used to filter files to extract data, i.e. if file name is "Budget v3.xlsx" you could say 'v3.xlsx' :param ws_name: Worksheet to read. Input as string i.e. 'Sales' or index. Zero is default. :param table_coords: Excel references for top left and bottom right of table. i.e. ['B7', 'Z20']. :param idx_cols: Number of index columns to use. 0 would be the first column and 3 would mean first 3 columns. :param dim_names: New dimensions names to add. i.e. ['Department', 'Geography'] :param dim_vals: Values for new dimensions. i.e. ['Sales', 'North America'] :param dim_names_coords: Excel references to get dimension names from the Excel. i.e. ['A3', 'A4']. :param dim_coords: Excel references to get dimension values from the Excel. i.e. ['B3', 'B4']. :return: self.data """ dim_names = dim_names if isinstance(dim_names, list) else [dim_names] dim_vals = dim_vals if isinstance(dim_vals, list) else [dim_vals] dim_names_coords = dim_names_coords if isinstance(dim_names_coords, list) else [dim_names_coords] dim_coords = dim_coords if isinstance(dim_coords, list) else [dim_coords] dir_list = pd.Series(os.listdir(dir_path)) dir_list = dir_list[~dir_list.str.contains("\.", na=False)] dir_path = dir_path.replace("\\", "/") for dir in dir_list: file_list = pd.Series(os.listdir(dir_path + "/" + dir)) if xl_id != None: file_list = file_list[ file_list.str.contains(xl_id, case=False, na=False) ] for file in file_list: f_path = dir_path + "/" + dir + "/" + file wb_sheets = [ws_name] if table_coords is None: for wsheet in wb_sheets: self.import_xl(f_path, wsheet) self.add_dimensions(["Work_Sheet"], [wsheet]) self.add_block_to_data() else: for wsheet in wb_sheets: self.update_custom_xl( f_path, ws_name, table_coords.copy(), idx_cols, dim_names.copy(), dim_vals.copy(), dim_names_coords.copy(), dim_coords.copy(), ) self.block = pd.DataFrame()
[docs] def add_block_to_data(self): """Takes the block data object, arranges the index and adds it to the data object. Even if the indexes don't match, this will fill in the missing pieces.""" if self.data.index.names != [None]: self._align_indicies() self.reorder_dimensions(self.data.index.names, 'block') self.data = pd.concat([self.data, self.block])
help_add_dimensions = "Add dimensions and values with lists ['???',...]"
[docs] def add_dimensions( self, new_dimensions, dim_values_to_add, col_num=1, data_obj="data" ): """ Append a dimension to the index with a new name, values and where to place it. :param new_dimensions: Name of the new dimension or dimensions. String value or list i.e. 'Department' or ['Department', 'Region'] :param dim_values_to_add: Values of the new dimension or dimensions. String value or list i.e. 'Sales' or ['Sales', 'EMEA'] :param col_num: Where in the index to place the new dimension. 1 indicates 1 column from the left. :param data_obj: Which data object you want o effect. Available - 'block', 'slice', 'consolidation', 'function_result', 'data' :return: data_obj """ new_dimensions = new_dimensions if isinstance(new_dimensions, list) else [new_dimensions] dim_values_to_add = dim_values_to_add if isinstance(dim_values_to_add, list) else [dim_values_to_add] if data_obj == "block": dfi = self.block.index.to_frame() for dim_name, dim_value in zip(new_dimensions, dim_values_to_add): dfi.insert(col_num, dim_name, dim_value) self.block.index = pd.MultiIndex.from_frame(dfi) elif data_obj == "slice": dfi = self.slice.index.to_frame() for dim_name, dim_value in zip(new_dimensions, dim_values_to_add): dfi.insert(col_num, dim_name, dim_value) self.slice.index = pd.MultiIndex.from_frame(dfi) elif data_obj == "consolidation": dfi = self.consolidation.index.to_frame() for dim_name, dim_value in zip(new_dimensions, dim_values_to_add): dfi.insert(col_num, dim_name, dim_value) self.consolidation.index = pd.MultiIndex.from_frame(dfi) elif data_obj == "function_result": dfi = self.function_result.index.to_frame() for dim_name, dim_value in zip(new_dimensions, dim_values_to_add): dfi.insert(col_num, dim_name, dim_value) self.function_result.index = pd.MultiIndex.from_frame(dfi) else: if ( len(new_dimensions) == len(dim_values_to_add) and isinstance(dim_values_to_add[0], list) is False ): dfi = self.data.index.to_frame() for dim_name, dim_value in zip(new_dimensions, dim_values_to_add): dfi.insert(col_num, dim_name, dim_value) self.data.index = pd.MultiIndex.from_frame(dfi) else: dfi = self.data.index.to_frame() index_counts = self.data._count_level(0).iloc[:, 0] index_counts = index_counts.reindex( dfi.index.get_level_values("Data_Block").unique() ) for new_dim_name in new_dimensions: dim_value_index = new_dimensions.index(new_dim_name) dim_value_list = [] counter = 0 dim_value_loop = dim_values_to_add[dim_value_index] for idx_value in index_counts.index: dim_value = ( dim_value_loop[counter] if len(dim_value_loop) > 1 else dim_value_loop[0] ) number_of_iter = list(range(0, index_counts.loc[idx_value])) counter += 1 for item in number_of_iter: dim_value_list.append(dim_value) dim_value_list = pd.Series(dim_value_list) dim_value_list.name = new_dim_name dfi.insert(1, new_dim_name, dim_value_list.values) self.data.index = pd.MultiIndex.from_frame(dfi)
# TODO: FIX add dimension to data for multiple. work on ordering when adding help_merge_dim_from_accts = ( "One-to-One add of a dimension from an accounts based on existing dimension" )
[docs] def merge_dim_from_accts(self, dim_set, base_dim, new_dims, data_obj="data"): """ Add dimension based on account data object. If you have a chart of accounts you could add the account number to the Line Item if you have it. :param dim_set: Section of 'accounts' data object to take as new dimensions. :param base_dim: Existing dimension in the index and accouts on which to merge the new dimension :param new_dims: Column from the accounts data object table to add to the dataframe :param data_obj: Which data object you want o effect. Available - 'block', 'data', 'slice', 'consolidation', 'function_result' :return: data_object """ dim_df = self.accounts.loc[dim_set].copy() dim_df.dropna(1, how="all", inplace=True) new_dims = [new_dims] if isinstance(new_dims, str) else new_dims if data_obj == "block": dfi = self.block.index.to_frame() dfi.reset_index(drop=True, inplace=True) new_dims.append(base_dim) dim_df = dim_df.loc[:, new_dims] dim_df.set_index(base_dim, inplace=True) dfi = dfi.join(dim_df, base_dim) self.block.index = pd.MultiIndex.from_frame(dfi) elif data_obj == "slice": dfi = self.slice.index.to_frame() dfi.reset_index(drop=True, inplace=True) new_dims.append(base_dim) dim_df = dim_df.loc[:, new_dims] dim_df.set_index(base_dim, inplace=True) dfi = dfi.join(dim_df, base_dim) self.slice.index = pd.MultiIndex.from_frame(dfi) elif data_obj == "consolidation": dfi = self.consolidation.index.to_frame() dfi.reset_index(drop=True, inplace=True) new_dims.append(base_dim) dim_df = dim_df.loc[:, new_dims] dim_df.set_index(base_dim, inplace=True) dfi = dfi.join(dim_df, base_dim) self.consolidation.index = pd.MultiIndex.from_frame(dfi) elif data_obj == "variance": dfi = self.variance.index.to_frame() dfi.reset_index(drop=True, inplace=True) new_dims.append(base_dim) dim_df = dim_df.loc[:, new_dims] dim_df.set_index(base_dim, inplace=True) dfi = dfi.join(dim_df, base_dim) self.variance.index = pd.MultiIndex.from_frame(dfi) elif data_obj == "function_result": dfi = self.function_result.index.to_frame() dfi.reset_index(drop=True, inplace=True) new_dims.append(base_dim) dim_df = dim_df.loc[:, new_dims] dim_df.set_index(base_dim, inplace=True) dfi = dfi.join(dim_df, base_dim) self.function_result.index = pd.MultiIndex.from_frame(dfi) else: dfi = self.data.index.to_frame() dfi.reset_index(drop=True, inplace=True) new_dims.append(base_dim) dim_df = dim_df.loc[:, new_dims] dim_df.set_index(base_dim, inplace=True) dfi = dfi.join(dim_df, base_dim) self.data.index = pd.MultiIndex.from_frame(dfi)
help_merge_dim_from_xl = ( "One-to-One add of a dimension from an excel list based on existing dimension" )
[docs] def merge_dim_from_xl(self, fpath, ws_name, base_dim, new_dims, data_obj="data"): """ Add dimension based on an table from an Excel file. If you have a chart of accounts you could add the account number to the Line Item if you have it. :param base_dim: Existing dimension in the index on which to merge the new dimension :param new_dims: Column from the accounts data object table to add to the dataframe :param data_obj: Which data object you want o effect. Available - 'block', 'data', 'slice', 'consolidation', 'function_result' :return: data_object """ dim_df = pd.read_excel(fpath, sheet_name=ws_name, index_col=None) new_dims = [new_dims] if isinstance(new_dims, str) else new_dims if data_obj == "block": dfi = self.block.index.to_frame() dfi.reset_index(drop=True, inplace=True) new_dims.append(base_dim) dim_df = dim_df.loc[:, new_dims] dim_df.set_index(base_dim, inplace=True) dfi = dfi.join(dim_df, base_dim, how="inner") self.block.index = pd.MultiIndex.from_frame(dfi) elif data_obj == "slice": dfi = self.slice.index.to_frame() dfi.reset_index(drop=True, inplace=True) new_dims.append(base_dim) dim_df = dim_df.loc[:, new_dims] dim_df.set_index(base_dim, inplace=True) dfi = dfi.join(dim_df, base_dim, how="inner") self.slice.index = pd.MultiIndex.from_frame(dfi) elif data_obj == "consolidation": dfi = self.consolidation.index.to_frame() dfi.reset_index(drop=True, inplace=True) new_dims.append(base_dim) dim_df = dim_df.loc[:, new_dims] dim_df.set_index(base_dim, inplace=True) dfi = dfi.join(dim_df, base_dim, how="inner") self.consolidation.index = pd.MultiIndex.from_frame(dfi) elif data_obj == "variance": dfi = self.variance.index.to_frame() dfi.reset_index(drop=True, inplace=True) new_dims.append(base_dim) dim_df = dim_df.loc[:, new_dims] dim_df.set_index(base_dim, inplace=True) dfi = dfi.join(dim_df, base_dim, how="inner") self.variance.index = pd.MultiIndex.from_frame(dfi) elif data_obj == "function_result": dfi = self.function_result.index.to_frame() dfi.reset_index(drop=True, inplace=True) new_dims.append(base_dim) dim_df = dim_df.loc[:, new_dims] dim_df.set_index(base_dim, inplace=True) dfi = dfi.join(dim_df, base_dim, how="inner") self.function_result.index = pd.MultiIndex.from_frame(dfi) else: dfi = self.data.index.to_frame() dfi.reset_index(drop=True, inplace=True) new_dims.append(base_dim) dim_df = dim_df.loc[:, new_dims] dim_df.set_index(base_dim, inplace=True) dfi = dfi.join(dim_df, base_dim, how="inner") self.data.index = pd.MultiIndex.from_frame(dfi)
[docs] def reorder_dimensions(self, new_order, data_obj="data"): """ Change the order of the dimensions of the index. :param new_order: List input i.e. ['Department', 'Region', 'Data_Block', 'Line Item'] :param data_obj: Which data object you want o effect. Available - 'block', 'data', 'slice', 'consolidation', 'function_result', 'variance' :return: data_obj """ if data_obj == "block": self.block = self.block.reorder_levels(new_order) elif data_obj == "data": self.data = self.data.reorder_levels(new_order) elif data_obj == "function_result": self.function_result= self.function_result.reorder_levels(new_order) elif data_obj == "variance": self.variance = self.variance.reorder_levels(new_order) elif data_obj == "consolidation": self.consolidation = self.consolidation.reorder_levels(new_order) else: self.slice = self.slice.reorder_levels(new_order)
[docs] def combine_dimensions(self, combine_order, data_obj="data"): if data_obj == "block": dfi = self.block.index.to_frame() dfi.reset_index(drop=True, inplace=True) s0 = dfi[combine_order[0]] for item in combine_order[1:]: s0 = s0.combine_first(dfi.loc[:, item]) dfi[combine_order[0]] = s0 self.block.index = pd.MultiIndex.from_frame(dfi) elif data_obj == "data": dfi = self.data.index.to_frame() dfi.reset_index(drop=True, inplace=True) s0 = dfi[combine_order[0]] for item in combine_order[1:]: s0 = s0.combine_first(dfi.loc[:, item]) dfi[combine_order[0]] = s0 self.data.index = pd.MultiIndex.from_frame(dfi)
[docs] def rename_dimensions(self, dim_list, data_obj="data"): """ Give new names to one or all of the dimension names. I.e. if dimensions are ['Department', 'Region', 'Data_Block'] it can be changed to ['Department', 'Geography', 'Data_Block'] :param dim_list: List of all the dimension names with the new names included :param data_obj: Which data object you want o effect. Available - 'block', 'data', 'slice', 'consolidation', 'function_result', 'variance' :return: data object """ if data_obj == "data": self.data.index.names = dim_list elif data_obj == "block": self.block.index.names = dim_list elif data_obj == "slice": self.slice.index.names = dim_list elif data_obj == "variance": self.variance.index.names = dim_list elif data_obj == "consolidation": self.consolidation.index.names = dim_list else: self.function_result.index.names = dim_list
[docs] def rename_dim_item(self, dim, old, new, data_obj="data"): """ Give a new name to an existing name in a dimension. I.e. if 'USA" is to be changed to 'North America' :param dim: String value of which dimension contains the item you want to change. I.e. 'Region' :param old: String value of the item to be changed. I.e. 'USA' :param new: String value of the new name of the item. I.e. 'North America' :param data_obj: Which data object you want o effect. Available - 'block', 'data', 'slice', 'consolidation', 'function_result', 'variance' :return: data object """ if data_obj == "data": idx = self.data.index.to_frame() idx.fillna("nval", inplace=True) idx[dim] = idx[dim].str.replace(old, new) self.data.index = pd.MultiIndex.from_frame(idx) elif data_obj == "block": idx = self.block.index.to_frame() idx.fillna("nval", inplace=True) idx[dim] = idx[dim].str.replace(old, new) self.block.index = pd.MultiIndex.from_frame(idx) elif data_obj == "accounts": idx = self.accounts.index.to_frame() idx.fillna("nval", inplace=True) idx['dim_set'] = idx['dim_set'].str.replace(old, new) self.accounts.index = pd.MultiIndex.from_frame(idx) elif data_obj == "slice": idx = self.slice.index.to_frame() idx.fillna("nval", inplace=True) idx[dim] = idx[dim].str.replace(old, new) self.slice.index = pd.MultiIndex.from_frame(idx) elif data_obj == "function_result": idx = self.function_result.index.to_frame() idx.fillna("nval", inplace=True) idx[dim] = idx[dim].str.replace(old, new) self.function_result.index = pd.MultiIndex.from_frame(idx) elif data_obj == "consolidation": idx = self.consolidation.index.to_frame() idx.fillna("nval", inplace=True) idx[dim] = idx[dim].str.replace(old, new) self.consolidation.index = pd.MultiIndex.from_frame(idx) else: idx = self.variance.index.to_frame() idx.fillna("nval", inplace=True) idx[dim] = idx[dim].str.replace(old, new) self.variance.index = pd.MultiIndex.from_frame(idx)
[docs] def dim_to_date(self, dim, data_obj="data"): """ Attempts to change a dimension or column from object to datetime. :param dim: String value of which dimension contains the item you want to change. I.e. 'Region' :param data_obj: Which data object you want o effect. Available - 'block', 'data', 'slice', 'consolidation', 'function_result', 'variance' """ if data_obj == "data": idx = self.data.index.to_frame() idx.fillna("nval", inplace=True) idx[dim] = pd.to_datetime(idx[dim]) self.data.index = pd.MultiIndex.from_frame(idx) elif data_obj == "block": idx = self.block.index.to_frame() idx.fillna("nval", inplace=True) idx[dim] = pd.to_datetime(idx[dim]) self.block.index = pd.MultiIndex.from_frame(idx) elif data_obj == "accounts": idx = self.accounts.index.to_frame() idx.fillna("nval", inplace=True) idx[dim] = pd.to_datetime(idx[dim]) self.accounts.index = pd.MultiIndex.from_frame(idx) elif data_obj == "slice": idx = self.slice.index.to_frame() idx.fillna("nval", inplace=True) idx[dim] = pd.to_datetime(idx[dim]) self.slice.index = pd.MultiIndex.from_frame(idx) elif data_obj == "function_result": idx = self.function_result.index.to_frame() idx.fillna("nval", inplace=True) idx[dim] = pd.to_datetime(idx[dim]) self.function_result.index = pd.MultiIndex.from_frame(idx) elif data_obj == "consolidation": idx = self.consolidation.index.to_frame() idx.fillna("nval", inplace=True) idx[dim] = pd.to_datetime(idx[dim]) self.consolidation.index = pd.MultiIndex.from_frame(idx) else: idx = self.variance.index.to_frame() idx.fillna("nval", inplace=True) idx[dim] = pd.to_datetime(idx[dim]) self.variance.index = pd.MultiIndex.from_frame(idx)
[docs] def reorder_index_dim( self, new_order, dim=None, axis_target="index", data_obj="data" ): """ Change the order of the item within an index. I.e. if existing the Line Item dimension has an existing order of ['Office Supplies', 'Rent', 'Payroll'], change it to ['Payroll', 'Rent', 'Office Supplies']. :param new_order: List values of the new order. Should contain all the values. I.e. ['Payroll', 'Rent', 'Office Supplies'] :param dim: Dimension to be reordered :param axis_target: Index or the columns. I.e. 'index' or 'columns' :param data_obj: Which data object you want o effect. Available - 'block', 'data', 'slice', 'consolidation', 'function_result', 'variance' :return: """ if data_obj == "data": index_items = list(set(self.data.index.get_level_values(dim))) for item_no in new_order: index_items.remove(item_no) new_order.reverse() for item_no in new_order: index_items.insert(0, item_no) self.data = self.data.reindex( index_items, axis=axis_target, level=dim, fill_value="nval" ) elif data_obj == "block": index_items = list(set(self.block.index.get_level_values(dim))) for item_no in new_order: index_items.remove(item_no) new_order.reverse() for item_no in new_order: index_items.insert(0, item_no) self.block = self.block.reindex( index_items, axis=axis_target, level=dim, fill_value="nval" ) elif data_obj == "slice": index_items = list(set(self.slice.index.get_level_values(dim))) for item_no in new_order: index_items.remove(item_no) new_order.reverse() for item_no in new_order: index_items.insert(0, item_no) self.slice = self.slice.reindex( index_items, axis=axis_target, level=dim, fill_value="nval" ) elif data_obj == "function_result": index_items = list(set(self.function_result.index.get_level_values(dim))) for item_no in new_order: index_items.remove(item_no) new_order.reverse() for item_no in new_order: index_items.insert(0, item_no) self.function_result= self.function_result.reindex( index_items, axis=axis_target, level=dim, fill_value="nval" ) elif data_obj == "consolidation": index_items = list(set(self.consolidation.index.get_level_values(dim))) for item_no in new_order: index_items.remove(item_no) new_order.reverse() for item_no in new_order: index_items.insert(0, item_no) self.consolidation = self.consolidation.reindex( index_items, axis=axis_target, level=dim, fill_value="nval" ) else: index_items = list(set(self.variance.index.get_level_values(dim))) for item_no in new_order: index_items.remove(item_no) new_order.reverse() for item_no in new_order: index_items.insert(0, item_no) self.variance = self.variance.reindex( index_items, axis=axis_target, level=dim, fill_value="nval" )
[docs] def move_dims_to_col(self, dims, data_obj="data"): """ Move a dimension from the index to a column in the data. :param dims: Dimension to drop. I.e. 'Department' :param data_obj: Which data object you want o effect. Available - 'block', 'data', 'slice', 'consolidation', 'function_result', 'variance' :return: """ if data_obj == "data": self.data.reset_index(dims, inplace=True) elif data_obj == "block": self.block.reset_index(dims, inplace=True) elif data_obj == "slice": self.slice.reset_index(dims, inplace=True) elif data_obj == "variance": self.variance.reset_index(dims, inplace=True) elif data_obj == "consolidation": self.consolidation.reset_index( dims, inplace=True ) else: self.function_result.reset_index(dims, inplace=True)
[docs] def move_col_to_dims(self, dims, data_obj="data"): """ Move a dimension from the columns in the data to the index. :param dims: Dimension to drop. I.e. 'Department' :param data_obj: Which data object you want o effect. Available - 'block', 'data', 'slice', 'consolidation', 'function_result', 'variance' :return: """ if data_obj == "data": self.data.set_index(dims, append=True, inplace=True) elif data_obj == "block": self.block.set_index(dims, append=True, inplace=True) elif data_obj == "slice": self.slice.set_index(dims, append=True, inplace=True) elif data_obj == "variance": self.variance.set_index(dims, append=True, inplace=True) elif data_obj == "consolidation": self.consolidation.set_index(dims, append=True, inplace=True) else: self.function_result.set_index(dims, append=True, inplace=True)
[docs] def make_records(self, data_obj="data"): """ Change the data object to a records format rather than a table. Works bes on single level columns. :param data_obj: Which data object you want o effect. Default: 'data'. Available - 'block', 'data', 'slice', 'consolidation', 'function_result', 'variance' """ if data_obj == "data": self.function_result = self.data.copy().stack() elif data_obj == "slice": self.function_result = self.slice.copy().stack() elif data_obj == "block": self.function_result = self.block.copy().stack() elif data_obj == "variance": self.function_result = self.variance.copy().stack() elif data_obj == "consolidation": self.function_result = self.consolidation.copy().stack() elif data_obj == "function_result": self.block= self.function_result.copy().stack() else: print("no object")
[docs] def drop_dimension(self, dimension_drop, data_obj="data"): """ Remove a dimension from the index. :param dimension_drop: Dimension to drop. I.e. 'Department' :param data_obj: Which data object you want o effect. Available - 'block', 'data', 'slice', 'consolidation', 'function_result', 'variance' :return: """ if data_obj == "data": self.data.index = self.data.index.droplevel(dimension_drop) elif data_obj == "block": self.block.index = self.block.index.droplevel(dimension_drop) elif data_obj == "slice": self.slice.index = self.slice.index.droplevel(dimension_drop) elif data_obj == "variance": self.variance.index = self.variance.index.droplevel(dimension_drop) elif data_obj == "consolidation": self.consolidation.index = self.consolidation.index.droplevel( dimension_drop ) else: self.function_result.index = self.function_result.index.droplevel(dimension_drop)
[docs] def comma_format(self, decimals=0): """ Changes the display options to show commas and decimal places. Choices are 0, 1, 2. """ if decimals == 2: pd.options.display.float_format = '{:,.2f}'.format elif decimals == 1: pd.options.display.float_format = '{:,.1f}'.format else: pd.options.display.float_format = '{:,.0f}'.format
[docs] def get_block_info(self, db_id): """ Retrieve the meta information from a data block. Just plug in the data block number. :param db_id: Interger of the data block. I.e. x.get_block_info(49593949) :return: Meta information for Data_Block. """ meta_info = self.meta_block.loc[db_id] return meta_info
[docs] def consol_dimension(self, dims="Data_Block", data_obj="data"): """ Consolidates a data object on a certain dimension of the data object. :param dim: Dimension to consolidate on. I.e. 'Department' or 'SalesPerson'. :param data_obj: Which data object you want o effect. Default: 'data'. Available - 'block', 'data', 'slice', 'consolidation', 'function_result', 'variance' :return: self.consolidation """ last_dim = self.data.index.names[-1] if data_obj == "data": self.consolidation = self.data.groupby(level=dims, sort=False).sum() elif data_obj == "slice": self.consolidation = self.slice.groupby(level=dims, sort=False).sum() elif data_obj == "function_result": self.consolidation = self.function_result.groupby(level=dims, sort=False).sum() elif data_obj == "block": self.consolidation = self.block.groupby(level=dims, sort=False).sum() elif data_obj == "variance": self.consolidation = self.variance.groupby(level=dims, sort=False).sum() else: self.consolidation = self.consolidation.groupby(level=dims, sort=False).sum() return self.consolidation
[docs] def slice_data( self, dims=None, dim_values=None, col_range=None, col_list=None, data_obj="data", append_to=False, ): """ Slice and dice data based on dimensions for the index and ranges or lists for the columns. :param dims: List object (even for one item) of dimensions. I.e. ['Department', 'Line Item'] :param dim_values: List and nested list object (even for one item) of items on which to slice. I.e. ['Operations', ['Network Costs', 'Payroll']] :param col_range: List object of start and end point of range. I.e. ['2022-06-30', '2022-12-31'] for datetime :param col_list: List object columns. I.e. ['2022-06-30', '2022-09-30', '2022-12-31'] for datetime :param data_obj: Which data object you want o effect. Available - 'data', 'slice' :param append_to: Add data to existing slice data object. :return: self.slice """ dims = dims if isinstance(dims, list) else [dims] dim_values = dim_values if isinstance(dim_values, list) else [dim_values] if data_obj == "data": col_search_dims = [] col_search_values = [] col_search_dims = [ dim for dim in dims if dim not in list(self.data.index.names) and dim in list(self.data.columns) ] dims_idx = [dims.index(col) for col in col_search_dims] col_search_values = [dim_values[idx] for idx in dims_idx] dims = [dim for dim in dims if dim not in col_search_dims] dim_values = [dim for dim in dim_values if dim not in col_search_values] if append_to == False: self.slice = pd.DataFrame() if dims == None or dims == [None] or dims == []: self.slice = self.data.copy() else: slice_matrix = pd.DataFrame(columns=dims) if dim_values == None or dim_values == [None]: dim_values = [] if isinstance(self.data.index, pd.MultiIndex): for item in self.data.index.levels: dim_values.append(list(item)) else: dim_values = self.data.index.values count_row = 0 for item, dim_name in zip(dim_values, dims): sub_count_row = 0 if isinstance(item, list): for subitem in item: slice_matrix.loc[sub_count_row, dim_name] = subitem sub_count_row += 1 else: slice_matrix.loc[count_row, dim_name] = item iter_matrix = pd.DataFrame() df_count = pd.DataFrame( index=["dim_count"], columns=slice_matrix.columns ) for col in slice_matrix.columns: dm = len(slice_matrix[col].dropna()) df_count.loc["dim_count", col] = dm for target_col in df_count.columns: for target_row in slice_matrix[target_col].dropna().index: target_column = slice_matrix[target_col].dropna() target_dim_item = target_column.loc[target_row] df_rest_list = list(slice_matrix.columns) df_rest_list.remove(target_col) df_rest = slice_matrix.loc[:, df_rest_list] x = 1 for item in df_rest.columns: x = len(slice_matrix[item].dropna()) * x df_item = pd.DataFrame( index=range(0, x), columns=slice_matrix.columns ) df_item[target_col] = target_dim_item for rest_col in df_rest.columns: rest_column = df_rest[rest_col].dropna() rest_item_count = df_count[rest_col][0] rest_multiplier = ( int(len(df_item.index) / rest_item_count) - 1 ) rest_column_insert = rest_column.copy() for mlper in range(0, rest_multiplier): rest_column_insert = rest_column_insert.append( rest_column ) rest_column_insert.index = df_item.index df_item[rest_col] = rest_column_insert iter_matrix = pd.concat( [iter_matrix, df_item], ignore_index=True ) iter_matrix = ( iter_matrix.drop_duplicates() .dropna(how="any") .reset_index(drop=True) ) for row in iter_matrix.index: slice_terms = tuple(iter_matrix.loc[row].values) if isinstance(self.data.index, pd.MultiIndex): single_slice = self.data.xs( slice_terms, level=dims, drop_level=False ) else: for dim in dims: for item_s in dim_values: single_slice = self.data.loc[item_s] self.slice = pd.concat([self.slice, single_slice]) if not isinstance(self.data.index, pd.MultiIndex): self.slice.drop_duplicates(inplace=True) self.slice.sort_index(inplace=True) # Column Search if col_search_dims != [] and col_search_values != []: col_slice = pd.DataFrame() col_slice_df = self.slice.copy() for dim in col_search_dims: for val in col_search_values: if isinstance(val, list): for sub_val in val: single_slice = col_slice_df[ col_slice_df[dim] == sub_val ] col_slice = pd.concat([col_slice, single_slice]) else: single_slice = col_slice_df[col_slice_df[dim] == val] col_slice = pd.concat([col_slice, single_slice]) col_slice_df = col_slice self.slice = col_slice # Column axis slicing if col_range != None: self.slice = self.slice.loc[:, col_range[0] : col_range[1]] if col_list != None: self.slice = self.slice.loc[:, col_list] return self.slice if data_obj == "consolidation": col_search_dims = [] col_search_values = [] col_search_dims = [ dim for dim in dims if dim not in list(self.consolidation.index.names) and dim in list(self.consolidation.columns) ] dims_idx = [dims.index(col) for col in col_search_dims] col_search_values = [dim_values[idx] for idx in dims_idx] dims = [dim for dim in dims if dim not in col_search_dims] dim_values = [dim for dim in dim_values if dim not in col_search_values] if append_to == False: self.slice = pd.DataFrame() if dims == None or dims == [None] or dims == []: self.slice = self.consolidation.copy() else: slice_matrix = pd.DataFrame(columns=dims) if dim_values == None or dim_values == [None]: dim_values = [] if isinstance(self.consolidation.index, pd.MultiIndex): for item in self.consolidation.index.levels: dim_values.append(list(item)) else: dim_values = self.consolidation.index.values count_row = 0 for item, dim_name in zip(dim_values, dims): sub_count_row = 0 if isinstance(item, list): for subitem in item: slice_matrix.loc[sub_count_row, dim_name] = subitem sub_count_row += 1 else: slice_matrix.loc[count_row, dim_name] = item iter_matrix = pd.DataFrame() df_count = pd.DataFrame( index=["dim_count"], columns=slice_matrix.columns ) for col in slice_matrix.columns: dm = len(slice_matrix[col].dropna()) df_count.loc["dim_count", col] = dm for target_col in df_count.columns: for target_row in slice_matrix[target_col].dropna().index: target_column = slice_matrix[target_col].dropna() target_dim_item = target_column.loc[target_row] df_rest_list = list(slice_matrix.columns) df_rest_list.remove(target_col) df_rest = slice_matrix.loc[:, df_rest_list] x = 1 for item in df_rest.columns: x = len(slice_matrix[item].dropna()) * x df_item = pd.DataFrame( index=range(0, x), columns=slice_matrix.columns ) df_item[target_col] = target_dim_item for rest_col in df_rest.columns: rest_column = df_rest[rest_col].dropna() rest_item_count = df_count[rest_col][0] rest_multiplier = ( int(len(df_item.index) / rest_item_count) - 1 ) rest_column_insert = rest_column.copy() for mlper in range(0, rest_multiplier): rest_column_insert = rest_column_insert.append( rest_column ) rest_column_insert.index = df_item.index df_item[rest_col] = rest_column_insert iter_matrix = pd.concat( [iter_matrix, df_item], ignore_index=True ) iter_matrix = ( iter_matrix.drop_duplicates() .dropna(how="any") .reset_index(drop=True) ) for row in iter_matrix.index: slice_terms = tuple(iter_matrix.loc[row].values) if isinstance(self.consolidation.index, pd.MultiIndex): single_slice = self.consolidation.xs( slice_terms, level=dims, drop_level=False ) else: for dim in dims: for item_s in dim_values: single_slice = self.consolidation.loc[item_s] self.slice = pd.concat([self.slice, single_slice]) if not isinstance(self.consolidation.index, pd.MultiIndex): self.slice.drop_duplicates(inplace=True) self.slice.sort_index(inplace=True) # Column Search if col_search_dims != [] and col_search_values != []: col_slice = pd.DataFrame() col_slice_df = self.slice.copy() for dim in col_search_dims: for val in col_search_values: if isinstance(val, list): for sub_val in val: single_slice = col_slice_df[ col_slice_df[dim] == sub_val ] col_slice = pd.concat([col_slice, single_slice]) else: single_slice = col_slice_df[col_slice_df[dim] == val] col_slice = pd.concat([col_slice, single_slice]) col_slice_df = col_slice self.slice = col_slice # Column axis slicing if col_range != None: self.slice = self.slice.loc[:, col_range[0] : col_range[1]] if col_list != None: self.slice = self.slice.loc[:, col_list] return self.slice if data_obj == "slice": df_slice = self.slice.copy() self.slice = pd.DataFrame() col_search_dims = [] col_search_values = [] col_search_dims = [ dim for dim in dims if dim not in list(df_slice.index.names) and dim in list(df_slice.columns) ] dims_idx = [dims.index(col) for col in col_search_dims] col_search_values = [dim_values[idx] for idx in dims_idx] dims = [dim for dim in dims if dim not in col_search_dims] dim_values = [dim for dim in dim_values if dim not in col_search_values] if append_to == False: self.slice = pd.DataFrame() if dims == None or dims == [None] or dims == []: self.slice = df_slice.copy() else: slice_matrix = pd.DataFrame(columns=dims) if dim_values == None or dim_values == [None]: dim_values = [] if isinstance(df_slice.index, pd.MultiIndex): for item in df_slice.index.levels: dim_values.append(list(item)) else: dim_values = df_slice.index.values count_row = 0 for item, dim_name in zip(dim_values, dims): sub_count_row = 0 if isinstance(item, list): for subitem in item: slice_matrix.loc[sub_count_row, dim_name] = subitem sub_count_row += 1 else: slice_matrix.loc[count_row, dim_name] = item iter_matrix = pd.DataFrame() df_count = pd.DataFrame( index=["dim_count"], columns=slice_matrix.columns ) for col in slice_matrix.columns: dm = len(slice_matrix[col].dropna()) df_count.loc["dim_count", col] = dm for target_col in df_count.columns: for target_row in slice_matrix[target_col].dropna().index: target_column = slice_matrix[target_col].dropna() target_dim_item = target_column.loc[target_row] df_rest_list = list(slice_matrix.columns) df_rest_list.remove(target_col) df_rest = slice_matrix.loc[:, df_rest_list] x = 1 for item in df_rest.columns: x = len(slice_matrix[item].dropna()) * x df_item = pd.DataFrame( index=range(0, x), columns=slice_matrix.columns ) df_item[target_col] = target_dim_item for rest_col in df_rest.columns: rest_column = df_rest[rest_col].dropna() rest_item_count = df_count[rest_col][0] rest_multiplier = ( int(len(df_item.index) / rest_item_count) - 1 ) rest_column_insert = rest_column.copy() for mlper in range(0, rest_multiplier): rest_column_insert = rest_column_insert.append( rest_column ) rest_column_insert.index = df_item.index df_item[rest_col] = rest_column_insert iter_matrix = pd.concat( [iter_matrix, df_item], ignore_index=True ) iter_matrix = ( iter_matrix.drop_duplicates() .dropna(how="any") .reset_index(drop=True) ) for row in iter_matrix.index: slice_terms = tuple(iter_matrix.loc[row].values) if isinstance(df_slice.index, pd.MultiIndex): single_slice = df_slice.xs( slice_terms, level=dims, drop_level=False ) else: for dim in dims: for item_s in dim_values: single_slice = self.data.loc[item_s] self.slice = pd.concat([self.slice, single_slice]) if not isinstance(self.slice.index, pd.MultiIndex): self.slice.drop_duplicates(inplace=True) self.slice.sort_index(inplace=True) # Column Search if col_search_dims != [] and col_search_values != []: col_slice = pd.DataFrame() col_slice_df = self.slice.copy() for dim in col_search_dims: for val in col_search_values: if isinstance(val, list): for sub_val in val: single_slice = col_slice_df[ col_slice_df[dim] == sub_val ] col_slice = pd.concat([col_slice, single_slice]) else: single_slice = col_slice_df[col_slice_df[dim] == val] col_slice = pd.concat([col_slice, single_slice]) col_slice_df = col_slice self.slice = col_slice # Column axis slicing if col_range != None: self.slice = self.slice.loc[:, col_range[0] : col_range[1]] if col_list != None: self.slice = self.slice.loc[:, col_list] return self.slice
[docs] def column_slice( self, dims=None, dim_values=None, col_range=None, col_list=None, data_obj="data", append_to=False, ): dims = dims if isinstance(dims, list) else [dims] dim_values = dim_values if isinstance(dim_values, list) else [dim_values] if data_obj == "data": if append_to == False: self.slice = pd.DataFrame() if dims == None or dims == [None]: self.slice = self.data.copy()
[docs] def time_slice( self, dim=None, start_dt=None, end_dt=None, idx_period=None, col_period=None, data_obj="data", ): if data_obj == "data": original_order = list(self.data.index.names) if dim != None: slice_order = original_order.copy() slice_order.remove(dim) slice_order.insert(0, dim) self.reorder_dimensions(slice_order, "data") self.slice = self.data.xs(slice(start_dt, end_dt), 0, dim, drop_level=False) self.reorder_dimensions(original_order, "data") else: self.slice = self.data.copy() if idx_period != None: self.slice = self.slice.resample(idx_period, 0, level=dim).sum() if col_period != None: self.slice = self.slice.resample(col_period, 1).sum() if isinstance(self.slice.index, pd.MultiIndex): self.reorder_dimensions(original_order, "slice") return self.slice elif data_obj == "slice": original_order = list(self.slice.index.names) if dim != None: slice_order = original_order.copy() slice_order.remove(dim) slice_order.insert(0, dim) self.reorder_dimensions(slice_order, "slice") self.slice = self.slice.xs( slice(start_dt, end_dt), 0, dim, drop_level=False ) if idx_period != None: self.slice = self.slice.resample(idx_period, 0, level=dim).sum() if col_period != None: self.slice = self.slice.resample(col_period, 1).sum() if isinstance(self.slice.index, pd.MultiIndex): self.reorder_dimensions(original_order, "slice") return self.slice
[docs] def keyword_slice(self, keywords, dims=None, data_obj="data", append_to=False): """ Slice data based on string fragment. Function will search the index & data and return any rows with the search string. :param keywords: String object of which to search. I.e. 'Rent', 'LLC', 'Smith' :param dims: String or list object of specific dimensions to search. Blank will search all dimensions & data. :param data_obj: Which data object you want o effect. Default: 'data'. Available - 'data', 'slice' :param append_to: True indicates new search will be appended to existing search results. False means it won't. :return: self.slice """ if data_obj == "data": keywords = [keywords] if isinstance(keywords, str) else keywords dims = [dims] if isinstance(dims, str) else dims word_slice = self.data.copy() original_index_order = word_slice.index.names if not dims: dims = original_index_order.copy() for dim_x in dims: if dim_x in self.data.index.names: word_slice.reset_index(dim_x, inplace=True) if not isinstance(word_slice.index, pd.MultiIndex): word_slice.index.name = 'TEMP_INDEX' keyword_matrix = pd.DataFrame() for col in dims: for word in keywords: keyword_result = pd.DataFrame() try: keyword_result = word_slice[ (word_slice[col].str.contains(word, case=False, na=False)) ] word_slice.drop(keyword_result.index, inplace=True) except: pass keyword_matrix = pd.concat([keyword_matrix, keyword_result]) elif data_obj == "slice": keywords = [keywords] if isinstance(keywords, str) else keywords dims = [dims] if isinstance(dims, str) else dims word_slice = self.slice.copy() original_index_order = word_slice.index.names if not dims: dims = original_index_order.copy() for dim_x in dims: if dim_x in self.data.index.names: word_slice.reset_index(dim_x, inplace=True) if not isinstance(word_slice.index, pd.MultiIndex): word_slice.index.name = 'TEMP_INDEX' keyword_matrix = pd.DataFrame() for col in dims: for word in keywords: keyword_result = pd.DataFrame() try: keyword_result = word_slice[ (word_slice[col].str.contains(word, case=False, na=False)) ] word_slice.drop(keyword_result.index, inplace=True) except: pass keyword_matrix = pd.concat([keyword_matrix, keyword_result]) if keyword_matrix.index.name == 'TEMP_INDEX': dims_return = [x for x in original_index_order if x in dims] if dims_return: keyword_matrix.set_index(dims_return, inplace=True) else: dims_return = [x for x in original_index_order if x in dims] if dims_return: keyword_matrix.set_index(dims_return, append=True, inplace=True) if append_to == True: self.slice = pd.concat([self.slice, keyword_matrix]) else: self.slice = keyword_matrix if isinstance(self.slice.index, pd.MultiIndex): self.reorder_dimensions(original_index_order, "slice") return self.slice
[docs] def keyword_replace(self, target_words, replace_words, dims=None, data_obj="data"): """ Replace a word or words in the index. For example, if you wanted to replace ['Payroll', 'East'] with ['Salaries & Wages', 'Northeast']. :param target_words: String or list of word(s) to be replace. I.e. 'Software' or ['Payroll', 'East']. :param replace_words: String or list of word(s) to insert. I.e. '3rd Party Code' or ['Salaries & Wages', 'Northeast']. :param dims: String or list of specific dimensions to effect. :param data_obj: Which data object you want o effect. Default: 'data'. Available - 'data', 'slice' :return: data_obj """ if data_obj == "data": target_words = ( [target_words] if isinstance(target_words, str) else target_words ) replace_words = ( [replace_words] if isinstance(replace_words, str) else replace_words ) original_index_order = self.data.index.names if dims == None: dims = original_index_order.copy() dims.remove("Data_Block") self.data.reset_index(dims, inplace=True) for word, rword in zip(target_words, replace_words): for col in self.data.columns: try: self.data[col] = self.data[col].str.replace(word, rword) except: pass self.data.set_index(dims, append=True, inplace=True) self.reorder_dimensions(original_index_order, "data") return self.data elif data_obj == "slice": target_words = ( [target_words] if isinstance(target_words, str) else target_words ) replace_words = ( [replace_words] if isinstance(replace_words, str) else replace_words ) original_index_order = self.slice.index.names if dims == None: dims = original_index_order.copy() dims.remove("Data_Block") self.slice.reset_index(dims, inplace=True) for word, rword in zip(target_words, replace_words): for col in self.slice.columns: try: self.slice[col] = self.slice[col].str.replace(word, rword) except: pass self.slice.set_index(dims, append=True, inplace=True) self.reorder_dimensions(original_index_order, "slice") return self.slice
[docs] def make_records_for_pivot(self, data_obj="data"): """ Change the data object to a records format so it can be pasted into Excel in a pivot table friendly format. Only works if the column index is a single level. :param data_obj: Which data object you want o effect. Default: 'data'. Available - 'block', 'data', 'slice', 'consolidation', 'function_result', 'variance' :return: """ if data_obj == "data": df_records = self.data.stack() df_records.to_clipboard() return df_records elif data_obj == "slice": df_records = self.slice.stack() df_records.to_clipboard() return df_records elif data_obj == "block": df_records = self.block.stack() df_records.to_clipboard() return df_records elif data_obj == "variance": df_records = self.variance.stack() df_records.to_clipboard() return df_records elif data_obj == "consolidation": df_records = self.consolidation.stack() df_records.to_clipboard() return df_records elif data_obj == "function_result": df_records = self.consolidation.stack() df_records.to_clipboard() return df_records else: print("no object")
[docs] def save_project(self, prj_name=None, path_name=None): """ Saves a project as either a directory with pickle files or as a json object. :param prj_name: String object for name of the project. Will make a directory in path_name if it doesn't exist. :param path_name: Path to directory or a full file path for json. I.e. for json file 'C:/Budgets/Budgets v1.json' or 'C:/Budgets/' for normal save :return: nothing """ if path_name == None: path_name = os.getcwd() if path_name[-4:] == "json": container = pd.DataFrame( index=["data", "data_idx", "meta", "accounts", "accts_idx"], columns=["Load"], ) container.loc["data", "Load"] = self.data.to_json(orient="index") container.loc["data_idx", "Load"] = self.data.index.to_frame().to_json( orient="index" ) container.loc["meta", "Load"] = self.meta_block.to_json(orient="index") container.loc["accounts", "Load"] = self.accounts.to_json(orient="index") container.loc["accts_idx", "Load"] = self.accounts.index.to_frame().to_json( orient="index" ) container.to_json(path_name, orient="index") else: if path_name != None: path_name = path_name.replace('\\', '/') if path_name[-1] != '/': path_name = path_name + '/' if path_name == None: path_name = os.getcwd() path_name = path_name.replace('\\', '/') if not os.path.isdir(prj_name): os.mkdir(prj_name) else: if not os.path.isdir(path_name + prj_name): os.mkdir(path_name + prj_name) self.data.to_pickle(path_name + prj_name + "/" + prj_name + " - data.pkl") self.meta_block.to_pickle( path_name + prj_name + "/" + prj_name + " - meta_block.pkl" ) self.accounts.to_pickle( path_name + prj_name + "/" + prj_name + " - accounts.pkl" )
# TODO: Add and test compression. SAVE & LOAD
[docs] def load_project(self, path_name): """ Imports the project by filling the 'data', 'meta_block' and 'accounts' data objects. :param path_name: Path to directory for normal save or to the json file. I.e. for json file 'C:/Budgets/Budgets v1.json' or 'C:/Budgets/' for normal load. :return: project """ if path_name[-4:] == "json": load_container = pd.read_json(path_name, orient="index") self.data = pd.read_json(load_container.loc["Load", "data"], orient="index") data_idx = pd.read_json( load_container.loc["Load", "data_idx"], orient="index" ) self.data.index = pd.MultiIndex.from_frame(data_idx) self.meta_block = pd.read_json( load_container.loc["Load", "meta"], orient="index", convert_axes=False ) self.meta_block.index = self.meta_block.index.astype("int") self.accounts = pd.read_json( load_container.loc["Load", "accounts"], orient="index" ) accounts_idx = pd.read_json( load_container.loc["Load", "accts_idx"], orient="index" ) self.data.index = pd.MultiIndex.from_frame(data_idx) else: if path_name != None: path_name = path_name.replace('\\', '/') if (os.getcwd()[-1] != "\\") or (os.getcwd()[-1] != "/"): path_name = path_name + "/" file_list = pd.Series(os.listdir(path_name)) f_name = file_list[file_list.str.contains("- data.pkl")].iloc[0] data_df = pd.read_pickle(path_name + f_name) self.data = data_df.copy() f_name = file_list[file_list.str.contains("- meta_block.pkl")].iloc[0] meta_df = pd.read_pickle(path_name + f_name) self.meta_block = meta_df.copy() f_name = file_list[file_list.str.contains("- accounts.pkl")].iloc[0] accounts_df = pd.read_pickle(path_name + f_name) self.accounts = accounts_df
[docs] def slice_to_project(self, prj_name=None, path_name=None): """ Saves the slice data object as a project as either a directory with pickle files or as a json object. :param prj_name: String object for name of the project. Will make a directory in path_name if it doesn't exist. :param path_name: Path to directory or a full file path for json. I.e. for json file 'C:/Budgets/Budgets v1.json' or 'C:/Budgets/' for normal save :return: nothing """ if path_name == None: path_name = os.getcwd() if path_name[-4:] == "json": container = pd.DataFrame( index=["data", "data_idx", "meta", "accounts", "accts_idx"], columns=["Load"], ) container.loc["data", "Load"] = self.slice.to_json(orient="index") container.loc["data_idx", "Load"] = self.data.index.to_frame().to_json( orient="index" ) container.loc["meta", "Load"] = self.meta_block.to_json(orient="index") container.loc["accounts", "Load"] = self.accounts.to_json(orient="index") container.loc["accts_idx", "Load"] = self.accounts.index.to_frame().to_json( orient="index" ) container.to_json(path_name, orient="index") else: if path_name != None: path_name = path_name.replace('\\', '/') if path_name[-1] != '/': path_name = path_name + '/' if path_name == None: path_name = os.getcwd() path_name = path_name.replace('\\', '/') if not os.path.isdir(prj_name): os.mkdir(prj_name) else: if not os.path.isdir(path_name + prj_name): os.mkdir(path_name + prj_name) self.slice.to_pickle(path_name + prj_name + "/" + prj_name + " - data.pkl") self.meta_block.to_pickle( path_name + prj_name + "/" + prj_name + " - meta_block.pkl" ) self.accounts.to_pickle( path_name + prj_name + "/" + prj_name + " - accounts.pkl" )
[docs] def make_pivot_table( self, value_col, index_names, col_names=None, data_obj="data", function="sum", totals=True, total_names="Total", ): if data_obj == "data": df = self.data.copy() df.reset_index(inplace=True) self.pivot = pd.pivot_table( df, value_col, index_names, col_names, aggfunc=function, margins=totals, margins_name=total_names, ) return self.pivot elif data_obj == "slice": df = self.slice.copy() df.reset_index(inplace=True) self.pivot = pd.pivot_table( df, value_col, index_names, col_names, aggfunc=function, margins=True, margins_name=total_names, ) return self.pivot elif data_obj == "consolidation": df = self.consolidation.copy() df.reset_index(inplace=True) self.pivot = pd.pivot_table( df, value_col, index_names, col_names, aggfunc=function, margins=True, margins_name=total_names, ) return self.pivot elif data_obj == "function_result": df = self.function_result.copy() df.reset_index(inplace=True) self.pivot = pd.pivot_table( df, value_col, index_names, col_names, aggfunc=function, margins=True, margins_name=total_names, ) return self.pivot
[docs] def variance_analysis(self, dim_name, dim1, dim2, data_obj="data"): """ Returns and difference and percent difference analysis based on two items with in a dimension. I.e. x.variance_analysis('Type', 'Actual', 'Budget') will produce the 'variance' data object with amount and percent differences. :param dim_name: String object of the name of dimension. I.e. 'Type' or 'Region'. :param dim1: String object of the name of dimension item for first part of calculation. I.e. 'Actual' or 'North'. :param dim2: String object of the name of dimension item for comparison. I.e. 'Budget' or 'South'. :param data_obj: Which data object you want o effect. Default: 'data'. Available - 'data', 'slice' :return: self.variance """ if data_obj == "data": df1 = self.data.xs(dim1, level=dim_name) df1 = df1.droplevel("Data_Block") df2 = self.data.xs(dim2, level=dim_name) df2 = df2.droplevel("Data_Block") if data_obj == "slice": df1 = self.slice.xs(dim1, level=dim_name) df1 = df1.droplevel("Data_Block") df2 = self.slice.xs(dim2, level=dim_name) df2 = df2.droplevel("Data_Block") amt_var = df1 - df2 pct_var = df1.divide(df2) - 1 self.variance = pd.concat([amt_var, pct_var], keys=["Amount", "Percent"]) return self.variance
# TODO: refine - percent?
[docs] def multiply_dim( self, dim_name, dim_vals=None, calc_name="New Item", data_obj="slice" ): """ Multiply two or more dimension items. For example, multiply units x price x discount with x.multiply_dim('Basis', ['Units', 'Price', 'Discount'] :param dim_name: String object of dimension name. I.e. 'Basis'. :param dim_vals: List object of dimension items to multiply. I.e. ['Units', 'Price', 'Discount'] :param calc_name: String object of new dimension item name. I.e. 'Total_Revenue' :param data_obj: Which data object you want o effect. Default: 'slice'. Available - 'data', 'slice' :return: self.function_result """ if data_obj == "data": df = pd.DataFrame() if dim_vals == None: dim_vals = set(list(self.data.index.get_level_values(dim_name))) for item in dim_vals: df1 = self.data.xs(item, level=dim_name) df1 = df1.droplevel("Data_Block") df1_idx = df1.index df1.reset_index(drop=True, inplace=True) if df.empty: df = df1 else: df = df * df1 df.index = df1_idx if data_obj == "slice": df = pd.DataFrame() if dim_vals == None: dim_vals = set(list(self.slice.index.get_level_values(dim_name))) for item in dim_vals: df1 = self.slice.xs(item, level=dim_name) df1 = df1.droplevel("Data_Block") df1_idx = df1.index df1.reset_index(drop=True, inplace=True) if df.empty: df = df1 else: df = df * df1 df.index = df1_idx self.function_result= df if not isinstance(dim_name, list): dim_name = [dim_name] if not isinstance(calc_name, list): calc_name = [calc_name] self.add_dimensions(dim_name, calc_name, data_obj="function_result") self.import_xl(self.function_result.copy()) return self.function_result
[docs] def sum_dim(self, dim_name, dim_vals=None, calc_name="New Item", data_obj="slice"): """ Add two or more dimension items. For example, 'Rent' + 'Office Supplies' with FPA_OBJECT.sum_dim('Line Item', ['Rent', 'Office Supplies'] :param dim_name: String object of dimension name. I.e. 'Line Item'. :param dim_vals: List object of dimension items to multiply. I.e. ['Rent', 'Office Supplies'] :param calc_name: String object of new dimension item name. I.e. 'Office Expense' :param data_obj: Which data object you want o effect. Default: 'slice'. Available - 'data', 'slice' :return: self.function_result """ if data_obj == "data": df = pd.DataFrame() if isinstance(dim_vals, str): dim_vals = [dim_vals] if dim_vals == None: dim_vals = set(list(self.data.index.get_level_values(dim_name))) for item in dim_vals: df1 = self.data.xs(item, level=dim_name) df1 = df1.droplevel("Data_Block") df1_idx = df1.index df1.reset_index(drop=True, inplace=True) if df.empty: df = df1 else: df = df + df1 df.index = df1_idx if data_obj == "slice": df = pd.DataFrame() if isinstance(dim_vals, str): dim_vals = [dim_vals] if dim_vals == None: dim_vals = set(list(self.slice.index.get_level_values(dim_name))) for item in dim_vals: df1 = self.slice.xs(item, level=dim_name) df1 = df1.droplevel("Data_Block") df1_idx = df1.index df1.reset_index(drop=True, inplace=True) if df.empty: df = df1 else: df = df + df1 df.index = df1_idx self.function_result= df if not isinstance(dim_name, list): dim_name = [dim_name] if not isinstance(calc_name, list): calc_name = [calc_name] self.add_dimensions(dim_name, calc_name, data_obj="function_result") self.import_xl(self.function_result.copy()) return self.function_result
[docs] def subtract_dim( self, dim_name, dim_vals=None, calc_name="New Item", data_obj="slice" ): """ Subtract two or more dimension items. For example, 'Total_Revenue' - 'COGS' with FPA_OBJECT.subtract_dim('IS_Category', ['Total_Revenue', 'COGS'] :param dim_name: String object of dimension name. I.e. 'IS_Category'. :param dim_vals: List object of dimension items to multiply. I.e. ['Total_Revenue', 'COGS'] :param calc_name: String object of new dimension item name. I.e. 'Gross Profit' :param data_obj: Which data object you want o effect. Default: 'slice'. Available - 'data', 'slice' :return: self.function_result """ if data_obj == "data": df = pd.DataFrame() if isinstance(dim_vals, str): dim_vals = [dim_vals] if dim_vals == None: dim_vals = set(list(self.data.index.get_level_values(dim_name))) for item in dim_vals: df1 = self.data.xs(item, level=dim_name) df1 = df1.droplevel("Data_Block") df1_idx = df1.index df1.reset_index(drop=True, inplace=True) if df.empty: df = df1 else: df = df - df1 df.index = df1_idx if data_obj == "slice": df = pd.DataFrame() if isinstance(dim_vals, str): dim_vals = [dim_vals] if dim_vals == None: dim_vals = set(list(self.slice.index.get_level_values(dim_name))) for item in dim_vals: df1 = self.slice.xs(item, level=dim_name) df1 = df1.droplevel("Data_Block") df1_idx = df1.index df1.reset_index(drop=True, inplace=True) if df.empty: df = df1 else: df = df - df1 df.index = df1_idx self.function_result= df if not isinstance(dim_name, list): dim_name = [dim_name] if not isinstance(calc_name, list): calc_name = [calc_name] self.add_dimensions(dim_name, calc_name, data_obj="function_result") self.import_xl(self.function_result.copy()) return self.function_result
[docs] def remove_duplicates(self, based_on=None, keep_item="last", data_obj="data"): """ Deletes repetitive records based on the index. Can be filtered down by dimensions. :param based_on: String or list object of dimensions to be used. If blank it will all dimensions. :param keep_item: String object of which records to keep. I.e. either 'last' or 'first'. :param data_obj: Which data object you want o effect. Default: 'data'. Available - 'data', 'block', 'slice' :return: data object without duplicate records """ if data_obj == "data": lx = list(self.data.index.names) index_order = list(self.data.index.names) lx.remove("Data_Block") self.data.reset_index(level=lx, inplace=True) if based_on == None: based_on = list(self.data.columns) if isinstance(based_on, str): based_on = [based_on] self.data.drop_duplicates(subset=based_on, keep=keep_item, inplace=True) self.data.set_index(lx, append=True, inplace=True) self.reorder_dimensions(index_order, "data") return self.data elif data_obj == "block": lx = list(self.block.index.names) index_order = list(self.block.index.names) lx.remove("Data_Block") self.block.reset_index(level=lx, inplace=True) if based_on == None: based_on = list(self.block.columns) if isinstance(based_on, str): based_on = [based_on] self.block.drop_duplicates(subset=based_on, keep=keep_item, inplace=True) self.block.set_index(lx, append=True, inplace=True) self.reorder_dimensions(index_order, "block") return self.block elif data_obj == "slice": lx = list(self.slice.index.names) index_order = list(self.slice.index.names) lx.remove("Data_Block") self.slice.reset_index(level=lx, inplace=True) if based_on == None: based_on = list(self.slice.columns) if isinstance(based_on, str): based_on = [based_on] self.slice.drop_duplicates(subset=based_on, keep=keep_item, inplace=True) self.slice.set_index(lx, append=True, inplace=True) self.reorder_dimensions(index_order, "block") return self.slice
[docs] def get_duplicates(self, based_on=None, data_obj="data"): """ Retrieves repetitive records based on the index. Can be filtered down by dimensions. :param based_on: String or list object of dimensions to be used. If blank it will all dimensions. :param data_obj: Which data object you want o effect. Default: 'data'. Available - 'data' :return: self.function_result """ if data_obj == "data": lx = list(self.data.index.names) index_order = list(self.data.index.names) lx.remove("Data_Block") self.function_result= self.data.copy() self.function_result.reset_index(level=lx, inplace=True) if based_on == None: based_on = list(self.function_result.columns) if isinstance(based_on, str): based_on = [based_on] try: self.function_result= pd.concat( z for _, z in self.function_result.groupby(based_on) if len(z) > 1 ) except ValueError as e: if str(e) == "No objects to concatenate": print("No duplicates.") pass self.function_result.set_index(lx, append=True, inplace=True) self.reorder_dimensions(index_order, "function_result") return self.function_result
def _align_indicies(self): x = self.block.index.names y = self.data.index.names z = [item in x for item in y] w = zip(y, z) for item in w: if item[1] == False: self.add_dimensions([item[0]], ["na"], data_obj='block') z = [item in y for item in x] w = zip(x, z) for item in w: if item[1] == False: self.add_dimensions([item[0]], ["na"], 1, "data")