web: Show position in line

This commit is contained in:
2025-11-12 15:09:18 -06:00
parent 22f1f9dae5
commit f53d704e74
7 changed files with 101 additions and 52 deletions

View File

@@ -13,3 +13,11 @@ docker-run:
--memory 16g \ --memory 16g \
--cpus 4 \ --cpus 4 \
ucjl-backend ucjl-backend
test:
clear; julia --threads 1 --procs 1 --project=test -e "using BackendT; runtests()"
run:
julia --procs 1 --project=. startup.jl
.PHONY: test

View File

@@ -8,50 +8,81 @@ import Base: put!
Base.@kwdef mutable struct JobProcessor Base.@kwdef mutable struct JobProcessor
pending = RemoteChannel(() -> Channel{String}(Inf)) pending = RemoteChannel(() -> Channel{String}(Inf))
processing = RemoteChannel(() -> Channel{String}(Inf)) processing = RemoteChannel(() -> Channel{String}(Inf))
completed = RemoteChannel(() -> Channel{String}(Inf))
shutdown = RemoteChannel(() -> Channel{Bool}(1)) shutdown = RemoteChannel(() -> Channel{Bool}(1))
worker_pids = [] worker_pids = []
worker_tasks = [] worker_tasks = []
work_fn = nothing work_fn = nothing
master_task = nothing
job_status = Dict()
job_position = Dict()
pending_queue = []
end
function update_positions!(processor::JobProcessor)
for (i, job_id) in enumerate(processor.pending_queue)
processor.job_position[job_id] = i
end
end end
function Base.put!(processor::JobProcessor, job_id::String) function Base.put!(processor::JobProcessor, job_id::String)
return put!(processor.pending, job_id) put!(processor.pending, job_id)
processor.job_status[job_id] = "pending"
push!(processor.pending_queue, job_id)
update_positions!(processor)
end end
function isbusy(processor::JobProcessor) function master_loop(processor)
return isready(processor.pending) || isready(processor.processing) @info "Starting master loop"
while true
# Check for shutdown signal
if isready(processor.shutdown)
break
end
# Check for processing jobs
while isready(processor.processing)
job_id = take!(processor.processing)
processor.job_status[job_id] = "processing"
filter!(x -> x != job_id, processor.pending_queue)
delete!(processor.job_position, job_id)
update_positions!(processor)
end
# Check for completed jobs
while isready(processor.completed)
job_id = take!(processor.completed)
delete!(processor.job_status, job_id)
delete!(processor.job_position, job_id)
end
sleep(0.1)
end
end end
function worker_loop(pending, processing, shutdown, work_fn) function worker_loop(pending, processing, completed, shutdown, work_fn)
@info "Starting worker loop" @info "Starting worker loop"
while true while true
# Check for shutdown signal # Check for shutdown signal
if isready(shutdown) if isready(shutdown)
@info "Shutdown signal received"
break break
end end
# Wait for a job with timeout # Check for pending tasks
if !isready(pending) if isready(pending)
sleep(0.1) job_id = take!(pending)
continue put!(processing, job_id)
@info "Job started: $job_id"
try
work_fn(job_id)
put!(completed, job_id)
catch e
@error "Job failed: job $job_id"
end
@info "Job finished: $job_id"
end end
# Move job from pending to processing queue sleep(0.1)
job_id = take!(pending)
put!(processing, job_id)
@info "Job started: $job_id"
# Run work function
try
work_fn(job_id)
catch e
@error "Job failed: job $job_id"
end
# Remove job from processing queue
take!(processing)
@info "Job finished: $job_id"
end end
end end
@@ -66,6 +97,7 @@ function start(processor::JobProcessor)
worker_loop( worker_loop(
processor.pending, processor.pending,
processor.processing, processor.processing,
processor.completed,
processor.shutdown, processor.shutdown,
processor.work_fn, processor.work_fn,
) )
@@ -73,23 +105,19 @@ function start(processor::JobProcessor)
push!(processor.worker_pids, pid) push!(processor.worker_pids, pid)
push!(processor.worker_tasks, task) push!(processor.worker_tasks, task)
end end
# Start master loop (after spawning workers to avoid serialization issues)
processor.master_task = @async master_loop(processor)
return return
end end
function stop(processor::JobProcessor) function stop(processor::JobProcessor)
# Send shutdown signal (all workers will see it)
put!(processor.shutdown, true) put!(processor.shutdown, true)
wait(processor.master_task)
# Wait for all worker tasks to complete
for (i, task) in enumerate(processor.worker_tasks) for (i, task) in enumerate(processor.worker_tasks)
try wait(task)
wait(task)
@info "Worker $(processor.worker_pids[i]) stopped"
catch e
@warn "Error waiting for worker $(processor.worker_pids[i])" exception=e
end
end end
return return
end end

View File

