Dynamic core.async/flow graphs for agent fan-out

2026-04-10 Fri 16:47 article clojure llm publish

core.async/flow (ref) runs a directed graph of processes connected by channels. The graph is a plain map, allowing generation at runtime. I experimented with giving an LLM a tool to do just that.

The Plan

The LLM produces a TaskPlan via structured output: a vector of self-contained prompts and an instruction for synthesizing them.

(def TaskPlanSchema
  [:map
   [:tasks
    [:vector {:min-count 2 :max-count 6}
     [:map [:prompt :string]]]]
   [:aggregation-prompt :string]])

m/explain validates the plan before any process starts.

Two Process Templates

llm-worker-fn receives a prompt string, calls generate, emits the result on :out:

clojure

(defn llm-worker-fn
  ;; declares the input and output port names
  ([] {:ins {:in "prompt string"} :outs {:out "answer string"}})
  ;; returns initial state — stateless, so empty map
  ([_] {})
  ;; called on lifecycle transitions (pause/resume/stop) — passthrough
  ([state _] state)
  ;; called on each incoming message — runs the agent, emits the answer
  ([state _port {:keys [id prompt]}]
   (let [answer (:text (llm/run-agent ai
                                      {:tools worker-tools
                                       :system-prompt "Use kagi_search to find current information. Answer in 4-6 sentences."
                                       :max-steps 3}
                                      prompt))]
     [state {:out [answer]}])))

make-aggregator-fn takes n and aggregation-prompt. Each incoming message is added to state. When the nth arrives, it calls generate with all results and emits the synthesis:

