Friday, February 7, 2014

High performance RSS/Atom parsing

Parsing RSS feeds is very easy in Java. Several libs exist to get the job done: feed4j, rssowl, Apache Abdera and many others. But the most commly used is ROME. ROME is a set of RSS and Atom Utilities for Java. It makes it easy to work in Java with most syndication formats: RSS 0.9x, 1.0, 2.0 and Atom 0.3, 1.0.

Reading RSS from a source is dead-simple, you need these dependencies:
<!-- Rome Atom+RSS -->
<dependency>
<groupId>net.java.dev.rome</groupId>
<artifactId>rome</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>net.java.dev.rome</groupId>
<artifactId>rome-fetcher</artifactId>
<version>1.0.0</version>
</dependency>

and a piece of code that looks like this:
        URL url = new URL("http://feeds.feedburner.com/manishchhabra27");
        HttpURLConnection httpcon = (HttpURLConnection)url.openConnection();
        // Reading the feed
        SyndFeedInput input = new SyndFeedInput();
        SyndFeed feed = input.build(new XmlReader(httpcon));
        List entries = feed.getEntries();
        Iterator itEntries = entries.iterator();

        while (itEntries.hasNext()) {
            SyndEntry entry = itEntries.next();
            System.out.println("Title: " + entry.getTitle());
            System.out.println("Link: " + entry.getLink());
            System.out.println("Author: " + entry.getAuthor());
            System.out.println("Publish Date: " + entry.getPublishedDate());
            System.out.println("Description: " + entry.getDescription().getValue());
            System.out.println();
        }
