Files
ai-agents/dashboard/app.py
T

203 lines
6.4 KiB
Python

from fastapi import FastAPI, Depends, HTTPException
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
from sqlalchemy.orm import Session
from pydantic import BaseModel
from datetime import datetime, timezone
from typing import Optional
import json
from database import get_db, init_db
from models import Agent, Run
app = FastAPI(title="Agent Command Center", version="1.0.0")
# --- Pydantic schemas ---
class AgentCreate(BaseModel):
id: str
name: str
description: str = ""
schedule: str = "manual"
class AgentUpdate(BaseModel):
name: Optional[str] = None
description: Optional[str] = None
schedule: Optional[str] = None
status: Optional[str] = None
class RunCreate(BaseModel):
status: str = "running"
output: str = ""
error: str = ""
metadata: dict = {}
class RunUpdate(BaseModel):
status: Optional[str] = None
output: Optional[str] = None
error: Optional[str] = None
finished_at: Optional[str] = None
metadata: Optional[dict] = None
# --- API routes ---
@app.get("/api/health")
def health():
return {"status": "ok", "service": "agent-command-center"}
@app.get("/api/agents")
def list_agents(db: Session = Depends(get_db)):
agents = db.query(Agent).all()
result = []
for a in agents:
last_run = db.query(Run).filter(Run.agent_id == a.id).order_by(Run.started_at.desc()).first()
recent_runs = db.query(Run).filter(Run.agent_id == a.id).order_by(Run.started_at.desc()).limit(10).all()
success_streak = 0
for r in recent_runs:
if r.status == "success":
success_streak += 1
else:
break
result.append({
"id": a.id,
"name": a.name,
"description": a.description,
"schedule": a.schedule,
"status": a.status,
"created_at": a.created_at.isoformat() if a.created_at else None,
"last_run": {
"status": last_run.status,
"started_at": last_run.started_at.isoformat() if last_run.started_at else None,
"finished_at": last_run.finished_at.isoformat() if last_run.finished_at else None,
} if last_run else None,
"success_streak": success_streak,
"total_runs": db.query(Run).filter(Run.agent_id == a.id).count(),
})
return result
@app.post("/api/agents")
def create_agent(agent: AgentCreate, db: Session = Depends(get_db)):
existing = db.query(Agent).filter(Agent.id == agent.id).first()
if existing:
raise HTTPException(status_code=409, detail="Agent already exists")
new_agent = Agent(
id=agent.id,
name=agent.name,
description=agent.description,
schedule=agent.schedule,
)
db.add(new_agent)
db.commit()
return {"id": new_agent.id, "status": "created"}
@app.get("/api/agents/{agent_id}")
def get_agent(agent_id: str, db: Session = Depends(get_db)):
agent = db.query(Agent).filter(Agent.id == agent_id).first()
if not agent:
raise HTTPException(status_code=404, detail="Agent not found")
runs = db.query(Run).filter(Run.agent_id == agent_id).order_by(Run.started_at.desc()).limit(50).all()
return {
"id": agent.id,
"name": agent.name,
"description": agent.description,
"schedule": agent.schedule,
"status": agent.status,
"created_at": agent.created_at.isoformat() if agent.created_at else None,
"runs": [{
"id": r.id,
"started_at": r.started_at.isoformat() if r.started_at else None,
"finished_at": r.finished_at.isoformat() if r.finished_at else None,
"status": r.status,
"output": r.output,
"error": r.error,
"metadata": r.metadata_,
} for r in runs],
}
@app.put("/api/agents/{agent_id}")
def update_agent(agent_id: str, update: AgentUpdate, db: Session = Depends(get_db)):
agent = db.query(Agent).filter(Agent.id == agent_id).first()
if not agent:
raise HTTPException(status_code=404, detail="Agent not found")
for field, value in update.model_dump(exclude_none=True).items():
setattr(agent, field, value)
db.commit()
return {"id": agent.id, "status": "updated"}
@app.post("/api/agents/{agent_id}/runs")
def create_run(agent_id: str, run: RunCreate, db: Session = Depends(get_db)):
agent = db.query(Agent).filter(Agent.id == agent_id).first()
if not agent:
raise HTTPException(status_code=404, detail="Agent not found")
new_run = Run(
agent_id=agent_id,
status=run.status,
output=run.output,
error=run.error,
metadata_=run.metadata,
)
if run.status in ("success", "failed"):
new_run.finished_at = datetime.now(timezone.utc)
db.add(new_run)
db.commit()
return {"id": new_run.id, "status": new_run.status}
@app.put("/api/runs/{run_id}")
def update_run(run_id: int, update: RunUpdate, db: Session = Depends(get_db)):
run = db.query(Run).filter(Run.id == run_id).first()
if not run:
raise HTTPException(status_code=404, detail="Run not found")
if update.status is not None:
run.status = update.status
if update.output is not None:
run.output = update.output
if update.error is not None:
run.error = update.error
if update.metadata is not None:
run.metadata_ = update.metadata
if update.finished_at is not None:
run.finished_at = datetime.fromisoformat(update.finished_at)
elif update.status in ("success", "failed"):
run.finished_at = datetime.now(timezone.utc)
db.commit()
return {"id": run.id, "status": run.status}
@app.get("/api/runs")
def list_runs(limit: int = 50, db: Session = Depends(get_db)):
runs = db.query(Run).order_by(Run.started_at.desc()).limit(limit).all()
return [{
"id": r.id,
"agent_id": r.agent_id,
"started_at": r.started_at.isoformat() if r.started_at else None,
"finished_at": r.finished_at.isoformat() if r.finished_at else None,
"status": r.status,
"output": r.output,
"error": r.error,
"metadata": r.metadata_,
} for r in runs]
# --- Static files (frontend) ---
app.mount("/static", StaticFiles(directory="static"), name="static")
@app.get("/")
def root():
return FileResponse("static/index.html")
# --- Startup ---
@app.on_event("startup")
def startup():
init_db()