Counting Rows in Apache Cassandra

Update:As of today Apache Cassandra support Counter type and it should be used instead of this custom solution.

As you may already notice it is not possible to count number of rows in a column family.  In fact there is a request about that in Apache Cassandra bug database. Correctly doing this in a distributed database is a difficult thing. But a not so good but working solution could be implemented relatively easily. There are two alternative, first one removes row counting problem by changing structure of column families.

First of all you may even do not need to count number of rows, you may redesign your CF (column family)  and use columns to store information. Cassandra’s column names are different than column name of a relational database’s table. Unlike relational database’s every row may have different columns and number of columns for a row could be very large. Since Cassandra has a method that gives number of columns for a given row, counting columns instead of rows is a very easy operation provided that you could organize your CFs in that way.

But first solution may not be always applicable. Sometimes you really need to count number of rows.

Sample Problem:

Assume that we have a User column family and Post column family, and we want to display number of registered users and total number of posts for a specific user.

    ... (add new colum as user adds new posts)

As you can see counting number of posts for each user is an easy operation, because Cassandra can easily return number of columns within a super column with get_count method. Just count number of columns within posts super column.

But getting total number of users is not easy unless you change your CFs. Lets implement a simple counter for our web application. Since our system is distributed, more than one application server may be accessing the database. We need to partition data for each server instance, each server will have a separate counter and will only increment it’s own value.  Each servers value will be kept as a new column in Cassandra. But while returning counter’s value. A server will add all values of servers. Since an application server is a multi-threaded environment we have to synchronize access to our increment method.

This solution has several disadvantages. First incrementing counter requires disk IO. Second, since increment operation  is synchronized only one thread may perform increment operation within a server.

1 @Singleton 2  public class Counter { 3 4 @Inject Logger logger; //log4j loggers ar thread safe 5   6 @Inject CassandraCounterDAO counterDAO; //this class is also thread safe 7 8 /** 9 * increment counter value and return new result. 10 * @param dataItem identifier of the counter to be incremented. 11 * @return new counter value. 12 */ 13 public synchronized long increment(String dataItem) { 14 long current = counterDAO.getCounter(dataItem); 15 current +=1; 16 counterDAO.setCounter(dataItem, current); 17 return current; 18 } 19 }

1 @Singleton 2 public class CassandraCounterDAO extends CassandraAbstractDAO { 3 4 public static final String COUNTER_DATA_SET = "Counter"; 5 private String hostname; 6 7 @PostConstruct 8 public void init() { 9 try { 10 hostname = InetAddress.getLocalHost().getHostName(); 11 } catch (UnknownHostException ex) { 12 logger.error("Can get hostname", ex); 13 throw new RuntimeException(ex); 14 } 15 } 16 17 /** 18 * Returns value of counter for given key. 19 * @param key identifier of counter. 20 * @return 21 */ 22 public long getCounter(String key) { 23 try { 24 Selector selector = Pelops.createSelector(MY_POOL, MY_KEYSPACE); 25 SlicePredicate columnSlicePredicate = Selector.newColumnsPredicateAll(false, Integer.MAX_VALUE); 26 List<Column> columns = selector.getColumnsFromRow(key, COUNTER_DATA_SET, columnSlicePredicate, ConsistencyLevel.QUORUM); 27 long total = 0; 28 for (Column column : columns) { 29 total += toLong(column); 30 } 31 return total; 32 } catch (Exception ex) { 33 logger.error("Can not query Counter", ex); 34 throw new RuntimeException(ex); 35 } 36 } 37 38 /** 39 * Increments value of counter and returns new value. 40 * @param key identifier ofcounter. 41 * @return 42 */ 43 public void setCounter(String key, long value) { 44 try { 45 Mutator mutator = Pelops.createMutator(MY_POOL, MY_KEYSPACE); 46 mutator.newColumn(hostname, toBytes(value)); 47 mutator.execute(ConsistencyLevel.ALL); 48 } catch (Exception ex) { 49 throw new RuntimeException(ex); 50 } 51 } 52 } 53

VN:F [1.9.22_1171]
Rating: 9.0/10 (2 votes cast)