@@ -31,6 +31,8 @@ end
function setup_logger() function setup_logger()
global_logger(TimeLogger()) global_logger(TimeLogger())
@spawn global_logger(TimeLogger()) for pid in workers()
@spawnat pid global_logger(TimeLogger())
end
return return
end end

View File

@@ -53,7 +53,7 @@ function submit(req, processor::JobProcessor)
return HTTP.Response(200, RESPONSE_HEADERS, response_body) return HTTP.Response(200, RESPONSE_HEADERS, response_body)
end end
function jobs_view(req) function jobs_view(req, processor)
# Extract job_id from URL path /api/jobs/{job_id}/view # Extract job_id from URL path /api/jobs/{job_id}/view
path_parts = split(req.target, '/') path_parts = split(req.target, '/')
job_id = path_parts[4] job_id = path_parts[4]
@@ -74,9 +74,19 @@ function jobs_view(req)
output_path = joinpath(job_dir, "output.json") output_path = joinpath(job_dir, "output.json")
output_content = isfile(output_path) ? read(output_path, String) : nothing output_content = isfile(output_path) ? read(output_path, String) : nothing
# Create response JSON # Read job status
response_data = Dict("log" => log_content, "solution" => output_content) job_status = "unknown"
if output_content !== nothing
job_status = "completed"
elseif haskey(processor.job_status, job_id)
job_status = processor.job_status[job_id]
end
# Read job position (0 if already processed or not found)
job_position = get(processor.job_position, job_id, 0)
# Create response JSON
response_data = Dict("log" => log_content, "solution" => output_content, "status" => job_status, "position" => job_position)
response_body = JSON.json(response_data) response_body = JSON.json(response_data)
return HTTP.Response(200, RESPONSE_HEADERS, response_body) return HTTP.Response(200, RESPONSE_HEADERS, response_body)
end end
@@ -134,7 +144,7 @@ function start_server(host, port; optimizer)
HTTP.register!(router, "POST", "/api/submit", req -> submit(req, processor)) HTTP.register!(router, "POST", "/api/submit", req -> submit(req, processor))
# Register job/*/view endpoint # Register job/*/view endpoint
HTTP.register!(router, "GET", "/api/jobs/*/view", jobs_view) HTTP.register!(router, "GET", "/api/jobs/*/view", req -> jobs_view(req, processor))
server = HTTP.serve!(router, host, port; verbose = false) server = HTTP.serve!(router, host, port; verbose = false)
return ServerHandle(server, processor) return ServerHandle(server, processor)

View File

@@ -26,10 +26,7 @@ function server_test_usage()
@test length(job_id) == 16 @test length(job_id) == 16
# Wait for jobs to finish # Wait for jobs to finish
sleep(5) sleep(10)
while isbusy(server.processor)
sleep(0.1)
end
# Verify the compressed file was saved correctly # Verify the compressed file was saved correctly
job_dir = joinpath(Backend.basedir, "jobs", job_id) job_dir = joinpath(Backend.basedir, "jobs", job_id)
@@ -52,6 +49,7 @@ function server_test_usage()
@test haskey(view_data, "solution") @test haskey(view_data, "solution")
@test view_data["log"] !== nothing @test view_data["log"] !== nothing
@test view_data["solution"] !== nothing @test view_data["solution"] !== nothing
@test view_data["status"] == "completed"
# Clean up # Clean up
rm(job_dir, recursive = true) rm(job_dir, recursive = true)

View File

@@ -8,11 +8,11 @@ ARG REACT_APP_BACKEND_URL
ENV REACT_APP_BACKEND_URL=$REACT_APP_BACKEND_URL ENV REACT_APP_BACKEND_URL=$REACT_APP_BACKEND_URL
RUN npm run build RUN npm run build
# Production Stage # Production Stage
FROM node:18-alpine AS production FROM node:18-alpine AS production
WORKDIR /app WORKDIR /app
COPY --from=build /app/build ./build COPY --from=build /app/build ./build
COPY server.js ./ COPY server.js ./
RUN npm install --production express RUN npm install --production express
EXPOSE 3000 EXPOSE 3000
CMD ["node", "server.js"] CMD ["node", "server.js"]

View File

@@ -15,6 +15,7 @@ import formStyles from "../Common/Forms/Form.module.css";
interface JobData { interface JobData {
log: string; log: string;
solution: any; solution: any;
position: number;
} }
const Jobs = () => { const Jobs = () => {
@@ -81,7 +82,9 @@ const Jobs = () => {
<SectionHeader title="Optimization log"></SectionHeader> <SectionHeader title="Optimization log"></SectionHeader>
<div className={formStyles.FormWrapper}> <div className={formStyles.FormWrapper}>
<div className={styles.SolverLog} ref={logRef}> <div className={styles.SolverLog} ref={logRef}>
{jobData ? jobData.log : "Loading..."} {jobData
? jobData.log || `Waiting for ${jobData.position} other optimization job(s) to finish...`
: "Loading..."}
</div> </div>
</div> </div>
</div> </div>