Few days ago, during our regular code review, one of my colleagues raised a question what would happen – and if it’s
even possible – when a CDI Observer (so a method with parameter annotated @Observes
) would be invoked multiple times
at the same time for different event instances. In other words, after producing few events, is it possible that the
following method will be processed by more than one thread at the same time:
public void observe(@Observes MyEvent myEvent) { ... }
After thinking about it I’ve decided to run a few tests and describe results in this post.
First results: it occurred that CDI Events are fired in synchronous mode which was a bit surprise for me. Why?
Up to this time, I’ve seen it this way: CDI observers allows me to very cleanly separate event producer from event consumer, so I don’t have any hard-coded registering of listeners, maintaining a list of listeners and manually informing them. The CDI container does everything for me.
Therefore, if we have cleanly separated producers from consumers I thought that there exists some kind of event bus running in specialized, thread executors pool which is responsible for mediation between registered events and invoked observers methods. I guess I based this assumption on other event/listeners solutions like Google Guava EventBus. They give you a chance to define if you want to use synchronous (default, EventBus) or asynchronous event dispatchers (AsyncEventBus.)
Moreover, if EJBs are both: producer and consumer, I assume it would have the same features as asynchronous EJB calls
when it comes to transactions. The only possible JTA transaction attribute for asynchronous event observer would be:
REQUIRED
, REQUIRES_NEW
or NOT_SUPPORTED
.
Now that’s all how I expected it to work which seems to be quite different from the current status. The real life shows that CDI events are synchronous.
There is an issue for making asynchronous events available in CDI 1.1 but I’m not sure what is the current status of this feature and didn’t find a word about it in CDI 1.1 (part of Java EE 7).
Let’s see how we can deal with it on our own.
Default Synchronous Events
Let’s start with basic example showing the problem. Take a look at the code – first, the CDI Bean producer:
@Path("/produce")
public class EventGenerator {
@Inject
private Logger logger;
@Inject
private Event<MyEvent> events;
@Path("/cdiBean/{num}")
@GET
public String generateEvents(@PathParam("num") int eventsNumToGenerate) {
for (int i = 0; i < eventsNumToGenerate; i++) {
MyEvent event = new MyEvent(i);
logger.info("Generating Event: " + event);
events.fire(event);
}
return "Finished. Generated " + eventsNumToGenerate + " events.";
}
}
MyEvent
is just some event object which is not really important here. It stores event sequence number that we pass
while instantiation.
Consumer is a pretty simple CDI Bean:
public class EventConsumer {
@Inject
private Logger logger;
public void consumeEvent(@Observes MyEvent myEvent)
throws InterruptedException {
logger.info("Receiving event: " + myEvent);
TimeUnit.MILLISECONDS.sleep(500);
}
}
Note that I’ve inserted a thread sleep to simulate some long-running event receiver process.
Now, let’s run this example by invoking a REST command this EventProducer
is exposing. The result (running JBoss
EAP 6.1 Alpha) would be similar to this:
14:15:59,196 [c.p.EventGenerator] [http-1] Generating Event: MyEvent[seqNo=0] 14:15:59,197 [c.p.EventConsumer] [http-1] Receiving event: MyEvent[seqNo=0]
14:15:59,697 [c.p.EventGenerator] [http-1] Generating Event: MyEvent[seqNo=1]
14:15:59,698 [c.p.EventConsumer] [http-1] Receiving event: MyEvent[seqNo=1]
14:16:00,199 [c.p.EventGenerator] [http-1] Generating Event: MyEvent[seqNo=2]
14:16:00,200 [c.p.EventConsumer] [http-1] Receiving event: MyEvent[seqNo=2]
It shows the synchronous nature of CDI events – event producing and consuming takes place in the same thread and one after another.
So, how to achieve asynchronous events with CDI?
Solution 1 – CDI Producer and Singleton EJB as Receiver
Producer stays at it was – a pure CDI bean:
@Path("/produce") public class EventGenerator {
@Path("/cdiBean/{eventsNum}")
@GET
public String generateEvents(@PathParam("eventsNum") int eventsNumToGenerate) {
...
}
}
Now if you turn your receiver into @Singleton EJB and mark observes method as @Asynchronous like this:
@Singleton
public class EventConsumer {
@Asynchronous
public void consumeEvent(@Observes MyEvent myEvent)
throws InterruptedException {
...
}
}
You’ll get the following results:
14:21:19,341 [c.p.EventGenerator] [http-1] Generating Event: MyEvent[seqNo=0]
14:21:19,343 [c.p.EventGenerator] [http-1] Generating Event: MyEvent[seqNo=1]
14:21:19,343 [c.p.EventGenerator] [http-1] Generating Event: MyEvent[seqNo=2]
14:21:19,347 [c.p.EventConsumer] [EJB default – 2] Receiving event: MyEvent[seqNo=1]
14:21:19,848 [c.p.EventConsumer] [EJB default – 1] Receiving event: MyEvent[seqNo=0]
14:21:20,350 [c.p.EventConsumer] [EJB default – 3] Receiving event: MyEvent[seqNo=2]
Events are produced one after another and in separate threads Singleton EJB is serving them one after another (take a look at time of event processing.) That’s because of implicit write-lock for every business methods of Singleton EJB. So this is:
Asynchronous: yes
Thread-safe observer method: yes
Solution 2 – Use Singleton EJB as Receiver With Read Lock
This approach is very similar to Solution 1, however, it gives you a much higher throughput because all events processing takes place in parallel.
Our producer stays the same – it’s a CDI bean:
@Path("/produce")
public class EventGenerator {
@Path("/cdiBean/{num}")
@GET
public String generateEvents(@PathParam("num") int eventsNumToGenerate) {
...
}
}
Our consumer has @Lock(READ)
added to its observes method; this makes the magic of being able to serve multiple
events at the same time:
@Singleton
public class EventConsumer {
@Asynchronous
@Lock(LockType.READ)
public void consumeEvent(@Observes MyEvent myEvent) throws InterruptedException {
...
}
}
This is what you can get as a result:
14:24:44,202 [c.p.EventGenerator] [http-1] Generating Event: MyEvent[seqNo=0]
14:24:44,204 [c.p.EventGenerator] [http-1] Generating Event: MyEvent[seqNo=1]
14:24:44,205 [c.p.EventGenerator] [http-1] Generating Event: MyEvent[seqNo=2]
14:24:44,207 [c.p.EventConsumer] (EJB default – 4) Receiving event: MyEvent[seqNo=0]
14:24:44,207 [c.p.EventConsumer] (EJB default – 6) Receiving event: MyEvent[seqNo=2]
14:24:44,207 [c.p.EventConsumer] (EJB default – 5) Receiving event: MyEvent[seqNo=1]
Different threads serving events at the same time are giving you a bigger throughput. So this is:
Asynchronous: yes
Thread-safe observer method: no
Solution 3 – EJB Producer and CDI Consumer
CDI allows you to observe events during specific stages of transaction. You specify it using
@Observes(during=TransactionPhase...)
. In our case, we would like the CDI to stack all those events and invoke our
observer only after the transaction ends. To do so we need only to add the above attribute to our CDI Bean observer:
public class EventConsumer {
public void consumeEvent(@Observes(during = TransactionPhase.AFTER_COMPLETION) MyEvent myEvent) { ... }
}
Now we just need to make sure we have a running transaction in EventGenerator
method. We can do it quickly by
transforming our CDI Bean into @Stateless
EJB and using its implicit REQUIRED
TransactionAttribute like this:
@Stateless
@Path("/produce")
public class EventGenerator {
@Path("/cdiBean/{num}")
@GET
public String generateEvents(@PathParam("num") int numberOfEventsToGenerate) { ... }
}
This is the result we might end with:
14:39:06,776 [c.p.EventGenerator] [http-1] Generating Event: MyEvent[seqNo=0]
14:39:06,776 [c.p.EventGenerator] [http-1] Generating Event: MyEvent[seqNo=1]
14:39:06,776 [c.p.EventGenerator] [http-1] Generating Event: MyEvent[seqNo=2]
14:39:06,778 [c.p.EventConsumer] [http-1] Receiving event: MyEvent[seqNo=2]
14:39:07,279 [c.p.EventConsumer] [http-1] Receiving event: MyEvent[seqNo=1]
14:39:07,780 [c.p.EventConsumer] [http-1] Receiving event: MyEvent[seqNo=0]
EJB EventGenerator
starts a transaction and CDI bean observer will be invoked in serialized way only after the
transaction completes.
Asynchronous: yes
Thread-safe observer method: yes
Solution 4 – EJB Producer and EJB Consumer
This is very similar to Solution 3. Our generator stays the same (Stateless EJB):
@Stateless
@Path("/produce")
public class EventGenerator {
@Path("/cdiBean/{num}")
@GET
public String generateEvents(@PathParam("num") int numberOfEventsToGenerate) { ... }
}
And changes are made to EventConsumer
which is right now:
@Singleton
public class EventConsumer {
@Asynchronous
@Lock(LockType.READ)
public void consumeEvent(@Observes(during = TransactionPhase.AFTER_COMPLETION) MyEvent myEvent) throws InterruptedException { ... }
}
The result might be as follows:
14:44:09,363 [c.p.EventGenerator] [http-1] Generating Event: MyEvent[seqNo=0]
14:44:09,464 [c.p.EventGenerator] [http-1] Generating Event: MyEvent[seqNo=1]
14:44:09,564 [c.p.EventGenerator] [http-1] Generating Event: MyEvent[seqNo=2]
14:44:09,670 [c.p.EventConsumer] (EJB default – 8) Receiving event: MyEvent[seqNo=2]
14:44:09,670 [c.p.EventConsumer] [EJB default – 2] Receiving event: MyEvent[seqNo=1]
14:44:09,670 [c.p.EventConsumer] [EJB default – 1] Receiving event: MyEvent[seqNo=0]
We’ve used two features here – one is that the event consumer method is asynchronous and the second one is that the consumer will not be notified before the producer transaction completes. This gives us:
Asynchronous: yes
Thread-safe observer method: no
Solution 4 vs Solution 2
Those two solutions seems to be the same. They only differ with consumer’s annotation: @Observes
vs @Observes(during = TransactionPhase.AFTER_COMPLETION)
. Moreover they act the same for our test case: they are asynchronous and multiple threads can be processing event receivers at the same time. However, there is one big difference between them.
In our test case we fired events one after another. Imagine that there is some other operations between events firing. In such case:
- Solution 2 (
@Observes
) will start processing events just after the first one will be fired, - Solution 4 (
@Observes(during = TransactionPhase.AFTER_COMPLETION)
) will start processing just after transaction completes, so when all events will be fired.
This shows possible result of such situation:
Solution 2 (@Observes
)
15:01:34,318 [c.p.EventGenerator] [http-1] Generating Event: MyEvent[seqNo=0]
15:01:34,320 [c.p.EventConsumer] [EJB default – 3] Receiving event: MyEvent[seqNo=0]
15:01:34,419 [c.p.EventGenerator] [http-1] Generating Event: MyEvent[seqNo=1]
15:01:34,420 [c.p.EventConsumer] (EJB default – 6) Receiving event: MyEvent[seqNo=1]
15:01:34,520 [c.p.EventGenerator] [http-1] Generating Event: MyEvent[seqNo=2]
15:01:34,521 [c.p.EventConsumer] (EJB default – 9) Receiving event: MyEvent[seqNo=2]
Solution 4 (@Observes(during = TransactionPhase.AFTER_COMPLETION)
)
15:00:41,126 [c.p.EventGenerator] [http-1] Generating Event: MyEvent[seqNo=0]
15:00:41,226 [c.p.EventGenerator] [http-1] Generating Event: MyEvent[seqNo=1]
15:00:41,326 [c.p.EventGenerator] [http-1] Generating Event: MyEvent[seqNo=2]
15:00:41,432 [c.p.EventConsumer] (EJB default – 10) Receiving event: MyEvent[seqNo=2]
15:00:41,432 [c.p.EventConsumer] (EJB default – 4) Receiving event: MyEvent[seqNo=1]
15:00:41,432 [c.p.EventConsumer] (EJB default – 5) Receiving event: MyEvent[seqNo=0]
Solution 5 – EJB Producer and CDI Consumer II
Up to this point we’ve tried to make our receiver asynchronous. There is also the opposite way – we can make the
event producer asynchronous. We can achieve it by marking our producer as @Stateless
and invoking its own
asynchronous method that will just fire an event:
@Stateless
@Path("/produce")
public class EventGenerator {
// ...
@Resource
private SessionContext sctx;
@Path("/cdiBean/{num}")
@GET
public String generateEvents(@PathParam("num") int eventsNumToGenerate) {
for (int i = 0; i < eventsNumToGenerate; i++) {
sctx.getBusinessObject(EventGenerator.class).fireEvent(new MyEvent(i));
}
return "Finished. Generated " + eventsNumToGenerate + " events.";
}
@Asynchronous
public void fireEvent(final MyEvent event) {
events.fire(event);
}
}
Take a closer look at EJB auto-reference using SessionContext
. It is required in this case as we want the container
to dispatch our method call and add the asynchronous nature of it. We don’t want to make it a local call so we refuse
to use implicit this
object.
Event consumer, on the other hand, is plain CDI bean:
public class EventConsumer {
public void consumeEvent(@Observes MyEvent myEvent) throws InterruptedException { ... }
}
The result might be as follows:
00:40:32,820 [c.p.EventGenerator] [EJB default – 2] Generating Event: MyEvent[seqNo=1]
00:40:32,820 [c.p.EventGenerator] [EJB default – 3] Generating Event: MyEvent[seqNo=2]
00:40:32,820 [c.p.EventGenerator] [EJB default – 1] Generating Event: MyEvent[seqNo=0]
00:40:32,821 [c.p.EventConsumer] [EJB default – 1] Receiving event: MyEvent[seqNo=0]
00:40:32,821 [c.p.EventConsumer] [EJB default – 2] Receiving event: MyEvent[seqNo=1]
00:40:32,821 [c.p.EventConsumer] [EJB default – 3] Receiving event: MyEvent[seqNo=2]
Asynchronous: yes
Thread-safe observer method: no
Solution 6 – CDI With JMS
This is a solution presented by Juliano Viana on his blog. It uses JMS as the event bus. CDI event is produced, then fetched by some class that is responsible for putting this event into JMS topic / queue. MDB that fetches messages from topic / queue is producing an event that invokes the real receiver.
This not only gives you the asynchronous delivery of events but also adds transaction nature to it. E.g. if event receiver is not able to process the message – it can rollback the transaction and the queue will make sure the message will be re-delivered (perhaps next time your event processor will be able to serve this event?)
Conclusion
CDI 1.0 doesn’t support asynchronous events generation. It also doesn’t seem that CDI 1.1 will have such support.
This, however, doesn’t mean you cannot achieve asynchronous processing. There are already existing solutions either basing on EJB’s 3.1 or existing CDI observer attributes. You should also be able to write a portable CDI extension that adds this functionality to your code.