High performance RSS/Atom parsing

Parsing RSS feeds is very easy in Java. Several libs exist to get the job done: feed4j, rssowl, Apache Abdera and many others. But the most commly used is ROME. ROME is a set of RSS and Atom Utilities for Java. It makes it easy to work in Java with most syndication formats: RSS 0.9x, 1.0, 2.0 and Atom 0.3, 1.0.

Reading RSS from a source is dead-simple, you need these dependencies:
<!-- Rome Atom+RSS -->
<dependency>
<groupId>net.java.dev.rome</groupId>
<artifactId>rome</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>net.java.dev.rome</groupId>
<artifactId>rome-fetcher</artifactId>
<version>1.0.0</version>
</dependency>

and a piece of code that looks like this:
        URL url = new URL("http://feeds.feedburner.com/manishchhabra27");
        HttpURLConnection httpcon = (HttpURLConnection)url.openConnection();
        // Reading the feed
        SyndFeedInput input = new SyndFeedInput();
        SyndFeed feed = input.build(new XmlReader(httpcon));
        List entries = feed.getEntries();
        Iterator itEntries = entries.iterator();

        while (itEntries.hasNext()) {
            SyndEntry entry = itEntries.next();
            System.out.println("Title: " + entry.getTitle());
            System.out.println("Link: " + entry.getLink());
            System.out.println("Author: " + entry.getAuthor());
            System.out.println("Publish Date: " + entry.getPublishedDate());
            System.out.println("Description: " + entry.getDescription().getValue());
            System.out.println();
        }
