1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
|
<?php
namespace Wikimedia\WRStats;
/**
* Writers gather a batch of increment operations and then
* commit them when flush() is called, or when the writer is destroyed.
*
* @since 1.39
*/
class WRStatsWriter {
/** @var StatsStore */
private $store;
/** @var array<string,MetricSpec> */
private $metricSpecs;
/** @var float[][] Values indexed by TTL and storage key */
private $queuedValues = [];
/** @var float|int|null The UNIX timestamp used for the current time */
private $now;
/** @var string[] */
private $prefixComponents;
/**
* @internal Use WRStatsFactory::createWriter instead
* @param StatsStore $store
* @param array<string,array> $specs
* @param string|string[] $prefix
*/
public function __construct( StatsStore $store, $specs, $prefix ) {
$this->store = $store;
$this->metricSpecs = [];
foreach ( $specs as $name => $spec ) {
$this->metricSpecs[$name] = new MetricSpec( $spec );
}
$this->prefixComponents = is_array( $prefix ) ? $prefix : [ $prefix ];
if ( !count( $this->prefixComponents ) ) {
throw new WRStatsError( __METHOD__ .
': there must be at least one prefix component' );
}
}
/**
* Queue an increment operation.
*
* @param string $name The metric name
* @param EntityKey|null $entity Additional storage key components
* @param float|int $value The value to add
*/
public function incr( $name, ?EntityKey $entity = null, $value = 1 ) {
$metricSpec = $this->metricSpecs[$name] ?? null;
$entity ??= new LocalEntityKey;
if ( $metricSpec === null ) {
throw new WRStatsError( "Unrecognised metric \"$name\"" );
}
$res = $metricSpec->resolution;
$scaledValue = $value / $res;
foreach ( $metricSpec->sequences as $seqSpec ) {
$timeStep = $seqSpec->timeStep;
$timeBucket = (int)( $this->now() / $timeStep );
$key = $this->store->makeKey(
$this->prefixComponents,
[ $name, $seqSpec->name, $timeBucket ],
$entity
);
$ttl = $seqSpec->hardExpiry;
if ( !isset( $this->queuedValues[$ttl][$key] ) ) {
$this->queuedValues[$ttl][$key] = 0;
}
$this->queuedValues[$ttl][$key] += (int)round( $scaledValue );
}
}
/**
* Set the time to be used as the current time
*
* @param float|int $now
*/
public function setCurrentTime( $now ) {
$this->now = $now;
}
/**
* Reset the stored current time. In a long-running process this should be
* called regularly to write new results.
*
* @return void
*/
public function resetCurrentTime() {
$this->now = null;
}
/**
* @return float|int
*/
private function now() {
$this->now ??= microtime( true );
return $this->now;
}
/**
* Commit the batch of increment operations.
*/
public function flush() {
foreach ( $this->queuedValues as $ttl => $values ) {
$this->store->incr( $values, $ttl );
}
$this->queuedValues = [];
}
/**
* Commit the batch of increment operations.
*/
public function __destruct() {
$this->flush();
}
/**
* Delete all stored metrics corresponding to the specs supplied to the
* constructor, resetting the counters to zero.
*
* @param EntityKey[]|null $entities An array of additional storage key
* components. The default is the empty local entity.
*/
public function resetAll( ?array $entities = null ) {
$entities ??= [ new LocalEntityKey ];
$this->queuedValues = [];
$keys = [];
foreach ( $this->metricSpecs as $name => $metricSpec ) {
foreach ( $metricSpec->sequences as $seqSpec ) {
$timeStep = $seqSpec->timeStep;
$ttl = $seqSpec->hardExpiry;
$lastBucket = (int)( $this->now() / $timeStep ) + 1;
$firstBucket = (int)( ( $this->now() - $ttl ) / $timeStep ) - 1;
for ( $bucket = $firstBucket; $bucket <= $lastBucket; $bucket++ ) {
foreach ( $entities as $entity ) {
$keys[] = $this->store->makeKey(
$this->prefixComponents,
[ $name, $seqSpec->name, $bucket ],
$entity
);
}
}
}
}
$this->store->delete( $keys );
}
}
|