NativeCachingAllocator Class — pytorch Architecture
Architecture documentation for the NativeCachingAllocator class in CUDACachingAllocator.cpp from the pytorch codebase.
Entity Profile
Source Code
c10/cuda/CUDACachingAllocator.cpp lines 3892–4578
class NativeCachingAllocator : public CUDAAllocator {
private:
// allows this allocator to be turned on and off programmatically
bool enable_ = true;
// Shard allocation region to have independent mutexes to reduce contention.
static constexpr size_t kNumMutexShard = 67;
struct alignas(hardware_destructive_interference_size) AlignedMutex {
std::mutex m;
};
std::array<AlignedMutex, kNumMutexShard> mutex;
// allocated blocks by device pointer
std::array<ska::flat_hash_map<void*, Block*>, kNumMutexShard>
allocated_blocks;
static size_t get_mutex_shard_id(void* ptr) {
return twang_mix64(reinterpret_cast<uintptr_t>(ptr)) % kNumMutexShard;
}
void add_allocated_block(Block* block) {
// NOLINTNEXTLINE(clang-analyzer-core.CallAndMessage)
const auto mutex_shard_id = get_mutex_shard_id(block->ptr);
std::lock_guard<std::mutex> lock(mutex[mutex_shard_id].m);
allocated_blocks[mutex_shard_id][block->ptr] = block;
}
// Variables by memory snapshot
c10::ApproximateClockToUnixTimeConverter clock_converter;
bool record_history = false;
RingBuffer<AnnotationEntry> annotation_buffer;
public:
std::vector<std::unique_ptr<DeviceCachingAllocator>> device_allocator;
Block* get_allocated_block(void* ptr, bool remove = false) {
const auto mutex_shard_id = get_mutex_shard_id(ptr);
std::lock_guard<std::mutex> lock(mutex[mutex_shard_id].m);
auto it = allocated_blocks[mutex_shard_id].find(ptr);
if (it == allocated_blocks[mutex_shard_id].end()) {
return nullptr;
}
Block* block = it->second;
if (remove) {
allocated_blocks[mutex_shard_id].erase(it);
}
return block;
}
void init(int device_count) override {
const auto size = static_cast<int64_t>(device_allocator.size());
if (size < device_count) {
device_allocator.resize(device_count);
for (const auto i : c10::irange(size, device_count)) {
device_allocator[i] = std::make_unique<DeviceCachingAllocator>(
static_cast<c10::DeviceIndex>(i));
}
}
}
bool initialized() override {
return !device_allocator.empty();
}
/** allocates a block which is safe to use from the provided stream */
void malloc(
void** devPtr,
c10::DeviceIndex device,
size_t size,
cudaStream_t stream) {
TORCH_INTERNAL_ASSERT(
0 <= device && static_cast<size_t>(device) < device_allocator.size(),
"Allocator not initialized for device ",
device,
": did you call init?");
Block* block = device_allocator[device]->malloc(size, stream);
add_allocated_block(block);
*devPtr = block->ptr;
const c10::impl::PyInterpreter* interp = c10::impl::GPUTrace::get_trace();
if (C10_UNLIKELY(interp)) {
(*interp)->trace_gpu_memory_allocation(
c10::kCUDA, reinterpret_cast<uintptr_t>(*devPtr));
}
}
void free(void* ptr) {
if (!ptr) {
return;
}
Block* block = get_allocated_block(ptr, true /* remove */);
if (!block) {
TORCH_CHECK(false, "invalid device pointer: ", ptr);
}
const c10::impl::PyInterpreter* interp = c10::impl::GPUTrace::get_trace();
if (C10_UNLIKELY(interp)) {
(*interp)->trace_gpu_memory_deallocation(
c10::kCUDA, reinterpret_cast<uintptr_t>(block->ptr));
}
device_allocator[block->device]->free(block);
}
double getMemoryFraction(c10::DeviceIndex device) override {
TORCH_INTERNAL_ASSERT(
0 <= device && static_cast<size_t>(device) < device_allocator.size(),
"Allocator not initialized for device ",
device,
": did you call init?");
return device_allocator[device]->getMemoryFraction();
}
void setMemoryFraction(double fraction, c10::DeviceIndex device) override {
TORCH_CHECK(
0 <= device && static_cast<size_t>(device) < device_allocator.size(),
"Allocator not initialized for device ",
device,
": did you call init?");
device_allocator[device]->setMemoryFraction(fraction);
}
std::vector<StreamSegmentSize> getExpandableSegmentSizes(
c10::DeviceIndex device) override {
TORCH_INTERNAL_ASSERT(
0 <= device && static_cast<size_t>(device) < device_allocator.size(),
"Allocator not initialized for device ",
device,
": did you call init?");
return device_allocator[device]->getExpandableSegmentSizes();
}
void recordHistory(
bool enabled,
CreateContextFn context_recorder,
size_t alloc_buffer_max_entries,
RecordContext when,
bool clearHistory,
const std::vector<std::string>& skip_actions) override {
record_history = enabled;
annotation_buffer.setMaxEntries(alloc_buffer_max_entries);
if (!enabled || clearHistory) {
annotation_buffer.clear();
}
for (auto& allocator : device_allocator) {
allocator->recordHistory(
enabled,
context_recorder,
alloc_buffer_max_entries,
when,
clearHistory,
skip_actions);
}
}
void recordAnnotation(
const std::vector<std::pair<std::string, std::string>>& md) override {
if (!record_history) {
return;
}
c10::DeviceIndex device = 0;
C10_CUDA_CHECK(c10::cuda::GetDevice(&device));
auto ae = AnnotationEntry(
/*device=*/device,
/*time=*/getApproximateTime());
for (const auto& md_pair : md) {
ae.recordUserMetadata(md_pair.first, md_pair.second);
}
annotation_buffer.insertEntries(ae);
}
void pushCompileContext(std::string& md) override {
if (!record_history) {
return;
}
c10::DeviceIndex device = 0;
C10_CUDA_CHECK(c10::cuda::GetDevice(&device));
device_allocator[device]->pushCompileContext(md);
}
void popCompileContext() override {
if (!record_history) {
return;
}
c10::DeviceIndex device = 0;
C10_CUDA_CHECK(c10::cuda::GetDevice(&device));
device_allocator[device]->popCompileContext();
}
void setUserMetadata(const std::string& metadata) override {
c10::DeviceIndex device = 0;
C10_CUDA_CHECK(c10::cuda::GetDevice(&device));
device_allocator[device]->setUserMetadata(metadata);
}
std::string getUserMetadata() override {
c10::DeviceIndex device = 0;
C10_CUDA_CHECK(c10::cuda::GetDevice(&device));
return device_allocator[device]->getUserMetadata();
}
bool isHistoryEnabled() override {
c10::DeviceIndex device = 0;
C10_CUDA_CHECK(c10::cuda::GetDevice(&device));
return device_allocator[device]->isHistoryEnabled();
}
bool checkPoolLiveAllocations(
c10::DeviceIndex device,
MempoolId_t mempool_id,
const std::unordered_set<void*>& expected_live_allocations) override {
return device_allocator[device]->checkPoolLiveAllocations(
mempool_id, expected_live_allocations);
}
void attachOutOfMemoryObserver(OutOfMemoryObserver observer) override {
for (auto& allocator : device_allocator) {
allocator->attachOutOfMemoryObserver(observer);
}
}
void attachAllocatorTraceTracker(AllocatorTraceTracker tracker) override {
for (auto& allocator : device_allocator) {
allocator->attachAllocatorTraceTracker(tracker);
}
}
void emptyCache(MempoolId_t mempool_id) override {
for (auto& da : device_allocator)
da->emptyCache(mempool_id);
}
void enable(bool value) override {
enable_ = value;
}
bool isEnabled() const override {
return enable_;
}
void* getBaseAllocation(void* ptr, size_t* outSize) override {
Block* block = get_allocated_block(ptr);
if (!block) {
TORCH_CHECK(false, "invalid device pointer: ", ptr);
}
return device_allocator[block->device]->getBaseAllocation(block, outSize);
}
ShareableHandle shareIpcHandle(void* ptr) override {
Block* block = get_allocated_block(ptr);
if (!block) {
TORCH_CHECK(false, "invalid device pointer: ", ptr);
}
return device_allocator[block->device]->shareIpcHandle(block);
}
void recordStream(const DataPtr& ptr, cuda::CUDAStream stream) override {
// Empty tensor's storage().data() might be a null ptr. As there is no
// blocks associated with those tensors, it is fine to do nothing here.
if (!ptr.get()) {
return;
}
// If a tensor is not allocated by this instance, simply skip
// This usually happens when CUDA tensors are shared across processes,
// we have implemented reference counting based sharing mechanism to
// guarantee tensors won't be accidentally freed by one process while
// they are still being used in another
if (ptr.get_deleter() != &local_raw_delete)
return;
Block* block = get_allocated_block(ptr.get());
// block must not be null reaching here
TORCH_INTERNAL_ASSERT(block != nullptr, "No allocated block can be found");
device_allocator[block->device]->recordStream(block, stream);
}
SnapshotInfo snapshot(MempoolId_t mempool_id, bool include_traces) override {
SnapshotInfo result;
if (include_traces) {
// Set-up converter to convert timestamps from tsc to microseconds.
auto tsc_to_ns = clock_converter.makeConverter();
auto tsc_to_us = [=](approx_time_t t_approx) {
return tsc_to_ns(t_approx) / 1000;
};
// Get AnnotationEntry list and convert the timestamps.
annotation_buffer.getEntries(result.external_annotations);
for (auto& ae : result.external_annotations) {
ae.time_.t_ = tsc_to_us(ae.time_.approx_t_);
}
// Get the device_traces' TraceEntry lists.
for (auto& da : device_allocator) {
result.device_traces.emplace_back(da->trace(tsc_to_us));
auto snap = da->snapshot(mempool_id);
result.segments.insert(result.segments.end(), snap.begin(), snap.end());
}
} else {
// Fast path: skip traces and annotations entirely
for (auto& da : device_allocator) {
auto snap = da->snapshot(mempool_id);
result.segments.insert(result.segments.end(), snap.begin(), snap.end());
}
}
auto& md = result.config_metadata;
md.garbage_collection_threshold =
AcceleratorAllocatorConfig::garbage_collection_threshold();
md.max_split_size = AcceleratorAllocatorConfig::max_split_size();
md.pinned_num_register_threads =
CUDAAllocatorConfig::pinned_num_register_threads();
md.expandable_segments = CUDAAllocatorConfig::expandable_segments();
md.release_lock_on_malloc =
CUDAAllocatorConfig::release_lock_on_cudamalloc();
md.pinned_use_host_register =
CUDAAllocatorConfig::pinned_use_cuda_host_register();
md.last_allocator_settings =
AcceleratorAllocatorConfig::last_allocator_settings();
md.graph_capture_record_stream_reuse =
CUDAAllocatorConfig::graph_capture_record_stream_reuse();
md.roundup_power2_divisions =
AcceleratorAllocatorConfig::roundup_power2_divisions();
return result;
}
std::shared_ptr<AllocatorState> getCheckpointState(
c10::DeviceIndex device,
MempoolId_t id) override {
return device_allocator[device]->getCheckpointState(id);
}
/**
* @brief Checkpoint the private pool state identified in `as` to its prior
* state
*
* @param device - device of the pool to manipulate
* @param as - allocator state
* @param stale_live_storages - storages of tensors which are currently
* allocated but which will be not be allocated after the checkpoint is set.
* For these storages we will remove their deleter function.
* @return CheckpointDelta - Freed Pointers and DataPtrs that contain deleter
* functions for all allocated blocks in the new checkpoint state.
*/
CheckpointDelta setCheckpointPoolState(
c10::DeviceIndex device,
std::shared_ptr<AllocatorState> as) override {
std::shared_ptr<PrivatePoolState> pps =
std::dynamic_pointer_cast<PrivatePoolState>(as);
TORCH_CHECK(pps, "Expected PrivatePoolState");
auto rr = device_allocator[device]->setCheckpointPoolState(*pps);
CheckpointDelta cpd;
for (void* ptr : rr.allocations_freed) {
get_allocated_block(ptr, /*remove*/ true);
cpd.ptrs_freed.push_back(ptr);
}
for (Block* block : rr.allocations_created) {
add_allocated_block(block);
cpd.dataptrs_allocd.emplace_back(
block->ptr,
block->ptr,
&local_raw_delete,
Device(DeviceType::CUDA, device));
}
return cpd;
}
DataPtr allocate(size_t size) override {
constexpr size_t one_exa_bytes = 1152921504606846976ULL;
TORCH_CHECK_WITH(
OutOfMemoryError,
size < one_exa_bytes,
"CUDA out of memory. Tried to allocate more than 1EB memory.");
c10::DeviceIndex device = 0;
C10_CUDA_CHECK(c10::cuda::GetDevice(&device));
void* devPtr = nullptr;
void (*deleteFunc)(void*) = &local_raw_delete;
CUDAStream stream = cuda::getCurrentCUDAStream(device);
if (forceUncachedAllocator() || !isEnabled()) {
deleteFunc = &uncached_delete;
devPtr = uncached_allocate(size);
} else {
if (size != 0) {
this->malloc(&devPtr, device, size, stream);
}
}
if (size && TORCH_SDT_IS_ENABLED(malloc)) {
TORCH_SDT_WITH_SEMAPHORE(malloc, devPtr, device, size, stream.id());
}
return {devPtr, devPtr, deleteFunc, Device(DeviceType::CUDA, device)};
}
DeleterFnPtr raw_deleter() const override {
if (forceUncachedAllocator() || !isEnabled()) {
return &uncached_delete;
} else {
return &local_raw_delete;
}
}
void cacheInfo(c10::DeviceIndex device, size_t* largestBlock) override {
device_allocator[device]->cacheInfo(largestBlock);
}
void assertValidDevice(c10::DeviceIndex device) {
const auto device_num = device_allocator.size();
TORCH_CHECK(
0 <= device && device < static_cast<int64_t>(device_num),
"Invalid device argument ",
device,
": did you call init?");
}
DeviceStats getDeviceStats(c10::DeviceIndex device) override {
assertValidDevice(device);
return device_allocator[device]->getStats();
}
void resetAccumulatedStats(c10::DeviceIndex device) override {
assertValidDevice(device);
device_allocator[device]->resetAccumulatedStats();
}
void resetPeakStats(c10::DeviceIndex device) override {
assertValidDevice(device);
device_allocator[device]->resetPeakStats();
}
void createOrIncrefPool(
c10::DeviceIndex device,
MempoolId_t mempool_id,
std::shared_ptr<CUDAAllocator> allocator) override {
assertValidDevice(device);
device_allocator[device]->createOrIncrefPool(
std::move(mempool_id), std::move(allocator));
}
void setUseOnOOM(
c10::DeviceIndex device,
MempoolId_t mempool_id,
bool use_on_oom) override {
assertValidDevice(device);
device_allocator[device]->setUseOnOOM(std::move(mempool_id), use_on_oom);
}
void setNoSplit(c10::DeviceIndex device, MempoolId_t mempool_id) override {
assertValidDevice(device);
device_allocator[device]->setNoSplit(std::move(mempool_id));
}
// CUDAGraph interactions
void beginAllocateToPool(
c10::DeviceIndex device,
MempoolId_t mempool_id,
std::function<bool(cudaStream_t)> filter) override {
assertValidDevice(device);
device_allocator[device]->beginAllocateToPool(
std::move(mempool_id), std::move(filter));
}
void endAllocateToPool(c10::DeviceIndex device, MempoolId_t mempool_id)
override {
assertValidDevice(device);
device_allocator[device]->endAllocateToPool(mempool_id);
}
void releasePool(c10::DeviceIndex device, MempoolId_t mempool_id) override {
assertValidDevice(device);
device_allocator[device]->releasePool(std::move(mempool_id));
}
int getPoolUseCount(c10::DeviceIndex device, MempoolId_t mempool_id)
override {
assertValidDevice(device);
return device_allocator[device]->getPoolUseCount(std::move(mempool_id));
}
void* raw_alloc(size_t nbytes) override {
if (nbytes == 0) {
return nullptr;
}
void* r = nullptr;
if (forceUncachedAllocator() || !isEnabled()) {
r = uncached_allocate(nbytes);
} else {
c10::DeviceIndex device = 0;
C10_CUDA_CHECK(c10::cuda::GetDevice(&device));
malloc(&r, device, nbytes, cuda::getCurrentCUDAStream(device));
}
return r;
}
void* raw_alloc_with_stream(size_t nbytes, cudaStream_t stream) override {
if (nbytes == 0) {
return nullptr;
}
void* r = nullptr;
if (forceUncachedAllocator() || !isEnabled()) {
r = uncached_allocate(nbytes);
} else {
c10::DeviceIndex device = 0;
C10_CUDA_CHECK(c10::cuda::GetDevice(&device));
malloc(&r, device, nbytes, stream);
}
return r;
}
void enablePeerAccess(c10::DeviceIndex dev, c10::DeviceIndex dev_to_access)
override {
c10::cuda::CUDAGuard device_guard(dev);
cudaError_t err = cudaDeviceEnablePeerAccess(dev_to_access, 0);
if (err == cudaErrorPeerAccessAlreadyEnabled) {
// ignore and clear the error if access was already enabled
(void)cudaGetLastError();
} else {
C10_CUDA_CHECK(err);
}
device_allocator[dev_to_access]->addPeerAccess(dev);
std::lock_guard<std::mutex> lock(IpcMutex);
for (auto& entry : ipcMemHandle_to_devptr) {
if (entry.second.device_ == dev_to_access &&
entry.second.expandable_segment_) {
entry.second.expandable_segment_->addPeer(dev);
}
}
}
cudaError_t memcpyAsync(
void* dst,
int dstDevice,
const void* src,
int srcDevice,
size_t count,
cudaStream_t stream,
bool p2p_enabled) override {
if (p2p_enabled || // memcpy ok because memory is mapped in both devices
srcDevice == dstDevice || // memcpy ok on a single device
// memcpy ok because both dst and src must have come from cudaMalloc
(!device_allocator[dstDevice]->hasAllocatedExpandableSegments() &&
!device_allocator[srcDevice]->hasAllocatedExpandableSegments())) {
return cudaMemcpyAsync(dst, src, count, cudaMemcpyDeviceToDevice, stream);
}
// when p2p is not enabled, only cudaMemcpyPeerAsync correctly handles
// memory not allocated via cudaMalloc
return cudaMemcpyPeerAsync(dst, dstDevice, src, srcDevice, count, stream);
}
void raw_delete(void* ptr) override {
if (forceUncachedAllocator() || !isEnabled()) {
uncached_delete(ptr);
} else {
this->free(ptr);
}
}
// In CUDA IPC, sender sends a tensor to receiver via shareIPCHandle,
// getIpcDevPtr is called by the receiving process to map the CUDA memory from
// the sending process into its own address space.
// When allocated with cudaMalloc we use the cudaIPCMemHandle_t APIs.
// These APIs only allow sharing a big memory block associated with a
// cudaIpcMemHandle_t and it can be opened only **once** per context per
// process. There can be multiple types of storage in the same IPC mem block,
// so we must cache the device ptr to construct typed storage as it comes.
// When using cuMemCreate, via expandable segments, we use
// cuMemExportToShareableHandle to create a file descriptor that can be sent
// to the other process to sort the object. Then we recreate part of the
// exandable segment necessary to load the allocation.
// ipcMemHandle_to_devptr caches the mapping from shareable handle to
// this process' memory mapping information for that share to ensure we do not
// create it twice. When the shared_ptr is no longer in use we clean up the
// cache.
std::mutex IpcMutex;
struct MemHandleCacheEntry {
MemHandleCacheEntry(
c10::DeviceIndex device,
std::string& handle,
const DeviceCachingAllocator& allocator)
: device_(device) {
int type = SHAREABLE_CUDA_MALLOC;
std::istringstream ss(handle);
if (handle.size() != CUDA_IPC_HANDLE_SIZE) {
auto version = ss.get();
TORCH_CHECK(
version <= SHAREABLE_HANDLE_VERSION,
"received sharable handle from a future version of torch that this version does not know how to handle")
type = ss.get();
} // otherwise this is coming from an old pytorch where it has to be a raw
// SHARABLE_CUDA_MALLOC
if (type == SHAREABLE_CUDA_MALLOC) {
cudaIpcMemHandle_t cuda_handle;
ss.read(reinterpret_cast<char*>(&cuda_handle), CUDA_IPC_HANDLE_SIZE);
C10_CUDA_CHECK(cudaIpcOpenMemHandle(
&cuda_ipc_ptr_, cuda_handle, cudaIpcMemLazyEnablePeerAccess));
} else if (type == SHAREABLE_CUDA_EXPANDABLE_SEGMENT) {
expandable_segment_ =
ExpandableSegment::fromShared(device, allocator.peers(), ss)
.release();
} else {
TORCH_INTERNAL_ASSERT(
false, "unexpected or illformed shareable handle type");
}
}
// this struct expects that clear is explicitly called to
// free resources, because we only want this code running when
// the shared pointer to this entry is destructed, not during
// deinitialization when cuda may already have been shutdown.
// This replicates the previous behavior of this map when it
// stored raw cuda_ipc_ptr_ handles.
void clear() {
if (cuda_ipc_ptr_) {
cuda::CUDAGuard device_guard(device_);
C10_CUDA_CHECK(cudaIpcCloseMemHandle(cuda_ipc_ptr_));
cuda_ipc_ptr_ = nullptr;
}
if (expandable_segment_) {
delete expandable_segment_;
expandable_segment_ = nullptr;
}
}
void* ptr() {
if (cuda_ipc_ptr_) {
return cuda_ipc_ptr_;
} else {
return expandable_segment_->ptr();
}
}
c10::DeviceIndex device_;
ExpandableSegment* expandable_segment_{nullptr};
void* cuda_ipc_ptr_{nullptr}; // nullptr if expandable_segment_ is not null
std::weak_ptr<void> wp_;
};
ska::flat_hash_map<std::string, MemHandleCacheEntry> ipcMemHandle_to_devptr;
std::shared_ptr<void> getIpcDevPtr(std::string handle) override {
std::lock_guard<std::mutex> lock(IpcMutex);
auto iter = ipcMemHandle_to_devptr.find(handle);
if (iter != ipcMemHandle_to_devptr.end()) {
auto devptr = iter->second.wp_.lock();
// the weak_ptr should always be valid because we delete the entry from
// the cache when the shared_ptr is destructed, so we should never get
// here.
TORCH_INTERNAL_ASSERT(devptr, "entry in cache has missing shared_ptr");
return devptr;
}
c10::DeviceIndex curr_device = 0;
C10_CUDA_CHECK(c10::cuda::GetDevice(&curr_device));
auto inserted = ipcMemHandle_to_devptr.insert(
iter,
{handle,
MemHandleCacheEntry(
curr_device, handle, *device_allocator[curr_device])});
auto sp = std::shared_ptr<void>(
inserted->second.ptr(), [handle, this](void* ptr) {
std::unique_lock<std::mutex> deleter_lock(IpcMutex);
auto it = ipcMemHandle_to_devptr.find(handle);
TORCH_INTERNAL_ASSERT(it != ipcMemHandle_to_devptr.end());
auto entry = std::move(it->second);
ipcMemHandle_to_devptr.erase(it);
// ExpandableSegment synchronizes on destruction in unmapHandles, so
// we need to release the lock first to minimize the performance hit.
deleter_lock.unlock();
entry.clear();
});
inserted->second.wp_ = sp;
return sp;
}
std::string name() override {
return "native";
}
void copy_data(void* dest, const void* src, std::size_t count) const final {
C10_CUDA_CHECK(
cudaMemcpy(dest, src, count, cudaMemcpyKind::cudaMemcpyDeviceToDevice));
}
};
Source
Analyze Your Own Codebase
Get architecture documentation, dependency graphs, and domain analysis for your codebase in minutes.
Try Supermodel Free