Skip to content

Callbacks and Event Listeners

AgentEnsemble provides an event listener API that lets you observe task and tool execution lifecycle events without modifying your agent or workflow configuration.

Register lambda handlers directly on the Ensemble builder:

EnsembleOutput output = Ensemble.builder()
.agent(researcher)
.task(researchTask)
.onTaskStart(event -> log.info("[{}] Starting: {}", event.taskIndex(), event.taskDescription()))
.onTaskComplete(event -> log.info("[{}] Done in {}", event.taskIndex(), event.duration()))
.onTaskFailed(event -> alertService.notify(event.cause()))
.onToolCall(event -> metrics.increment("tool." + event.toolName()))
.onDelegationStarted(event -> log.info("Delegating to {} [{}]", event.workerRole(), event.delegationId()))
.onDelegationCompleted(event -> metrics.record("delegation.latency", event.duration()))
.onDelegationFailed(event -> log.warn("Delegation failed: {}", event.failureReason()))
.build()
.run();

Fired immediately before an agent begins executing a task.

FieldTypeDescription
taskDescription()StringThe description of the task
agentRole()StringThe role of the agent executing the task
taskIndex()int1-based index of this task in the workflow run
totalTasks()intTotal number of tasks in this run

Fired immediately after an agent successfully completes a task.

FieldTypeDescription
taskDescription()StringThe description of the completed task
agentRole()StringThe role of the agent
taskOutput()TaskOutputThe full output produced by the agent
duration()DurationTime from task start to completion
taskIndex()int1-based index of this task
totalTasks()intTotal number of tasks

Fired when an agent fails to complete a task, before the exception propagates. This lets you observe failures (for alerting, metrics, etc.) without needing to wrap run() in try-catch.

FieldTypeDescription
taskDescription()StringThe description of the failed task
agentRole()StringThe role of the agent
cause()ThrowableThe exception that caused the failure
duration()DurationTime from task start to failure
taskIndex()int1-based index of this task
totalTasks()intTotal number of tasks

Fired after each tool execution within an agent’s ReAct loop.

FieldTypeDescription
toolName()StringThe name of the tool that was called
toolArguments()StringJSON string of arguments passed to the tool
toolResult()StringThe result returned by the tool
agentRole()StringThe role of the agent that invoked the tool
duration()DurationTime taken for the tool execution

Fired for each token received during streaming generation of the final agent response. Only fires when a StreamingChatModel is resolved for the agent (see Streaming Output below).

FieldTypeDescription
token()StringThe text fragment emitted by the streaming model
agentRole()StringThe role of the agent generating the response

Token events are fired during the direct LLM-to-answer path only. Tool-loop iterations remain synchronous because the full response must be seen to detect tool-call requests.

Fired immediately before a delegation is handed off to a worker agent. Only fired for delegations that pass all built-in guards and registered policy evaluations.

FieldTypeDescription
delegationId()StringUnique correlation ID; matches the completed or failed event
delegatingAgentRole()StringRole of the agent initiating the delegation
workerRole()StringRole of the agent that will execute the subtask
taskDescription()StringDescription of the subtask
delegationDepth()intDepth in the chain (1 = first delegation, 2 = nested, etc.)
request()DelegationRequestThe full typed delegation request

Fired immediately after a delegation completes successfully.

FieldTypeDescription
delegationId()StringMatches the corresponding DelegationStartedEvent
delegatingAgentRole()StringRole of the agent that initiated the delegation
workerRole()StringRole of the worker that executed
response()DelegationResponseFull typed response with output and metadata
duration()DurationElapsed time from delegation start to completion

Fired when a delegation fails, whether due to a guard violation, policy rejection, or worker exception. Guard/policy failures have cause() == null; worker exceptions carry the thrown exception. Guard and policy failures do not have a corresponding DelegationStartedEvent.

FieldTypeDescription
delegationId()StringMatches DelegationRequest.getTaskId()
delegatingAgentRole()StringRole of the initiating agent
workerRole()StringRole of the intended target
failureReason()StringHuman-readable description of the failure
cause()ThrowableException if worker threw; null for guard/policy failures
response()DelegationResponseFAILURE response with error messages
duration()DurationElapsed time from delegation start to failure

For listeners that handle multiple event types, implement EnsembleListener directly:

public class MetricsListener implements EnsembleListener {
private final MeterRegistry registry;
public MetricsListener(MeterRegistry registry) {
this.registry = registry;
}
@Override
public void onTaskStart(TaskStartEvent event) {
registry.counter("task.started", "agent", event.agentRole()).increment();
}
@Override
public void onTaskComplete(TaskCompleteEvent event) {
registry.timer("task.duration", "agent", event.agentRole())
.record(event.duration());
}
@Override
public void onTaskFailed(TaskFailedEvent event) {
registry.counter("task.failed", "agent", event.agentRole()).increment();
}
@Override
public void onToolCall(ToolCallEvent event) {
registry.counter("tool.calls", "tool", event.toolName()).increment();
}
}
// Register with the builder
Ensemble.builder()
.agent(researcher)
.task(researchTask)
.listener(new MetricsListener(meterRegistry))
.build()
.run();

All methods have default no-op implementations, so you only need to override the events you care about. The three delegation event methods (onDelegationStarted, onDelegationCompleted, onDelegationFailed) are also available on EnsembleListener and can be overridden in the same way.

All registration methods accumulate — calling them multiple times adds each listener to the list. Listeners are called in registration order.

Ensemble.builder()
.agent(researcher)
.task(researchTask)
.listener(new MetricsListener()) // Full implementation
.listener(new AuditLogger()) // Another full implementation
.onTaskFailed(event -> alertPager()) // Lambda for just this one event
.build()
.run();

