#All Rights Reserved John Salguero #Starts the backend to my Youtube stream import json import asyncio import logging from problem_generator import generate_problem from steps_generator import generate_steps from sympy import init_printing, sympify from fastapi import FastAPI, WebSocket, WebSocketDisconnect from fastapi.staticfiles import StaticFiles from fastapi.responses import RedirectResponse logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[ logging.FileHandler("app.log"), logging.StreamHandler() # prints to console ] ) log = logging.getLogger(__name__) app = FastAPI() clients = [] app.mount("/static", StaticFiles(directory="www"), name="static") @app.get("/static") def static_root(): return RedirectResponse(url="/static/index.html") @app.websocket("/ws") async def websocket_endpoint(ws: WebSocket): await ws.accept() clients.append(ws) try: while True: data = json.loads(await ws.receive_text()) log.info(f"Received from browser: {data}") if data.get("type") == "query_problem": problem = generate_problem_message() await ws.send_text(json.dumps(problem)) except WebSocketDisconnect: log.info("Client disconnected") except Exception as e: log.error(f"Unexpected error: {e}") finally: if ws in clients: clients.remove(ws) # example loop def generate_problem_message(): problem_solved = False problem = {} while not problem_solved: problem["type"] = "generated_problem" problem["data"] = generate_problem() problem["time"] = get_time_for_problem(problem["data"]) problem["data"]["steps"] = generate_steps(problem["data"]); problem_solved = check_solution(problem["data"]["steps"][-1]["after"], problem["data"]["solution"]) if not problem_solved: log.warning(f"issue with problem: {problem}") return problem def get_time_for_problem(problem): return 10 def main(): problem_solved = False while not problem_solved: problem = generate_problem() problem["steps"] = generate_steps(problem); problem_solved = check_solution(problem["steps"][-1]["after"], problem["solution"]) if not problem_solved: print(f"issue with problem: {problem}") return pretty_print_steps(problem["steps"]) def check_solution(got, solution): values = set([sympify(r.split("=")[1].strip()) for r in got.split(",")]) if is_iterable(solution) and not isinstance(solution, str): solutions = set([sympify(r) for r in solution]) else: solutions_list = [] solutions_list.append(sympify(solution)) solutions = set(solutions_list) return solutions == values def pretty_print_steps(steps): print("\n" + "=" * 50) for i, step in enumerate(steps, start=1): print(f"\nStep {i}") print("-" * 50) print(f"Before: {step.get('before', '')}") print(f"After: {step.get('after', '')}") for key in step: if key not in ("before", "after"): print(f"{key.capitalize()}: {step[key]}") print("\n" + "=" * 50) def is_iterable(obj): try: iter(obj) return True except TypeError: return False #Starts the program if __name__ == "__main__": main()