Back in akka land! I’m using the ask pattern to get results back from actors since I have a requirement to block and get a result (I can’t wait for an actor to push at a later date). Thats fine, but converting from scala futures to java completable futures is a pain. I also, (like mentioned in another post) want to make sure that my async responses capture and set the MDC for proper logging.

My final usage should look something like:

  
private \<Response, Request\> Future\<Response\> askActorForResponseAsync(Request source) {  
 final FiniteDuration askTimeout = new FiniteDuration(config.getAskForResultTimeout().toMillis(), TimeUnit.MILLISECONDS);

final Timeout timeout = new Timeout(askTimeout);

final scala.concurrent.Future\<Object\> ask = Patterns.ask(master.getActor(), new PersistableMessageContext(source), timeout);

return FutureConverter.fromScalaFuture(ask)  
 .executeOn(actorSystem.dispatcher())  
 .thenApply(i -\> (Response) i);  
}  

The idea is that I’m going to translate a scala future with a callback into a completable future java promise.

Next up, the future converter:

  
public class FutureConverter {  
 public static \<T\> FromScalaFuture\<T\> fromScalaFuture(scala.concurrent.Future\<T\> future) {  
 return new FromScalaFuture\<\>(future);  
 }  
}  

This is just an entrypoint into a new class that can give you a nice fluent interface to provide the execution context.

Next, a class whose job is to create an akka callback and convert it into a completable future.

  
import scala.concurrent.ExecutionContext;  
import scala.concurrent.Future;

import java.util.concurrent.CompletableFuture;

public class FromScalaFuture\<T\> {

private final Future\<T\> future;

public FromScalaFuture(Future\<T\> future) {  
 this.future = future;  
 }

public CompletableFuture\<T\> executeOn(ExecutionContext context) {  
 final CompletableFuture\<T\> completableFuture = new CompletableFuture\<\>();

final AkkaOnCompleteCallback\<T\> completer = AkkaCompletionConverter.\<T\>createCompleter((failure, success) -\> {  
 if (failure != null) {  
 completableFuture.completeExceptionally(failure);  
 }  
 else {  
 completableFuture.complete(success);  
 }  
 });

future.onComplete(completer.toScalaCallback(), context);

return completableFuture;  
 }  
}  

And finally another guy whose job it is to translate java functions into akka callbacks:

  
import akka.dispatch.OnComplete;

@FunctionalInterface  
public interface AkkaOnCompleteCallback\<T\> {  
 OnComplete\<T\> toScalaCallback();  
}  

  
import akka.dispatch.OnComplete;  
import org.slf4j.MDC;

import java.util.Map;  
import java.util.function.BiConsumer;

public class AkkaCompletionConverter {  
 /\*\*  
 \* Handles closing over the mdc context map and setting the responding future thread with the  
 \* previous context  
 \*  
 \* @param callback  
 \* @return  
 \*/  
 public static \<T\> AkkaOnCompleteCallback\<T\> createCompleter(BiConsumer\<Throwable, T\> callback) {  
 return () -\> {

final Map\<String, String\> oldContextMap = MDC.getCopyOfContextMap();

return new OnComplete\<T\>() {  
 @Override public void onComplete(final Throwable failure, final T success) throws Throwable {  
 // capture the current threads context map  
 final Map\<String, String\> currentThreadsContext = MDC.getCopyOfContextMap();

// set the closed over context map  
 if(oldContextMap != null) {  
 MDC.setContextMap(oldContextMap);  
 }

callback.accept(failure, success);

// return the current threads previous context map  
 if(currentThreadsContext != null) {  
 MDC.setContextMap(currentThreadsContext);  
 }  
 }  
 };  
 };  
 }  
}