If a listener throws an exception, the exception is caught and logged at WARN level. Execution continues normally and all subsequent listeners are still called. A misbehaving listener cannot abort task execution or prevent other listeners from receiving events.

Ensemble.builder()
.agent(researcher)
.task(researchTask)
.onTaskComplete(event -> {
// If this throws, it is logged and the next listener still runs
externalService.send(event);
})
.onTaskComplete(event -> {
// This always runs, even if the previous listener threw
localLog.record(event);
})
.build()
.run();

ExecutionContext (which carries the listener list) is immutable. When using Workflow.PARALLEL, the onTaskStart, onTaskComplete, onTaskFailed, and onToolCall events may be fired concurrently from multiple virtual threads.

Listener implementations must be thread-safe when used with a parallel workflow.

For example, use ConcurrentLinkedQueue instead of ArrayList if you are collecting events in a listener:

ConcurrentLinkedQueue<TaskCompleteEvent> events = new ConcurrentLinkedQueue<>();
Ensemble.builder()
.agents(List.of(a1, a2, a3))
.tasks(List.of(t1, t2, t3))
.workflow(Workflow.PARALLEL)
.onTaskComplete(events::add) // ConcurrentLinkedQueue is thread-safe
.build()
.run();
Ensemble.builder()
.agent(researcher)
.task(researchTask)
.onTaskStart(e -> log.info("Task {}/{} starting: {}", e.taskIndex(), e.totalTasks(), e.agentRole()))
.onTaskComplete(e -> log.info("Task {}/{} done in {}", e.taskIndex(), e.totalTasks(), e.duration()))
.onTaskFailed(e -> log.error("Task {}/{} failed: {}", e.taskIndex(), e.totalTasks(), e.cause().getMessage()))
.build()
.run();

The example below uses ArrayList, which is safe for Workflow.SEQUENTIAL and Workflow.HIERARCHICAL. For Workflow.PARALLEL, callbacks fire from concurrent virtual threads — use thread-safe collections instead (see the Thread Safety section above).

// Safe for sequential/hierarchical workflows. For parallel, replace ArrayList
// with ConcurrentLinkedQueue or CopyOnWriteArrayList.
List<Duration> taskDurations = new ArrayList<>();
List<String> toolsUsed = new ArrayList<>();
Ensemble.builder()
.agent(researcher)
.task(researchTask)
.onTaskComplete(e -> taskDurations.add(e.duration()))
.onToolCall(e -> toolsUsed.add(e.toolName()))
.build()
.run();
log.info("Average task duration: {}", average(taskDurations));
log.info("Tools used: {}", toolsUsed);
Ensemble.builder()
.agent(researcher)
.task(criticalTask)
.onTaskFailed(event -> {
alertService.sendAlert(
"Task failed: " + event.taskDescription(),
"Agent: " + event.agentRole(),
event.cause()
);
})
.build()
.run();
Ensemble.builder()
.agent(researcher)
.task(researchTask)
.onToolCall(event -> auditLog.record(
"Agent '" + event.agentRole() + "' called tool '" + event.toolName() + "'" +
" with args: " + event.toolArguments() +
" -> result: " + event.toolResult() +
" (" + event.duration().toMillis() + "ms)"
))
.build()
.run();

Agents can stream their final response token-by-token using LangChain4j’s StreamingChatModel. When streaming is configured, each token fires an onToken(TokenEvent) callback to all registered listeners.

Streaming is opt-in and off by default. Configure it at one of three levels; the first non-null value in the chain wins:

LevelHow to setPriority
Agent-levelAgent.builder().streamingLlm(model)Highest
Task-levelTask.builder().streamingChatLanguageModel(model)Middle
Ensemble-levelEnsemble.builder().streamingChatLanguageModel(model)Lowest

Only the final answer path is streamed. When an agent has tools, the tool-calling iterations use the synchronous ChatModel (full responses are needed to detect tool-call requests). Streaming fires only when executeWithoutTools is active.

StreamingChatModel streamingModel = OpenAiStreamingChatModel.builder()
.apiKey(System.getenv("OPENAI_API_KEY"))
.modelName("gpt-4o")
.build();
EnsembleOutput output = Ensemble.builder()
.chatLanguageModel(syncModel) // used for tool-loop iterations
.streamingChatLanguageModel(streamingModel) // used for final answers
.task(Task.builder()
.description("Write a haiku about Java")
.expectedOutput("A three-line haiku")
.build())
.onToken(event -> System.out.print(event.token())) // typewriter effect
.build()
.run();
System.out.println(); // newline after streaming

Example: streaming with the live dashboard

Section titled “Example: streaming with the live dashboard”

When webDashboard() is registered alongside streamingChatLanguageModel(), tokens are broadcast over WebSocket as token messages. The viz dashboard displays the text live in the Live Output section of the task detail panel.

WebDashboard dashboard = WebDashboard.onPort(7329);
Ensemble.builder()
.chatLanguageModel(syncModel)
.streamingChatLanguageModel(streamingModel)
.webDashboard(dashboard)
.task(Task.of("Summarize the state of AI in 2025"))
.build()
.run();

In a parallel workflow, onToken may be called concurrently from multiple virtual threads (one per running agent). Use thread-safe accumulators when collecting tokens across tasks:

ConcurrentHashMap<String, StringBuffer> tokenBuffers = new ConcurrentHashMap<>();
Ensemble.builder()
.workflow(Workflow.PARALLEL)
.streamingChatLanguageModel(streamingModel)
.onToken(event -> tokenBuffers
.computeIfAbsent(event.agentRole(), k -> new StringBuffer())
.append(event.token()))
.tasks(List.of(task1, task2))
.build()
.run();