1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721
|
// Copyright 2015 The Gemmlowp Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// multi_thread_gemm.h: Multi-threaded GEMM entry point.
// Readers note: To understand this file, it is useful to first
// read and understand the much simpler single_thread_gemm.h.
#ifndef GEMMLOWP_INTERNAL_MULTI_THREAD_GEMM_H_
#define GEMMLOWP_INTERNAL_MULTI_THREAD_GEMM_H_
#include <atomic> // NOLINT
#include <chrono> // NOLINT
#include <thread> // NOLINT
#include <vector>
#include "single_thread_gemm.h"
namespace gemmlowp {
// This value was empirically derived on an end-to-end application benchmark.
// That this number of cycles means that we may be sleeping substantially longer
// than a scheduler timeslice's duration is not necessarily surprising. The
// idea is to pick up quickly new work after having finished the previous
// workload. When it's new work within the same GEMM as the previous work, the
// time interval that we might be busy-waiting is very small, so for that
// purpose it would be more than enough to sleep for 1 million cycles.
// That is all what we would observe on a GEMM benchmark. However, in a real
// application, after having finished a GEMM, we might do unrelated work for
// a little while, then start on a new GEMM. Think of a neural network
// application performing inference, where many but not all layers are
// implemented by a GEMM. In such cases, our worker threads might be idle for
// longer periods of time before having work again. If we let them passively
// wait, on a mobile device, the CPU scheduler might aggressively clock down
// or even turn off the CPU cores that they were running on. That would result
// in a long delay the next time these need to be turned back on for the next
// GEMM. So we need to strike a balance that reflects typical time intervals
// between consecutive GEMM invokations, not just intra-GEMM considerations.
// Of course, we need to balance keeping CPUs spinning longer to resume work
// faster, versus passively waiting to conserve power.
const int kMaxBusyWaitNOPs = 4 * 1000 * 1000;
// On X86 and ARM platforms we may use NOP instructions to know how long we
// are busy-waiting.
#if defined(GEMMLOWP_ALLOW_INLINE_ASM) && !defined(GEMMLOWP_NO_BUSYWAIT) && \
(defined(GEMMLOWP_ARM) || defined(GEMMLOWP_X86))
#define GEMMLOWP_NOP "nop\n"
#define GEMMLOWP_STRING_CONCAT_4(X) X X X X
#define GEMMLOWP_NOP4 GEMMLOWP_STRING_CONCAT_4(GEMMLOWP_NOP)
#define GEMMLOWP_NOP16 GEMMLOWP_STRING_CONCAT_4(GEMMLOWP_NOP4)
#define GEMMLOWP_NOP64 GEMMLOWP_STRING_CONCAT_4(GEMMLOWP_NOP16)
inline int DoSomeNOPs() {
asm volatile(GEMMLOWP_NOP64);
return 64;
}
#undef GEMMLOWP_STRING_CONCAT_4
#undef GEMMLOWP_NOP64
#undef GEMMLOWP_NOP16
#undef GEMMLOWP_NOP4
#undef GEMMLOWP_NOP
#else // May not use asm NOP.
// If we can't use NOPs, let's use a non-inline function call as a basic
// thing that has some vaguely known, nonzero cost.
GEMMLOWP_NOINLINE
inline int DoSomeNOPs() {
// Pretend that calling an empty function takes as long as 16 NOPs...
return 16;
}
#endif
// Waits until *var != initial_value.
//
// Returns the new value of *var. The guarantee here is that
// the return value is different from initial_value, and that that
// new value has been taken by *var at some point during the
// execution of this function. There is no guarantee that this is
// still the value of *var when this function returns, since *var is
// not assumed to be guarded by any lock.
//
// First does some busy-waiting for a fixed number of no-op cycles,
// then falls back to passive waiting for the given condvar, guarded
// by the given mutex.
//
// The idea of doing some initial busy-waiting is to help get
// better and more consistent multithreading benefits for small GEMM sizes.
// Busy-waiting help ensuring that if we need to wake up soon after having
// started waiting, then we can wake up quickly (as opposed to, say,
// having to wait to be scheduled again by the OS). On the other hand,
// we must still eventually revert to passive waiting for longer waits
// (e.g. worker threads having finished a GEMM and waiting until the next GEMM)
// so as to avoid permanently spinning.
//
template <typename T>
T WaitForVariableChange(std::atomic<T>* var, T initial_value,
pthread_cond_t* cond, pthread_mutex_t* mutex) {
// First, trivial case where the variable already changed value.
T new_value = var->load(std::memory_order_acquire);
if (new_value != initial_value) {
return new_value;
}
// Then try busy-waiting.
int nops = 0;
while (nops < kMaxBusyWaitNOPs) {
nops += DoSomeNOPs();
new_value = var->load(std::memory_order_acquire);
if (new_value != initial_value) {
return new_value;
}
}
// Finally, do real passive waiting.
pthread_mutex_lock(mutex);
new_value = var->load(std::memory_order_acquire);
while (new_value == initial_value) {
pthread_cond_wait(cond, mutex);
new_value = var->load(std::memory_order_acquire);
}
pthread_mutex_unlock(mutex);
return new_value;
}
// A BlockingCounter lets one thread to wait for N events to occur.
// This is how the master thread waits for all the worker threads
// to have finished working.
// The waiting is done using a naive spinlock waiting for the atomic
// count_ to hit the value 0. This is acceptable because in our usage
// pattern, BlockingCounter is used only to synchronize threads after
// short-lived tasks (performing parts of the same GEMM). It is not used
// for synchronizing longer waits (resuming work on the next GEMM).
class BlockingCounter {
public:
BlockingCounter() : count_(0) {}
// Sets/resets the counter; initial_count is the number of
// decrementing events that the Wait() call will be waiting for.
void Reset(std::size_t initial_count) {
std::size_t old_count_value = count_.load(std::memory_order_relaxed);
assert(old_count_value == 0);
(void)old_count_value;
count_.store(initial_count, std::memory_order_release);
}
// Decrements the counter; if the counter hits zero, signals
// the threads that were waiting for that, and returns true.
// Otherwise (if the decremented count is still nonzero),
// returns false.
bool DecrementCount() {
std::size_t old_count_value =
count_.fetch_sub(1, std::memory_order_acq_rel);
assert(old_count_value > 0);
std::size_t count_value = old_count_value - 1;
return count_value == 0;
}
// Waits for the N other threads (N having been set by Reset())
// to hit the BlockingCounter.
void Wait() {
ScopedProfilingLabel label("BlockingCounter::Wait");
// Busy-wait until the count value is 0.
int nops = 0;
while (count_.load(std::memory_order_acquire)) {
nops += DoSomeNOPs();
if (nops > kMaxBusyWaitNOPs) {
nops = 0;
// If we are unlucky, the blocking thread (that calls DecrementCount)
// and the blocked thread (here, calling Wait) may be scheduled on
// the same CPU, so the busy-waiting of the present thread may prevent
// the blocking thread from resuming and unblocking.
// If we are even unluckier, the priorities of the present thread
// might be higher than that of the blocking thread, so just yielding
// wouldn't allow the blocking thread to resume. So we sleep for
// a substantial amount of time in that case. Notice that we only
// do so after having busy-waited for kMaxBusyWaitNOPs, which is
// typically several milliseconds, so sleeping 1 more millisecond
// isn't terrible at that point.
//
// How this is mitigated in practice:
// In practice, it is well known that the application should be
// conservative in choosing how many threads to tell gemmlowp to use,
// as it's hard to know how many CPU cores it will get to run on,
// on typical mobile devices.
// It seems impossible for gemmlowp to make this choice automatically,
// which is why gemmlowp's default is to use only 1 thread, and
// applications may override that if they know that they can count on
// using more than that.
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
}
private:
std::atomic<std::size_t> count_;
};
// A workload for a worker.
struct Task {
Task() : local_allocator(nullptr) {}
virtual ~Task() {}
virtual void Run() = 0;
Allocator* local_allocator;
};
// A worker thread.
class Worker {
public:
enum class State {
ThreadStartup, // The initial state before the thread main loop runs.
Ready, // Is not working, has not yet received new work to do.
HasWork, // Has work to do.
ExitAsSoonAsPossible // Should exit at earliest convenience.
};
explicit Worker(BlockingCounter* counter_to_decrement_when_ready)
: task_(nullptr),
state_(State::ThreadStartup),
counter_to_decrement_when_ready_(counter_to_decrement_when_ready) {
pthread_cond_init(&state_cond_, nullptr);
pthread_mutex_init(&state_mutex_, nullptr);
pthread_create(&thread_, nullptr, ThreadFunc, this);
}
~Worker() {
ChangeState(State::ExitAsSoonAsPossible);
pthread_join(thread_, nullptr);
pthread_cond_destroy(&state_cond_);
pthread_mutex_destroy(&state_mutex_);
}
// Changes State; may be called from either the worker thread
// or the master thread; however, not all state transitions are legal,
// which is guarded by assertions.
//
// The Task argument is to be used only with new_state==HasWork.
// It specifies the Task being handed to this Worker.
void ChangeState(State new_state, Task* task = nullptr) {
ScopedProfilingLabel label("Worker::ChangeState");
pthread_mutex_lock(&state_mutex_);
State old_state = state_.load(std::memory_order_relaxed);
assert(old_state != new_state);
switch (old_state) {
case State::ThreadStartup:
assert(new_state == State::Ready);
break;
case State::Ready:
assert(new_state == State::HasWork ||
new_state == State::ExitAsSoonAsPossible);
break;
case State::HasWork:
assert(new_state == State::Ready ||
new_state == State::ExitAsSoonAsPossible);
break;
default:
abort();
}
switch (new_state) {
case State::Ready:
if (task_) {
// Doing work is part of reverting to 'ready' state.
task_->Run();
task_ = nullptr;
}
break;
case State::HasWork:
assert(!task_);
task->local_allocator = &local_allocator_;
task_ = task;
break;
default:
break;
}
state_.store(new_state, std::memory_order_relaxed);
pthread_cond_broadcast(&state_cond_);
pthread_mutex_unlock(&state_mutex_);
if (new_state == State::Ready) {
counter_to_decrement_when_ready_->DecrementCount();
}
}
// Thread entry point.
void ThreadFunc() {
ScopedProfilingLabel label("Worker::ThreadFunc");
ChangeState(State::Ready);
// Thread main loop
while (true) {
// Get a state to act on
// In the 'Ready' state, we have nothing to do but to wait until
// we switch to another state.
State state_to_act_upon = WaitForVariableChange(
&state_, State::Ready, &state_cond_, &state_mutex_);
// We now have a state to act on, so act.
switch (state_to_act_upon) {
case State::HasWork:
// Got work to do! So do it, and then revert to 'Ready' state.
ChangeState(State::Ready);
break;
case State::ExitAsSoonAsPossible:
return;
default:
abort();
}
}
}
static void* ThreadFunc(void* arg) {
static_cast<Worker*>(arg)->ThreadFunc();
return nullptr;
}
// Called by the master thead to give this worker work to do.
void StartWork(Task* task) { ChangeState(State::HasWork, task); }
private:
// The underlying thread.
pthread_t thread_;
// The task to be worked on.
Task* task_;
// The condition variable and mutex guarding state changes.
pthread_cond_t state_cond_;
pthread_mutex_t state_mutex_;
// The state enum tells if we're currently working, waiting for work, etc.
// Its concurrent accesses by the worker and main threads are guarded by
// state_mutex_, and can thus use memory_order_relaxed. This still needs
// to be a std::atomic because we use WaitForVariableChange.
std::atomic<State> state_;
// Each thread had a local allocator so they can allocate temporary
// buffers without blocking each other.
Allocator local_allocator_;
// pointer to the master's thread BlockingCounter object, to notify the
// master thread of when this worker switches to the 'Ready' state.
BlockingCounter* const counter_to_decrement_when_ready_;
};
// A very simple pool of workers, that only allows the very
// specific parallelization pattern that we use here:
// a fixed number of workers can be given work, and one then
// waits for all of them to finish.
//
// See MultiThreadGemmContextBase for how other WorkersPool implementations can
// be used.
class WorkersPool {
public:
WorkersPool() {}
~WorkersPool() {
for (auto w : workers_) {
delete w;
}
}
// Just executes the tasks. Does not destroy them. Similar to
// ruy::ThreadPool::Execute.
template <typename TaskType>
void Execute(int tasks_count, TaskType* tasks) {
assert(tasks_count >= 1);
// One of the tasks will be run on the current thread.
std::size_t workers_count = tasks_count - 1;
CreateWorkers(workers_count);
assert(workers_count <= workers_.size());
counter_to_decrement_when_ready_.Reset(workers_count);
for (std::size_t i = 0; i < tasks_count - 1; i++) {
workers_[i]->StartWork(&tasks[i]);
}
// Execute the remaining workload immediately on the current thread.
Task* task = &tasks[tasks_count - 1];
task->local_allocator = &main_thread_task_allocator_;
task->Run();
// Wait for the workers submitted above to finish.
counter_to_decrement_when_ready_.Wait();
}
// Legacy: executes the tasks and destroys them
void LegacyExecuteAndDestroyTasks(const std::vector<Task*>& tasks) {
std::size_t tasks_count = tasks.size();
assert(tasks_count >= 1);
// One of the tasks will be run on the current thread.
std::size_t workers_count = tasks_count - 1;
CreateWorkers(workers_count);
assert(workers_count <= workers_.size());
counter_to_decrement_when_ready_.Reset(workers_count);
for (int i = 0; i < tasks_count - 1; i++) {
workers_[i]->StartWork(tasks[i]);
}
// Execute the remaining workload immediately on the current thread.
Task* task = tasks[tasks_count - 1];
task->local_allocator = &main_thread_task_allocator_;
task->Run();
// Wait for the workers submitted above to finish.
counter_to_decrement_when_ready_.Wait();
// Cleanup tasks (best to do this from the same thread that allocated
// the memory).
std::for_each(tasks.begin(), tasks.end(), [](Task* task) { delete task; });
}
// Legacy old name of LegacyExecuteAndDestroyTasks
void Execute(const std::vector<Task*>& tasks) {
LegacyExecuteAndDestroyTasks(tasks);
}
private:
// Ensures that the pool has at least the given count of workers.
// If any new worker has to be created, this function waits for it to
// be ready.
void CreateWorkers(std::size_t workers_count) {
if (workers_.size() >= workers_count) {
return;
}
counter_to_decrement_when_ready_.Reset(workers_count - workers_.size());
while (workers_.size() < workers_count) {
workers_.push_back(new Worker(&counter_to_decrement_when_ready_));
}
counter_to_decrement_when_ready_.Wait();
}
// copy construction disallowed
WorkersPool(const WorkersPool&) = delete;
// The workers in this pool. They are owned by the pool:
// the pool creates workers and destroys them in its destructor.
std::vector<Worker*> workers_;
// The BlockingCounter used to wait for the workers.
BlockingCounter counter_to_decrement_when_ready_;
// For N-threaded operations, we will use only N-1 worker threads
// while the last task will be run directly on the main thread.
// It will then use this main_thread_task_allocator_; having a
// dedicated allocator for that (separate from the base allocator_)
// allows to use the same code for all tasks regardless of which
// thread they run on.
Allocator main_thread_task_allocator_;
};
// The task we use to implement a multi-threaded Gemm: a block of the
// RHS has been packed by the master thread; each worker thread
// then has to pack a block of the LHS and accumulate the Gemm of these
// packed LHS and RHS blocks.
template <typename KernelFormat, typename InputScalar, typename OutputScalar,
typename BitDepthParams, MapOrder LhsOrder, MapOrder RhsOrder,
MapOrder ResultOrder, typename LhsOffset, typename RhsOffset,
typename OutputPipelineType, typename GemmContextType>
struct GemmWithPackedRhsTask : Task {
typedef PackedSideBlock<typename KernelFormat::Lhs> PackedLhs;
typedef PackedSideBlock<typename KernelFormat::Rhs> PackedRhs;
GemmWithPackedRhsTask(GemmContextType* _context, const KernelBase& _kernel,
const MatrixMap<const InputScalar, LhsOrder>& _lhs,
const PackedRhs& _packed_rhs,
MatrixMap<OutputScalar, ResultOrder>* _result,
const MatrixBlockBounds& _result_block,
const LhsOffset& _lhs_offset,
const RhsOffset& _rhs_offset,
const BlockParams& _block_params,
const OutputPipelineType& _output_pipeline)
: context(_context),
kernel(_kernel),
lhs(_lhs),
packed_rhs(_packed_rhs),
result(*_result),
result_block(_result_block),
lhs_offset(_lhs_offset),
rhs_offset(_rhs_offset),
block_params(_block_params),
output_pipeline(_output_pipeline) {}
void Run() override {
ScopedProfilingLabel label("GemmWithPackedRhsTask");
const int rows = result_block.rows;
const int cols = result_block.cols;
const int depth = lhs.cols();
PackedLhs packed_lhs(Side::Lhs, local_allocator, block_params);
PackedResult packed_result(local_allocator, block_params);
local_allocator->Commit();
for (int c = 0; c < cols; c += block_params.l2_cols) {
int cs = std::min(block_params.l2_cols, cols - c);
for (int r = 0; r < rows; r += block_params.l2_rows) {
int rs = std::min(block_params.l2_rows, rows - r);
PackLhs(&packed_lhs, lhs.block(r, 0, rs, depth));
Compute(kernel, block_params, &packed_result, packed_lhs, packed_rhs,
depth);
auto curr_result_block = MatrixBlockBounds(
result_block.start_row + r, result_block.start_col + c, rs, cs);
UnpackResult<KernelFormat>(
&result, curr_result_block, packed_result, depth,
packed_lhs.sums_of_each_slice(), packed_rhs.sums_of_each_slice(),
lhs_offset.block(curr_result_block.start_row, rs),
rhs_offset.block(curr_result_block.start_col, cs), output_pipeline);
}
}
local_allocator->Decommit();
}
const GemmContextType* context;
const KernelBase& kernel;
const MatrixMap<const InputScalar, LhsOrder> lhs;
const PackedRhs packed_rhs;
MatrixMap<OutputScalar, ResultOrder> result;
const MatrixBlockBounds result_block;
const LhsOffset& lhs_offset;
const RhsOffset& rhs_offset;
const BlockParams& block_params;
const OutputPipelineType& output_pipeline;
};
// This base class for multi-threading allows subclasses to implement their own
// workers_pool() method. See MultiThreadGemmContext below for an example;
// any other implementation of workers_pool() must return an object with the
// same public methods as WorkersPool.
class MultiThreadGemmContextBase : public SingleThreadGemmContext {
public:
void set_max_num_threads(int n) { max_num_threads_ = n; }
int max_num_threads() const { return max_num_threads_; }
protected:
// The maximum number of worker threads to use (including
// the master thread).
// The default value 1 means single-threading. That is the default
// because gemmlowp's primary target is mobile hardware, where thermal
// constraints usually mean that it may not be realistic to use more
// than 1 CPU core even if multiple cores are present.
// The special value 0 means try to detect the number of hardware threads.
// Note: this assumes that all CPU cores are equivalent. That assumption
// is defeated on big.LITTLE ARM devices, where we have no API to query
// the number of big cores (which is typically what we would want to use,
// leaving aside above-mentioned thermal issues). That is the other reason
// why the best compromise here is to let max_num_threads_ default to 1,
// so users who want multi-threading have to make the decision of how many
// threads to use by themselves.
int max_num_threads_ = 1;
};
class MultiThreadGemmContext : public MultiThreadGemmContextBase {
public:
WorkersPool* workers_pool() { return &workers_pool_; }
private:
// The workers pool used by MultiThreadGemm. Making
// this part of the context allows it to be persistent,
// avoiding recreating threads on every Gemm.
WorkersPool workers_pool_;
};
// Determines how many threads should be used for a given Gemm
// operation.
template <int KernelRows>
inline int HowManyThreads(int max_num_threads, int rows, int cols, int depth) {
// Early-exit in the default case where multi-threading is disabled.
if (max_num_threads == 1) {
return 1;
}
// Determine the maximum number of threads.
int max_count = GetHardwareConcurrency(max_num_threads);
// Basic calculation: take into account max pool size, and
// how many rows we have to feed our kernel.
// The motivation for an absolute minimum number of rows per thread,
// potentially higher than KernelRows, is that very thin thread workload
// currently defeat assumptions of the AddMod generator, resulting
// in substantial bias in TestWithRealData on 24 threads.
// Ideally, the AddMod generator should be aware of global (r,c) coordinates
// so as to be independent of the number of threads.
static const int AbsoluteMinRowsPerThread = 16;
static const int MinRowsPerThread = KernelRows > AbsoluteMinRowsPerThread
? KernelRows
: AbsoluteMinRowsPerThread;
int thread_count = std::min(max_count, CeilQuotient(rows, MinRowsPerThread));
// At this point for small products we already have thread_count==1 so
// we can avoid doing more work; otherwise, we still want to check
// that the cubic size (rows*cols*depth) is big enough to keep
// workers_ busy.
if (thread_count > 1) {
// Empirically determined value.
static const std::uint64_t min_cubic_size_per_thread = 64 * 1024;
// We can only multiply two out of three sizes without risking overflow
const std::uint64_t cubic_size =
std::uint64_t(rows) * std::uint64_t(cols) * std::uint64_t(depth);
thread_count =
std::min(thread_count, int(cubic_size / min_cubic_size_per_thread));
if (thread_count < 1) {
thread_count = 1;
}
}
assert(thread_count > 0 && thread_count <= max_count);
return thread_count;
}
// The main multi-threaded Gemm function.
// To understand it, first read the code of SingleThreadGemm().
// The parallelization scheme used here is to have this master function
// pack a block of RHS and then start worker threads to pack a block of LHS
// each, and accumulate the corresponding products.
template <typename KernelFormat, typename InputScalar, typename OutputScalar,
typename BitDepthParams, MapOrder LhsOrder, MapOrder RhsOrder,
MapOrder ResultOrder, typename LhsOffset, typename RhsOffset,
typename OutputPipelineType, typename GemmContextType>
void MultiThreadGemm(GemmContextType* context, const KernelBase& kernel,
const MatrixMap<const InputScalar, LhsOrder>& lhs,
const MatrixMap<const InputScalar, RhsOrder>& rhs,
MatrixMap<OutputScalar, ResultOrder>* result,
const LhsOffset& lhs_offset, const RhsOffset& rhs_offset,
const OutputPipelineType& output_pipeline) {
ScopedProfilingLabel label("gemmlowp::MultiThreadGemm");
assert(lhs.cols() == rhs.rows());
int rows = result->rows();
int cols = result->cols();
int depth = lhs.cols();
// zero sizes should have been caught earlier and early-returned.
assert(rows > 0);
assert(cols > 0);
assert(depth > 0);
// The case of rows<cols should have been caught earlier and transposed.
assert(rows >= cols);
const int thread_count = HowManyThreads<KernelFormat::kRows>(
context->max_num_threads(), rows, cols, depth);
if (thread_count == 1) {
return SingleThreadGemm<KernelFormat, InputScalar, OutputScalar,
BitDepthParams>(context, kernel, lhs, rhs, result,
lhs_offset, rhs_offset,
output_pipeline);
}
assert(thread_count > 1);
// Simple 1:1 mapping of tasks to physical cores, which is very important
// to getting good multithreaded performance, specially for not-very-large
// GEMMs, and especially on Android.
const int task_count = thread_count;
Allocator* allocator = context->allocator();
auto* workers_pool = context->workers_pool();
BlockParams block_params;
block_params.Init<KernelFormat>(
rows, cols, depth, task_count, context->l1_bytes_to_use(),
context->l2_bytes_to_use(), context->l2_rhs_factor());
PackedSideBlock<typename KernelFormat::Rhs> packed_rhs(Side::Rhs, allocator,
block_params);
allocator->Commit();
// We loop over large blocks of the RHS.
for (int c = 0; c < cols; c += block_params.l2_cols) {
int cs = std::min(block_params.l2_cols, cols - c);
// Pack a large block of the RHS.
PackRhs(&packed_rhs, rhs.block(0, c, depth, cs));
// Give work to each worker.
std::vector<Task*> tasks;
int next_start_row = 0;
for (int n = 0; n < task_count; ++n) {
int start_row = next_start_row;
next_start_row = std::min(
rows, RoundUp<KernelFormat::kRows>(rows * (n + 1) / task_count));
int block_rows = next_start_row - start_row;
auto lhs_block = lhs.block(start_row, 0, block_rows, depth);
typedef GemmWithPackedRhsTask<KernelFormat, InputScalar, OutputScalar,
BitDepthParams, LhsOrder, RhsOrder,
ResultOrder, LhsOffset, RhsOffset,
OutputPipelineType, GemmContextType>
TaskType;
tasks.push_back(
new TaskType(context, kernel, lhs_block, packed_rhs, result,
MatrixBlockBounds(start_row, c, block_rows, cs),
lhs_offset, rhs_offset, block_params, output_pipeline));
}
// Execute the work on the workers (and partially on this thread).
workers_pool->Execute(tasks);
}
allocator->Decommit();
}
} // namespace gemmlowp
#endif // GEMMLOWP_INTERNAL_MULTI_THREAD_GEMM_H_
|