Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions Framework/CCDBSupport/src/CCDBFetcherHelper.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -257,14 +257,24 @@ auto CCDBFetcherHelper::populateCacheWith(std::shared_ptr<CCDBFetcherHelper> con
helper->totalFetchedBytes += size;
helper->totalRequestedBytes += size;
api.appendFlatHeader(v, headers);
// Adopt the new SHM message BEFORE pruning the old cached one. The
// DPL CacheId is the SHM payload pointer (see addToCache and
// ObjectCache::Id::fromRef). If we pruned first the SHM allocator
// could recycle the just-freed address, the new CacheId would
// numerically equal the old one, and the consumer's ObjectCache
// would skip the deserialisation as "same id" — silently dropping
// every CCDB update from that point onwards.
auto oldDPLCacheIt = helper->mapURL2DPLCache.find(path);
auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodCCDB);
if (oldDPLCacheIt != helper->mapURL2DPLCache.end()) {
allocator.pruneFromCache(oldDPLCacheIt->second);
}
helper->mapURL2DPLCache[path] = cacheId;
responses.emplace_back(Response{.id = cacheId, .size = size, .request = nullptr});
O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Caching %{public}s for %{public}s (DPL id %" PRIu64 ", size %zu)", path.data(), headers["ETag"].data(), cacheId.value, size);
continue;
}
if (v.size()) { // but should be overridden by fresh object
// somewhere here pruneFromCache should be called
if (v.size()) { // but should be overridden by fresh object
helper->mapURL2UUID[path].etag = headers["ETag"]; // update uuid
helper->mapURL2UUID[path].cachePopulatedAt = timestampToUse;
helper->mapURL2UUID[path].cacheValidUntil = headers["Cache-Valid-Until"].empty() ? 0 : std::stoul(headers["Cache-Valid-Until"]);
Expand All @@ -276,12 +286,15 @@ auto CCDBFetcherHelper::populateCacheWith(std::shared_ptr<CCDBFetcherHelper> con
helper->totalFetchedBytes += size;
helper->totalRequestedBytes += size;
api.appendFlatHeader(v, headers);
// Adopt-before-prune; see comment in the etag.empty() branch above.
auto oldDPLCacheIt = helper->mapURL2DPLCache.find(path);
auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodCCDB);
if (oldDPLCacheIt != helper->mapURL2DPLCache.end()) {
allocator.pruneFromCache(oldDPLCacheIt->second);
}
helper->mapURL2DPLCache[path] = cacheId;
responses.emplace_back(Response{.id = cacheId, .size = size, .request = nullptr});
O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Caching %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value);
// one could modify the adoptContainer to take optional old cacheID to clean:
// mapURL2DPLCache[URL] = ctx.outputs().adoptContainer(output, std::move(outputBuffer), DataAllocator::CacheStrategy::Always, mapURL2DPLCache[URL]);
continue;
} else {
// Only once the etag is actually used, we get the information on how long the object is valid
Expand Down
44 changes: 36 additions & 8 deletions Framework/CCDBSupport/src/CCDBHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -347,13 +347,26 @@ auto populateCacheWith(std::shared_ptr<CCDBFetcherHelper> const& helper,
helper->totalFetchedBytes += v.size();
helper->totalRequestedBytes += v.size();
api.appendFlatHeader(v, headers);
// IMPORTANT: adopt the fresh SHM message BEFORE pruning the previously
// cached one. The DPL CacheId is the SHM payload pointer (see
// MessageContext::addToCache and ObjectCache::Id::fromRef). If we
// pruned first, the SHM allocator could immediately recycle the just-
// freed address for the new allocation, producing a CacheId numerically
// equal to the previous one. The consumer-side ObjectCache then thinks
// the object hasn't changed, returns the stale deserialised heap
// pointer, and CCDB updates silently stop propagating. Allocating
// first guarantees the old chunk is still alive, forcing the allocator
// to choose a different address for the new one.
auto oldDPLCacheIt = helper->mapURL2DPLCache.find(path);
auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodCCDB);
if (oldDPLCacheIt != helper->mapURL2DPLCache.end()) {
allocator.pruneFromCache(oldDPLCacheIt->second);
}
helper->mapURL2DPLCache[path] = cacheId;
O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Caching %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value);
continue;
}
if (v.size()) { // but should be overridden by fresh object
// somewhere here pruneFromCache should be called
if (v.size()) { // but should be overridden by fresh object
helper->mapURL2UUID[path].etag = headers["ETag"]; // update uuid
helper->mapURL2UUID[path].cachePopulatedAt = timestampToUse;
helper->mapURL2UUID[path].cacheValidUntil = headers["Cache-Valid-Until"].empty() ? 0 : std::stoul(headers["Cache-Valid-Until"]);
Expand All @@ -364,11 +377,16 @@ auto populateCacheWith(std::shared_ptr<CCDBFetcherHelper> const& helper,
helper->totalFetchedBytes += v.size();
helper->totalRequestedBytes += v.size();
api.appendFlatHeader(v, headers);
// Adopt-before-prune: see comment in the etag.empty() branch above
// about why this ordering matters (SHM-address-as-CacheId collision
// would otherwise make the consumer skip the replacement).
auto oldDPLCacheIt = helper->mapURL2DPLCache.find(path);
auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodCCDB);
if (oldDPLCacheIt != helper->mapURL2DPLCache.end()) {
allocator.pruneFromCache(oldDPLCacheIt->second);
}
helper->mapURL2DPLCache[path] = cacheId;
O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Caching %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value);
// one could modify the adoptContainer to take optional old cacheID to clean:
// mapURL2DPLCache[URL] = ctx.outputs().adoptContainer(output, std::move(outputBuffer), DataAllocator::CacheStrategy::Always, mapURL2DPLCache[URL]);
continue;
} else {
// Only once the etag is actually used, we get the information on how long the object is valid
Expand Down Expand Up @@ -448,11 +466,18 @@ AlgorithmSpec CCDBHelpers::fetchFromCCDB()
helper->totalRequestedBytes += v.size();
newOrbitResetTime = getOrbitResetTime(v);
api.appendFlatHeader(v, headers);
// Adopt-before-prune; see comment in populateCacheWith() about
// why pruning first would let the SHM allocator recycle the same
// address and produce a CacheId that collides with the consumer's
// ObjectCache id (which is also the SHM payload pointer).
auto oldDPLCacheIt = helper->mapURL2DPLCache.find(path);
auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodNone);
if (oldDPLCacheIt != helper->mapURL2DPLCache.end()) {
allocator.pruneFromCache(oldDPLCacheIt->second);
}
helper->mapURL2DPLCache[path] = cacheId;
O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "fetchFromCCDB", "Caching %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value);
} else if (v.size()) { // but should be overridden by fresh object
// somewhere here pruneFromCache should be called
} else if (v.size()) { // but should be overridden by fresh object
helper->mapURL2UUID[path].etag = headers["ETag"]; // update uuid
helper->mapURL2UUID[path].cacheMiss++;
helper->mapURL2UUID[path].size = v.size();
Expand All @@ -462,11 +487,14 @@ AlgorithmSpec CCDBHelpers::fetchFromCCDB()
helper->totalRequestedBytes += v.size();
newOrbitResetTime = getOrbitResetTime(v);
api.appendFlatHeader(v, headers);
// Adopt-before-prune; see analogous comment above.
auto oldDPLCacheIt = helper->mapURL2DPLCache.find(path);
auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodNone);
if (oldDPLCacheIt != helper->mapURL2DPLCache.end()) {
allocator.pruneFromCache(oldDPLCacheIt->second);
}
helper->mapURL2DPLCache[path] = cacheId;
O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "fetchFromCCDB", "Caching %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value);
// one could modify the adoptContainer to take optional old cacheID to clean:
// mapURL2DPLCache[URL] = ctx.outputs().adoptContainer(output, std::move(outputBuffer), DataAllocator::CacheStrategy::Always, mapURL2DPLCache[URL]);
}
// cached object is fine
}
Expand Down
9 changes: 9 additions & 0 deletions Framework/Core/include/Framework/DataAllocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,15 @@ class DataAllocator
/// Adopt an already cached message, using an already provided CacheId.
void adoptFromCache(Output const& spec, CacheId id, header::SerializationMethod method = header::gSerializationMethodNone);

