CoronaMQ

It’s been a while since I started this pet-project of mine: it was a proof-of-concept message queue that uses PostgreSQL, the vertx-postgres-driver and it’s NOTIFY LISTEN support. The idea was, that if a row was added in to a table (jobs, tasks, you name it), Postgres would send a notification to the driver. The driver would then send this notification, alongside with the task’s payload, over the vertx eventbus and some worker would react to it. The following figure visualizes this concept:

This was already two years ago and since then I didn’t invest more time into it – like with any other sideproject you start. But then, two things happened: vertx added support for Future-APIs in their code-generator and I caught Covid – no joke. I started to play around with the code-generator and created APIs for mutiny and rxjava. I read about and added vertx-service-proxies for the TaskRepository, decoupled the participants (Broker, TaskRepository and Workers) using vertx-service-discovery, added metric-profiling with Mircometer and started using Testcontainers for the integration tests.

Now, again, a couple of months later, I finally found some time to upload the project to maven-central and share it with the world. I’m happy with the result and think it’s a cool real-world-application that uses awesome open source libraries. If you have any feedback, I’m happy to hear it, either in the comments, on Twitter or on the github-page.

Happy coding!

vertx-jooq 5.1.0 now on maven central

This release comes with nice benefits for the reactive modules. Users had a hard time in the past if they wanted to map a JSONB-column to a custom POJO. If they created a custom jOOQ-Converter, the generated entity would map the field to the POJO as expected, but they couldn’t really work with it, because the mapping between the POJO and the reactive-driver data types was unknown. For that matter, there is now a new PgConverter that you can use in the reactive modules. Checkout this example for a how-to.

More information about the release can be found at the project’s github page.

vertx-jooq 4.0-BETA released

The last couple of weeks I’ve been working on the fourth version of vertx-jooq. Although the 3.x branch was released not that long ago, I decided to draft a new major version as many features have been added and also some API changes have been made. The main focus however was the integration of a new  reactive driver for vertx which was ranked number one in the TechEmpower benchmark, round 15.

Checkout the project’s GitHub-page to see about what else has been added and how to upgrade!

vertx-jooq 3.0.0 released!

Just some minutes ago, I’ve released the third major version of vertx-jooq – your favorite jOOQ-wrapper for vertx! This release merged vertx-jooq and vertx-jooq-async*: now you are free to choose, not only between three different APIs, but also the underlying database-driver (async or JDBC) – without the need to change the APIs. Of course this is not the only feature – read more about the release, how to use it and about the (unavoidable) breaking changes on GitHub.

* Because the async-driver is now incorporated into vertx-jooq there is no need to maintain vertx-jooq-async.

vertx-push-onesignal 1.7 released

When working with OneSignal, you sometimes receive an API response telling you that the notification you’ve sent does not have any recipients (“All players are not subscribed”). This can either mean that the notifications you’ve setup are not working like intended (e.g. you’re using wrong filters) or that a former active user of yours became inactive and cannot be addressed any longer. In case you are targeting users using tags this can happen quite frequently.

To avoid the cost of creating a OneSignalException in that case there is an option now to prevent vertx-push-onesignal from creating these exceptions in case that response is received from the OneSignal API.

See my github-page for an example how to set it up properly.

Remotely connect to JMX behind an AWS Loadbalancer

JMX is useful to gather various information about a running Java application. Mostly it is used to gain detailed data about CPU-consumption (where are the hotspots in my code?) or Memory-allocation (what is eating up my memory?). Especially when a system is under load and behaves ‘unexpectedly’ that information is valuable. Here are the steps you have to take in order to get remote access to JMX when you host your servers in the Amazon cloud behind a loadbalancer.

1. Associate Public IP Address

The servers that are created in your ElasticBeanstalk environment need to have a public IP-Address. To do this, make sure you’ve selected the following option in your Elastic Beanstalk environment’s configuration:

Elastic Beanstalk → Your Application → Your Environment → Configuration →  VPC → Associate Public IP Address

Be aware that exposing the instances behind a LB undermines it’s purpose: only having one public end-point. By making all your instances public, they are open for attacks of any kind!

2. Change server startup parameters

A quick search on stackoverflow reveals the required arguments you have to add to your Java application:

