I will describe our approach to unique user action counting using Cassandra. An action is a click, a page view, a phone call or anything similar. Each action connects an object and a user.

A counter for a specific action provides the following methods:

Sets of users and objects are effectively infinite, but they increase gradually -- new ones appear and former ones don't appear any more. We don't want to run out of space because of storing connections forever, so we need some expiration policy. More about it later.

Data model

Cassandra schema looks like this:

CREATE TABLE IF NOT EXISTS $TABLE(
  id varchar,
  t varchar,
  PRIMARY KEY (id, t))
WITH
  compaction = {
    'class': 'LeveledCompactionStrategy'
  } AND
  default_time_to_live = $TTL

id is an object id. t is a token identifying user.

As it follows from the schema:

Requests

We have only two kinds of requests.

A request connecting the object and the token:

INSERT INTO $TABLE(id, t) VALUES (:id, :t)

A request counting unique tokens associated with the object:

SELECT COUNT(1) as counter FROM $TABLE WHERE id = :id LIMIT $LIMIT

We use explicit counter limit to bound the response time in case of extremely (or artificially) popular object.

Both requests are executed with LOCAL_ONE consistency level.

DAO

The DAO uses requests considered before and is based on the official Cassandra Java driver.

Cassandra driver is configured the following way:

Cluster.builder()
  .addContactPoints(clusterNodes: _*)
  .withCompression(ProtocolOptions.Compression.LZ4)
  .withLoadBalancingPolicy(
    new TokenAwarePolicy(
      new DCAwareRoundRobinPolicy(
        localDataCenter,
        remoteNodes)))

The implementation uses Session.executeAsync() and is fully asynchronous.

HTTP API

HTTP API is quite simple and delegates to the DAO. Let's HTTP requests.

Get count of tokens connected with object object_id:

$ curl http://myhost/api/v1/counter/object_id
42

Connect token token to object object_id and return count of tokens:

$ curl -X PUT -d token http://myhost/api/v1/counter/object_id
43

When implementing HTTP API we used Spray.

Benchmarks

We have a two node Cassandra cluster. Each node is a 16 core (with HT) 64 GB SSD machine.

Cassandra keyspace configuration is the following:

replication = {'class': 'NetworkTopologyStrategy', 'DC1': '2'} AND durable_writes = true

HTTP API runs on a 32 core machine.

Test load

The test load depends on two parameters:

Each test request can be expressed like this:

$ curl -X PUT -d token http://myhost/api/v1/counter/object_id

Where token is a random long value and object_id is an int value between 0 and total number of objects chosen using Gaussian distribution to model hot (popular) objects.

100 objects and 10M tokens

Cassandra nodes' CPUs get fully saturated at 700 rps load. The stable load area:

700 rps

Some numbers:

1K objects and 10M tokens

Cassandra nodes' CPUs get fully saturated at 2.5 Krps load. The stable load area:

2.5 Krps

Some numbers:

Conclusion

We considered unique action counters on top of Cassandra storage. This implementation (with some minor changes and caching added) is used in all the user facing counters in Yandex.Auto, Yandex.Realty and Yandex.Rabota.

Next time we might consider:

comments powered by Disqus
Copyright © 2013-2017 Vadim TSesko