Skip to content
Lynn's Blog
Go back

Building the Concurrent Monolith: Atomic Job Claiming in Go

Updated:

In my previous post, I built the architectural blueprint for Workron: the trade-offs between channels and mutexes, why I chose HTTP over gRPC, and the roadmap from a single-process queue to a distributed system.

The first two iterations tackle what I think is the hardest concurrency problem in a job scheduler: how do N goroutines share one queue without ever processing the same job twice? Everything runs in a single Go process. No network. No database. Just goroutines, a shared map, and a mutex standing between correctness and chaos.

Full code source: Workron


The Job Store: A Map Behind a Mutex

The heart of Workron is a MemoryStore: a Go map of jobs protected by a sync.RWMutex. Every operation that reads or writes job state must go through this lock.

type MemoryStore struct {
    mu   sync.RWMutex
    jobs map[string]*Job
}

It enforces: no goroutine ever touches a job directly. Every interaction goes through methods that acquire the lock first.

Adding a job is straightforward. Lock, insert, unlock:

func (s *MemoryStore) AddJob(command string) string {
    s.mu.Lock()
    defer s.mu.Unlock()

    id := fmt.Sprintf("job-%d", s.nextID)
    s.nextID++
    s.jobs[id] = &Job{
        ID:         id,
        Command:    command,
        Status:     StatusPending,
        CreatedAt:  time.Now(),
        MaxRetries: 3,
    }
    return id
}

Read operations use RLock instead, allowing multiple goroutines to read concurrently as long as nobody is writing. This distinction matters when the REST API is fielding status queries while workers are simultaneously claiming jobs.

But the operation that makes the entire system is ClaimJob.


ClaimJob: The Critical Section

ClaimJob is where correctness lives or dies. Three workers call it simultaneously. Only one should win. The loser should not get the same job, should not get a half-updated job, and should not corrupt state for the winner.

The implementation is deceptively simple:

func (s *MemoryStore) ClaimJob() (*Job, bool) {
    s.mu.Lock()
    defer s.mu.Unlock()

    for _, job := range s.jobs {
        if job.Status == StatusPending {
            job.Status = StatusRunning
            job.Attempts++
            now := time.Now()
            job.StartedAt = &now
            return s.copyJob(job), true
        }
    }
    return nil, false
}

Three details here are worth unpacking.

The full lock. This is mu.Lock(), not mu.RLock(). Even though we are “finding” a job, we are also mutating it. If two goroutines both found the same pending job under a read lock, they would both think they claimed it. The write lock guarantees that find-and-update is a single atomic operation.

Returning a copy. copyJob creates a deep copy of the job before returning it. Without this, the caller receives a pointer into the map. If the worker modifies the job (or if another goroutine claims a different job and triggers a map resize), you get a data race. The copy severs the link between the worker’s view and the shared state.

Attempt tracking. job.Attempts++ happens inside the lock, during the claim itself, not after execution. This means the attempt count is always accurate regardless of what happens next. If the worker crashes after claiming but before finishing, the attempt was still recorded.

This pattern, lock → find → mutate → copy → unlock, maps directly to how a database handles the equivalent operation: UPDATE jobs SET status = 'running' WHERE status = 'pending' LIMIT 1. The mutex is doing the work that a database row lock would do later.


The Worker: A Polling Loop

Each worker is a goroutine running a simple loop: try to claim a job, execute it if found, wait if not.

func (w *Worker) Start(ctx context.Context) {
    ticker := time.NewTicker(pollInterval)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return
        default:
        }

        job, found := w.source.ClaimJob()
        if found {
            w.process(job)
            continue
        }

        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
        }
    }
}

The structure is deliberately three-step: check for shutdown, try to claim, and if there is nothing to do, wait for either the next tick or a shutdown signal. The two select statements serve different purposes. The first is a non-blocking check so we do not start new work after a shutdown signal. The second is a blocking wait that keeps the goroutine idle instead of spinning the CPU when the queue is empty.

An earlier version used time.Sleep instead of a ticker. The problem: Sleep cannot be interrupted. If you cancel the context during a one-second sleep, the worker does not notice until the sleep finishes. The ticker-based select responds to cancellation immediately.


Command Execution

The executor is intentionally the simplest component. It takes a command string, runs it through the system shell, and returns an error if anything goes wrong:

func (e *Executor) Execute(command string) error {
    if command == "" {
        return fmt.Errorf("empty command")
    }
    cmd := exec.Command("sh", "-c", command)
    output, err := cmd.CombinedOutput()
    if err != nil {
        return fmt.Errorf("command failed: %w: %s", err, output)
    }
    return nil
}

Using sh -c means the executor supports pipes, redirects, and shell built-ins. CombinedOutput captures both stdout and stderr, which gets attached to the error message on failure. This is critical for debugging: when a job fails, you want to know why, not just that it failed.


