Source code for termlog.message

import ast
import datetime
import inspect
import json
import re
from dataclasses import asdict, dataclass, field
from pathlib import Path
from typing import Any, Dict, Optional


[docs]def fix_json(obj): try: return json._default_decoder(obj) except Exception: return str(obj)
[docs]@dataclass class Message: data: Any timestamp: datetime.datetime = field(default_factory=datetime.datetime.utcnow) json: bool = False color: bool = False type: Any = str lexer: str = "" time_format: Optional[str] = "%Y%m%d%H%M%S" include_timestamp: bool = True fields: Dict = field(default_factory=dict) def __post_init__(self): if self.color is False: self.data = strip_escape(f"{self.data}") self.time_format = "%Y%m%d%H%M%S" if self.time_format is None else self.time_format self.timestamp = self.timestamp.strftime(self.time_format) self._update_fields() @property def nested_calling_frame_pairs(self): previous = None previous_internal = None package_path = str(Path(__file__).parent) for frame in inspect.getouterframes(inspect.currentframe()): if not frame.code_context: continue internal = True if (package_path in frame.filename and not "tests" in frame.filename) else False if internal: previous = frame previous_internal = internal continue if internal is False and previous_internal is True: if previous: yield frame, previous if internal is True: previous = frame else: previous = None previous_internal = internal @property def calling_frame(self): for caller, nested in self.nested_calling_frame_pairs: return caller @property def calling_frame_code(self): frame_info = self.calling_frame code = "".join(inspect.getsourcelines(frame_info.frame)[0]) return code @property def calling_frame_data(self): frame_data = {} frame = self.calling_frame if frame: members = {k: v for k, v in inspect.getmembers(frame)} key_frame = members["frame"] frame_data.update(key_frame.f_globals) frame_data.update(key_frame.f_locals) return frame_data def _update_fields(self): frame_data = self.calling_frame_data code = self.calling_frame_code parsed = ast.parse(code) queue = parsed.body data = [] fields = [] # Grab field names to get data needed for message count = -1 while queue: count += 1 node = queue.pop(0) ignored = (ast.ImportFrom, ast.Import, ast.Assert) if isinstance(node, (ast.Expr, ast.FormattedValue, ast.Assign, ast.Starred, ast.Attribute)): queue.append(node.value) elif isinstance(node, (ast.Call,)): # TODO: Find a way to capture the colors here queue.extend(node.args) elif isinstance(node, (ast.JoinedStr,)): queue.extend(node.values) elif isinstance(node, (ast.Str,)): data.append(node.s) elif isinstance(node, (ast.Name,)): fields.append(node.id) elif isinstance(node, (ast.BinOp,)): queue.append(node.left) queue.append(node.right) elif isinstance(node, (ast.FunctionDef,)): queue.extend(node.body) elif isinstance(node, ignored): pass else: print("Termlog Warning [Unhandled ast.Node]:", node, ", ".join([d for d in dir(node) if not d.startswith("_")])) if count > 1000: # to prevent a runaway break for name in fields: if name in frame_data: self.fields[name] = frame_data[name] def __eq__(self, other): try: equal = self.data == other.data except AttributeError: equal = self.data == other return equal def __radd__(self, other) -> "Message": message = self try: new_fields = {k: v for k, v in self.fields.items()} if isinstance(other, Message): new_fields.update(other.fields) text = other.data + self.data else: text = other + str(self.data) message = Message( text, timestamp=self.timestamp, json=self.json, color=self.color, lexer=self.lexer, time_format=self.time_format, include_timestamp=self.include_timestamp, fields=new_fields, ) except (AttributeError, TypeError): text = other + str(self.data) message = Message( text, timestamp=self.timestamp, json=self.json, color=self.color, lexer=self.lexer, time_format=self.time_format, include_timestamp=self.include_timestamp, fields=self.fields, ) finally: return message def __add__(self, other) -> "Message": try: new_fields = {k: v for k, v in self.fields.items()} new_fields.update(other.fields) return Message( self.data + other.data, timestamp=self.timestamp, json=self.json, color=self.color, lexer=self.lexer, time_format=self.time_format, include_timestamp=self.include_timestamp, fields=new_fields, ) except AttributeError: return Message( self.data + other, timestamp=self.timestamp, json=self.json, color=self.color, lexer=self.lexer, time_format=self.time_format, include_timestamp=self.include_timestamp, fields=self.fields, ) def __str__(self): from .formatting import beautify data = asdict(self) # ts = '' if not self.include_timestamp else f'{self.timestamp.strftime(self.time_format)} ' ts = "" if not self.include_timestamp else f"{self.timestamp} " if self.json: msg_fields = ["json", "color", "lexer", "type", "time_format", "include_timestamp"] for field_name in msg_fields: data.pop(field_name) for field_name, field_value in data.pop("fields", {}).items(): data[field_name] = field_value if not self.include_timestamp: data.pop("timestamp") string = json.dumps(data, default=fix_json) elif self.lexer and self.color: string = beautify(self.data, lexer=self.lexer) else: string = f"{self.data}" if self.include_timestamp and not self.json: string = f"{ts}{string}" if not self.color: string = strip_escape(string) return str(string)
[docs]def strip_escape(text: str) -> str: """Remove terminal ascii escape sequences from *text*. Args: text: text to strip escape sequences from Returns: text stripped of escape sequences """ # These are all valid escape sequences... they're probably not # inclusive sequences = [ r"\033", r"\x1b", r"\u001b", ] value_sequence = r"\[[0-9;]+m" for escape_sequence in sequences: pattern = f"{escape_sequence}{value_sequence}" replacement = "" text = re.sub(pattern, replacement, text) return text