If it ain’t broke, make it faster.

I’m writing a sharding StatsD proxy as a way to learn the Rust programming language. This is just a toy project, but I’m treating it as if it were something I’d run in production where I care about performance.

This proxy can be used to spread StatsD message load across N downstream servers (ie. Telegraf with the StatsD input plugin). However, due to the way metrics are stored in a timeseries database like InfluxDB, we need to ensure consistent hashing. You can think of it as an AWS Load Balancer with sticky sessions enabled. The proxy must always forward the same StatsD message – a measurement and it’s unique tags+values – to the same downstream server.

My initial code worked fine, but let’s see how we can improve my hashing related logic.

// Example StatsD message: users.login,host=web1,country=US:1|c
pub fn hash1(&self, message: &str) -> u64 {
    let mut parts: Vec<&str> = self.re.split(message).collect();
    // Parts will be: ["users.login", "host=web1", "country=US", "1|c"]

    // Remove the measurement ...
    let measurement: &str = parts.remove(0);
    // Parts is now: ["host=web1", "country=US", "1|c"]
    
    // Remove the type and value
    let _measurement_type: &str = parts.pop().unwrap();
    // Parts is now: ["host=web1", "country=US"]
    
    // Sort tags so we can ensure consistent sharding
    parts.sort();
    // Parts is now: ["country=US", "host=web1"]

    // Push measurement back onto the front
    parts.insert(0, measurement);
    // Parts is now: ["users.login", "country=US", "host=web1"]

    // Join measurement and tags into a string we can hash to shards
    let shardable_metric = parts.join(",");
    // shardable_metric is now : "users.login,country=US,host=web1

    // djb2 hash
    let mut hash: u64 = 5381;
    for char in shardable_metric.chars() {
        hash = (hash << 5).wrapping_add(hash).wrapping_add(char as u64);
    }
    hash
}

As you can see, the code does several vector and string operations to transform a portion of the message into a string that we then hash. Amazingly, we can get rid of nearly all of the vector operations and sort a slice of the vector in place!

Here’s the updated code, moved to a new hash2 function:

// Example StatsD message: users.login,host=web1,country=US:1|c
pub fn hash2(&self, message: &str) -> u64 {
    let mut parts: Vec<&str> = self.re.split(message).collect();
    // Parts will be: ["users.login", "host=web1", "country=US", "1|c"]

    // Get index of last tag
    let l = parts.len() - 1;
    // Sort only if we have more than 1 tag
    if parts.len() > 3 {
        parts[1..l].sort();
        // Parts is now: ["users.login", "country=US", "host=web1", "1|c"]
    }
    let shardable_metric = parts[0..l].join(",");
    // shardable_metric is now : "users.login,country=US,host=web1

    // djb2 hash
    let mut hash: u64 = 5381;
    for char in shardable_metric.chars() {
        hash = (hash << 5).wrapping_add(hash).wrapping_add(char as u64);
    }
    hash
}

Now, we’ll review the rest of the code.

It seems unnecessary to create a shardable_metric string from elements of the parts vector just so we can hash it’s characters. We already have those characters in parts, so let’s try to hash directly from there instead.

Here’s the updated code, moved to a new hash3 function.

// Example StatsD message: users.login,host=web1,country=US:1|c
pub fn hash3(&self, message: &str) -> u64 {
    let mut parts: Vec<&str> = self.re.split(message).collect();
    // Parts will be: ["users.login", "host=web1", "country=US", "1|c"]
    
    // Get index of last tag
    let l = parts.len() - 1;
    // Sort only if we have more than 1 tag
    if parts.len() > 3 {
        parts[1..l].sort();
        // Parts is now: ["users.login", "country=US", "host=web1", "1|c"]
    }

    // Use djb2 hash across string slices within the vector.
    // Be sure to hash the commas that were taken out during RegEx split
    let mut hash: u64 = 5381;
    // Hash the measurement
    for char in parts[0].chars() {
        hash = (hash << 5).wrapping_add(hash).wrapping_add(char as u64);
    }
    // Now hash the tags and their preceding commas
    for part in parts[1..l].iter() {
        // Manually hash the comma that was taken out during RegEx split
        hash = (hash << 5).wrapping_add(hash).wrapping_add(',' as u64);
        for char in part.chars() {
            hash = (hash << 5).wrapping_add(hash).wrapping_add(char as u64);
        }
    }
    hash
}

This change makes the code longer, but it ends up being noticeably faster.

With Rust nightly, I can use cargo bench to measure the speedup.

$ cargo bench 
## ...
running 4 tests
test tests::test_hashing ... ignored
test tests::hash1 ... bench:     409,767 ns/iter (+/- 7,688)
test tests::hash2 ... bench:     405,508 ns/iter (+/- 1,794)
test tests::hash3 ... bench:     390,440 ns/iter (+/- 601)

The hash3 version is definitely faster, allowing me to process roughly an additional 200,000 messages per 10 seconds. My test machine has a 7th generation Intel Core i7 CPU, and I’m using 4 processing threads.

image

Here’s the code for one of the benchmarking test functions. The messages.txt file contains 1000 sample StatsD messages.

#[bench]
fn hash3(b: &mut Bencher) {
    b.iter(|| {
        let mut contents = String::new();
        match File::open("messages.txt") {
            Ok(mut file) => {
                match file.read_to_string(&mut contents) {
                    Ok(_) => println!("Yay"),
                    Err(_) => println!("Failed to read messages.txt")
                }
            },
            Err(e) => println!("Failed to open messages.txt {}", e)
        }
        let h = hashing::Hashing::new();
        for line in contents.lines() {
            let _ = h.hash3(line);
        }
    });
}

Thanks for reading! I hope you enjoyed the walkthrough.

You can find the full code for this project in my rust-sharding-statsd-proxy repository.