Files
Youtube_Math/main.py
2026-05-05 15:43:42 -04:00

119 lines
3.5 KiB
Python

#All Rights Reserved John Salguero
#Starts the backend to my Youtube stream
import json
import asyncio
import logging
from problem_generator import generate_problem
from steps_generator import generate_steps
from sympy import init_printing, sympify
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.staticfiles import StaticFiles
from fastapi.responses import RedirectResponse
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[
logging.FileHandler("app.log"),
logging.StreamHandler() # prints to console
]
)
log = logging.getLogger(__name__)
app = FastAPI()
clients = []
app.mount("/static", StaticFiles(directory="www"), name="static")
@app.get("/static")
def static_root():
return RedirectResponse(url="/static/index.html")
@app.websocket("/ws")
async def websocket_endpoint(ws: WebSocket):
await ws.accept()
clients.append(ws)
try:
while True:
data = json.loads(await ws.receive_text())
log.info(f"Received from browser: {data}")
if data.get("type") == "query_problem":
problem = generate_problem_message()
await ws.send_text(json.dumps(problem))
except WebSocketDisconnect:
log.info("Client disconnected")
except Exception as e:
log.error(f"Unexpected error: {e}")
finally:
if ws in clients:
clients.remove(ws)
# example loop
def generate_problem_message():
problem_solved = False
problem = {}
while not problem_solved:
problem["type"] = "generated_problem"
problem["data"] = generate_problem()
problem["time"] = get_time_for_problem(problem["data"])
problem["data"]["steps"] = generate_steps(problem["data"]);
problem_solved = check_solution(problem["data"]["steps"][-1]["after"], problem["data"]["solution"])
if not problem_solved:
log.warning(f"issue with problem: {problem}")
return problem
def get_time_for_problem(problem):
return 10
def main():
problem_solved = False
while not problem_solved:
problem = generate_problem()
problem["steps"] = generate_steps(problem);
problem_solved = check_solution(problem["steps"][-1]["after"], problem["solution"])
if not problem_solved:
print(f"issue with problem: {problem}")
return pretty_print_steps(problem["steps"])
def check_solution(got, solution):
values = set([sympify(r.split("=")[1].strip()) for r in got.split(",")])
if is_iterable(solution) and not isinstance(solution, str):
solutions = set([sympify(r) for r in solution])
else:
solutions_list = []
solutions_list.append(sympify(solution))
solutions = set(solutions_list)
return solutions == values
def pretty_print_steps(steps):
print("\n" + "=" * 50)
for i, step in enumerate(steps, start=1):
print(f"\nStep {i}")
print("-" * 50)
print(f"Before: {step.get('before', '')}")
print(f"After: {step.get('after', '')}")
for key in step:
if key not in ("before", "after"):
print(f"{key.capitalize()}: {step[key]}")
print("\n" + "=" * 50)
def is_iterable(obj):
try:
iter(obj)
return True
except TypeError:
return False
#Starts the program
if __name__ == "__main__":
main()