diff options
Diffstat (limited to 'src/google/protobuf/arena.cc')
-rwxr-xr-x | src/google/protobuf/arena.cc | 484 |
1 files changed, 292 insertions, 192 deletions
diff --git a/src/google/protobuf/arena.cc b/src/google/protobuf/arena.cc index 16cf8951..c117c9e5 100755 --- a/src/google/protobuf/arena.cc +++ b/src/google/protobuf/arena.cc @@ -38,277 +38,377 @@ #include <sanitizer/asan_interface.h> #endif // ADDRESS_SANITIZER +#include <google/protobuf/stubs/port.h> + namespace google { +static const size_t kMinCleanupListElements = 8; +static const size_t kMaxCleanupListElements = 64; // 1kB on 64-bit. + namespace protobuf { +namespace internal { -google::protobuf::internal::SequenceNumber Arena::lifecycle_id_generator_; +std::atomic<int64> ArenaImpl::lifecycle_id_generator_; #if defined(GOOGLE_PROTOBUF_NO_THREADLOCAL) -Arena::ThreadCache& Arena::thread_cache() { +ArenaImpl::ThreadCache& ArenaImpl::thread_cache() { static internal::ThreadLocalStorage<ThreadCache>* thread_cache_ = new internal::ThreadLocalStorage<ThreadCache>(); return *thread_cache_->Get(); } #elif defined(PROTOBUF_USE_DLLS) -Arena::ThreadCache& Arena::thread_cache() { +ArenaImpl::ThreadCache& ArenaImpl::thread_cache() { static GOOGLE_THREAD_LOCAL ThreadCache thread_cache_ = { -1, NULL }; return thread_cache_; } #else -GOOGLE_THREAD_LOCAL Arena::ThreadCache Arena::thread_cache_ = { -1, NULL }; +GOOGLE_THREAD_LOCAL ArenaImpl::ThreadCache ArenaImpl::thread_cache_ = {-1, NULL}; #endif -void Arena::Init() { - lifecycle_id_ = lifecycle_id_generator_.GetNext(); - blocks_ = 0; - hint_ = 0; - space_allocated_ = 0; - owns_first_block_ = true; - cleanup_list_ = 0; - - if (options_.initial_block != NULL && options_.initial_block_size > 0) { - GOOGLE_CHECK_GE(options_.initial_block_size, sizeof(Block)) - << ": Initial block size too small for header."; - - // Add first unowned block to list. - Block* first_block = reinterpret_cast<Block*>(options_.initial_block); - first_block->size = options_.initial_block_size; - first_block->pos = kHeaderSize; - first_block->next = NULL; - // Thread which calls Init() owns the first block. This allows the - // single-threaded case to allocate on the first block without taking any - // locks. - first_block->owner = &thread_cache(); - SetThreadCacheBlock(first_block); - AddBlockInternal(first_block); - owns_first_block_ = false; - } +void ArenaImpl::Init() { + lifecycle_id_ = + lifecycle_id_generator_.fetch_add(1, std::memory_order_relaxed); + hint_.store(nullptr, std::memory_order_relaxed); + threads_.store(nullptr, std::memory_order_relaxed); - // Call the initialization hook - if (options_.on_arena_init != NULL) { - hooks_cookie_ = options_.on_arena_init(this); + if (initial_block_) { + // Thread which calls Init() owns the first block. This allows the + // single-threaded case to allocate on the first block without having to + // perform atomic operations. + new (initial_block_) Block(options_.initial_block_size, NULL); + SerialArena* serial = + SerialArena::New(initial_block_, &thread_cache(), this); + serial->set_next(NULL); + threads_.store(serial, std::memory_order_relaxed); + space_allocated_.store(options_.initial_block_size, + std::memory_order_relaxed); + CacheSerialArena(serial); } else { - hooks_cookie_ = NULL; + space_allocated_.store(0, std::memory_order_relaxed); } } -Arena::~Arena() { - uint64 space_allocated = ResetInternal(); - - // Call the destruction hook - if (options_.on_arena_destruction != NULL) { - options_.on_arena_destruction(this, hooks_cookie_, space_allocated); - } -} - -uint64 Arena::Reset() { - // Invalidate any ThreadCaches pointing to any blocks we just destroyed. - lifecycle_id_ = lifecycle_id_generator_.GetNext(); - return ResetInternal(); +ArenaImpl::~ArenaImpl() { + // Have to do this in a first pass, because some of the destructors might + // refer to memory in other blocks. + CleanupList(); + FreeBlocks(); } -uint64 Arena::ResetInternal() { +uint64 ArenaImpl::Reset() { + // Have to do this in a first pass, because some of the destructors might + // refer to memory in other blocks. CleanupList(); uint64 space_allocated = FreeBlocks(); - - // Call the reset hook - if (options_.on_arena_reset != NULL) { - options_.on_arena_reset(this, hooks_cookie_, space_allocated); - } + Init(); return space_allocated; } -Arena::Block* Arena::NewBlock(void* me, Block* my_last_block, size_t n, - size_t start_block_size, size_t max_block_size) { +ArenaImpl::Block* ArenaImpl::NewBlock(Block* last_block, size_t min_bytes) { size_t size; - if (my_last_block != NULL) { + if (last_block) { // Double the current block size, up to a limit. - size = 2 * (my_last_block->size); - if (size > max_block_size) size = max_block_size; + size = std::min(2 * last_block->size(), options_.max_block_size); } else { - size = start_block_size; + size = options_.start_block_size; } - // Verify that n + kHeaderSize won't overflow. - GOOGLE_CHECK_LE(n, std::numeric_limits<size_t>::max() - kHeaderSize); - size = std::max(size, kHeaderSize + n); - - Block* b = reinterpret_cast<Block*>(options_.block_alloc(size)); - b->pos = kHeaderSize + n; - b->size = size; - b->owner = me; -#ifdef ADDRESS_SANITIZER - // Poison the rest of the block for ASAN. It was unpoisoned by the underlying - // malloc but it's not yet usable until we return it as part of an allocation. - ASAN_POISON_MEMORY_REGION( - reinterpret_cast<char*>(b) + b->pos, b->size - b->pos); -#endif // ADDRESS_SANITIZER + // Verify that min_bytes + kBlockHeaderSize won't overflow. + GOOGLE_CHECK_LE(min_bytes, std::numeric_limits<size_t>::max() - kBlockHeaderSize); + size = std::max(size, kBlockHeaderSize + min_bytes); + + void* mem = options_.block_alloc(size); + Block* b = new (mem) Block(size, last_block); + space_allocated_.fetch_add(size, std::memory_order_relaxed); return b; } -void Arena::AddBlock(Block* b) { - MutexLock l(&blocks_lock_); - AddBlockInternal(b); +ArenaImpl::Block::Block(size_t size, Block* next) + : next_(next), pos_(kBlockHeaderSize), size_(size) {} + +GOOGLE_PROTOBUF_ATTRIBUTE_NOINLINE +void ArenaImpl::SerialArena::AddCleanupFallback(void* elem, + void (*cleanup)(void*)) { + size_t size = cleanup_ ? cleanup_->size * 2 : kMinCleanupListElements; + size = std::min(size, kMaxCleanupListElements); + size_t bytes = internal::AlignUpTo8(CleanupChunk::SizeOf(size)); + CleanupChunk* list = reinterpret_cast<CleanupChunk*>(AllocateAligned(bytes)); + list->next = cleanup_; + list->size = size; + + cleanup_ = list; + cleanup_ptr_ = &list->nodes[0]; + cleanup_limit_ = &list->nodes[size]; + + AddCleanup(elem, cleanup); +} + +GOOGLE_PROTOBUF_ATTRIBUTE_FUNC_ALIGN(32) +void* ArenaImpl::AllocateAligned(size_t n) { + SerialArena* arena; + if (GOOGLE_PREDICT_TRUE(GetSerialArenaFast(&arena))) { + return arena->AllocateAligned(n); + } else { + return AllocateAlignedFallback(n); + } } -void Arena::AddBlockInternal(Block* b) { - b->next = reinterpret_cast<Block*>(google::protobuf::internal::NoBarrier_Load(&blocks_)); - google::protobuf::internal::Release_Store(&blocks_, reinterpret_cast<google::protobuf::internal::AtomicWord>(b)); - if (b->avail() != 0) { - // Direct future allocations to this block. - google::protobuf::internal::Release_Store(&hint_, reinterpret_cast<google::protobuf::internal::AtomicWord>(b)); +void* ArenaImpl::AllocateAlignedAndAddCleanup(size_t n, + void (*cleanup)(void*)) { + SerialArena* arena; + if (GOOGLE_PREDICT_TRUE(GetSerialArenaFast(&arena))) { + return arena->AllocateAlignedAndAddCleanup(n, cleanup); + } else { + return AllocateAlignedAndAddCleanupFallback(n, cleanup); } - space_allocated_ += b->size; } -void Arena::AddListNode(void* elem, void (*cleanup)(void*)) { - Node* node = reinterpret_cast<Node*>(AllocateAligned(sizeof(Node))); - node->elem = elem; - node->cleanup = cleanup; - node->next = reinterpret_cast<Node*>( - google::protobuf::internal::NoBarrier_AtomicExchange(&cleanup_list_, - reinterpret_cast<google::protobuf::internal::AtomicWord>(node))); +void ArenaImpl::AddCleanup(void* elem, void (*cleanup)(void*)) { + SerialArena* arena; + if (GOOGLE_PREDICT_TRUE(GetSerialArenaFast(&arena))) { + arena->AddCleanup(elem, cleanup); + } else { + return AddCleanupFallback(elem, cleanup); + } } -void* Arena::AllocateAligned(const std::type_info* allocated, size_t n) { - // Align n to next multiple of 8 (from Hacker's Delight, Chapter 3.) - n = (n + 7) & -8; +GOOGLE_PROTOBUF_ATTRIBUTE_NOINLINE +void* ArenaImpl::AllocateAlignedFallback(size_t n) { + return GetSerialArena()->AllocateAligned(n); +} - // Monitor allocation if needed. - if (GOOGLE_PREDICT_FALSE(hooks_cookie_ != NULL) && - options_.on_arena_allocation != NULL) { - options_.on_arena_allocation(allocated, n, hooks_cookie_); - } +GOOGLE_PROTOBUF_ATTRIBUTE_NOINLINE +void* ArenaImpl::AllocateAlignedAndAddCleanupFallback(size_t n, + void (*cleanup)(void*)) { + return GetSerialArena()->AllocateAlignedAndAddCleanup(n, cleanup); +} +GOOGLE_PROTOBUF_ATTRIBUTE_NOINLINE +void ArenaImpl::AddCleanupFallback(void* elem, void (*cleanup)(void*)) { + GetSerialArena()->AddCleanup(elem, cleanup); +} + +inline GOOGLE_PROTOBUF_ATTRIBUTE_ALWAYS_INLINE +bool ArenaImpl::GetSerialArenaFast(ArenaImpl::SerialArena** arena) { // If this thread already owns a block in this arena then try to use that. // This fast path optimizes the case where multiple threads allocate from the // same arena. - if (thread_cache().last_lifecycle_id_seen == lifecycle_id_ && - thread_cache().last_block_used_ != NULL) { - if (thread_cache().last_block_used_->avail() < n) { - return SlowAlloc(n); - } - return AllocFromBlock(thread_cache().last_block_used_, n); + ThreadCache* tc = &thread_cache(); + if (GOOGLE_PREDICT_TRUE(tc->last_lifecycle_id_seen == lifecycle_id_)) { + *arena = tc->last_serial_arena; + return true; } - // Check whether we own the last accessed block on this arena. - // This fast path optimizes the case where a single thread uses multiple - // arenas. - void* me = &thread_cache(); - Block* b = reinterpret_cast<Block*>(google::protobuf::internal::Acquire_Load(&hint_)); - if (!b || b->owner != me || b->avail() < n) { - return SlowAlloc(n); + // Check whether we own the last accessed SerialArena on this arena. This + // fast path optimizes the case where a single thread uses multiple arenas. + SerialArena* serial = hint_.load(std::memory_order_acquire); + if (GOOGLE_PREDICT_TRUE(serial != NULL && serial->owner() == tc)) { + *arena = serial; + return true; + } + + return false; +} + +ArenaImpl::SerialArena* ArenaImpl::GetSerialArena() { + SerialArena* arena; + if (GOOGLE_PREDICT_TRUE(GetSerialArenaFast(&arena))) { + return arena; + } else { + return GetSerialArenaFallback(&thread_cache()); } - return AllocFromBlock(b, n); } -void* Arena::AllocFromBlock(Block* b, size_t n) { - size_t p = b->pos; - b->pos = p + n; +GOOGLE_PROTOBUF_ATTRIBUTE_NOINLINE +void* ArenaImpl::SerialArena::AllocateAlignedFallback(size_t n) { + // Sync back to current's pos. + head_->set_pos(head_->size() - (limit_ - ptr_)); + + head_ = arena_->NewBlock(head_, n); + ptr_ = head_->Pointer(head_->pos()); + limit_ = head_->Pointer(head_->size()); + #ifdef ADDRESS_SANITIZER - ASAN_UNPOISON_MEMORY_REGION(reinterpret_cast<char*>(b) + p, n); + ASAN_POISON_MEMORY_REGION(ptr_, limit_ - ptr_); #endif // ADDRESS_SANITIZER - return reinterpret_cast<char*>(b) + p; -} -void* Arena::SlowAlloc(size_t n) { - void* me = &thread_cache(); - Block* b = FindBlock(me); // Find block owned by me. - // See if allocation fits in my latest block. - if (b != NULL && b->avail() >= n) { - SetThreadCacheBlock(b); - google::protobuf::internal::NoBarrier_Store(&hint_, reinterpret_cast<google::protobuf::internal::AtomicWord>(b)); - return AllocFromBlock(b, n); - } - b = NewBlock(me, b, n, options_.start_block_size, options_.max_block_size); - AddBlock(b); - SetThreadCacheBlock(b); - return reinterpret_cast<char*>(b) + kHeaderSize; + return AllocateAligned(n); } -uint64 Arena::SpaceAllocated() const { - MutexLock l(&blocks_lock_); - return space_allocated_; +uint64 ArenaImpl::SpaceAllocated() const { + return space_allocated_.load(std::memory_order_relaxed); } -uint64 Arena::SpaceUsed() const { +uint64 ArenaImpl::SpaceUsed() const { + SerialArena* serial = threads_.load(std::memory_order_acquire); uint64 space_used = 0; - Block* b = reinterpret_cast<Block*>(google::protobuf::internal::NoBarrier_Load(&blocks_)); - while (b != NULL) { - space_used += (b->pos - kHeaderSize); - b = b->next; + for ( ; serial; serial = serial->next()) { + space_used += serial->SpaceUsed(); } return space_used; } -std::pair<uint64, uint64> Arena::SpaceAllocatedAndUsed() const { - return std::make_pair(SpaceAllocated(), SpaceUsed()); +uint64 ArenaImpl::SerialArena::SpaceUsed() const { + // Get current block's size from ptr_ (since we can't trust head_->pos(). + uint64 space_used = ptr_ - head_->Pointer(kBlockHeaderSize); + // Get subsequent block size from b->pos(). + for (Block* b = head_->next(); b; b = b->next()) { + space_used += (b->pos() - kBlockHeaderSize); + } + // Remove the overhead of the SerialArena itself. + space_used -= kSerialArenaSize; + return space_used; } -uint64 Arena::FreeBlocks() { +uint64 ArenaImpl::FreeBlocks() { uint64 space_allocated = 0; - Block* b = reinterpret_cast<Block*>(google::protobuf::internal::NoBarrier_Load(&blocks_)); - Block* first_block = NULL; - while (b != NULL) { - space_allocated += (b->size); - Block* next = b->next; - if (next != NULL) { -#ifdef ADDRESS_SANITIZER - // This memory was provided by the underlying allocator as unpoisoned, so - // return it in an unpoisoned state. - ASAN_UNPOISON_MEMORY_REGION(reinterpret_cast<char*>(b), b->size); -#endif // ADDRESS_SANITIZER - options_.block_dealloc(b, b->size); - } else { - if (owns_first_block_) { + // By omitting an Acquire barrier we ensure that any user code that doesn't + // properly synchronize Reset() or the destructor will throw a TSAN warning. + SerialArena* serial = threads_.load(std::memory_order_relaxed); + + while (serial) { + // This is inside a block we are freeing, so we need to read it now. + SerialArena* next = serial->next(); + space_allocated += ArenaImpl::SerialArena::Free(serial, initial_block_, + options_.block_dealloc); + // serial is dead now. + serial = next; + } + + return space_allocated; +} + +uint64 ArenaImpl::SerialArena::Free(ArenaImpl::SerialArena* serial, + Block* initial_block, + void (*block_dealloc)(void*, size_t)) { + uint64 space_allocated = 0; + + // We have to be careful in this function, since we will be freeing the Block + // that contains this SerialArena. Be careful about accessing |serial|. + + for (Block* b = serial->head_; b; ) { + // This is inside the block we are freeing, so we need to read it now. + Block* next_block = b->next(); + space_allocated += (b->size()); + #ifdef ADDRESS_SANITIZER - // This memory was provided by the underlying allocator as unpoisoned, - // so return it in an unpoisoned state. - ASAN_UNPOISON_MEMORY_REGION(reinterpret_cast<char*>(b), b->size); + // This memory was provided by the underlying allocator as unpoisoned, so + // return it in an unpoisoned state. + ASAN_UNPOISON_MEMORY_REGION(b->Pointer(0), b->size()); #endif // ADDRESS_SANITIZER - options_.block_dealloc(b, b->size); - } else { - // User passed in the first block, skip free'ing the memory. - first_block = b; - } + + if (b != initial_block) { + block_dealloc(b, b->size()); } - b = next; - } - blocks_ = 0; - hint_ = 0; - space_allocated_ = 0; - if (!owns_first_block_) { - // Make the first block that was passed in through ArenaOptions - // available for reuse. - first_block->pos = kHeaderSize; - // Thread which calls Reset() owns the first block. This allows the - // single-threaded case to allocate on the first block without taking any - // locks. - first_block->owner = &thread_cache(); - SetThreadCacheBlock(first_block); - AddBlockInternal(first_block); + + b = next_block; } + return space_allocated; } -void Arena::CleanupList() { - Node* head = - reinterpret_cast<Node*>(google::protobuf::internal::NoBarrier_Load(&cleanup_list_)); - while (head != NULL) { - head->cleanup(head->elem); - head = head->next; +void ArenaImpl::CleanupList() { + // By omitting an Acquire barrier we ensure that any user code that doesn't + // properly synchronize Reset() or the destructor will throw a TSAN warning. + SerialArena* serial = threads_.load(std::memory_order_relaxed); + + for ( ; serial; serial = serial->next()) { + serial->CleanupList(); } - cleanup_list_ = 0; } -Arena::Block* Arena::FindBlock(void* me) { - // TODO(sanjay): We might want to keep a separate list with one - // entry per thread. - Block* b = reinterpret_cast<Block*>(google::protobuf::internal::Acquire_Load(&blocks_)); - while (b != NULL && b->owner != me) { - b = b->next; +void ArenaImpl::SerialArena::CleanupList() { + if (cleanup_ != NULL) { + CleanupListFallback(); + } +} + +void ArenaImpl::SerialArena::CleanupListFallback() { + // Cleanup newest chunk: ptrs give us length. + size_t n = cleanup_ptr_ - &cleanup_->nodes[0]; + CleanupNode* node = cleanup_ptr_; + for (size_t i = 0; i < n; i++) { + --node; + node->cleanup(node->elem); + } + + // Cleanup older chunks, which are known to be full. + CleanupChunk* list = cleanup_->next; + while (list) { + size_t n = list->size; + CleanupNode* node = &list->nodes[list->size]; + for (size_t i = 0; i < n; i++) { + --node; + node->cleanup(node->elem); + } + list = list->next; + } +} + +ArenaImpl::SerialArena* ArenaImpl::SerialArena::New(Block* b, void* owner, + ArenaImpl* arena) { + GOOGLE_DCHECK_EQ(b->pos(), kBlockHeaderSize); // Should be a fresh block + GOOGLE_DCHECK_LE(kBlockHeaderSize + kSerialArenaSize, b->size()); + SerialArena* serial = + reinterpret_cast<SerialArena*>(b->Pointer(kBlockHeaderSize)); + b->set_pos(kBlockHeaderSize + kSerialArenaSize); + serial->arena_ = arena; + serial->owner_ = owner; + serial->head_ = b; + serial->ptr_ = b->Pointer(b->pos()); + serial->limit_ = b->Pointer(b->size()); + serial->cleanup_ = NULL; + serial->cleanup_ptr_ = NULL; + serial->cleanup_limit_ = NULL; + return serial; +} + +GOOGLE_PROTOBUF_ATTRIBUTE_NOINLINE +ArenaImpl::SerialArena* ArenaImpl::GetSerialArenaFallback(void* me) { + // Look for this SerialArena in our linked list. + SerialArena* serial = threads_.load(std::memory_order_acquire); + for ( ; serial; serial = serial->next()) { + if (serial->owner() == me) { + break; + } + } + + if (!serial) { + // This thread doesn't have any SerialArena, which also means it doesn't + // have any blocks yet. So we'll allocate its first block now. + Block* b = NewBlock(NULL, kSerialArenaSize); + serial = SerialArena::New(b, me, this); + + SerialArena* head = threads_.load(std::memory_order_relaxed); + do { + serial->set_next(head); + } while (!threads_.compare_exchange_weak( + head, serial, std::memory_order_release, std::memory_order_relaxed)); + } + + CacheSerialArena(serial); + return serial; +} + +} // namespace internal + +void Arena::CallDestructorHooks() { + uint64 space_allocated = impl_.SpaceAllocated(); + // Call the reset hook + if (on_arena_reset_ != NULL) { + on_arena_reset_(this, hooks_cookie_, space_allocated); + } + + // Call the destruction hook + if (on_arena_destruction_ != NULL) { + on_arena_destruction_(this, hooks_cookie_, space_allocated); + } +} + +void Arena::OnArenaAllocation(const std::type_info* allocated_type, + size_t n) const { + if (on_arena_allocation_ != NULL) { + on_arena_allocation_(allocated_type, n, hooks_cookie_); } - return b; } } // namespace protobuf |