Retry Logic: Giving Jobs a Second Chance

Not every failure is permanent. A network blip, a temporary disk full condition, a transient dependency being unavailable, these all resolve themselves. Workron gives every job three attempts by default.

The retry decision happens in the worker’s process method:

func (w *Worker) process(job *store.Job) {
    err := w.executor.Execute(job.Command)
    if err != nil {
        if job.Attempts < job.MaxRetries {
            w.source.UpdateJobStatus(job.ID, store.StatusPending)
            return
        }
        w.source.UpdateJobStatus(job.ID, store.StatusFailed)
        return
    }
    w.source.UpdateJobStatus(job.ID, store.StatusDone)
}

When a job fails and has attempts remaining, it goes back to pending. The next time any worker calls ClaimJob, this job is eligible again. The Attempts field, which was incremented during claiming, ensures the job eventually exhausts its retries and lands in failed permanently.

One subtlety: the worker decides whether to retry, not the store. The store just records state transitions. This separation matters when workers communicate over HTTP, the retry decision moves to the scheduler. The worker simply reports “this failed” and the scheduler decides what to do next. Keeping the store dumb makes that migration easier.


The REST API

The HTTP layer is thin by design. Each handler validates input, calls a store method, and returns JSON:

func (s *Server) handleSubmitJob(w http.ResponseWriter, r *http.Request) {
    var req submitJobRequest
    if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
        http.Error(w, "invalid request body", http.StatusBadRequest)
        return
    }
    if req.Command == "" {
        http.Error(w, "command is required", http.StatusBadRequest)
        return
    }
    id := s.store.AddJob(req.Command)
    job, _ := s.store.GetJob(id)
    writeJSON(w, http.StatusCreated, job)
}

Go 1.22’s enhanced ServeMux handles method-based routing natively, so there is no need for a third-party router:

s.mux.HandleFunc("POST /jobs", s.handleSubmitJob)
s.mux.HandleFunc("GET /jobs/{id}", s.handleGetJob)
s.mux.HandleFunc("GET /jobs", s.handleListJobs)

The API is deliberately minimal at this stage. Submit a job, check its status, list all jobs. That is enough to verify the entire pipeline works end to end.


Testing Concurrent Behavior

The most important test in the project verifies that multiple workers never double-claim a job:

func TestWorker_MultipleWorkerNoDuplicates(t *testing.T) {
    s := store.NewMemoryStore()

    ids := make([]string, 5)
    for i := range ids {
        ids[i] = s.AddJob("echo hello")
    }

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    for i := 1; i <= 3; i++ {
        w := NewWorker(i, s)
        go w.Start(ctx)
    }

    for _, id := range ids {
        waitForStatus(t, s, id, store.StatusDone, 5*time.Second)
    }

    for _, id := range ids {
        job, _ := s.GetJob(id)
        if job.Attempts != 1 {
            t.Errorf("job %s: expected 1 attempt, got %d", id, job.Attempts)
        }
    }
}

Five jobs, three workers. Every job should complete exactly once. If any job shows more than one attempt, it means two workers claimed the same job, and our mutex has a bug. Running this with go test -race adds the race detector on top, which catches data races that might not manifest as wrong results but are still undefined behavior.

The waitForStatus helper deserves a mention. Since workers run in separate goroutines, you cannot assert on job status immediately after starting them. The helper polls the store until the expected status appears or a timeout expires. It is a small utility, but without it every concurrent test becomes either flaky or artificially slow.


Graceful Shutdown

The main function wires everything together: start the store, start the HTTP server, spin up workers, and wait for a shutdown signal.

quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit

cancel()  // signal all workers to stop
wg.Wait() // wait for in-progress jobs to finish

context.WithCancel propagates the shutdown signal to every worker. Each worker finishes whatever job it is currently processing, then exits its polling loop. sync.WaitGroup ensures the main goroutine does not exit until every worker has confirmed it is done.

This is important: we do not kill workers mid-job. A job that was halfway through execution completes normally. Only idle workers (waiting on the ticker) exit immediately.


What This Gets Right and What It Defers

Workron can accept jobs over HTTP, execute them concurrently across multiple workers, retry transient failures, and shut down without losing in-progress work. The race detector confirms no data races under contention.

What it cannot do yet: survive a process restart (all state is in memory), detect a crashed worker (no heartbeats), or coordinate across multiple machines (everything is in one process).

The next post tackles those limitations. The scheduler and workers split into separate binaries communicating over HTTP instead of shared memory. Once they are separate, a new problem emerges: what happens when a worker dies mid-job with no one watching? The answer involves an interface with two methods, a background goroutine, and a 30-second timeout.

Full source: Workron


References and Further Reading


Share this post on:

Previous Post
Before the Code: Designing a Distributed Job Scheduler in Go
Next Post
Splitting and Surviving Failures: HTTP Workers and Heartbeat Detection in Go