(from: http://blog.manishchhabra.com/2011/10/rome-library-example-for-parsing-rss-and-atom-feeds/)

But in a context of an enterprise-grade application, especially if you want build a "Google Reader"-like service , we just must NOT use ROME out-of-the-box. ROME can be tuned on many points.

Cache

ROME allows you to cache feed details by implementing the interface com.sun.syndication.fetcher.impl.FeedFetcherCache. By default, three classes implement this interface:
-com.sun.syndication.fetcher.impl.HashMapFeedInfoCache
-com.sun.syndication.fetcher.impl.LinkedHashMapFeedInfoCache
-com.sun.syndication.fetcher.impl.DiskFeedInfoCache

If you already have a cache (Infinispan, memcache, EhCache...), you can reuse it and benefit from its features (distribution, replication, time-to-live...). Instead of creating a specific implementation, let's create a generic class delegating the caching logic to a Spring cache manager. Then, you will be able to change the cache depending on your needs. Spring cache already has a Spring cache manager delegating to ehCache. You will find more implementation by Googling "spring cache 'the-name-of-the-awesome-cache-framework-you-use'", for instance:
- Infinispan (http://docs.jboss.org/infinispan/5.1/apidocs/org/infinispan/spring/provider/package-summary.html)
- Redis (http://docs.spring.io/spring-data/redis/docs/current/reference/html/redis.html#redis:support:cache-abstraction)
- Gemfire: http://docs.spring.io/spring-gemfire/docs/current/reference/html/apis.html#apis:spring-cache-abstraction
- Coherence (by Soat): http://blog.soat.fr/2012/08/spring-3-1-utiliser-labstraction-de-cache-2-le-retour/
...

So, choose your poison, and let's code the ROME FeedFetcherCache delegating to a Spring's cache manager:

import java.net.URL;
import javax.annotation.PostConstruct;
import org.springframework.cache.Cache;
import org.springframework.cache.Cache.ValueWrapper;
import org.springframework.cache.CacheManager;
import com.google.common.base.Preconditions;
import com.sun.syndication.fetcher.impl.FeedFetcherCache;
import com.sun.syndication.fetcher.impl.SyndFeedInfo;

public class SpringFeedFetcherCache implements FeedFetcherCache {

private String cacheName;

private CacheManager cacheManager;

public SpringFeedFetcherCache(String cacheName, CacheManager cacheManager) {
this.cacheName = cacheName;
this.cacheManager = cacheManager;
}

private Cache cache;

@PostConstruct
public void init() {
Preconditions.checkNotNull(cacheManager);
Preconditions.checkNotNull(cacheName);
cache = cacheManager.getCache(cacheName);
}

@Override
public SyndFeedInfo getFeedInfo(URL feedUrl) {
return get(feedUrl);
}

@Override
public void setFeedInfo(URL feedUrl, SyndFeedInfo syndFeedInfo) {
cache.put(feedUrl, syndFeedInfo);
}

@Override
public void clear() {
cache.clear();
}

@Override
public SyndFeedInfo remove(URL feedUrl) {
SyndFeedInfo syndFeedInfo = get(feedUrl);
cache.evict(feedUrl);
return syndFeedInfo;
}

private SyndFeedInfo get(URL feedUrl) {
ValueWrapper valueWrapper = cache.get(feedUrl);
if (valueWrapper != null) {
return (SyndFeedInfo) valueWrapper.get();
}
return null;
}
}

Not trap here...

Http connections

An another thing is that ROME use URLConnection or the old commons-httpclient to fetch the RSS. In your application you certainly have an instance of an HttpClient (httpcomponents-httpclient) from your social layer, mongodb java connector, rest framework or other. In all case, we can reuse it. So, let's implement a new AbstractFeedFetcher:
https://gist.github.com/Treydone/8860062

Putting the things together

The configuration for the httpclient:

@Configuration
public class HttpConfig {

private static final int DEFAULT_MAX_TOTAL_CONNECTIONS = 100;
private static final int DEFAULT_MAX_CONNECTIONS_PER_ROUTE = 5;
private static final int DEFAULT_READ_TIMEOUT_MILLISECONDS = (60 * 1000);

@Bean
public CloseableHttpClient httpClient() {
PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager();
connectionManager.setMaxTotal(DEFAULT_MAX_TOTAL_CONNECTIONS);
connectionManager
.setDefaultMaxPerRoute(DEFAULT_MAX_CONNECTIONS_PER_ROUTE);
connectionManager.setMaxPerRoute(new HttpRoute(new HttpHost(
"blogspot.com")), 20);
RequestConfig config = RequestConfig.custom()
.setConnectTimeout(DEFAULT_READ_TIMEOUT_MILLISECONDS).build();

CloseableHttpClient defaultHttpClient = HttpClientBuilder.create()
.setConnectionManager(connectionManager)
.setDefaultRequestConfig(config).build();
return defaultHttpClient;
}
}

for the RSS feed fetcher:

import javax.inject.Inject;
import org.apache.http.impl.client.CloseableHttpClient;
import org.springframework.cache.CacheManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.hangar2.syndic.HttpComponentsFeedFetcher;
import com.streaming.syndic.SpringFeedFetcherCache;
import com.sun.syndication.fetcher.FeedFetcher;
import com.sun.syndication.fetcher.impl.FeedFetcherCache;

@Configuration
public class SyndicConfig {

@Inject
private CacheManager cacheManager;

@Inject
private CloseableHttpClient httpClient;

@Bean
public FeedFetcher feedFetcher() {
HttpComponentsFeedFetcher httpFeedFetcher = new HttpComponentsFeedFetcher(
httpClient);
httpFeedFetcher.setFeedInfoCache(feedInfoCache());
httpFeedFetcher.setUserAgent("Bot");
return httpFeedFetcher;
}

@Bean
public FeedFetcherCache feedInfoCache() {
return new SpringFeedFetcherCache("rss", cacheManager);
}
}

and for the cache:

import javax.inject.Inject;
import org.springframework.cache.CacheManager;
import org.springframework.cache.annotation.CachingConfigurer;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.cache.concurrent.ConcurrentMapCacheManager;
import org.springframework.cache.ehcache.EhCacheCacheManager;
import org.springframework.cache.ehcache.EhCacheManagerFactoryBean;
import org.springframework.cache.interceptor.KeyGenerator;
import org.springframework.context.annotation.AdviceMode;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.core.io.ResourceLoader;

@Configuration
@EnableCaching(mode = AdviceMode.ASPECTJ)
public class CacheConfig {

@Configuration
@Profile("!test")
static class Default implements CachingConfigurer {

@Inject
private ResourceLoader resourceLoader;

@Bean
public KeyGenerator keyGenerator() {
return new ReflectionBasedKeyGenerator();
}

@Bean
public CacheManager cacheManager() {
EhCacheCacheManager ehCacheCacheManager = new EhCacheCacheManager();
try {
ehCacheCacheManager.setCacheManager(ehcacheCacheManager()
.getObject());
} catch (Exception e) {
throw new IllegalStateException(
"Failed to create an EhCacheManagerFactoryBean", e);
}
return ehCacheCacheManager;
}

@Bean
public EhCacheManagerFactoryBean ehcacheCacheManager() throws Exception {
EhCacheManagerFactoryBean bean = new EhCacheManagerFactoryBean();
bean.setShared(true);
bean.setConfigLocation(resourceLoader
.getResource("classpath:ehcache.xml"));
return bean;
}
}

@Configuration
@Profile(Profiles.TEST)
static class Test implements CachingConfigurer {

@Bean
public KeyGenerator keyGenerator() {
return new ReflectionBasedKeyGenerator();
}

@Bean
public CacheManager cacheManager() {
return new ConcurrentMapCacheManager();
}
}

}

Monday, November 25, 2013

How to kill Hadoop jobs matching a pattern?


Today, I had to kill a list of jobs (45) running on my Hadoop cluster. Ok, let's have a look to the docs http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/CommandsManual.html#job But wait a minute... No, Hadoop knows the "kill" command, but not the "pkill"...

One solution is:

import java.io.IOException;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobStatus;
import org.apache.hadoop.mapred.RunningJob;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PKill {

private final static Logger LOGGER = LoggerFactory.getLogger(PKill.class);

private static void printUsage(Options options) {
HelpFormatter usageFormatter = new HelpFormatter();
usageFormatter.printHelp("pkill", options);
}

public static void main(final String... commandLineArguments)
throws IOException, InterruptedException {

CommandLineParser parser = new PosixParser();

Options options = new Options();
options.addOption(
"p",
"pattern",
true,
"Specify the patterns: 'elastic' will match 'elastic',elasticsearch','searchelasticsearch'");
options.addOption("h", "help", false, "Print the help");

CommandLine line = null;
try {
line = parser.parse(options, commandLineArguments);
} catch (ParseException exp) {
printUsage(options);
return;
}

if (line.hasOption("h")) {
printUsage(options);
return;
}

String[] patterns = null;
if (line.hasOption("p")) {
LOGGER.debug("Setting prefixes: {}",
new Object[] { line.getOptionValues("p") });
patterns = line.getOptionValues("p");
}

Configuration configuration = new Configuration();

JobClient jobClient = new JobClient(configuration);

JobStatus[] jobsToComplete = jobClient.jobsToComplete();

for (JobStatus jobStatus : jobsToComplete) {
RunningJob job = jobClient.getJob(jobStatus.getJobID());

LOGGER.debug("--------------------------------------------");
LOGGER.debug("Job: " + job.getJobName());
LOGGER.debug(" Progress:" + job.cleanupProgress() * 100 + "% (Map:"
+ job.getJobStatus().mapProgress() * 100 + ", Reduce:"
+ job.getJobStatus().reduceProgress() + ")");
LOGGER.debug(" Username:" + jobStatus.getUsername());
if (patterns == null) {
kill(job);
} else {

for (String pattern : patterns) {
if (job.getJobName().toLowerCase().contains(pattern)) {
kill(job);
break;
}
}
}
}

if (ArrayUtils.isNotEmpty(patterns)) {
LOGGER.debug("Jobs matching '" + StringUtils.join(patterns, ",")
+ "' have been killed with success!");
} else {
LOGGER.debug("All jobs have been killed with success!");
}
}

private static void kill(RunningJob job) throws IOException {
LOGGER.debug("KILLING....");
job.killJob();
LOGGER.debug("Success");
}
}

Then in your console, enter:
java -jar yourjar.jar com.hangar2.PKill -p "terasort" -p "pi"

And all your jobs containing "terasort" or "pi" will be killed!

Nice!

Monday, November 4, 2013

Pooling a Thrift client

Thrift is an interface definition language that is used to define and create services for numerous languages. The Thrift stack relies on protocols (TBinaryProtocol, TCompactProtocol...)  and transports (TSocket, TFileTransport...). But, since the transport layer is essentially a wrapper on a socket or a file, Thrift is NOT thread-safe. Like other resources not thread-safe, you have the choice: work with a costly locking algo, create each time a new connection to the resource or think about pool your connections. 

Locking is the simpliest way to share a not thread-safe resource, Java has plenty manner to do it well, but in all case, this implies to create a bottleneck in your code. Create a new connection to the resource each time we want to access to the service, is definitely not a great idea neither: a lot of ephemeral connection will be created and destroyed, this can be really expensive. But what about pooling?

A pool is just a set of initialised resources that are kept ready to use, rather than allocated and destroyed on demand. A client of the pool will request an object from the pool and perform operations on the returned object. When the client has finished with an object (or resource), it returns it to the pool, rather than destroying it. Pooling of resources can offer a significant performance boost in situations where the cost of initializing a class instance is high and the rate of instantiation of a class is high. (thanks Wikipedia) Apache offers a well-known pooling framework: commons-pool.

Let's start with some Maven dependencies:


<dependency>
<groupId>com.codahale.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
<version>0.8.0</version>
</dependency>
<dependency>
<groupId>commons-pool</groupId>
<artifactId>commons-pool</artifactId>
<version>1.6</version>
</dependency>


We need a thrift definition file, we can use the one from Nifty:
https://github.com/facebook/nifty/blob/master/nifty-examples/src/main/resources/scribe.thrift

NB: Nifty is an implementation of Thrift clients and servers on Netty developped by Facebook, open-source and available here https://github.com/facebook/nifty.

To generate the source from a thrift file run

thrift --gen java thrift/scribe.thrift

NB: Don't forget to install the thrift-compiler (apt-get install thrift-compiler on Ubuntu - version 0.8.0-0ubuntu2).

Create the class ThriftClientPool:


package com.hangar2.thrift;

import org.apache.commons.pool.BasePoolableObjectFactory;
import org.apache.commons.pool.impl.GenericObjectPool;
import org.apache.thrift.TServiceClient;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ThriftClientPool<T extends TServiceClient> implements
AutoCloseable {

private static final Logger LOGGER = LoggerFactory
.getLogger(ThriftClientPool.class);

private final GenericObjectPool<T> internalPool;

public ThriftClientPool(ClientFactory<T> clientFactory,
GenericObjectPool.Config poolConfig, String host, int port) {
this(clientFactory, new BinaryOverSocketProtocolFactory(host, port),
poolConfig);
}

public ThriftClientPool(ClientFactory<T> clientFactory,
ProtocolFactory protocolFactory, GenericObjectPool.Config poolConfig) {
this.internalPool = new GenericObjectPool<T>(new ThriftClientFactory(
clientFactory, protocolFactory), poolConfig);
}

class ThriftClientFactory extends BasePoolableObjectFactory<T> {

private ClientFactory<T> clientFactory;
private ProtocolFactory protocolFactory;

public ThriftClientFactory(ClientFactory<T> clientFactory,
ProtocolFactory protocolFactory) {
this.clientFactory = clientFactory;
this.protocolFactory = protocolFactory;
}

@Override
public T makeObject() throws Exception {
try {
TProtocol protocol = protocolFactory.make();
return clientFactory.make(protocol);
} catch (Exception e) {
LOGGER.warn("whut?", e);
throw new ThriftClientException(
"Can not make a new object for pool", e);
}
}

@Override
public void destroyObject(T obj) throws Exception {
if (obj.getOutputProtocol().getTransport().isOpen()) {
obj.getOutputProtocol().getTransport().close();
}
if (obj.getInputProtocol().getTransport().isOpen()) {
obj.getInputProtocol().getTransport().close();
}
}
}

public static interface ClientFactory<T> {

T make(TProtocol tProtocol);
}

public static interface ProtocolFactory {

TProtocol make();
}

public static class BinaryOverSocketProtocolFactory implements
ProtocolFactory {

private String host;
private int port;

public BinaryOverSocketProtocolFactory(String host, int port) {
this.host = host;
this.port = port;
}

public TProtocol make() {
TTransport transport = new TSocket(host, port);
try {
transport.open();
} catch (TTransportException e) {
LOGGER.warn("whut?", e);
throw new ThriftClientException("Can not make protocol", e);
}
return new TBinaryProtocol(transport);
}
}

public static class ThriftClientException extends RuntimeException {

// Fucking Eclipse
private static final long serialVersionUID = -2275296727467192665L;

public ThriftClientException(String message, Exception e) {
super(message, e);
}

}

public T getResource() {
try {
return internalPool.borrowObject();
} catch (Exception e) {
throw new ThriftClientException(
"Could not get a resource from the pool", e);
}
}

public void returnResourceObject(T resource) {
try {
internalPool.returnObject(resource);
} catch (Exception e) {
throw new ThriftClientException(
"Could not return the resource to the pool", e);
}
}

public void returnBrokenResource(T resource) {
returnBrokenResourceObject(resource);
}

public void returnResource(T resource) {
returnResourceObject(resource);
}

protected void returnBrokenResourceObject(T resource) {
try {
internalPool.invalidateObject(resource);
} catch (Exception e) {
throw new ThriftClientException(
"Could not return the resource to the pool", e);
}
}

public void destroy() {
close();
}

public void close() {
try {
internalPool.close();
} catch (Exception e) {
throw new ThriftClientException("Could not destroy the pool", e);
}
}
}

ThriftClientPool delegates all the pooling logic on a GenericObjectPool. A custom BasePoolableObjectFactory holds the creation of new clients. Clients used the default ProtocolFactory (BinaryOverSocketProtocolFactory), which is a simple binary protocol transported via a socket.

Now, you can use it, with a default configuration:

final ThriftClientPool<Scribe.Client> pool = new ThriftClientPool<Scribe.Client>(
new ClientFactory<Scribe.Client>() {
@Override
public Client make(TProtocol tProtocol) {
return new Scribe.Client(tProtocol);
}
}, new Config(), "localhost", 7911);

Or with a custom pooling configuration:

Config poolConfig = new Config();
poolConfig.maxActive = 80;
poolConfig.minIdle = 5;
poolConfig.whenExhaustedAction = GenericObjectPool.WHEN_EXHAUSTED_BLOCK;
poolConfig.testOnBorrow = true;
poolConfig.testWhileIdle = true;
poolConfig.numTestsPerEvictionRun = 10;
poolConfig.maxWait = 3000;

final ThriftClientPool<Scribe.Client> pool = new ThriftClientPool<Scribe.Client>(
new ClientFactory<Scribe.Client>() {
@Override
public Client make(TProtocol tProtocol) {
return new Scribe.Client(tProtocol);
}
}, poolConfig, "localhost", 7911);

Or with a custom protocol, for instance binary over framed socket (you'll need a TThreadedSelectorServer on the server side):


final ThriftClientPool<Scribe.Client> pool = new ThriftClientPool<Scribe.Client>(
new ClientFactory<Scribe.Client>() {
@Override
public Client make(TProtocol tProtocol) {
return new Scribe.Client(tProtocol);
}
}, new ProtocolFactory() {
public TProtocol make() {
TFramedTransport transport = new TFramedTransport(
new TSocket("localhost", 7911));
try {
transport.open();
} catch (TTransportException e) {
throw new ThriftClientException(
"Can not make protocol", e);
}
return new TBinaryProtocol(transport);
}
}, new Config());

Now, some tests...

Server side:

package com.hangar2.thrift;

import java.util.List;
import java.util.concurrent.TimeUnit;

import org.apache.thrift.TException;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TServerSocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.codahale.metrics.ConsoleReporter;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.facebook.nifty.test.LogEntry;
import com.facebook.nifty.test.ResultCode;
import com.facebook.nifty.test.Scribe;

public class BaseNativeServer {

private static final Logger log = LoggerFactory
.getLogger(BaseNativeServer.class);

public static void main(String[] args) throws Exception {

final MetricRegistry registry = new MetricRegistry();
final Meter requests = registry.meter(MetricRegistry.name(
BaseNativeSelectorServer.class, "requests"));
final ConsoleReporter reporter = ConsoleReporter.forRegistry(registry)
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS).build();
reporter.start(1, TimeUnit.SECONDS);

// Create the handler
Scribe.Iface serviceInterface = new Scribe.Iface() {

public ResultCode log(List<LogEntry> messages) throws TException {
requests.mark();
for (LogEntry message : messages) {
log.info("{}: {}", message.getCategory(),
message.getMessage());
}
return ResultCode.OK;
}
};

TServerSocket serverTransport = new TServerSocket(7911);
Scribe.Processor<Scribe.Iface> processor = new Scribe.Processor<Scribe.Iface>(
serviceInterface);

final TServer server = new TThreadPoolServer(
new TThreadPoolServer.Args(serverTransport)
.processor(processor));

server.serve();

// Arrange to stop the server at shutdown
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
server.stop();
}
});
}

}

