Concurrency

User code runs inside the async scheduler automatically. Fibers are single-threaded cooperative tasks — concurrent but not parallel. Because only one fiber runs at a time, shared data requires no synchronization. For CPU parallelism across cores, see threads.md.

Spawn and join

# Spawn a fiber, wait for its result
(def f (ev/spawn (fn [] (+ 1 2))))
(ev/join f)                # => 3

# Join a sequence — results in input order
(def [a b] (ev/join [(ev/spawn (fn [] :first))
                      (ev/spawn (fn [] :second))]))

Parallel map

The most common pattern:

(ev/map (fn [x] (* x x)) [1 2 3 4])   # => [1 4 9 16]

# Bounded parallelism (at most n in flight)
(ev/map-limited (fn [x] (* x x)) [1 2 3 4] 2)

Error handling

ev/join-protected returns [ok? value] instead of raising errors:

(let [[ok? val] (ev/join-protected (ev/spawn (fn [] (+ 1 2))))]
  (if ok? val :failed))   # => 3

Select, race, timeout

# First to complete wins; abort the rest
(ev/race [(ev/spawn (fn [] :fast))
          (ev/spawn (fn [] (ev/sleep 1) :slow))])  # => :fast

# Deadline on a computation
(ev/timeout 5 (fn [] (+ 1 2)))   # => 3

Scoped concurrency

All children must finish before scope exits. If one child fails, the others are aborted.

(ev/scope (fn [spawn]
  (let [a (spawn (fn [] :users))
        b (spawn (fn [] :settings))]
    {:users (ev/join a) :settings (ev/join b)})))

Primitives reference

# (ev/spawn thunk)            — create fiber, returns handle
# (ev/join fiber-or-seq)      — wait for result(s), propagate errors
# (ev/join-protected target)  — wait without raising: [ok? value]
# (ev/abort fiber)            — graceful cancel (defer blocks run)
# (ev/select fibers)          — wait for first: [done remaining]
# (ev/race fibers)            — first wins, abort rest, return value
# (ev/timeout secs thunk)     — deadline: value or nil on timeout
# (ev/scope (fn [spawn] ...)) — nursery: children can't outlive scope
# (ev/map f items)            — parallel map, results in order
# (ev/map-limited f items n)  — bounded parallel map
# (ev/as-completed fibers)    — lazy iterator: [next-fn pool]
# (ev/sleep seconds)          — yield for N seconds

TCP

# (tcp/listen addr port)      — bind and listen, returns listener
# (tcp/accept listener)       — yield until connection, returns port
# (tcp/connect host port)     — yield until connected, returns port

Synchronization (lib/sync)

lib/sync provides fiber-friendly synchronization primitives built on ev/futex-wait and ev/futex-wake. These cooperate with the async scheduler — waiting fibers yield rather than blocking the thread.

(def sync ((import "std/sync")))

(def lock (sync:make-lock))
(lock:acquire)
# ... critical section ...
(lock:release)
PrimitiveDescription
make-lockMutual exclusion lock
make-semaphore nCounting semaphore with n permits
make-condvarCondition variable (:wait, :notify, :broadcast)
make-rwlockRead-write lock (multiple readers or one writer)
make-barrier nAll n fibers must :wait before any proceed
make-latchOne-shot gate — once opened, stays open
make-once thunkLazy one-time initialization; all callers get the cached result
make-queue capacityBounded blocking FIFO queue

Processes (lib/process)

lib/process provides an Erlang/OTP-inspired process model: lightweight processes with mailboxes, links, monitors, named registration, and fuel-based preemption. Built entirely on fibers and signals.

On top of the core process API, the module provides:

(def process ((import "std/process")))

(process:start (fn []
  # Ping-pong between two processes
  (let* ([me (process:self)]
         [peer (process:spawn (fn []
                 (match (process:recv)
                   [from :ping] (process:send from :pong)
                   _ nil)))]
    (process:send peer [me :ping])
    (assert (= (process:recv) :pong) "pong received"))))

GenServer example

(def process ((import "std/process")))

(process:start (fn []
  (process:gen-server-start-link
    {:init        (fn [_] 0)
     :handle-call (fn [req _from state]
       (case req
         :inc [:reply (+ state 1) (+ state 1)]
         :get [:reply state state]))}
    nil :name :counter)
  (process:gen-server-call :counter :inc)
  (process:gen-server-call :counter :inc)
  (assert (= 2 (process:gen-server-call :counter :get)) "counter is 2")))

Supervisor example

(def process ((import "std/process")))

(process:start (fn []
  (let ([me (process:self)])
    (process:supervisor-start-link
      [{:id :worker :restart :permanent
        :start (fn []
          (process:send me [:started (process:self)])
          (forever (process:recv)))}]
      :name :sup
      :max-restarts 3)
    (match (process:recv)
      [:started pid] (assert (integer? pid) "worker started")
      _ nil))))

See processes.md for the complete API reference, including supervised subprocesses, deferred replies, restart strategies, logging, and structured concurrency inside processes.


See also