Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,16 @@ started, follow these steps:

#### Message Queue Settings

| Name | Default | Description |
|-------------------------------------------|----------------------|--------------------------------------------------------------------------------------|
| `BLOCKSEMBLER_MESSAGE_QUEUE_URL` | `localhost` | URL of the RabbitMQ message broker used for communication between services. |
| `BLOCKSEMBLER_MESSAGE_QUEUE_USER` | `blocksembler` | Username used to authenticate with the RabbitMQ message broker. |
| `BLOCKSEMBLER_MESSAGE_QUEUE_PASSWORD` | `blocksembler` | Password used to authenticate with the RabbitMQ message broker. |
| `BLOCKSEMBLER_GRADING_RESPONSE_QUEUE_TTL` | `1000*60*15 (15min)` | Time-to-live (TTL) in milliseconds for grading response queues before auto-deletion. |
| Name | Default | Description |
|-------------------------------------------|---------------------------------|-----------------------------------------------------------------|
| `BLOCKSEMBLER_MQ_URL` | `localhost` | Hostname or URL of the RabbitMQ broker. |
| `BLOCKSEMBLER_MQ_PORT` | `5672` | Port on which the RabbitMQ broker is listening. |
| `BLOCKSEMBLER_MQ_USER` | `blocksembler` | Username for authenticating with the RabbitMQ broker. |
| `BLOCKSEMBLER_MQ_PWD` | `blocksembler` | Password for authenticating with the RabbitMQ broker. |
| `BLOCKSEMBLER_MQ_EXCHANGE_NAME` | `blocksembler-grading-exchange` | Name of the RabbitMQ exchange used for publishing grading jobs. |
| `BLOCKSEMBLER_MQ_GRADING_JOB_QUEUE` | `grading-jobs` | Name of the RabbitMQ queue that receives grading jobs. |
| `BLOCKSEMBLER_MQ_GRADING_JOB_ROUTING_KEY` | `grading.job.created` | Routing key used to bind the grading job queue to the exchange. |
|

## Contributing

Expand Down
10 changes: 1 addition & 9 deletions app/api/schema/grading.py → app/api/schema/grading_job.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from datetime import datetime
from typing import Optional

from pydantic import BaseModel

Expand All @@ -10,13 +9,6 @@ class ExerciseSubmission(BaseModel):
solution_code: str


class GradingResult(BaseModel):
success: bool
penalty: datetime
feedback: str
hint: Optional[str]


class GradingJobRead(BaseModel):
id: str
tan_code: str
Expand All @@ -25,4 +17,4 @@ class GradingJobRead(BaseModel):
started: datetime
terminated: datetime | None
passed: bool | None
feedback: GradingResult | None
feedback: list[str] | None
14 changes: 7 additions & 7 deletions app/api/v1/grading.py → app/api/v1/grading_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,16 @@
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession

from app.api.schema.grading import ExerciseSubmission, GradingJobRead
from app.config import MESSAGE_QUEUE_EXCHANGE_NAME
from app.api.schema.grading_job import ExerciseSubmission, GradingJobRead
from app.config import MQ_EXCHANGE_NAME, GRADING_JOB_ROUTING_KEY
from app.db.database import get_session
from app.db.model.exercise import ExerciseProgress
from app.db.model.grading import GradingJob
from app.db.model.grading_job import GradingJob
from app.mq.message_queue import get_mq_channel

router = APIRouter(
prefix="/submissions",
tags=["submissions"],
prefix="/grading-jobs",
tags=["grading jobs"],
)


