Source code for template_log_parser.log_functions

import pandas as pd

from io import BytesIO, StringIO, TextIOBase
from pathlib import Path

from typing import Literal, Iterable, Union, Optional

from template_log_parser.definitions import (
    event_data_column,
    event_type_column,
    other_type_column,
    unparsed_text_column,
    SimpleTemplate,
)


[docs] def get_lines_from_file( f: Union[str, Path, BytesIO, StringIO, TextIOBase], ) -> list[str]: """Return a list of strings from a flat file :param f: Path to file or filelike object, most commonly in the format of some_log_process.log :type f: str, Path, BytesIO, StringIO, TextIOBase :return: list of string :rtype: list[str] :raise ValueError: If wrong file type is provided """ if isinstance(f, (str, Path)): with open(f, "r", encoding="utf-8") as file_obj: return file_obj.read().splitlines() elif isinstance(f, BytesIO): f.seek(0) return f.read().decode("utf-8").splitlines() elif isinstance(f, (StringIO, TextIOBase)): f.seek(0) return f.read().splitlines() else: raise ValueError( "Unsupported file type. Must be str, Path, BytesIO, StringIO, or TextIOBase." )
[docs] def parse_function(event: str, templates: list[SimpleTemplate]) -> dict[str, str]: """Return a dictionary of information parsed from a log file string based on matching template. :param event: String data, should ideally match a repeated format throughout a text file :type event: str :param templates: formatted as a list of namedtuple (SimpleTemplate) [(compiled_template, event_type, search_string), ...] :type templates: list[SimpleTemplate] :return: dictionary containing: - event_type along parsed values if successful. Otherwise, {"Unparsed_text": original_text, "event_type": "Other"} :rtype: dict[str, str] """ for template_tuple in templates: if template_tuple.search_string not in event: continue parsed_result = template_tuple.template.parse(event) if parsed_result and len(parsed_result.named) == len( template_tuple.template.named_fields ): output = parsed_result.named output[event_type_column] = template_tuple.event_type return output return {unparsed_text_column: event, event_type_column: other_type_column}
[docs] def filter_line( line: str, match: str | list[str] | None = None, eliminate: str | list[str] | None = None, match_type: Literal["any", "all"] = "any", eliminate_type: Literal["any", "all"] = "any", ) -> bool: """Return True if log file line adheres to filter criteria Eliminate applied second, and therefore supersedes any words in match should conflicts exist. :param line: A single log file line :type line: str :param match: (optional) A single word or list of words must be present within the line otherwise dropped. :type match: str, List[str], None :param eliminate: (optional) A single word or a list of words if present within line will result in it being dropped :type eliminate: str, List[str], None :param match_type: (optional) criteria to determine if any words must be present to match, or all words :type match_type: Literal["any", "all"] :param eliminate_type: (optional) criteria to determine if any words must be present to eliminate, or all words :type eliminate_type: Literal["any", "all"] :return: True if string contains the match criteria and does not contain the eliminate criteria, else False :rtype: bool """ def normalize(value: str | Iterable[str] | None) -> list[str]: if value is None: return [] if isinstance(value, str): return [value] return [str(v) for v in value] def validate(items: list[str], log_line: str, mode: Literal["any", "all"]) -> bool: if mode == "all": return all(item in log_line for item in items) return any(item in log_line for item in items) match_items = normalize(match) eliminate_items = normalize(eliminate) # Return false if the match criteria is not met OR if the eliminate criteria is met if match_items and not validate(match_items, line, match_type): return False if eliminate_items and validate(eliminate_items, line, eliminate_type): return False return True
[docs] def log_pre_process( file: str | BytesIO | StringIO | TextIOBase, templates: list[SimpleTemplate], match: str | list[str] | None = None, eliminate: str | list[str] | None = None, match_type: Literal["any", "all"] = "any", eliminate_type: Literal["any", "all"] = "any", ) -> pd.DataFrame: """ Return a Pandas DataFrame with named columns as specified by templates :param file: Path to file or filelike object, most commonly in the format of some_log_process.log :type file: str, Path, BytesIO, StringIO, TextIOBase :param templates: formatted as a list of namedtuple (SimpleTemplate) [(compiled_template, event_type, search_string), ...] :type templates: list[SimpleTemplate] :param match: (optional) A single word or list of words must be present within the line otherwise dropped. :type match: str, list[str], None :param eliminate: (optional) A single word or a list of words if present within line will result in it being dropped :type eliminate: str, list[str], None :param match_type: (optional) criteria to determine if any words must be present to match, or all words :type match_type: Literal["any", "all"] :param eliminate_type: (optional) criteria to determine if any words must be present to eliminate, or all words :type eliminate_type: Literal["any", "all"] :return: DataFrame with columns found in matching templates :rtype: Pandas.DataFrame :raise ValueError: If wrong file type is provided :note: eliminate applied second, and therefore supersedes any words in match should duplicate criteria exist. """ parsed_results = [] def parse_line(log_line: str) -> None: data = parse_function(log_line, templates) data[event_data_column] = log_line parsed_results.append(data) for line in get_lines_from_file(file): line = line.strip() if match or eliminate: valid_line = filter_line( line=line, match=match, eliminate=eliminate, match_type=match_type, eliminate_type=eliminate_type, ) if valid_line: parse_line(line) else: parse_line(line) df = pd.DataFrame(parsed_results) return df
[docs] def process_log( file: str | BytesIO | StringIO | TextIOBase, templates: list[SimpleTemplate], dict_format: bool = True, datetime_columns: Optional[list[str]] = None, match: str | list[str] | None = None, eliminate: str | list[str] | None = None, match_type: Literal["any", "all"] = "any", eliminate_type: Literal["any", "all"] = "any", ) -> dict[str, pd.DataFrame] | pd.DataFrame: """Return a single Pandas Dataframe or dictionary of DataFrames whose keys are the log file event types, utilizing templates. :param file: Path to file or filelike object, most commonly in the format of some_log_process.log :type file: str, Path, BytesIO, StringIO, TextIOBase :param templates: formatted as a list of namedtuple (SimpleTemplate) [(compiled_template, event_type, search_string), ...] :type templates: list[SimpleTemplate] :param dict_format: Return a dictionary of DataFrames when True, one large DataFrame when False, True by default :type dict_format: (optional) bool :param datetime_columns: (optional) Columns to be converted using Pandas.to_datetime() :type datetime_columns: List[str] :param match: (optional) A single word or list of words must be present within the line otherwise dropped. :type match: str, List[str], None :param eliminate: (optional) A single word or a list of words if present within line will result in it being dropped :type eliminate: str, List[str], None :param match_type: (optional) criteria to determine if any words must be present to match, or all words :type match_type: Literal["any", "all"] :param eliminate_type: (optional) criteria to determine if any words must be present to eliminate, or all words :type eliminate_type: Literal["any", "all"] :return: dict formatted as {'event_type_1': df_1, 'event_type_2': df_2, ...}, Pandas Dataframe will include all event types and all columns :rtype: Dict[str, Pandas.DataFrame], Pandas Dataframe """ # Initial parsing df = log_pre_process( file=file, templates=templates, match=match, eliminate=eliminate, match_type=match_type, eliminate_type=eliminate_type, ) if datetime_columns: for col in datetime_columns: if col in df.columns: try: df[col] = pd.to_datetime(df[col]) except Exception as e: print(f"Error converting column '{col}' to datetime: {e}") if not dict_format: return df else: if df.empty: return {} df_dict = {} # For every unique event_type create a copy df for event_type in df[event_type_column].unique().tolist(): df_dict[event_type] = ( df[df[event_type_column] == event_type].dropna(axis=1, how="all").copy() ) return df_dict