"""Centralized logging configuration for Maya server.""" import os import sys import json import logging import warnings import traceback from datetime import timezone from loguru import logger from src.config import constants def log_filter(record): msg = record["message"] if "Unable to send message" in msg and "before joining" in msg: return False if record.get("exception"): exc_info = record["exception"] if exc_info and exc_info.value: exc_msg = str(exc_info.value) if "Unable to send message" in exc_msg and "before joining" in exc_msg: return False # Daily SDK noise if "Failed to send LogLine" in msg: return False if "ResponseCanceled" in msg or "ResponseCancelled" in msg: return False if "Metrics failed to get snapshot" in msg: return False if "daily_core" in msg: return False if "CallManagerEvent" in msg: return False if "MediasoupManager" in msg: return False if record["level"].name in ("ERROR", "CRITICAL", "EXCEPTION"): return True name = record.get("name", "") if name and name.startswith(("botocore", "boto3", "urllib3", "s3transfer")): return False if name and name.startswith("openai"): return False if name and name.startswith("httpx"): return False if "Empty audio frame received for STT service" in msg: return False if "Ignoring not RTVI message" in msg: return False if "User stopped speaking but no new aggregation received" in msg: return False if "Unclosed client session" in msg or "Unclosed connector" in msg: return False if "Task was destroyed but it is pending" in msg: return False if "Loading JSON file:" in msg: return False if "Changing event name" in msg: return False if "Request options:" in msg: return False return True CONSOLE_FORMAT = "{time:YYYY-MM-DD HH:mm:ss.SSS} | {level: <8} | {name}:{function}:{line} | {message}" def datadog_sink(message): record = message.record level = record["level"].name timestamp = record["time"].astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z" log_entry = { "timestamp": timestamp, "level": level, "logger": record["name"], "function": record["function"], "line": record["line"], "message": record["message"], "service": "maya-pipecat", "env": os.getenv("DD_ENV", os.getenv("ENVIRONMENT", "development")), "version": os.getenv("DD_VERSION", getattr(constants, 'APP_VERSION', "1.0.0")), } for key, value in record["extra"].items(): if value and key not in ("_maya_configured",): log_entry[key] = value if level in ("ERROR", "CRITICAL", "EXCEPTION"): log_entry["status"] = "error" log_entry["error.kind"] = level if record.get("exception"): exc_info = record["exception"] if exc_info: log_entry["error.type"] = exc_info.type.__name__ if exc_info.type else "Unknown" log_entry["error.message"] = str(exc_info.value) if exc_info.value else "" log_entry["error.stack"] = "".join(traceback.format_exception(exc_info.type, exc_info.value, exc_info.traceback)) else: log_entry["status"] = "info" return json.dumps(log_entry, ensure_ascii=False) + "\n" class DatadogStdoutSink: """Writes JSON-formatted logs to stdout for DD DaemonSet collection.""" def write(self, message): json_line = datadog_sink(message) sys.stdout.write(json_line) sys.stdout.flush() class InterceptHandler(logging.Handler): def emit(self, record): try: level = logger.level(record.levelname).name except ValueError: level = record.levelno frame, depth = sys._getframe(6), 6 while frame and frame.f_code.co_filename == logging.__file__: frame = frame.f_back depth += 1 logger.opt(depth=depth, exception=record.exc_info).log(level, record.getMessage()) def configure_logging(): if hasattr(logger, '_maya_configured'): return os.makedirs(constants.LOG_DIR, exist_ok=True) logger.remove() is_production = bool(os.getenv("DD_AGENT_HOST")) stdout_level = os.getenv("LOG_LEVEL", "INFO" if is_production else "DEBUG") if is_production: json_stdout_sink = DatadogStdoutSink() logger.add( json_stdout_sink.write, level=stdout_level, filter=log_filter, ) else: logger.add( sys.stdout, level="DEBUG", format=CONSOLE_FORMAT, filter=log_filter, colorize=False, enqueue=False, backtrace=True, diagnose=True, ) logger.add( constants.LOG_FILE_PATH, level="INFO", format=CONSOLE_FORMAT, filter=log_filter, rotation=constants.LOG_ROTATION_SIZE, retention=constants.LOG_RETENTION, compression="zip", enqueue=False, backtrace=True, diagnose=True, ) warnings.filterwarnings("default") logging.captureWarnings(True) logging.basicConfig(handlers=[InterceptHandler()], level=logging.DEBUG, force=True) for logger_name in ["uvicorn", "uvicorn.error", "uvicorn.access", "fastapi", "pipecat", "aiohttp"]: logging.getLogger(logger_name).handlers = [InterceptHandler()] logging.getLogger(logger_name).propagate = False logger._maya_configured = True logger.info("=" * 60) logger.info("MAYA PIPECAT SERVER - STARTUP INITIATED") logger.info("=" * 60) logger.info(f"STARTUP_CONFIG: env={os.getenv('ENVIRONMENT', 'development')} host={constants.SERVER_HOST} port={constants.SERVER_PORT}") def log_startup_complete(startup_time_ms: float = None): logger.info("=" * 60) logger.info("MAYA PIPECAT SERVER - STARTUP COMPLETE") if startup_time_ms: logger.info(f"STARTUP_TIME: {startup_time_ms:.2f}ms") logger.info("=" * 60) def log_shutdown(): logger.info("MAYA PIPECAT SERVER - SHUTDOWN INITIATED") def log_error(message: str, error: Exception = None, **context): full_message = f"ERROR: {message}" if error: full_message += f" | error_type={type(error).__name__} error_message={str(error)}" for key, value in context.items(): full_message += f" {key}={value}" if error: logger.opt(exception=error).error(full_message) else: logger.error(full_message)