Expand All @@ -34,8 +34,8 @@ async def submit_grading_job(job_msg: dict, session: AsyncSession, ch: AbstractR

job_msg["job_id"] = str(job_msg["job_id"])

exchange = await ch.get_exchange(MESSAGE_QUEUE_EXCHANGE_NAME)
await exchange.publish(Message(body=json.dumps(job_msg).encode('utf-8')), routing_key='grading_jobs')
exchange = await ch.get_exchange(MQ_EXCHANGE_NAME)
await exchange.publish(Message(body=json.dumps(job_msg).encode('utf-8')), routing_key=GRADING_JOB_ROUTING_KEY)


@router.post("/", status_code=status.HTTP_201_CREATED, response_model=str)
Expand Down
15 changes: 10 additions & 5 deletions app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,14 @@
ORIGINS = os.environ.get("BLOCKSEMBLER_ORIGINS", "*").split(',')
BASE_URL = os.environ.get('BLOCKSEMBLER_API_BASE_URL', '')

DATABASE_URL = os.getenv("BLOCKSEMBLER_DB_URI", "postgresql+asyncpg://postgres:postgres@localhost:5432/blocksembler")
DATABASE_URL = os.environ.get("BLOCKSEMBLER_DB_URI",
"postgresql+asyncpg://postgres:postgres@localhost:5432/blocksembler")

MESSAGE_QUEUE_URL = os.environ.get('BLOCKSEMBLER_MESSAGE_QUEUE_URL', 'localhost')
MESSAGE_QUEUE_USER = os.getenv("BLOCKSEMBLER_MESSAGE_QUEUE_USER", "blocksembler")
MESSAGE_QUEUE_PASSWORD = os.getenv("BLOCKSEMBLER_MESSAGE_QUEUE_PASSWORD", "blocksembler")
MESSAGE_QUEUE_EXCHANGE_NAME = os.getenv("BLOCKSEMBLER_MESSAGE_QUEUE_EXCHANGE_NAME", "blocksembler-grading-exchange")
MQ_URL = os.environ.get('BLOCKSEMBLER_MQ_URL', 'localhost')
MQ_PORT = os.environ.get('BLOCKSEMBLER_MQ_PORT', '5672')
MQ_USER = os.environ.get("BLOCKSEMBLER_MQ_USER", "blocksembler")
MQ_PWD = os.environ.get("BLOCKSEMBLER_MQ_PWD", "blocksembler")

MQ_EXCHANGE_NAME = os.environ.get("BLOCKSEMBLER_MQ_EXCHANGE_NAME", "blocksembler-grading-exchange")
GRADING_JOB_QUEUE = os.environ.get('BLOCKSEMBLER_MQ_GRADING_JOB_QUEUE', "grading-jobs")
GRADING_JOB_ROUTING_KEY = os.environ.get('BLOCKSEMBLER_MQ_GRADING_JOB_ROUTING_KEY', 'grading.job.created')
2 changes: 1 addition & 1 deletion app/db/model/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .exercise import Exercise, TestCase
from .grading import GradingJob
from .grading_job import GradingJob
from .logging_event import LoggingEvent
from .tan import Tan
File renamed without changes.
33 changes: 21 additions & 12 deletions app/main.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
import logging
from contextlib import asynccontextmanager

import aio_pika
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

from app.api.v1 import health, logging_event, tan, exercise, grading
from app.config import BASE_URL, DEBUG, ORIGINS, MESSAGE_QUEUE_URL, MESSAGE_QUEUE_USER, MESSAGE_QUEUE_PASSWORD
import app.config as conf
from app.api.v1 import health, logging_event, tan, exercise, grading_job
from app.db.database import create_tables


@asynccontextmanager
async def lifespan(_app: FastAPI):
if DEBUG:
if conf.DEBUG:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.INFO)
Expand All @@ -21,25 +22,33 @@ async def lifespan(_app: FastAPI):
logging.info("create tables [done]")

logging.info("setup message queue exchange [start]")
with amqp.Connection(MESSAGE_QUEUE_URL, userid=MESSAGE_QUEUE_USER, password=MESSAGE_QUEUE_PASSWORD) as c:
ch = c.channel()
ch.exchange_declare('blocksembler-grading-exchange', 'topic', durable=True)
ch.close()

mq_connection_str = f"amqp://{conf.MQ_USER}:{conf.MQ_PWD}@{conf.MQ_URL}:{conf.MQ_PORT}"
connection = await aio_pika.connect_robust(mq_connection_str)

