diff --git a/web/backend/Makefile b/web/backend/Makefile index fed85d1..bd3f0e4 100644 --- a/web/backend/Makefile +++ b/web/backend/Makefile @@ -13,3 +13,11 @@ docker-run: --memory 16g \ --cpus 4 \ ucjl-backend + +test: + clear; julia --threads 1 --procs 1 --project=test -e "using BackendT; runtests()" + +run: + julia --procs 1 --project=. startup.jl + +.PHONY: test diff --git a/web/backend/src/jobs.jl b/web/backend/src/jobs.jl index afbf64a..98f9676 100644 --- a/web/backend/src/jobs.jl +++ b/web/backend/src/jobs.jl @@ -8,50 +8,81 @@ import Base: put! Base.@kwdef mutable struct JobProcessor pending = RemoteChannel(() -> Channel{String}(Inf)) processing = RemoteChannel(() -> Channel{String}(Inf)) + completed = RemoteChannel(() -> Channel{String}(Inf)) shutdown = RemoteChannel(() -> Channel{Bool}(1)) worker_pids = [] worker_tasks = [] work_fn = nothing + master_task = nothing + job_status = Dict() + job_position = Dict() + pending_queue = [] +end + +function update_positions!(processor::JobProcessor) + for (i, job_id) in enumerate(processor.pending_queue) + processor.job_position[job_id] = i + end end function Base.put!(processor::JobProcessor, job_id::String) - return put!(processor.pending, job_id) + put!(processor.pending, job_id) + processor.job_status[job_id] = "pending" + push!(processor.pending_queue, job_id) + update_positions!(processor) end -function isbusy(processor::JobProcessor) - return isready(processor.pending) || isready(processor.processing) +function master_loop(processor) + @info "Starting master loop" + while true + # Check for shutdown signal + if isready(processor.shutdown) + break + end + + # Check for processing jobs + while isready(processor.processing) + job_id = take!(processor.processing) + processor.job_status[job_id] = "processing" + filter!(x -> x != job_id, processor.pending_queue) + delete!(processor.job_position, job_id) + update_positions!(processor) + end + + # Check for completed jobs + while isready(processor.completed) + job_id = take!(processor.completed) + delete!(processor.job_status, job_id) + delete!(processor.job_position, job_id) + end + + sleep(0.1) + end end -function worker_loop(pending, processing, shutdown, work_fn) +function worker_loop(pending, processing, completed, shutdown, work_fn) @info "Starting worker loop" while true # Check for shutdown signal if isready(shutdown) - @info "Shutdown signal received" break end - # Wait for a job with timeout - if !isready(pending) - sleep(0.1) - continue + # Check for pending tasks + if isready(pending) + job_id = take!(pending) + put!(processing, job_id) + @info "Job started: $job_id" + try + work_fn(job_id) + put!(completed, job_id) + catch e + @error "Job failed: job $job_id" + end + @info "Job finished: $job_id" end - # Move job from pending to processing queue - job_id = take!(pending) - put!(processing, job_id) - @info "Job started: $job_id" - - # Run work function - try - work_fn(job_id) - catch e - @error "Job failed: job $job_id" - end - - # Remove job from processing queue - take!(processing) - @info "Job finished: $job_id" + sleep(0.1) end end @@ -66,6 +97,7 @@ function start(processor::JobProcessor) worker_loop( processor.pending, processor.processing, + processor.completed, processor.shutdown, processor.work_fn, ) @@ -73,23 +105,19 @@ function start(processor::JobProcessor) push!(processor.worker_pids, pid) push!(processor.worker_tasks, task) end + + # Start master loop (after spawning workers to avoid serialization issues) + processor.master_task = @async master_loop(processor) + return end function stop(processor::JobProcessor) - # Send shutdown signal (all workers will see it) put!(processor.shutdown, true) - - # Wait for all worker tasks to complete + wait(processor.master_task) for (i, task) in enumerate(processor.worker_tasks) - try - wait(task) - @info "Worker $(processor.worker_pids[i]) stopped" - catch e - @warn "Error waiting for worker $(processor.worker_pids[i])" exception=e - end + wait(task) end - return end diff --git a/web/backend/src/log.jl b/web/backend/src/log.jl index 528c8da..75011ef 100644 --- a/web/backend/src/log.jl +++ b/web/backend/src/log.jl @@ -31,6 +31,8 @@ end function setup_logger() global_logger(TimeLogger()) - @spawn global_logger(TimeLogger()) + for pid in workers() + @spawnat pid global_logger(TimeLogger()) + end return end diff --git a/web/backend/src/server.jl b/web/backend/src/server.jl index 13dabcb..0028df8 100644 --- a/web/backend/src/server.jl +++ b/web/backend/src/server.jl @@ -53,7 +53,7 @@ function submit(req, processor::JobProcessor) return HTTP.Response(200, RESPONSE_HEADERS, response_body) end -function jobs_view(req) +function jobs_view(req, processor) # Extract job_id from URL path /api/jobs/{job_id}/view path_parts = split(req.target, '/') job_id = path_parts[4] @@ -74,9 +74,19 @@ function jobs_view(req) output_path = joinpath(job_dir, "output.json") output_content = isfile(output_path) ? read(output_path, String) : nothing - # Create response JSON - response_data = Dict("log" => log_content, "solution" => output_content) + # Read job status + job_status = "unknown" + if output_content !== nothing + job_status = "completed" + elseif haskey(processor.job_status, job_id) + job_status = processor.job_status[job_id] + end + # Read job position (0 if already processed or not found) + job_position = get(processor.job_position, job_id, 0) + + # Create response JSON + response_data = Dict("log" => log_content, "solution" => output_content, "status" => job_status, "position" => job_position) response_body = JSON.json(response_data) return HTTP.Response(200, RESPONSE_HEADERS, response_body) end @@ -134,7 +144,7 @@ function start_server(host, port; optimizer) HTTP.register!(router, "POST", "/api/submit", req -> submit(req, processor)) # Register job/*/view endpoint - HTTP.register!(router, "GET", "/api/jobs/*/view", jobs_view) + HTTP.register!(router, "GET", "/api/jobs/*/view", req -> jobs_view(req, processor)) server = HTTP.serve!(router, host, port; verbose = false) return ServerHandle(server, processor) diff --git a/web/backend/test/src/server_test.jl b/web/backend/test/src/server_test.jl index b5e80d4..bb95cd3 100644 --- a/web/backend/test/src/server_test.jl +++ b/web/backend/test/src/server_test.jl @@ -26,10 +26,7 @@ function server_test_usage() @test length(job_id) == 16 # Wait for jobs to finish - sleep(5) - while isbusy(server.processor) - sleep(0.1) - end + sleep(10) # Verify the compressed file was saved correctly job_dir = joinpath(Backend.basedir, "jobs", job_id) @@ -52,6 +49,7 @@ function server_test_usage() @test haskey(view_data, "solution") @test view_data["log"] !== nothing @test view_data["solution"] !== nothing + @test view_data["status"] == "completed" # Clean up rm(job_dir, recursive = true) diff --git a/web/frontend/Dockerfile b/web/frontend/Dockerfile index 759a85a..54a7636 100644 --- a/web/frontend/Dockerfile +++ b/web/frontend/Dockerfile @@ -8,11 +8,11 @@ ARG REACT_APP_BACKEND_URL ENV REACT_APP_BACKEND_URL=$REACT_APP_BACKEND_URL RUN npm run build - # Production Stage - FROM node:18-alpine AS production - WORKDIR /app - COPY --from=build /app/build ./build - COPY server.js ./ - RUN npm install --production express - EXPOSE 3000 - CMD ["node", "server.js"] +# Production Stage +FROM node:18-alpine AS production +WORKDIR /app +COPY --from=build /app/build ./build +COPY server.js ./ +RUN npm install --production express +EXPOSE 3000 +CMD ["node", "server.js"] diff --git a/web/frontend/src/components/Jobs/Jobs.tsx b/web/frontend/src/components/Jobs/Jobs.tsx index 4d5b9d2..839e7db 100644 --- a/web/frontend/src/components/Jobs/Jobs.tsx +++ b/web/frontend/src/components/Jobs/Jobs.tsx @@ -15,6 +15,7 @@ import formStyles from "../Common/Forms/Form.module.css"; interface JobData { log: string; solution: any; + position: number; } const Jobs = () => { @@ -81,7 +82,9 @@ const Jobs = () => {
- {jobData ? jobData.log : "Loading..."} + {jobData + ? jobData.log || `Waiting for ${jobData.position} other optimization job(s) to finish...` + : "Loading..."}