java - non-locking threading code using atomic types when implementing a sliding window class for time -
i trying understand code yammer metrics. confusion starts trim method , call trim in both update , getsnapshot. explain logic here 15 min sliding window? why want clear map before passing snapshot (this stats of window calculated).
package com.codahale.metrics; import java.util.concurrent.concurrentskiplistmap; import java.util.concurrent.timeunit; import java.util.concurrent.atomic.atomiclong; public class slidingtimewindowreservoir implements reservoir { // allow many duplicate ticks before overwriting measurements private static final int collision_buffer = 256; // trim on updating once every n private static final int trim_threshold = 256; private final clock clock; private final concurrentskiplistmap<long, long> measurements; private final long window; private final atomiclong lasttick; private final atomiclong count; public slidingtimewindowreservoir(long window, timeunit windowunit) { this(window, windowunit, clock.defaultclock()); } public slidingtimewindowreservoir(long window, timeunit windowunit, clock clock) { this.clock = clock; this.measurements = new concurrentskiplistmap<long, long>(); this.window = windowunit.tonanos(window) * collision_buffer; this.lasttick = new atomiclong(); this.count = new atomiclong(); } @override public int size() { trim(); return measurements.size(); } @override public void update(long value) { if (count.incrementandget() % trim_threshold == 0) { trim(); } measurements.put(gettick(), value); } @override public snapshot getsnapshot() { trim(); return new snapshot(measurements.values()); } private long gettick() { (; ; ) { final long oldtick = lasttick.get(); final long tick = clock.gettick() * collision_buffer; // ensure tick strictly incrementing if there duplicate ticks final long newtick = tick > oldtick ? tick : oldtick + 1; if (lasttick.compareandset(oldtick, newtick)) { return newtick; } } } private void trim() { measurements.headmap(gettick() - window).clear(); } }
two bits of information documentation
concurrentskiplistmap
sorted according natural ordering of keys
that's datastructure hold measurements. key here long current time. -> measurements indexed time sorted time.
.headmap(k tokey)
returns view of portion of map keys strictly lesstokey
.
the magic code in gettick
makes sure 1 time value never used twice (simply takes oldtick + 1
if happen). collision_buffer
bit tricky understand it's ensuring through clock#gettick()
returns same value new values don't collide next tick clock.
e.g. clock.gettick()
returns 0 -> modified 0 * 256 = 0
clock.gettick()
returns 1 -> modified 1 * 256 = 256
-> 256 values room in between.
now trim()
does
measurements.headmap(gettick() - window).clear();
this calculates "current time", subtracts time window , uses time portion of map older "window ticks ago". clearing portion clear in original map. it's not clearing whole map, part.
-> trim removes values old.
each time update
need remove old values or map gets large. when creating snapshot
same things happens old values not included.
the endless loop in gettick
trick use atomic compare , set method ensure - once ready update value - nothing has changed value in between. if happens, whole loop starts on & refreshes it's starting value. basic schema is
for (; ; ) { long expectedoldvalue = atomic.get(); // other threads can change value of atomic here.. long modified = modify(expectedoldvalue); // can set new value if old 1 still same if (atomic.compareandset(expectedoldvalue, modified)) { return modified; } }
Comments
Post a Comment