1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305
|
<?php
namespace Wikimedia\WRStats;
/**
* Readers gather a batch of read operations, returning
* promises. The batch is executed when the first promise is resolved.
*
* @since 1.39
*/
class WRStatsReader {
/** @var StatsStore */
private $store;
/** @var array<string,MetricSpec> */
private $metricSpecs;
/** @var string[] */
private $prefixComponents;
/** @var float|int|null The UNIX timestamp used for the current time */
private $now;
/** @var bool[] Storage keys ready to be fetched */
private $queuedKeys = [];
/** @var int[] Unscaled integers returned by the store, indexed by key */
private $cachedValues = [];
/**
* @internal Use WRStatsFactory::createReader instead
* @param StatsStore $store
* @param array<string,array> $specs
* @param string|string[] $prefix
*/
public function __construct( StatsStore $store, $specs, $prefix ) {
$this->store = $store;
$this->metricSpecs = [];
foreach ( $specs as $name => $spec ) {
$this->metricSpecs[$name] = new MetricSpec( $spec );
}
$this->prefixComponents = is_array( $prefix ) ? $prefix : [ $prefix ];
if ( !count( $this->prefixComponents ) ) {
throw new WRStatsError( __METHOD__ .
': there must be at least one prefix component' );
}
}
/**
* Get a TimeRange for some period ending at the current time. Note that
* this will use the same value of the current time for subsequent calls
* until resetCurrentTime() is called.
*
* @param int|float $numSeconds
* @return TimeRange
*/
public function latest( $numSeconds ) {
$now = $this->now();
return new TimeRange( $now - $numSeconds, $now );
}
/**
* Get a specified time range
*
* @param int|float $start The UNIX time of the start of the range
* @param int|float $end The UNIX time of the end of the range
* @return TimeRange
*/
public function timeRange( $start, $end ) {
return new TimeRange( $start, $end );
}
/**
* Queue a fetch operation.
*
* @param string $metricName The metric name, the key into $specs.
* @param EntityKey|null $entity Additional storage key components
* @param TimeRange $range The time range to fetch
* @return RatePromise
*/
public function getRate( $metricName, ?EntityKey $entity, TimeRange $range ) {
$metricSpec = $this->metricSpecs[$metricName] ?? null;
if ( $metricSpec === null ) {
throw new WRStatsError( "Unrecognised metric \"$metricName\"" );
}
$entity ??= new LocalEntityKey;
$now = $this->now();
$seqSpec = null;
foreach ( $metricSpec->sequences as $seqSpec ) {
$seqStart = $now - $seqSpec->softExpiry;
if ( $seqStart <= $range->start ) {
break;
}
}
if ( !$seqSpec ) {
// This check exists to make Phan happy.
// It should never fail since we apply normalization in MetricSpec::__construct()
throw new WRStatsError( 'There should have been at least one sequence' );
}
$timeStep = $seqSpec->timeStep;
$firstBucket = (int)( $range->start / $timeStep );
$lastBucket = (int)ceil( $range->end / $timeStep );
for ( $bucket = $firstBucket; $bucket <= $lastBucket; $bucket++ ) {
$key = $this->store->makeKey(
$this->prefixComponents,
[ $metricName, $seqSpec->name, $bucket ],
$entity
);
if ( !isset( $this->cachedValues[$key] ) ) {
$this->queuedKeys[$key] = true;
}
}
return new RatePromise( $this, $metricName, $entity, $metricSpec, $seqSpec, $range );
}
/**
* Queue a batch of fetch operations for different metrics with the same
* time range.
*
* @param string[] $metricNames
* @param EntityKey|null $entity
* @param TimeRange $range
* @return RatePromise[]
*/
public function getRates( $metricNames, ?EntityKey $entity, TimeRange $range ) {
$rates = [];
foreach ( $metricNames as $name ) {
$rates[$name] = $this->getRate( $name, $entity, $range );
}
return $rates;
}
/**
* Perform any queued fetch operations.
*/
public function fetch() {
if ( !$this->queuedKeys ) {
return;
}
$this->cachedValues += $this->store->query( array_keys( $this->queuedKeys ) );
$this->queuedKeys = [];
}
/**
* Set the current time to be used in latest() etc.
*
* @param int|float $now
*/
public function setCurrentTime( $now ) {
$this->now = $now;
}
/**
* Clear the current time so that it will be filled with the real current
* time on the next call.
*/
public function resetCurrentTime() {
$this->now = null;
}
/**
* @return float|int
*/
private function now() {
$this->now ??= microtime( true );
return $this->now;
}
/**
* @internal Utility for resolution in RatePromise
* @param string $metricName
* @param EntityKey $entity
* @param MetricSpec $metricSpec
* @param SequenceSpec $seqSpec
* @param TimeRange $range
* @return float|int
*/
public function internalGetCount(
$metricName,
EntityKey $entity,
MetricSpec $metricSpec,
SequenceSpec $seqSpec,
TimeRange $range
) {
$this->fetch();
$timeStep = $seqSpec->timeStep;
$firstBucket = (int)( $range->start / $timeStep );
$lastBucket = (int)( $range->end / $timeStep );
$now = $this->now();
$total = 0;
for ( $bucket = $firstBucket; $bucket <= $lastBucket; $bucket++ ) {
$key = $this->store->makeKey(
$this->prefixComponents,
[ $metricName, $seqSpec->name, $bucket ],
$entity
);
$value = $this->cachedValues[$key] ?? 0;
if ( !$value ) {
continue;
} elseif ( $bucket === $firstBucket ) {
if ( $bucket === $lastBucket ) {
// It can be assumed that there are zero events in the future
$bucketStartTime = $bucket * $timeStep;
$rateInterpolationEndTime = min( $bucketStartTime + $timeStep, $now );
$interpolationDuration = $rateInterpolationEndTime - $bucketStartTime;
if ( $interpolationDuration > 0 ) {
$total += $value * $range->getDuration() / $interpolationDuration;
}
} else {
$overlapDuration = max( ( $bucket + 1 ) * $timeStep - $range->start, 0 );
$total += $value * $overlapDuration / $timeStep;
}
} elseif ( $bucket === $lastBucket ) {
// It can be assumed that there are zero events in the future
$bucketStartTime = $bucket * $timeStep;
$rateInterpolationEndTime = min( $bucketStartTime + $timeStep, $now );
$overlapDuration = max( $range->end - $bucketStartTime, 0 );
$interpolationDuration = $rateInterpolationEndTime - $bucketStartTime;
if ( $overlapDuration === $interpolationDuration ) {
// Special case for 0/0 -- current time exactly on boundary.
$total += $value;
} elseif ( $interpolationDuration > 0 ) {
$total += $value * $overlapDuration / $interpolationDuration;
}
} else {
$total += $value;
}
}
// Round to nearest resolution step for nicer display
$rounded = round( $total ) * $metricSpec->resolution;
// Convert to integer if integer is expected
if ( is_int( $metricSpec->resolution ) ) {
$rounded = (int)$rounded;
}
return $rounded;
}
/**
* Resolve a batch of RatePromise objects, returning their counter totals,
* indexed as in the input array.
*
* @param array<mixed,RatePromise> $rates
* @return array<mixed,float|int>
*/
public function total( $rates ) {
$result = [];
foreach ( $rates as $key => $rate ) {
$result[$key] = $rate->total();
}
return $result;
}
/**
* Resolve a batch of RatePromise objects, returning their per-second rates.
*
* @param array<mixed,RatePromise> $rates
* @return array<mixed,float>
*/
public function perSecond( $rates ) {
$result = [];
foreach ( $rates as $key => $rate ) {
$result[$key] = $rate->perSecond();
}
return $result;
}
/**
* Resolve a batch of RatePromise objects, returning their per-minute rates.
*
* @param array<mixed,RatePromise> $rates
* @return array<mixed,float>
*/
public function perMinute( $rates ) {
$result = [];
foreach ( $rates as $key => $rate ) {
$result[$key] = $rate->perMinute();
}
return $result;
}
/**
* Resolve a batch of RatePromise objects, returning their per-hour rates.
*
* @param array<mixed,RatePromise> $rates
* @return array<mixed,float>
*/
public function perHour( $rates ) {
$result = [];
foreach ( $rates as $key => $rate ) {
$result[$key] = $rate->perHour();
}
return $result;
}
/**
* Resolve a batch of RatePromise objects, returning their per-day rates.
*
* @param array<mixed,RatePromise> $rates
* @return array<mixed,float>
*/
public function perDay( $rates ) {
$result = [];
foreach ( $rates as $key => $rate ) {
$result[$key] = $rate->perDay();
}
return $result;
}
}
|