Client side:

package com.hangar2.thrift;

import java.util.Collections;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.commons.pool.impl.GenericObjectPool;
import org.apache.commons.pool.impl.GenericObjectPool.Config;
import org.apache.thrift.protocol.TProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.facebook.nifty.test.LogEntry;
import com.facebook.nifty.test.Scribe;
import com.facebook.nifty.test.Scribe.Client;
import com.hangar2.thrift.ThriftClientPool.ClientFactory;

public class BaseNativeClient {

private static final Logger log = LoggerFactory
.getLogger(BaseNativeClient.class);

public static void main(String[] args) throws Exception {

Config poolConfig = new Config();
poolConfig.maxActive = 80;
poolConfig.minIdle = 5;
poolConfig.whenExhaustedAction = GenericObjectPool.WHEN_EXHAUSTED_BLOCK;
poolConfig.testOnBorrow = true;
poolConfig.testWhileIdle = true;
poolConfig.numTestsPerEvictionRun = 10;
poolConfig.maxWait = 3000;

final ThriftClientPool<Scribe.Client> pool = new ThriftClientPool<Scribe.Client>(
new ClientFactory<Scribe.Client>() {
@Override
public Client make(TProtocol tProtocol) {
return new Scribe.Client(tProtocol);
}
}, poolConfig, "localhost", 7911);

ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
executor.submit(new Runnable() {
public void run() {

Scribe.Client resource = pool.getResource();
try {
for (int i = 0; i < 10000; i++) {
try {
resource.log(Collections
.singletonList(new LogEntry("cat1",
"test" + i)));
} catch (Exception e) {
e.printStackTrace();
}
}
pool.returnResource(resource);
} catch (Exception e) {
pool.returnBrokenResource(resource);
log.warn("whut?", e);
}
}
});
}

Thread.sleep(3000);
pool.close();
}
}

