119 lines
3.5 KiB
Python
119 lines
3.5 KiB
Python
#All Rights Reserved John Salguero
|
|
#Starts the backend to my Youtube stream
|
|
|
|
import json
|
|
import asyncio
|
|
import logging
|
|
from problem_generator import generate_problem
|
|
from steps_generator import generate_steps
|
|
from sympy import init_printing, sympify
|
|
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
|
|
from fastapi.staticfiles import StaticFiles
|
|
from fastapi.responses import RedirectResponse
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s [%(levelname)s] %(message)s",
|
|
handlers=[
|
|
logging.FileHandler("app.log"),
|
|
logging.StreamHandler() # prints to console
|
|
]
|
|
)
|
|
|
|
log = logging.getLogger(__name__)
|
|
app = FastAPI()
|
|
clients = []
|
|
|
|
app.mount("/static", StaticFiles(directory="www"), name="static")
|
|
@app.get("/static")
|
|
def static_root():
|
|
return RedirectResponse(url="/static/index.html")
|
|
|
|
@app.websocket("/ws")
|
|
async def websocket_endpoint(ws: WebSocket):
|
|
await ws.accept()
|
|
clients.append(ws)
|
|
|
|
try:
|
|
while True:
|
|
data = json.loads(await ws.receive_text())
|
|
|
|
log.info(f"Received from browser: {data}")
|
|
|
|
if data.get("type") == "query_problem":
|
|
problem = generate_problem_message()
|
|
await ws.send_text(json.dumps(problem))
|
|
|
|
except WebSocketDisconnect:
|
|
log.info("Client disconnected")
|
|
except Exception as e:
|
|
log.error(f"Unexpected error: {e}")
|
|
finally:
|
|
if ws in clients:
|
|
clients.remove(ws)
|
|
|
|
# example loop
|
|
def generate_problem_message():
|
|
problem_solved = False
|
|
problem = {}
|
|
while not problem_solved:
|
|
problem["type"] = "generated_problem"
|
|
problem["data"] = generate_problem()
|
|
problem["time"] = get_time_for_problem(problem["data"])
|
|
problem["data"]["steps"] = generate_steps(problem["data"]);
|
|
problem_solved = check_solution(problem["data"]["steps"][-1]["after"], problem["data"]["solution"])
|
|
if not problem_solved:
|
|
log.warning(f"issue with problem: {problem}")
|
|
|
|
return problem
|
|
|
|
def get_time_for_problem(problem):
|
|
return 10
|
|
|
|
def main():
|
|
problem_solved = False
|
|
while not problem_solved:
|
|
problem = generate_problem()
|
|
problem["steps"] = generate_steps(problem);
|
|
problem_solved = check_solution(problem["steps"][-1]["after"], problem["solution"])
|
|
if not problem_solved:
|
|
print(f"issue with problem: {problem}")
|
|
|
|
return pretty_print_steps(problem["steps"])
|
|
|
|
def check_solution(got, solution):
|
|
values = set([sympify(r.split("=")[1].strip()) for r in got.split(",")])
|
|
if is_iterable(solution) and not isinstance(solution, str):
|
|
solutions = set([sympify(r) for r in solution])
|
|
else:
|
|
solutions_list = []
|
|
solutions_list.append(sympify(solution))
|
|
solutions = set(solutions_list)
|
|
return solutions == values
|
|
|
|
|
|
def pretty_print_steps(steps):
|
|
print("\n" + "=" * 50)
|
|
|
|
for i, step in enumerate(steps, start=1):
|
|
print(f"\nStep {i}")
|
|
print("-" * 50)
|
|
print(f"Before: {step.get('before', '')}")
|
|
print(f"After: {step.get('after', '')}")
|
|
|
|
for key in step:
|
|
if key not in ("before", "after"):
|
|
print(f"{key.capitalize()}: {step[key]}")
|
|
|
|
print("\n" + "=" * 50)
|
|
|
|
def is_iterable(obj):
|
|
try:
|
|
iter(obj)
|
|
return True
|
|
except TypeError:
|
|
return False
|
|
|
|
#Starts the program
|
|
if __name__ == "__main__":
|
|
main() |