(defn make-aggregator-fn [n aggregation-prompt]
  (fn
    ([] {:ins {:in "worker result"} :outs {:out "final answer"}})
    ([_] {:results []})
    ([state _] state)
    ([{:keys [results] :as state} _port msg]
     (let [results' (conj results msg)]
       (if (= (count results') n)
         (let [numbered (str/join "\n\n---\n\n"
                          (map-indexed #(str "Result " (inc %1) ":\n" %2) results'))
               answer   (:text (llm/generate ai (str aggregation-prompt "\n\n" numbered)))]
           [(assoc state :results results') {:out [answer]}])
         [(assoc state :results results') {}])))))

Flow's state persists across transform calls. Workers complete in any order. The aggregator fires when the count is reached.

Building the Graph

tasks->flow-config takes a validated plan and an output channel and returns a create-flow config. Worker ids are generated with map-indexed:

(defn tasks->flow-config [{:keys [tasks aggregation-prompt]} out-chan]
  (let [n          (count tasks)
        indexed    (map-indexed (fn [i t] [(keyword (str "task-" (inc i))) t]) tasks)
        worker-ids (mapv first indexed)]
    {:procs (merge
              (into {} (map (fn [[id {:keys [prompt]}]]
                              [id {:proc (flow/process llm-worker-fn {:workload :io})
                                   :args {:prompt prompt}}])
                            indexed))
              {:aggregate {:proc (flow/process (make-aggregator-fn n aggregation-prompt) {:workload :io})}
               :sink      {:proc (flow/process sink-fn {:workload :io}) :args {:out-chan out-chan}}})
     :conns (into (mapv #(vector [% :out] [:aggregate :in]) worker-ids)
                  [[[:aggregate :out] [:sink :in]]])}))

For 5 tasks:

[task-1] ──┐
[task-2] ──┤
[task-3] ──┼──► [aggregate] ──► [sink]
[task-4] ──┤
[task-5] ──┘

For a question that decomposes into 4 tasks, tasks->flow-config returns:

{:procs
 {:task-1    {:proc #<process llm-worker-fn>
              :args {:prompt "Research aspect A..."}}
  :task-2    {:proc #<process llm-worker-fn>
              :args {:prompt "Research aspect B..."}}
  :task-3    {:proc #<process llm-worker-fn>
              :args {:prompt "Research aspect C..."}}
  :task-4    {:proc #<process llm-worker-fn>
              :args {:prompt "Research aspect D..."}}
  :aggregate {:proc #<process make-aggregator-fn/4>}
  :sink      {:proc #<process sink-fn>
              :args {:out-chan #<ManyToManyChannel>}}}
 :conns
 [[[:task-1    :out] [:aggregate :in]]
  [[:task-2    :out] [:aggregate :in]]
  [[:task-3    :out] [:aggregate :in]]
  [[:task-4    :out] [:aggregate :in]]
  [[:aggregate :out] [:sink     :in]]]}

The Tool

(defn fan-out-research
  {:malli/schema
   [:=> [:cat
         [:map {:name        "fan_out_research"
                :description "Decompose a question into parallel subtasks and synthesize results."}
          [:tasks
           {:description "2-6 independent subtasks. Each prompt must be self-contained."}
           [:vector {:min-count 2 :max-count 6}
            [:map [:prompt :string]]]]
          [:aggregation-prompt
           {:description "instruction for the synthesis step"}
           :string]]]
        :string]}
  [{:keys [tasks] :as plan}]
  (when-let [err (m/explain TaskPlanSchema plan)]
    (throw (ex-info "Invalid task plan" {:explain err})))
  (let [out-chan (a/chan 1)
        g        (flow/create-flow (tasks->flow-config plan out-chan))
        {:keys [error-chan]} (flow/start g)]
    (a/go-loop []
      (when-let [err (a/<! error-chan)]
        (println "FLOW ERROR:" (pr-str err))
        (recur)))
    (flow/resume g)
    (doseq [[id {:keys [prompt]}] (map-indexed (fn [i t] [(keyword (str "task-" (inc i))) t]) tasks)]
      (flow/inject g [id :in] [{:prompt prompt}]))
    (let [result (a/<!! out-chan)]
      (flow/stop g)
      result)))

In Action

(llm/run-agent ai
  {:tools        [#'fan-out-research]
   :system-prompt "Use fan_out_research to decompose complex questions into parallel subtasks."
   :max-steps    3}
  "Compare the food cultures of Osaka, Lyon, and Mexico City.")

The model decomposes the question into 5 tasks: one per city, plus cross-cutting tasks for street food comparison and historical influences.

[fan-out] spawning 5 parallel workers
  task-1: What are the key characteristics of Osaka's food culture?...
  task-2: What are the key characteristics of Lyon's food culture?...
  task-3: What are the key characteristics of Mexico City's food culture?...
  task-4: Compare and contrast street food, fine dining, and regional specialties...
  task-5: What are some historical influences on the food cultures of all three cities?...

  [worker task-1] running...
  [worker task-3] running...
  [worker task-4] running...
  [worker task-5] running...
  [worker task-2] running...
  [worker task-2] done
  [worker task-1] done
  [worker task-5] done
  [worker task-3] done
  [worker task-4] done
  [aggregate] all 5 results in — synthesizing...

=== Final Answer ===

The food cultures of Osaka, Lyon, and Mexico City each possess distinct
characteristics shaped by history and regional influences.

Osaka, known for its "kuidaore" ("eat 'til you drop") philosophy, emphasizes
affordability and variety, with signature dishes like takoyaki and okonomiyaki.
Its merchant city history fostered a focus on fresh ingredients and
presentation. Street food and regional variations thrive, while fine dining is
less prominent.

Lyon is characterized by its rich, traditional, hearty cuisine. Its historical
role as a silk trading hub influenced its access to diverse ingredients and
skilled chefs. Bouchons serve dishes like quenelles and coq au vin, utilizing
offal, butter, and regional produce. Fine dining and regional cuisine are
dominant, though street food is emerging.

Mexico City's food culture blends Indigenous, Spanish, and global influences,
reflected in dishes like tacos, enchiladas, and mole. Its history is marked by
Aztec origins, Spanish colonization, and indigenous agricultural practices. The
city boasts a dynamic street food scene, diverse regional influences, and a
growing fine dining sector that combines tradition and modern techniques.

The outer agent loop issues one tool call and receives one string. The flow graph, the parallel workers, and the aggregation are internal to the tool.

The pattern generalizes to any task that decomposes into independent subtasks: code review across multiple files, analysis across multiple datasets, research across multiple sources. The schema, the two process templates, and the graph construction stay the same. Only the prompts change.

Each worker is itself a clj-llm/run-agent call. Passing a fan-out tool to the worker agents allows subgraphs to spawn subgraphs. A depth counter in the task args, decremented at each level and refusing to fan out at zero, bounds the recursion.

Try it Yourself

The script is in the clj-llm repository. Workers use Kagi FastGPT for grounded results when KAGI_API_KEY is set, and fall back to the model's own knowledge without it.

git clone https://github.com/minikomi/clj-llm
cd clj-llm

Set your API key, then pipe any question in:

echo "Compare the food cultures of Osaka, Lyon, and Mexico City" | clojure -M:flow scripts/fan-out.clj

Progress and worker logs go to stderr, the final answer to stdout, so results can be redirected:

echo "..." | clojure -M:flow scripts/fan-out.clj > answer.md