/// Prune a previously cached message identified by @a id from the message cache.
/// The cached shallow-clone is dropped; if no other in-flight reference exists
/// the underlying shared-memory region will be released by FairMQ. Calling this
/// with an unknown id is a no-op.
/// This is intended to be used when an entry in a user-managed map of CacheIds
/// is about to be overwritten by a fresh adoptContainer() call (see e.g. the
/// CCDB cache replacement in CCDBHelpers).
void pruneFromCache(CacheId id);

/// snapshot object and route to output specified by OutputRef
/// Framework makes a (serialized) copy of object content.
///
Expand Down
62 changes: 33 additions & 29 deletions Framework/Core/include/Framework/InputRecord.h
Original file line number Diff line number Diff line change
Expand Up @@ -415,37 +415,41 @@ class InputRecord
auto id = ObjectCache::Id::fromRef(ref);
ConcreteDataMatcher matcher{header->dataOrigin, header->dataDescription, header->subSpecification};
// If the matcher does not have an entry in the cache, deserialise it
// and cache the deserialised object at the given id.
// and cache the deserialised object alongside its id, keyed by path.
// The (id, obj) pair is stored per path so that a later cross-path
// SHM-address recycle cannot overwrite another path's cached
// pointer (which would turn the next replace-branch `delete` into a
// destructor call on a wrong-typed object — see ObjectCache.h).
auto path = fmt::format("{}", DataSpecUtils::describe(matcher));
LOGP(debug, "{}", path);
auto& cache = mRegistry.get<ObjectCache>();
auto& callbacks = mRegistry.get<CallbackService>();
auto cacheEntry = cache.matcherToId.find(path);
if (cacheEntry == cache.matcherToId.end()) {
cache.matcherToId.insert(std::make_pair(path, id));
auto cacheEntry = cache.matcherToEntry.find(path);
if (cacheEntry == cache.matcherToEntry.end()) {
std::unique_ptr<ValueT const, Deleter<ValueT const>> result(DataRefUtils::as<CCDBSerialized<ValueT>>(ref).release(), false);
void* obj = (void*)result.get();
callbacks.call<CallbackService::Id::CCDBDeserialised>((ConcreteDataMatcher&)matcher, (void*)obj);
cache.idToObject[id] = obj;
cache.matcherToEntry.emplace(path, ObjectCache::Entry{id, obj});
LOGP(info, "Caching in {} ptr to {} ({})", id.value, path, obj);
return result;
}
auto& oldId = cacheEntry->second;
auto& entry = cacheEntry->second;
// The id in the cache is the same, let's simply return it.
if (oldId.value == id.value) {
std::unique_ptr<ValueT const, Deleter<ValueT const>> result((ValueT const*)cache.idToObject[id], false);
if (entry.id.value == id.value) {
std::unique_ptr<ValueT const, Deleter<ValueT const>> result((ValueT const*)entry.obj, false);
LOGP(debug, "Returning cached entry {} for {} ({})", id.value, path, (void*)result.get());
return result;
}
// The id in the cache is different. Let's destroy the old cached entry
// and create a new one.
delete reinterpret_cast<ValueT*>(cache.idToObject[oldId]);
// The id in the cache is different. Destroy this path's previously
// cached object (entry.obj is guaranteed to be the object we created
// for this path — no other path can write into entry) and replace it.
delete reinterpret_cast<ValueT*>(entry.obj);
std::unique_ptr<ValueT const, Deleter<ValueT const>> result(DataRefUtils::as<CCDBSerialized<ValueT>>(ref).release(), false);
void* obj = (void*)result.get();
callbacks.call<CallbackService::Id::CCDBDeserialised>((ConcreteDataMatcher&)matcher, (void*)obj);
cache.idToObject[id] = obj;
LOGP(info, "Replacing cached entry {} with {} for {} ({})", oldId.value, id.value, path, obj);
oldId.value = id.value;
LOGP(info, "Replacing cached entry {} with {} for {} ({})", entry.id.value, id.value, path, obj);
entry.id = id;
entry.obj = obj;
return result;
} else {
throw runtime_error("Attempt to extract object from message with unsupported serialization type");
Expand Down Expand Up @@ -497,29 +501,29 @@ class InputRecord
auto id = ObjectCache::Id::fromRef(ref);
ConcreteDataMatcher matcher{header->dataOrigin, header->dataDescription, header->subSpecification};
// If the matcher does not have an entry in the cache, deserialise it
// and cache the deserialised object at the given id.
// and cache it per path. Same per-path-keyed structure as the object
// cache above; see ObjectCache.h for the rationale.
auto path = fmt::format("{}", DataSpecUtils::describe(matcher));
LOGP(debug, "{}", path);
auto& cache = mRegistry.get<ObjectCache>();
auto cacheEntry = cache.matcherToMetadataId.find(path);
if (cacheEntry == cache.matcherToMetadataId.end()) {
cache.matcherToMetadataId.insert(std::make_pair(path, id));
cache.idToMetadata[id] = DataRefUtils::extractCCDBHeaders(ref);
auto cacheEntry = cache.matcherToMetadata.find(path);
if (cacheEntry == cache.matcherToMetadata.end()) {
auto [it, inserted] = cache.matcherToMetadata.emplace(
path, ObjectCache::MetadataEntry{id, DataRefUtils::extractCCDBHeaders(ref)});
LOGP(info, "Caching CCDB metadata {}: {}", id.value, path);
return cache.idToMetadata[id];
return it->second.metadata;
}
auto& oldId = cacheEntry->second;
auto& entry = cacheEntry->second;
// The id in the cache is the same, let's simply return it.
if (oldId.value == id.value) {
if (entry.id.value == id.value) {
LOGP(debug, "Returning cached CCDB metatada {}: {}", id.value, path);
return cache.idToMetadata[id];
return entry.metadata;
}
// The id in the cache is different. Let's destroy the old cached entry
// and create a new one.
LOGP(info, "Replacing cached entry {} with {} for {}", oldId.value, id.value, path);
cache.idToMetadata[id] = DataRefUtils::extractCCDBHeaders(ref);
oldId.value = id.value;
return cache.idToMetadata[id];
// The id in the cache is different. Replace this path's metadata.
LOGP(info, "Replacing cached entry {} with {} for {}", entry.id.value, id.value, path);
entry.id = id;
entry.metadata = DataRefUtils::extractCCDBHeaders(ref);
return entry.metadata;
}

template <typename T>
Expand Down
50 changes: 36 additions & 14 deletions Framework/Core/include/Framework/ObjectCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,25 @@
#include "Framework/DataRef.h"
#include <unordered_map>
#include <map>
#include <string>

namespace o2::framework
{

/// A cache for CCDB objects or objects in general
/// which have more than one timeframe of lifetime.
///
/// The cache is keyed *per path* rather than by a global id-derived hash.
/// Earlier versions stored a `matcherToId` (path -> id) map alongside an
/// `idToObject` (id -> deserialised object) map keyed by the SHM payload
/// pointer of the incoming message. Because SHM addresses are recycled by
/// the FairMQ allocator once a chunk is freed, two distinct CCDB paths could
/// transiently share the same id at different points in time. Within a single
/// timeframe an earlier path's deserialisation could then overwrite the
/// `idToObject` slot a later path's `matcherToId` was still pointing at,
/// turning the next `delete reinterpret_cast<T*>(idToObject[oldId])` into a
/// destructor call on the wrong object type. Storing the (id, object) pair
/// per path closes that hole: every path looks up its own slot only.
struct ObjectCache {
struct Id {
int64_t value;
Expand All @@ -39,20 +52,29 @@ struct ObjectCache {
}
};
};
/// A cache for deserialised objects.
/// This keeps a mapping so that we can tell if a given
/// path was already received and it's blob stored in
/// .second.
std::unordered_map<std::string, Id> matcherToId;
/// A map from a CacheId (which is the void* ptr of the previous map).
/// to an actual (type erased) pointer to the deserialised object.
std::unordered_map<Id, void*, Id::hash_fn> idToObject;

/// A cache to the deserialised metadata
/// We keep it separate because we want to avoid that looking up
/// the metadata also pollutes the object cache.
std::unordered_map<std::string, Id> matcherToMetadataId;
std::unordered_map<Id, std::map<std::string, std::string>, Id::hash_fn> idToMetadata;

/// Per-path cache entry for a deserialised CCDB object.
/// `id` is the version marker — compared against the incoming message's id
/// to detect "did this object change?". `obj` is the heap-owned, type-
/// erased pointer to the deserialised value; the path that put it here is
/// the only one allowed to delete it.
struct Entry {
Id id{0};
void* obj{nullptr};
};

/// Per-path cache entry for the CCDB metadata map.
struct MetadataEntry {
Id id{0};
std::map<std::string, std::string> metadata;
};

/// Path -> (id, object). Replaces the former matcherToId / idToObject pair.
std::unordered_map<std::string, Entry> matcherToEntry;

/// Path -> (id, metadata). Replaces the former matcherToMetadataId /
/// idToMetadata pair.
std::unordered_map<std::string, MetadataEntry> matcherToMetadata;
};

} // namespace o2::framework
Expand Down
9 changes: 9 additions & 0 deletions Framework/Core/src/DataAllocator.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,15 @@ void DataAllocator::adoptFromCache(const Output& spec, CacheId id, header::Seria
context.add<MessageContext::TrivialObject>(std::move(headerMessage), std::move(payloadMessage), routeIndex);
}

void DataAllocator::pruneFromCache(CacheId id)
{
// Drop the cached shallow-clone for @a id from the message cache. If no other
// outstanding reference is held the underlying SHM region will be released.
// Erasing an unknown id is a no-op (std::unordered_map::erase semantics).
auto& context = mRegistry.get<MessageContext>();
context.pruneFromCache(id.value);
}

void DataAllocator::cookDeadBeef(const Output& spec)
{
auto& proxy = mRegistry.get<FairMQDeviceProxy>();
Expand Down
Loading