async with connection:
channel = await connection.channel()
exchange = await channel.declare_exchange(conf.MQ_EXCHANGE_NAME, 'topic', durable=True)
queue = await channel.declare_queue(conf.GRADING_JOB_QUEUE, durable=True)

await queue.bind(exchange, routing_key=conf.GRADING_JOB_ROUTING_KEY)
await channel.close()

logging.info("setup message queue exchange [done]")

yield

logging.info("shutting down...")


if DEBUG:
app = FastAPI(root_path=BASE_URL, lifespan=lifespan)
if conf.DEBUG:
app = FastAPI(root_path=conf.BASE_URL, lifespan=lifespan)
else:
app = FastAPI(root_path=BASE_URL, docs_url=None, redoc_url=None, openapi_url=None)
app = FastAPI(root_path=conf.BASE_URL, docs_url=None, redoc_url=None, openapi_url=None)

app.add_middleware(
CORSMiddleware,
allow_origins=ORIGINS,
allow_origins=conf.ORIGINS,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
Expand All @@ -49,4 +58,4 @@ async def lifespan(_app: FastAPI):
app.include_router(health.router)
app.include_router(logging_event.router)
app.include_router(exercise.router)
app.include_router(grading.router)
app.include_router(grading_job.router)
8 changes: 7 additions & 1 deletion app/mq/message_queue.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
import logging

import aio_pika
from aio_pika.abc import AbstractRobustChannel

from app.config import MQ_USER, MQ_PWD, MQ_PORT, MQ_URL


async def get_mq_channel() -> AbstractRobustChannel:
connection = await aio_pika.connect_robust("amqp://blocksembler:blocksembler@localhost:5672")
uri = f"amqp://{MQ_USER}:{MQ_PWD}@{MQ_URL}:{MQ_PORT}"
logging.debug('connect to: ', uri)
connection = await aio_pika.connect_robust(uri)

async with connection:
channel = await connection.channel()
Expand Down
14 changes: 8 additions & 6 deletions tests/test_submission.py → tests/test_grading_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from fastapi.testclient import TestClient
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession

from app.config import GRADING_JOB_ROUTING_KEY
from app.db.database import get_session
from app.main import app
from app.mq.message_queue import get_mq_channel
Expand All @@ -19,7 +20,8 @@ def setup_mocks():
channel_mock.get_exchange = AsyncMock(return_value=exchange_mock)
return channel_mock, exchange_mock

class TestSubmission:

class TestGradingJob:
def setup_method(self):
self.engine = create_async_engine(DB_URI, echo=True, future=True)
self.async_session = async_sessionmaker(self.engine, expire_on_commit=False, class_=AsyncSession)
Expand All @@ -38,18 +40,18 @@ async def get_mq_connection_override() -> AbstractRobustChannel:

client = TestClient(app)

submission = {
exercise_submission = {
"tan_code": "test-tan-1",
"exercise_id": 2,
"solution_code": "addi r0 r0 r0"
}

response = client.post("/submissions", json=submission)
response = client.post("/grading-jobs", json=exercise_submission)

print(response.json())

assert response.status_code == status.HTTP_201_CREATED
exchange_mock.publish.assert_awaited_once_with(ANY, routing_key='grading_jobs')
exchange_mock.publish.assert_awaited_once_with(ANY, routing_key=GRADING_JOB_ROUTING_KEY)

def test_post_invalid_submission(self):
channel_mock, exchange_mock = setup_mocks()
Expand All @@ -62,13 +64,13 @@ async def get_mq_connection_override() -> AbstractRobustChannel:

client = TestClient(app)

submission = {
exercise_submission = {
"tan_code": "non-existing-tan",
"exercise_id": 1,
"solution_code": "addi r0 r0 r0"
}

response = client.post("/submissions", json=submission)
response = client.post("/grading-jobs", json=exercise_submission)

print(response.json())

Expand Down