1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844
|
<?php
/**
* Matomo - free/libre analytics platform
*
* @link https://matomo.org
* @license https://www.gnu.org/licenses/gpl-3.0.html GPL v3 or later
*/
namespace Piwik;
use Exception;
use Piwik\Archive\DataTableFactory;
use Piwik\ArchiveProcessor\Parameters;
use Piwik\ArchiveProcessor\Rules;
use Piwik\Container\StaticContainer;
use Piwik\DataAccess\ArchiveWriter;
use Piwik\DataAccess\LogAggregator;
use Piwik\DataTable\Manager;
use Piwik\DataTable\Map;
use Piwik\DataTable\Row;
use Piwik\Segment\SegmentExpression;
use Piwik\Log\LoggerInterface;
/**
* Used by {@link Piwik\Plugin\Archiver} instances to insert and aggregate archive data.
*
* ### See also
*
* - **{@link Piwik\Plugin\Archiver}** - to learn how plugins should implement their own analytics
* aggregation logic.
* - **{@link Piwik\DataAccess\LogAggregator}** - to learn how plugins can perform data aggregation
* across Piwik's log tables.
*
* ### Examples
*
* **Inserting numeric data**
*
* // function in an Archiver descendant
* public function aggregateDayReport()
* {
* $archiveProcessor = $this->getProcessor();
*
* $myFancyMetric = // ... calculate the metric value ...
* $archiveProcessor->insertNumericRecord('MyPlugin_myFancyMetric', $myFancyMetric);
* }
*
* **Inserting serialized DataTables**
*
* // function in an Archiver descendant
* public function aggregateDayReport()
* {
* $archiveProcessor = $this->getProcessor();
*
* $maxRowsInTable = Config::getInstance()->General['datatable_archiving_maximum_rows_standard'];j
*
* $dataTable = // ... build by aggregating visits ...
* $serializedData = $dataTable->getSerialized($maxRowsInTable, $maxRowsInSubtable = $maxRowsInTable,
* $columnToSortBy = Metrics::INDEX_NB_VISITS);
*
* $archiveProcessor->insertBlobRecords('MyPlugin_myFancyReport', $serializedData);
* }
*
* **Aggregating archive data**
*
* // function in Archiver descendant
* public function aggregateMultipleReports()
* {
* $archiveProcessor = $this->getProcessor();
*
* // aggregate a metric
* $archiveProcessor->aggregateNumericMetrics('MyPlugin_myFancyMetric');
* $archiveProcessor->aggregateNumericMetrics('MyPlugin_mySuperFancyMetric', 'max');
*
* // aggregate a report
* $archiveProcessor->aggregateDataTableRecords('MyPlugin_myFancyReport');
* }
*
*/
class ArchiveProcessor
{
/**
* @var bool
*/
public static $isRootArchivingRequest = true;
/**
* @var \Piwik\DataAccess\ArchiveWriter
*/
private $archiveWriter;
/**
* @var \Piwik\DataAccess\LogAggregator
*/
private $logAggregator;
/**
* @var Archive
*/
public $archive = null;
/**
* @var Parameters
*/
private $params;
/**
* @var int
*/
private $numberOfVisits = false;
private $numberOfVisitsConverted = false;
private $processedDependentSegments = [];
public function __construct(Parameters $params, ArchiveWriter $archiveWriter, LogAggregator $logAggregator)
{
$this->params = $params;
$this->logAggregator = $logAggregator;
$this->archiveWriter = $archiveWriter;
}
protected function getArchive()
{
if (empty($this->archive)) {
$subPeriods = $this->params->getSubPeriods();
$idSites = $this->params->getIdSites();
$this->archive = Archive::factory($this->params->getSegment(), $subPeriods, $idSites);
/**
* @internal
*/
Piwik::postEvent('ArchiveProcessor.getArchive', [$this->archive]);
}
return $this->archive;
}
public function setNumberOfVisits($visits, $visitsConverted)
{
$this->numberOfVisits = $visits;
$this->numberOfVisitsConverted = $visitsConverted;
}
/**
* Returns the {@link Parameters} object containing the site, period and segment we're archiving
* data for.
*
* @return Parameters
* @api
*/
public function getParams()
{
return $this->params;
}
/**
* Returns a `{@link Piwik\DataAccess\LogAggregator}` instance for the site, period and segment this
* ArchiveProcessor will insert archive data for.
*
* @return LogAggregator
* @api
*/
public function getLogAggregator()
{
return $this->logAggregator;
}
/**
* Array of (column name before => column name renamed) of the columns for which sum operation is invalid.
* These columns will be renamed as per this mapping.
* @var array
*/
protected static $columnsToRenameAfterAggregation = array(
Metrics::INDEX_NB_UNIQ_VISITORS => Metrics::INDEX_SUM_DAILY_NB_UNIQ_VISITORS,
Metrics::INDEX_NB_USERS => Metrics::INDEX_SUM_DAILY_NB_USERS,
);
/**
* Sums records for every subperiod of the current period and inserts the result as the record
* for this period.
*
* DataTables are summed recursively so subtables will be summed as well.
*
* @param string|array $recordNames Name(s) of the report we are aggregating, eg, `'Referrers_type'`.
* @param int $maximumRowsInDataTableLevelZero Maximum number of rows allowed in the top level DataTable.
* @param int $maximumRowsInSubDataTable Maximum number of rows allowed in each subtable.
* @param string|null $defaultColumnToSortByBeforeTruncation The name of the column to sort by before truncating a DataTable.
* If not set, and the table contains nb_visits or INDEX_NB_VISITS, we will
* sort by visits.
* @param array $columnsAggregationOperation Operations for aggregating columns, see {@link Row::sumRow()}.
* @param array $columnsToRenameAfterAggregation Columns mapped to new names for columns that must change names
* when summed because they cannot be summed, eg,
* `array('nb_uniq_visitors' => 'sum_daily_nb_uniq_visitors')`.
* @param string[]|bool $countRowsRecursive array of recordNames that defines for which ones you need a recursive row count, or true if it should be done for all
* @param string[] $countLeafRows array of recordNames that defines for which ones you need a leaf row count.
* @return array Returns the row counts of each aggregated report before truncation, eg,
*
* array(
* 'report1' => array('level0' => $report1->getRowsCount,
* 'recursive' => $report1->getRowsCountRecursive()),
* 'report2' => array('level0' => $report2->getRowsCount,
* 'recursive' => $report2->getRowsCountRecursive()),
* ...
* )
* @api
*/
public function aggregateDataTableRecords(
$recordNames,
$maximumRowsInDataTableLevelZero = null,
$maximumRowsInSubDataTable = null,
$defaultColumnToSortByBeforeTruncation = null,
&$columnsAggregationOperation = null,
$columnsToRenameAfterAggregation = null,
$countRowsRecursive = true,
array $countLeafRows = []
) {
/** @var LoggerInterface $logger */
$logger = StaticContainer::get(LoggerInterface::class);
if (!is_array($recordNames)) {
$recordNames = array($recordNames);
}
$archiveDescription = $this->params . '';
$nameToCount = array();
foreach ($recordNames as $recordName) {
$latestUsedTableId = Manager::getInstance()->getMostRecentTableId();
$logger->debug("aggregating record {record} [archive = {archive}]", [
'record' => $recordName,
'archive' => $archiveDescription,
]);
$table = $this->aggregateDataTableRecord($recordName, $columnsAggregationOperation, $columnsToRenameAfterAggregation);
$nameToCount[$recordName]['level0'] = $table->getRowsCount();
if ($countRowsRecursive === true || (is_array($countRowsRecursive) && in_array($recordName, $countRowsRecursive))) {
$nameToCount[$recordName]['recursive'] = $table->getRowsCountRecursive();
}
if (in_array($recordName, $countLeafRows)) {
$nameToCount[$recordName]['leafs'] = $table->getLeafRowsCount();
}
$columnToSortByBeforeTruncation = $defaultColumnToSortByBeforeTruncation;
if (empty($columnToSortByBeforeTruncation)) {
$columns = $table->getColumns();
if (in_array(Metrics::INDEX_NB_VISITS, $columns)) {
$columnToSortByBeforeTruncation = Metrics::INDEX_NB_VISITS;
} elseif (in_array('nb_visits', $columns)) {
$columnToSortByBeforeTruncation = 'nb_visits';
}
}
$blob = $table->getSerialized($maximumRowsInDataTableLevelZero, $maximumRowsInSubDataTable, $columnToSortByBeforeTruncation);
Common::destroy($table);
$this->insertBlobRecord($recordName, $blob);
unset($blob);
DataTable\Manager::getInstance()->deleteAll($latestUsedTableId);
}
return $nameToCount;
}
/**
* Aggregates one or more metrics for every subperiod of the current period and inserts the results
* as metrics for the current period.
*
* @param array|string $columns Array of metric names to aggregate.
* @param string|string[]|false $operationsToApply The operation to apply to the metric. Either `'sum'`, `'max'` or `'min'`.
* Can also be an array mapping record names to operations.
* @return array|int Returns the array of aggregate values. If only one metric was aggregated,
* the aggregate value will be returned as is, not in an array.
* For example, if `array('nb_visits', 'nb_hits')` is supplied for `$columns`,
*
* array(
* 'nb_visits' => 3040,
* 'nb_hits' => 405
* )
*
* could be returned. If `array('nb_visits')` or `'nb_visits'` is used for `$columns`,
* then `3040` would be returned.
* @api
*/
public function aggregateNumericMetrics($columns, $operationsToApply = false)
{
$metrics = $this->getAggregatedNumericMetrics($columns, $operationsToApply);
foreach ($metrics as $column => $value) {
$this->insertNumericRecord($column, $value);
}
// if asked for only one field to sum
if (count($metrics) === 1) {
return reset($metrics);
}
// returns the array of records once summed
return $metrics;
}
public function getNumberOfVisits()
{
if ($this->numberOfVisits === false) {
throw new Exception("visits should have been set here");
}
return $this->numberOfVisits;
}
public function getNumberOfVisitsConverted()
{
return $this->numberOfVisitsConverted;
}
/**
* Caches multiple numeric records in the archive for this processor's site, period
* and segment.
*
* @param array $numericRecords A name-value mapping of numeric values that should be
* archived, eg,
*
* array('Referrers_distinctKeywords' => 23, 'Referrers_distinctCampaigns' => 234)
* @api
*/
public function insertNumericRecords($numericRecords)
{
foreach ($numericRecords as $name => $value) {
$this->insertNumericRecord($name, $value);
}
}
/**
* Caches a single numeric record in the archive for this processor's site, period and
* segment.
*
* Numeric values are not inserted if they equal `0`.
*
* @param string $name The name of the numeric value, eg, `'Referrers_distinctKeywords'`.
* @param float|null $value The numeric value.
* @api
*/
public function insertNumericRecord($name, $value)
{
$value = round($value ?? 0, 2);
$value = Common::forceDotAsSeparatorForDecimalPoint($value);
$this->archiveWriter->insertRecord($name, $value);
}
/**
* Caches one or more blob records in the archive for this processor's site, period
* and segment.
*
* @param string $name The name of the record, eg, 'Referrers_type'.
* @param string|array $values A blob string or an array of blob strings. If an array
* is used, the first element in the array will be inserted
* with the `$name` name. The others will be inserted with
* `$name . '_' . $index` as the record name (where $index is
* the index of the blob record in `$values`).
* @api
*/
public function insertBlobRecord($name, $values)
{
$this->archiveWriter->insertBlobRecord($name, $values);
}
/**
* This method selects all DataTables that have the name $name over the period.
* All these DataTables are then added together, and the resulting DataTable is returned.
*
* @param string $name
* @param array $columnsAggregationOperation Operations for aggregating columns, @see Row::sumRow()
* @param array $columnsToRenameAfterAggregation columns in the array (old name, new name) to be renamed as the sum operation is not valid on them (eg. nb_uniq_visitors->sum_daily_nb_uniq_visitors)
* @return DataTable
*/
protected function aggregateDataTableRecord($name, $columnsAggregationOperation = null, $columnsToRenameAfterAggregation = null)
{
try {
ErrorHandler::pushFatalErrorBreadcrumb(__CLASS__, ['name' => $name]);
$blobs = $this->getArchive()->querySingleBlob($name);
$dataTable = $this->getAggregatedDataTableMapFromBlobs($blobs, $columnsAggregationOperation, $columnsToRenameAfterAggregation, $name);
} finally {
ErrorHandler::popFatalErrorBreadcrumb();
}
return $dataTable;
}
protected function getAggregatedDataTableMapFromBlobs(\Iterator $dataTableBlobs, $columnsAggregationOperation, $columnsToRenameAfterAggregation, $name)
{
// maps period & subtable ID in database to the Row instance in $result that subtable should be added to when encountered
// [$row['date1'].','.$row['date2']][$tableId] = $row in $result
/** @var Row[][] */
$tableIdToResultRowMapping = [];
$result = new DataTable();
if (!empty($columnsAggregationOperation)) {
$result->setMetadata(DataTable::COLUMN_AGGREGATION_OPS_METADATA_NAME, $columnsAggregationOperation);
}
foreach ($dataTableBlobs as $archiveDataRow) {
$period = $archiveDataRow['date1'] . ',' . $archiveDataRow['date2'];
$tableId = $archiveDataRow['name'] == $name ? null : $this->getSubtableIdFromBlobName($archiveDataRow['name']);
$blobTable = DataTable::fromSerializedArray($archiveDataRow['value']);
// see https://github.com/piwik/piwik/issues/4377
$blobTable->filter(function ($table) use ($columnsToRenameAfterAggregation) {
if ($this->areColumnsNotAlreadyRenamed($table)) {
/**
* This makes archiving and range dates a lot faster. Imagine we archive a week, then we will
* rename all columns of each 7 day archives. Afterwards we know the columns will be replaced in a
* week archive. When generating month archives, which uses mostly week archives, we do not have
* to replace those columns for the week archives again since we can be sure they were already
* replaced. Same when aggregating year and range archives. This can save up 10% or more when
* aggregating Month, Year and Range archives.
*/
$this->renameColumnsAfterAggregation($table, $columnsToRenameAfterAggregation);
}
});
$tableToAddTo = null;
if ($tableId === null) {
$tableToAddTo = $result;
} elseif (empty($tableIdToResultRowMapping[$period][$tableId])) { // sanity check
StaticContainer::get(LoggerInterface::class)->info(
'Unexpected state when aggregating DataTable, unknown period/table ID combination encountered: {period} - {tableId}.'
. ' This either means the SQL to order blobs is behaving incorrectly or the blob data is corrupt in some way.',
[
'period' => $period,
'tableId' => $tableId,
]
);
continue;
} else {
$rowToAddTo = $tableIdToResultRowMapping[$period][$tableId];
if (!$rowToAddTo->getIdSubDataTable()) {
$newTable = new DataTable();
$newTable->setMetadata(DataTable::COLUMN_AGGREGATION_OPS_METADATA_NAME, $columnsAggregationOperation);
$rowToAddTo->setSubtable($newTable);
}
$tableToAddTo = $rowToAddTo->getSubtable();
}
$tableToAddTo->addDataTable($blobTable);
// add subtable IDs for $blobTableRow to $tableIdToResultRowMapping
foreach ($blobTable->getRows() as $blobTableRow) {
$label = $blobTableRow->getColumn('label');
$subtableId = $blobTableRow->getIdSubDataTable();
if (empty($subtableId)) {
continue;
}
$rowToAddTo = $tableToAddTo->getRowFromLabel($label);
$tableIdToResultRowMapping[$period][$subtableId] = $rowToAddTo;
}
Common::destroy($blobTable);
unset($blobTable);
}
return $result;
}
private function getSubtableIdFromBlobName($recordName)
{
$parts = explode('_', $recordName);
$id = end($parts);
if (is_numeric($id)) {
return $id;
}
return null;
}
/**
* Note: public only for use in closure in PHP 5.3.
*
* @param $table
* @return \Piwik\Period
*/
public function areColumnsNotAlreadyRenamed($table)
{
$period = $table->getMetadata(DataTableFactory::TABLE_METADATA_PERIOD_INDEX);
return !$period || $period->getLabel() === 'day';
}
protected function getOperationForColumns($columns, $defaultOperation)
{
$operationForColumn = array();
foreach ($columns as $name) {
$operation = is_array($defaultOperation) ? ($defaultOperation[$name] ?? null) : $defaultOperation;
if (empty($operation)) {
$operation = $this->guessOperationForColumn($name);
}
$operationForColumn[$name] = $operation;
}
return $operationForColumn;
}
protected function enrichWithUniqueVisitorsMetric(Row $row)
{
if (
$row->getColumn('nb_uniq_visitors') === false
&& $row->getColumn('nb_users') === false
) {
return;
}
$periodLabel = $this->getParams()->getPeriod()->getLabel();
if (!SettingsPiwik::isUniqueVisitorsEnabled($periodLabel)) {
$row->deleteColumn('nb_uniq_visitors');
$row->deleteColumn('nb_users');
return;
}
$sites = $this->getIdSitesToComputeNbUniques();
if (count($sites) > 1 && Rules::shouldSkipUniqueVisitorsCalculationForMultipleSites()) {
if ($periodLabel != 'day') {
// for day we still keep the aggregated metric but for other periods we remove it as it becomes to
// inaccurate
$row->deleteColumn('nb_uniq_visitors');
$row->deleteColumn('nb_users');
}
return;
}
if (empty($sites)) {
// a plugin disabled running below query by removing all sites.
$row->deleteColumn('nb_uniq_visitors');
$row->deleteColumn('nb_users');
return;
}
if (count($sites) === 1) {
$uniqueVisitorsMetric = Metrics::INDEX_NB_UNIQ_VISITORS;
} else {
if (!SettingsPiwik::isSameFingerprintAcrossWebsites()) {
throw new Exception("Processing unique visitors across websites is enabled for this instance,
but to process this metric you must first set enable_fingerprinting_across_websites=1
in the config file, under the [Tracker] section.");
}
$uniqueVisitorsMetric = Metrics::INDEX_NB_UNIQ_FINGERPRINTS;
}
$metrics = array(
Metrics::INDEX_NB_USERS,
$uniqueVisitorsMetric,
);
$uniques = $this->computeNbUniques($metrics, $sites);
// see edge case as described in https://github.com/piwik/piwik/issues/9357 where uniq_visitors might be higher
// than visits because we archive / process it after nb_visits. Between archiving nb_visits and nb_uniq_visitors
// there could have been a new visit leading to a higher nb_unique_visitors than nb_visits which is not possible
// by definition. In this case we simply use the visits metric instead of unique visitors metric.
$visits = $row->getColumn('nb_visits');
if ($visits !== false && $uniques[$uniqueVisitorsMetric] !== false) {
$uniques[$uniqueVisitorsMetric] = min($uniques[$uniqueVisitorsMetric], $visits);
}
$row->setColumn('nb_uniq_visitors', $uniques[$uniqueVisitorsMetric]);
$row->setColumn('nb_users', $uniques[Metrics::INDEX_NB_USERS]);
}
protected function guessOperationForColumn($column)
{
if (strpos($column, 'max_') === 0) {
return 'max';
}
if (strpos($column, 'min_') === 0) {
return 'min';
}
return 'sum';
}
private function getIdSitesToComputeNbUniques()
{
$params = $this->getParams();
$sites = array($params->getSite()->getId());
/**
* Triggered to change which site ids should be looked at when processing unique visitors and users.
*
* @param array &$idSites An array with one idSite. This site is being archived currently. To cancel the query
* you can change this value to an empty array. To include other sites in the query you
* can add more idSites to this list of idSites.
* @param Period $period The period that is being requested to be archived.
* @param Segment $segment The segment that is request to be archived.
*/
Piwik::postEvent('ArchiveProcessor.ComputeNbUniques.getIdSites', array(&$sites, $params->getPeriod(), $params->getSegment()));
return $sites;
}
/**
* Processes number of unique visitors for the given period
*
* This is the only Period metric (ie. week/month/year/range) that we process from the logs directly,
* since unique visitors cannot be summed like other metrics.
*
* @param array $metrics Metrics Ids for which to aggregates count of values
* @param int[] $sites A list of idSites that should be included
* @return array|null An array of metrics, where the key is metricid and the value is the metric value or null if
* the query was cancelled and not executed.
*/
protected function computeNbUniques($metrics, $sites)
{
$logAggregator = $this->getLogAggregator();
$sitesBackup = $logAggregator->getSites();
$logAggregator->setSites($sites);
try {
$query = $logAggregator->queryVisitsByDimension(array(), false, array(), $metrics);
} finally {
$logAggregator->setSites($sitesBackup);
}
$data = $query->fetch();
return $data;
}
/**
* If the DataTable is a Map, sums all DataTable in the map and return the DataTable.
*
*
* @param $data DataTable|DataTable\Map
* @param $columnsToRenameAfterAggregation array
* @return DataTable
*/
protected function getAggregatedDataTableMap($data, $columnsAggregationOperation)
{
$table = new DataTable();
if (!empty($columnsAggregationOperation)) {
$table->setMetadata(DataTable::COLUMN_AGGREGATION_OPS_METADATA_NAME, $columnsAggregationOperation);
}
if ($data instanceof DataTable\Map) {
// as $date => $tableToSum
$this->aggregatedDataTableMapsAsOne($data, $table);
} else {
$table->addDataTable($data);
}
return $table;
}
/**
* Aggregates the DataTable\Map into the destination $aggregated
* @param $map
* @param $aggregated
*/
protected function aggregatedDataTableMapsAsOne(Map $map, DataTable $aggregated)
{
foreach ($map->getDataTables() as $tableToAggregate) {
if ($tableToAggregate instanceof Map) {
$this->aggregatedDataTableMapsAsOne($tableToAggregate, $aggregated);
} else {
$aggregated->addDataTable($tableToAggregate);
}
}
}
/**
* Note: public only for use in closure in PHP 5.3.
*/
public function renameColumnsAfterAggregation(DataTable $table, $columnsToRenameAfterAggregation = null)
{
// Rename columns after aggregation
if (is_null($columnsToRenameAfterAggregation)) {
$columnsToRenameAfterAggregation = self::$columnsToRenameAfterAggregation;
}
if (empty($columnsToRenameAfterAggregation)) {
return;
}
foreach ($table->getRows() as $row) {
foreach ($columnsToRenameAfterAggregation as $oldName => $newName) {
$row->renameColumn($oldName, $newName);
}
$subTable = $row->getSubtable();
if ($subTable) {
$this->renameColumnsAfterAggregation($subTable, $columnsToRenameAfterAggregation);
}
}
}
protected function getAggregatedNumericMetrics($columns, $operationsToApply)
{
if (!is_array($columns)) {
$columns = array($columns);
}
$operationForColumn = $this->getOperationForColumns($columns, $operationsToApply);
$dataTable = $this->getArchive()->getDataTableFromNumeric($columns);
if ($dataTable->wasBuiltWithoutArchives()) {
return (new Row())->getColumns();
}
$results = $this->getAggregatedDataTableMap($dataTable, $operationForColumn);
if ($results->getRowsCount() > 1) {
throw new Exception("A DataTable is an unexpected state:" . var_export($results, true));
}
$rowMetrics = $results->getFirstRow();
if ($rowMetrics === false) {
$rowMetrics = new Row();
}
$this->enrichWithUniqueVisitorsMetric($rowMetrics);
$this->renameColumnsAfterAggregation($results, self::$columnsToRenameAfterAggregation);
$metrics = $rowMetrics->getColumns();
foreach ($columns as $name) {
if (!isset($metrics[$name])) {
$metrics[$name] = 0;
}
}
return $metrics;
}
/**
* Initiate archiving for a plugin during an ongoing archiving. The plugin can be another
* plugin or the same plugin.
*
* This method should be called during archiving when one plugin uses the report of another
* plugin with a segment. It will ensure reports for that segment & plugin will be archived
* without initiating archiving for every plugin with that segment (which would be a performance
* killer).
*
* @param string $plugin
* @param string $segment
*/
public function processDependentArchive($plugin, $segment)
{
if (!self::$isRootArchivingRequest) { // prevent all recursion
return;
}
$params = $this->getParams();
// range archives are always processed on demand, so pre-processing dependent archives is not required
// here
if (Rules::shouldProcessOnlyReportsRequestedInArchiveQuery($params->getPeriod()->getLabel())) {
return;
}
$idSites = [$params->getSite()->getId()];
// important to use the original segment string when combining. As the API itself would combine the original string.
// this prevents a bug where the API would use the segment
// userId!@%2540matomo.org;userId!=hello%2540matomo.org;visitorType==new
// vs here we would use
// userId!@%40matomo.org;userId!=hello%40matomo.org;visitorType==new
// thus these would result in different segment hashes and therefore the reports would either show 0 or archive the data twice
$originSegmentString = $params->getSegment()->getOriginalString();
$newSegment = Segment::combine($originSegmentString, SegmentExpression::AND_DELIMITER, $segment);
if (!empty($originSegmentString) && $newSegment === $segment && $params->getRequestedPlugin() === $plugin) { // being processed now
return;
}
$newSegment = new Segment($newSegment, $idSites, $params->getDateTimeStart(), $params->getDateTimeEnd());
if (ArchiveProcessor\Rules::isSegmentPreProcessed($idSites, $newSegment)) {
// will be processed anyway
return;
}
// The below check is meant to avoid archiving the same dependency multiple times.
$processedSegmentKey = $params->getSite()->getId() . $params->getPeriod()->getDateStart() . $params->getPeriod()->getLabel() . $newSegment->getOriginalString();
if (in_array($processedSegmentKey . $plugin, $this->processedDependentSegments)) {
return;
}
self::$isRootArchivingRequest = false;
try {
$invalidator = StaticContainer::get('Piwik\Archive\ArchiveInvalidator');
// Ensure to always invalidate VisitsSummary before any other plugin archive.
// Otherwise those archives might get build with outdated VisitsSummary data
if ($plugin !== 'VisitsSummary' && !in_array($processedSegmentKey . 'VisitsSummary', $this->processedDependentSegments)) {
$invalidator->markArchivesAsInvalidated(
$idSites,
[$params->getPeriod()->getDateStart()],
$params->getPeriod()->getLabel(),
$newSegment,
false,
false,
'VisitsSummary',
false,
true
);
$parameters = new ArchiveProcessor\Parameters($params->getSite(), $params->getPeriod(), $newSegment);
$parameters->onlyArchiveRequestedPlugin();
$archiveLoader = new ArchiveProcessor\Loader($parameters);
$archiveLoader->prepareArchive('VisitsSummary');
$this->processedDependentSegments[] = $processedSegmentKey . 'VisitsSummary';
}
$invalidator->markArchivesAsInvalidated(
$idSites,
[$params->getPeriod()->getDateStart()],
$params->getPeriod()->getLabel(),
$newSegment,
false,
false,
$plugin,
false,
true
);
$parameters = new ArchiveProcessor\Parameters($params->getSite(), $params->getPeriod(), $newSegment);
$parameters->onlyArchiveRequestedPlugin();
$archiveLoader = new ArchiveProcessor\Loader($parameters);
$archiveLoader->prepareArchive($plugin);
$this->processedDependentSegments[] = $processedSegmentKey . $plugin;
} finally {
self::$isRootArchivingRequest = true;
}
}
public function getArchiveWriter()
{
return $this->archiveWriter;
}
}
|