(from: http://blog.manishchhabra.com/2011/10/rome-library-example-for-parsing-rss-and-atom-feeds/)

But in a context of an enterprise-grade application, especially if you want build a "Google Reader"-like service , we just must NOT use ROME out-of-the-box. ROME can be tuned on many points.

Cache

ROME allows you to cache feed details by implementing the interface com.sun.syndication.fetcher.impl.FeedFetcherCache. By default, three classes implement this interface:
-com.sun.syndication.fetcher.impl.HashMapFeedInfoCache
-com.sun.syndication.fetcher.impl.LinkedHashMapFeedInfoCache
-com.sun.syndication.fetcher.impl.DiskFeedInfoCache

If you already have a cache (Infinispan, memcache, EhCache...), you can reuse it and benefit from its features (distribution, replication, time-to-live...). Instead of creating a specific implementation, let's create a generic class delegating the caching logic to a Spring cache manager. Then, you will be able to change the cache depending on your needs. Spring cache already has a Spring cache manager delegating to ehCache. You will find more implementation by Googling "spring cache 'the-name-of-the-awesome-cache-framework-you-use'", for instance:
- Infinispan (http://docs.jboss.org/infinispan/5.1/apidocs/org/infinispan/spring/provider/package-summary.html)
- Redis (http://docs.spring.io/spring-data/redis/docs/current/reference/html/redis.html#redis:support:cache-abstraction)
- Gemfire: http://docs.spring.io/spring-gemfire/docs/current/reference/html/apis.html#apis:spring-cache-abstraction
- Coherence (by Soat): http://blog.soat.fr/2012/08/spring-3-1-utiliser-labstraction-de-cache-2-le-retour/
...

So, choose your poison, and let's code the ROME FeedFetcherCache delegating to a Spring's cache manager:

import java.net.URL;
import javax.annotation.PostConstruct;
import org.springframework.cache.Cache;
import org.springframework.cache.Cache.ValueWrapper;
import org.springframework.cache.CacheManager;
import com.google.common.base.Preconditions;
import com.sun.syndication.fetcher.impl.FeedFetcherCache;
import com.sun.syndication.fetcher.impl.SyndFeedInfo;

public class SpringFeedFetcherCache implements FeedFetcherCache {

private String cacheName;

private CacheManager cacheManager;

public SpringFeedFetcherCache(String cacheName, CacheManager cacheManager) {
this.cacheName = cacheName;
this.cacheManager = cacheManager;
}

private Cache cache;

@PostConstruct
public void init() {
Preconditions.checkNotNull(cacheManager);
Preconditions.checkNotNull(cacheName);
cache = cacheManager.getCache(cacheName);
}

@Override
public SyndFeedInfo getFeedInfo(URL feedUrl) {
return get(feedUrl);
}

@Override
public void setFeedInfo(URL feedUrl, SyndFeedInfo syndFeedInfo) {
cache.put(feedUrl, syndFeedInfo);
}

@Override
public void clear() {
cache.clear();
}

@Override
public SyndFeedInfo remove(URL feedUrl) {
SyndFeedInfo syndFeedInfo = get(feedUrl);
cache.evict(feedUrl);
return syndFeedInfo;
}

private SyndFeedInfo get(URL feedUrl) {
ValueWrapper valueWrapper = cache.get(feedUrl);
if (valueWrapper != null) {
return (SyndFeedInfo) valueWrapper.get();
}
return null;
}
}

Not trap here...

Http connections

An another thing is that ROME use URLConnection or the old commons-httpclient to fetch the RSS. In your application you certainly have an instance of an HttpClient (httpcomponents-httpclient) from your social layer, mongodb java connector, rest framework or other. In all case, we can reuse it. So, let's implement a new AbstractFeedFetcher:
https://gist.github.com/Treydone/8860062

Putting the things together

The configuration for the httpclient:

@Configuration
public class HttpConfig {

private static final int DEFAULT_MAX_TOTAL_CONNECTIONS = 100;
private static final int DEFAULT_MAX_CONNECTIONS_PER_ROUTE = 5;
private static final int DEFAULT_READ_TIMEOUT_MILLISECONDS = (60 * 1000);

@Bean
public CloseableHttpClient httpClient() {
PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager();
connectionManager.setMaxTotal(DEFAULT_MAX_TOTAL_CONNECTIONS);
connectionManager
.setDefaultMaxPerRoute(DEFAULT_MAX_CONNECTIONS_PER_ROUTE);
connectionManager.setMaxPerRoute(new HttpRoute(new HttpHost(
"blogspot.com")), 20);
RequestConfig config = RequestConfig.custom()
.setConnectTimeout(DEFAULT_READ_TIMEOUT_MILLISECONDS).build();

CloseableHttpClient defaultHttpClient = HttpClientBuilder.create()
.setConnectionManager(connectionManager)
.setDefaultRequestConfig(config).build();
return defaultHttpClient;
}
}

for the RSS feed fetcher:

import javax.inject.Inject;
import org.apache.http.impl.client.CloseableHttpClient;
import org.springframework.cache.CacheManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.hangar2.syndic.HttpComponentsFeedFetcher;
import com.streaming.syndic.SpringFeedFetcherCache;
import com.sun.syndication.fetcher.FeedFetcher;
import com.sun.syndication.fetcher.impl.FeedFetcherCache;

@Configuration
public class SyndicConfig {

@Inject
private CacheManager cacheManager;

@Inject
private CloseableHttpClient httpClient;

@Bean
public FeedFetcher feedFetcher() {
HttpComponentsFeedFetcher httpFeedFetcher = new HttpComponentsFeedFetcher(
httpClient);
httpFeedFetcher.setFeedInfoCache(feedInfoCache());
httpFeedFetcher.setUserAgent("Bot");
return httpFeedFetcher;
}

@Bean
public FeedFetcherCache feedInfoCache() {
return new SpringFeedFetcherCache("rss", cacheManager);
}
}

and for the cache:

import javax.inject.Inject;
import org.springframework.cache.CacheManager;
import org.springframework.cache.annotation.CachingConfigurer;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.cache.concurrent.ConcurrentMapCacheManager;
import org.springframework.cache.ehcache.EhCacheCacheManager;
import org.springframework.cache.ehcache.EhCacheManagerFactoryBean;
import org.springframework.cache.interceptor.KeyGenerator;
import org.springframework.context.annotation.AdviceMode;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.core.io.ResourceLoader;

@Configuration
@EnableCaching(mode = AdviceMode.ASPECTJ)
public class CacheConfig {

@Configuration
@Profile("!test")
static class Default implements CachingConfigurer {

@Inject
private ResourceLoader resourceLoader;

@Bean
public KeyGenerator keyGenerator() {
return new ReflectionBasedKeyGenerator();
}

@Bean
public CacheManager cacheManager() {
EhCacheCacheManager ehCacheCacheManager = new EhCacheCacheManager();
try {
ehCacheCacheManager.setCacheManager(ehcacheCacheManager()
.getObject());
} catch (Exception e) {
throw new IllegalStateException(
"Failed to create an EhCacheManagerFactoryBean", e);
}
return ehCacheCacheManager;
}

@Bean
public EhCacheManagerFactoryBean ehcacheCacheManager() throws Exception {
EhCacheManagerFactoryBean bean = new EhCacheManagerFactoryBean();
bean.setShared(true);
bean.setConfigLocation(resourceLoader
.getResource("classpath:ehcache.xml"));
return bean;
}
}

@Configuration
@Profile(Profiles.TEST)
static class Test implements CachingConfigurer {

@Bean
public KeyGenerator keyGenerator() {
return new ReflectionBasedKeyGenerator();
}

@Bean
public CacheManager cacheManager() {
return new ConcurrentMapCacheManager();
}
}

}

Labels: , , , , ,