-Dcom.sun.management.jmxremote.port=PORT
-Dcom.sun.management.jmxremote.rmi.port=PORT
-Dcom.sun.management.jmxremote.local.only=false
-Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.ssl=false
-Djava.rmi.server.hostname=PUBLIC_IP

In a previous post I’ve described how to pass these arguments to the Java application (add them as JAVA_OPTS) and choosing a PORT is trivial. However you should use the same port for .port and .rmi.port to minimize the ports you have to open in the security group later on.

Now what about the PUBLIC_IP? If you leave out this parameter, JMX will most likely bind to the private IP-Address and you wont be able to access it from outside. My first attempt was checking the instance’s environment variables to see if the server’s public IP is added there. Unfortunately it was not. My second attempt was to expose an environment variable JMX_IP utilizing .ebextensions like this:

commands:
  01-setip:
    command: export JMX_IP=$(curl http://169.254.169.254/latest/meta-data/public-ipv4)

The idea was to export an environment variable JMX_IP and then reference it in the JAVA_OPTS like so -Djava.rmi.server.hostname=${JMX_IP} which did not work as well. I’m not sure why (maybe you know?), but I ended up changing the script that builds my deployment artifact:


#!/bin/bash
set -e
now=$(date +"%Y-%m-%d_%H-%M-%S")
tag="./target/app-$now.zip"
mvn clean install
echo "web: ./run.sh" > target/Procfile
echo "JMX_IP=\$(curl http://169.254.169.254/latest/meta-data/public-ipv4)" > target/run.sh
echo "exec java \$JAVA_OPTS -Djava.rmi.server.hostname=\$JMX_IP -jar app.jar \$JAVA_ARGS" >> target/run.sh
chmod +x target/run.sh
zip -j ${tag} target/Procfile target/run.sh target/app.jar

view raw

build.sh

hosted with ❤ by GitHub

This results into the following run.sh file which is executed during deployment:


JMX_IP=$(curl http://169.254.169.254/latest/meta-data/public-ipv4)
exec java $JAVA_OPTS -Djava.rmi.server.hostname=$JMX_IP -jar app.jar $JAVA_ARGS

view raw

run.sh

hosted with ❤ by GitHub

As you can see, the public ip address is fetched from the instance’s meta-data URL and added as an option to the Java process. If you added the startup parameters from above, the server should start JMX and expose it on the public IP address.

3. Open JMX port in security group

In the final step you have to open a port to be able to connect to your server. Before we proceed, be aware that we disabled any authentication! That means that as soon as the port is open, anyone that knows the IP of your server can access to it. Therefore, either close the port as soon as you’re finished your work and/or add SSL to your JMX configuration!

To open the port, I recommend creating a new security group under

EC2 → Security Groups → Create Security Group

in your VPC. Add an Inbound rule to open a TCP port you’ve chosen for JMX (if you decided to give a different port for rmi, it has to be opened as well) and add your IP address as the source. Note the security group’s name (e.g. sg-abcdefg) and add it to your Elastic Beanstalk configuration:

Elastic Beanstalk → Your Application → Your Environment → Configuration → Instances → EC2 security groups

Finally, you should be able to add and connect the EC2 instances using jvisualvm.

vertx-jooq-async 0.4 released!

vertx-jooq-async the (un)beloved child of vertx-jooq has been released in version 0.4! This version finally implements insertReturningPrimaryAsync in all VertxDAOs*.

In addition, the complex code in the DAOs default methods has been abstracted and moved to vertx-jooq-async-shared. That means less code duplication and fewer points of failure – a move that will very likely be done in vertx-jooq soon as well.

Finally the vertx-dependency version has been bumped to 3.5. (and so the support for RXJava2).

* Unfortunately, this only works for MySQL and numeric auto-increment keys. That is, because the implementation is based on the implementations of io.vertx.ext.asyncsql.impl.AsyncSQLConnectionImpl and only the MySQL variant returns the generated ID.

Convert List<CompletableFuture> to CompletableFuture<List>

Sometimes I find myself in the situation where I have to perform some asynchronous tasks and perform another asynchronous task when all those tasks have been completed. As always in such situations, I’ve searched stackoverflow for a how-to and the top rated answer suggests the following solution:


static<T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com) {
return CompletableFuture.allOf(com.toArray(new CompletableFuture[com.size()]))
.thenApply(v -> com.stream()
.map(CompletableFuture::join)
.collect(toList())
);
}

That solution totally works and it is good. However, if you often deal with Streams, a more functional approach would be neat. So I started coding a Collector that does this operation for me in one go. I won’t go into detail of how a Collector works, but this blog-post helped me out a lot understanding it.

Finally I ended up with this solution, which I’ve uploaded to github:


import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collector;
import java.util.stream.Collectors;
public class CompletableFutureCollector<X, T extends CompletableFuture<X>> implements Collector<T, List<T>, CompletableFuture<List<X>>> {
private CompletableFutureCollector(){
}
public static <X, T extends CompletableFuture<X>> Collector<T, List<T>, CompletableFuture<List<X>>> allOf(){
return new CompletableFutureCollector<>();
}
@Override
public Supplier<List<T>> supplier() {
return ArrayList::new;
}
@Override
public BiConsumer<List<T>, T> accumulator() {
return List::add;
}
@Override
public BinaryOperator<List<T>> combiner() {
return (left, right) -> { left.addAll(right); return left; };
}
@Override
public Function<List<T>, CompletableFuture<List<X>>> finisher() {
return ls->CompletableFuture.allOf(ls.toArray(new CompletableFuture[ls.size()]))
.thenApply(v -> ls
.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
}
@Override
public Set<Characteristics> characteristics() {
return Collections.emptySet();
}
}

And here is how you would use it:


public static void main(String[] args) {
CompletableFuture<List<Integer>> collect = Stream.of(1,2,3).map(CompletableFuture::completedFuture).collect(CompletableFutureCollector.allOf());
}

Happy coding!


Update

Obviously it was late yesterday ^^ The solution I posted was hidden in another answer with less upvotes. It suggest using Collectors.collectAndThen together with the sequence-method above. In my opinion this is cleaner than following my approach with writing the Collector on your own (DRY-principle). The final solution is posted below and it contains another Collector-factory method that can be used if you’re not interested in the results or the CompletableFutures to collect are of type Void.


import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collector;
import java.util.stream.Collectors;
public class CompletableFutureCollector2 {
private CompletableFutureCollector2(){
}
/**
* Transforms a <pre>{@code List<CompletableFuture<T>>}</pre> into a <pre>{@code CompletableFuture<List<T>>}</pre>
* @param <X> the computed result type
* @param <T> some CompletableFuture
* @return a CompletableFuture of <pre>{@code CompletableFuture<List<T>>}</pre> that is complete when all collected CompletableFutures are complete.
*/
public static <X, T extends CompletableFuture<X>> Collector<T, ?, CompletableFuture<List<X>>> collectResult(){
return Collectors.collectingAndThen(Collectors.toList(), joinResult());
}
/**
* Transforms a <pre>{@code List<CompletableFuture<?>>}</pre> into a <pre>{@code CompletableFuture<Void>}</pre>
* Use this function if you are not interested in the collected results or the collected CompletableFutures are of
* type Void.
* @param <T> some CompletableFuture
* @return a <pre>{@code CompletableFuture<Void>}</pre> that is complete when all collected CompletableFutures are complete.
*/
public static <T extends CompletableFuture<?>> Collector<T, ?, CompletableFuture<Void>> allComplete(){
return Collectors.collectingAndThen(Collectors.toList(), CompletableFutureCollector::allOf);
}
private static <X, T extends CompletableFuture<X>> Function<List<T>, CompletableFuture<List<X>>> joinResult() {
return ls-> allOf(ls)
.thenApply(v -> ls
.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
}
private static <T extends CompletableFuture<?>> CompletableFuture<Void> allOf(List<T> ls) {
return CompletableFuture.allOf(ls.toArray(new CompletableFuture[ls.size()]));
}
}

vertx-jooq 2.4 released

Note: Unfortunately, the dependencies to rxjava1 haven’t been removed completely in 2.4.0. RX users please checkout version 2.4.1.

I’m happy to announce the release of vertx-jooq 2.4. The main driver behind this release was the dependency upgrade to vertx 3.5 and thus the change of the rx-dependency in vertx-jooq-rx from rxjava to rxjava2. In addition, the JSON key names of the generated POJO are now the same as their database counterparts. This change has been announced here. Last but not least, a bug has been fixed when dealing with JsonObjectConverter or JsonArrayConverter.