Sources:
Wikipedia: http://en.wikipedia.org/wiki/Apache_Thrift
Thrift homepage: http://thrift.apache.org/
Nifty, the Thift over Netty framework: https://github.com/facebook/nifty

Monday, October 21, 2013

A best Spring AsyncRestTemplate!

I wrote an article on the famous Spring's RestTemplate some months ago (http://vincentdevillers.blogspot.fr/2013/02/configure-best-spring-resttemplate.html). Since the last version of Spring 4, currently 4.0.0.M3, or the BUILD-SNAPSHOT for the most courageous, you can use a custom RestTemplate in an async way: the AsyncRestTemplate API.

Why should I use that?
The purpose of this API is to allow Java applications to easily execute HTTP requests and asynchronously process the HTTP responses. In non async mode, the code will block until the response is fully received. In async mode, the code will continue and a listener will warn you about the availability of the response (entire or parts of the response).

As the RestTemple uses a ClientHttpRequestFactory for creating HTTP connections, the AsyncRestTemple uses an... AsyncClientHttpRequestFactory. Actually, only the httpcomponents implementation (HttpComponentsAsyncClientHttpRequestFactory) and a basic implementation using the jdk classes (SimpleClientHttpRequestFactory) are ready to use, but we can bet than many other will come, supporting frameworks like Netty (http://netty.io/), Grizzly (https://grizzly.java.net/) or kryonet (http://code.google.com/p/kryonet/).

NB: SimpleClientHttpRequestFactory implements now both ClientHttpRequestFactory and AsyncClientHttpRequestFactory. The trick is, when using the async way, the SimpleClientHttpRequestFactory relies on a TaskExecutor.

First, you need this:



<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<version>4.0.0.M3</version>
</dependency>

<!-- Apache Http Client -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.3</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpasyncclient</artifactId>
<version>4.0-beta4</version>
</dependency>

Add some configuration:


import java.util.List;

import javax.inject.Inject;

import org.apache.http.HttpHost;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.conn.routing.HttpRoute;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager;
import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.client.AsyncClientHttpRequestFactory;
import org.springframework.http.client.ClientHttpRequestFactory;
import org.springframework.http.client.HttpComponentsAsyncClientHttpRequestFactory;
import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
import org.springframework.http.converter.HttpMessageConverter;
import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter;
import org.springframework.web.client.AsyncRestTemplate;
import org.springframework.web.client.RestTemplate;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;

@Configuration
public class HttpConfig {

private static final int DEFAULT_MAX_TOTAL_CONNECTIONS = 100;

private static final int DEFAULT_MAX_CONNECTIONS_PER_ROUTE = 5;

private static final int DEFAULT_READ_TIMEOUT_MILLISECONDS = (60 * 1000);

@Inject
private ObjectMapper objectMapper;

// ################################################### SYNC
@Bean
public ClientHttpRequestFactory httpRequestFactory() {
return new HttpComponentsClientHttpRequestFactory(httpClient());
}

@Bean
public RestTemplate restTemplate() {
RestTemplate restTemplate = new RestTemplate(httpRequestFactory());
List<HttpMessageConverter<?>> converters = restTemplate
.getMessageConverters();

for (HttpMessageConverter<?> converter : converters) {
if (converter instanceof MappingJackson2HttpMessageConverter) {
MappingJackson2HttpMessageConverter jsonConverter = (MappingJackson2HttpMessageConverter) converter;
jsonConverter.setObjectMapper(objectMapper);
}
}
return restTemplate;
}

@Bean
public CloseableHttpClient httpClient() {
PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager();
connectionManager.setMaxTotal(DEFAULT_MAX_TOTAL_CONNECTIONS);
connectionManager
.setDefaultMaxPerRoute(DEFAULT_MAX_CONNECTIONS_PER_ROUTE);
connectionManager.setMaxPerRoute(new HttpRoute(new HttpHost(
"facebook.com")), 20);
connectionManager.setMaxPerRoute(new HttpRoute(new HttpHost(
"twitter.com")), 20);
connectionManager.setMaxPerRoute(new HttpRoute(new HttpHost(
"linkedin.com")), 20);
connectionManager.setMaxPerRoute(new HttpRoute(new HttpHost(
"viadeo.com")), 20);
RequestConfig config = RequestConfig.custom()
.setConnectTimeout(DEFAULT_READ_TIMEOUT_MILLISECONDS).build();

CloseableHttpClient defaultHttpClient = HttpClientBuilder.create()
.setConnectionManager(connectionManager)
.setDefaultRequestConfig(config).build();
return defaultHttpClient;
}

// ################################################### ASYNC
@Bean
public AsyncClientHttpRequestFactory asyncHttpRequestFactory() {
return new HttpComponentsAsyncClientHttpRequestFactory(
asyncHttpClient());
}

@Bean
public AsyncRestTemplate asyncRestTemplate() {
AsyncRestTemplate restTemplate = new AsyncRestTemplate(
asyncHttpRequestFactory(), restTemplate());
return restTemplate;
}

@Bean
public CloseableHttpAsyncClient asyncHttpClient() {
try {
PoolingNHttpClientConnectionManager connectionManager = new PoolingNHttpClientConnectionManager(
new DefaultConnectingIOReactor(IOReactorConfig.DEFAULT));
connectionManager.setMaxTotal(DEFAULT_MAX_TOTAL_CONNECTIONS);
connectionManager
.setDefaultMaxPerRoute(DEFAULT_MAX_CONNECTIONS_PER_ROUTE);
connectionManager.setMaxPerRoute(new HttpRoute(new HttpHost(
"facebook.com")), 20);
connectionManager.setMaxPerRoute(new HttpRoute(new HttpHost(
"twitter.com")), 20);
connectionManager.setMaxPerRoute(new HttpRoute(new HttpHost(
"linkedin.com")), 20);
connectionManager.setMaxPerRoute(new HttpRoute(new HttpHost(
"viadeo.com")), 20);
RequestConfig config = RequestConfig.custom()
.setConnectTimeout(DEFAULT_READ_TIMEOUT_MILLISECONDS)
.build();

CloseableHttpAsyncClient httpclient = HttpAsyncClientBuilder
.create().setConnectionManager(connectionManager)
.setDefaultRequestConfig(config).build();
return httpclient;
} catch (Exception e) {
throw Throwables.propagate(e);
}
}
}

As you can see, the AsyncRestTemplate uses an underlying RestTemplate. This template is used for all the configuration (http message converters, errors handler...).

And then test:


import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import javax.inject.Inject;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.support.AnnotationConfigContextLoader;
import org.springframework.web.client.AsyncRestTemplate;
import org.springframework.web.client.RestTemplate;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.JdkFutureAdapters;
import com.leguide.Feed.BindConfig;

@ContextConfiguration(loader = AnnotationConfigContextLoader.class, classes = {
HttpConfig.class, BindConfig.class })
@RunWith(SpringJUnit4ClassRunner.class)
public class Feed {

@Configuration
static class BindConfig {

@Bean
public ObjectMapper objectMapper() {
return new ObjectMapper();
}
}

@Inject
private RestTemplate restTemplate;

@Inject
private AsyncRestTemplate asyncRestTemplate;

@Test
public void sync() {

// Arrange
String url = "https://api.github.com/users/treydone";

// Actions
ResponseEntity<Map> entity = restTemplate.getForEntity(url, Map.class);

// Asserts
assertTrue(entity.getStatusCode().equals(HttpStatus.OK));
assertEquals("Treydone", entity.getBody().get("login"));
}

@Test
public void async_withFuture() throws InterruptedException,
ExecutionException {

// Arrange
String url = "https://api.github.com/users/treydone";

// Actions
Future<ResponseEntity<Map>> future = asyncRestTemplate.getForEntity(
url, Map.class);

while (!future.isDone()) {
TimeUnit.MILLISECONDS.sleep(100);
}

ResponseEntity<Map> entity = future.get();

// Asserts
assertTrue(entity.getStatusCode().equals(HttpStatus.OK));
assertEquals("Treydone", entity.getBody().get("login"));
}

@Test
public void async_withCallback() throws InterruptedException {

// Arrange
String url = "https://api.github.com/users/treydone";
ExecutorService executorService = Executors.newFixedThreadPool(1);

// Actions
Futures.addCallback(
JdkFutureAdapters.listenInPoolThread(
asyncRestTemplate.getForEntity(url, Map.class),
executorService),
new FutureCallback<ResponseEntity<Map>>() {

@Override
public void onSuccess(ResponseEntity<Map> entity) {
// Asserts
assertTrue(entity.getStatusCode().equals(HttpStatus.OK));
assertEquals("Treydone", entity.getBody().get("login"));
}

@Override
public void onFailure(Throwable t) {

}
}, executorService);

TimeUnit.SECONDS.sleep(3);
}
}

You can find more examples on Github: https://github.com/spring-projects/spring-framework/tree/master/spring-web/src/test/java/org/springframework/http/client


The new async API for RestTemplate is really easy to use and the transition between sync to async seems to be effortless. Spring continue to offers a good abstraction over http frameworks and the REST capacities powered by, among other, the underlying HttpMessageConverters, are also available in the async API, priceless!
But the API is still young, and maybe not as production-ready, as the async-http-client from Ning (https://github.com/AsyncHttpClient/async-http-client), which supports Grizzly, Netty and Apache as underlying providers, and is well documented (AsyncRestTemplate is not yet documented on spring.io:  http://docs.spring.io/spring/docs/4.0.x/spring-framework-reference/html/remoting.html#rest-resttemplate). Moreover, one big missing thing is a provided callback in the methods: all methods return a Future, but if you want to play with callbacks (Runnable or Callable) or ListenableFuture, you have to cheat with Guava, when others frameworks can do it natively:


<dependency>
<groupId>com.ning</groupId>
<artifactId>async-http-client</artifactId>
<version>1.7.20</version>
</dependency>



@Test
public void ning() throws InterruptedException, IOException {

// Arrange
String url = "https://api.github.com/users/treydone";
AsyncHttpClient client = new AsyncHttpClient();

// Actions
client.prepareGet(url).execute(new AsyncCompletionHandlerBase() {

@Override
public Response onCompleted(Response response) throws Exception {
// Asserts
assertTrue(response.getStatusCode() == HttpStatus.OK.value());
assertEquals(
"Treydone",
objectMapper.readValue(response.getResponseBody(),
Map.class).get("login"));
return super.onCompleted(response);
}

});

client.close();
}



Enjoy!

Source:
https://github.com/spring-projects/spring-framework/tree/master/spring-web/src/test/java/org/springframework/http/client
http://sonatype.github.io/async-http-client/
https://jira.springsource.org/browse/SPR-8804