diff --git a/Framework/CCDBSupport/src/CCDBFetcherHelper.cxx b/Framework/CCDBSupport/src/CCDBFetcherHelper.cxx index 8d50dac63a67b..6fa5b88222ea3 100644 --- a/Framework/CCDBSupport/src/CCDBFetcherHelper.cxx +++ b/Framework/CCDBSupport/src/CCDBFetcherHelper.cxx @@ -9,6 +9,7 @@ // granted to it by virtue of its status as an Intergovernmental Organization // or submit itself to any jurisdiction. #include "CCDBFetcherHelper.h" +#include "CCDBHelpers.h" #include "Framework/DataTakingContext.h" #include "Framework/Signpost.h" #include "Framework/DataSpecUtils.h" @@ -257,14 +258,13 @@ auto CCDBFetcherHelper::populateCacheWith(std::shared_ptr con helper->totalFetchedBytes += size; helper->totalRequestedBytes += size; api.appendFlatHeader(v, headers); - auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodCCDB); + auto cacheId = CCDBHelpers::adoptAndReplaceCachedMessage(allocator, helper->mapURL2DPLCache, path, output, std::move(v), header::gSerializationMethodCCDB); helper->mapURL2DPLCache[path] = cacheId; responses.emplace_back(Response{.id = cacheId, .size = size, .request = nullptr}); O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Caching %{public}s for %{public}s (DPL id %" PRIu64 ", size %zu)", path.data(), headers["ETag"].data(), cacheId.value, size); continue; } - if (v.size()) { // but should be overridden by fresh object - // somewhere here pruneFromCache should be called + if (v.size()) { // but should be overridden by fresh object helper->mapURL2UUID[path].etag = headers["ETag"]; // update uuid helper->mapURL2UUID[path].cachePopulatedAt = timestampToUse; helper->mapURL2UUID[path].cacheValidUntil = headers["Cache-Valid-Until"].empty() ? 0 : std::stoul(headers["Cache-Valid-Until"]); @@ -276,12 +276,10 @@ auto CCDBFetcherHelper::populateCacheWith(std::shared_ptr con helper->totalFetchedBytes += size; helper->totalRequestedBytes += size; api.appendFlatHeader(v, headers); - auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodCCDB); + auto cacheId = CCDBHelpers::adoptAndReplaceCachedMessage(allocator, helper->mapURL2DPLCache, path, output, std::move(v), header::gSerializationMethodCCDB); helper->mapURL2DPLCache[path] = cacheId; responses.emplace_back(Response{.id = cacheId, .size = size, .request = nullptr}); O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Caching %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value); - // one could modify the adoptContainer to take optional old cacheID to clean: - // mapURL2DPLCache[URL] = ctx.outputs().adoptContainer(output, std::move(outputBuffer), DataAllocator::CacheStrategy::Always, mapURL2DPLCache[URL]); continue; } else { // Only once the etag is actually used, we get the information on how long the object is valid diff --git a/Framework/CCDBSupport/src/CCDBHelpers.cxx b/Framework/CCDBSupport/src/CCDBHelpers.cxx index fd78594e365bf..8ba9216f888a3 100644 --- a/Framework/CCDBSupport/src/CCDBHelpers.cxx +++ b/Framework/CCDBSupport/src/CCDBHelpers.cxx @@ -249,6 +249,22 @@ bool isOnlineRun(DataTakingContext const& dtc) return dtc.deploymentMode == DeploymentMode::OnlineAUX || dtc.deploymentMode == DeploymentMode::OnlineDDS || dtc.deploymentMode == DeploymentMode::OnlineECS; } +DataAllocator::CacheId CCDBHelpers::adoptAndReplaceCachedMessage( + DataAllocator& allocator, + std::unordered_map const& cache, + std::string const& path, + Output const& output, + o2::pmr::vector&& v, + o2::header::SerializationMethod method) +{ + auto oldIt = cache.find(path); + auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, method); + if (oldIt != cache.end()) { + allocator.pruneFromCache(oldIt->second); + } + return cacheId; +} + auto populateCacheWith(std::shared_ptr const& helper, int64_t timestamp, TimingInfo& timingInfo, @@ -347,13 +363,12 @@ auto populateCacheWith(std::shared_ptr const& helper, helper->totalFetchedBytes += v.size(); helper->totalRequestedBytes += v.size(); api.appendFlatHeader(v, headers); - auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodCCDB); + auto cacheId = CCDBHelpers::adoptAndReplaceCachedMessage(allocator, helper->mapURL2DPLCache, path, output, std::move(v), header::gSerializationMethodCCDB); helper->mapURL2DPLCache[path] = cacheId; O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Caching %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value); continue; } - if (v.size()) { // but should be overridden by fresh object - // somewhere here pruneFromCache should be called + if (v.size()) { // but should be overridden by fresh object helper->mapURL2UUID[path].etag = headers["ETag"]; // update uuid helper->mapURL2UUID[path].cachePopulatedAt = timestampToUse; helper->mapURL2UUID[path].cacheValidUntil = headers["Cache-Valid-Until"].empty() ? 0 : std::stoul(headers["Cache-Valid-Until"]); @@ -364,11 +379,9 @@ auto populateCacheWith(std::shared_ptr const& helper, helper->totalFetchedBytes += v.size(); helper->totalRequestedBytes += v.size(); api.appendFlatHeader(v, headers); - auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodCCDB); + auto cacheId = CCDBHelpers::adoptAndReplaceCachedMessage(allocator, helper->mapURL2DPLCache, path, output, std::move(v), header::gSerializationMethodCCDB); helper->mapURL2DPLCache[path] = cacheId; O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "populateCacheWith", "Caching %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value); - // one could modify the adoptContainer to take optional old cacheID to clean: - // mapURL2DPLCache[URL] = ctx.outputs().adoptContainer(output, std::move(outputBuffer), DataAllocator::CacheStrategy::Always, mapURL2DPLCache[URL]); continue; } else { // Only once the etag is actually used, we get the information on how long the object is valid @@ -448,11 +461,10 @@ AlgorithmSpec CCDBHelpers::fetchFromCCDB() helper->totalRequestedBytes += v.size(); newOrbitResetTime = getOrbitResetTime(v); api.appendFlatHeader(v, headers); - auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodNone); + auto cacheId = CCDBHelpers::adoptAndReplaceCachedMessage(allocator, helper->mapURL2DPLCache, path, output, std::move(v), header::gSerializationMethodNone); helper->mapURL2DPLCache[path] = cacheId; O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "fetchFromCCDB", "Caching %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value); - } else if (v.size()) { // but should be overridden by fresh object - // somewhere here pruneFromCache should be called + } else if (v.size()) { // but should be overridden by fresh object helper->mapURL2UUID[path].etag = headers["ETag"]; // update uuid helper->mapURL2UUID[path].cacheMiss++; helper->mapURL2UUID[path].size = v.size(); @@ -462,11 +474,9 @@ AlgorithmSpec CCDBHelpers::fetchFromCCDB() helper->totalRequestedBytes += v.size(); newOrbitResetTime = getOrbitResetTime(v); api.appendFlatHeader(v, headers); - auto cacheId = allocator.adoptContainer(output, std::move(v), DataAllocator::CacheStrategy::Always, header::gSerializationMethodNone); + auto cacheId = CCDBHelpers::adoptAndReplaceCachedMessage(allocator, helper->mapURL2DPLCache, path, output, std::move(v), header::gSerializationMethodNone); helper->mapURL2DPLCache[path] = cacheId; O2_SIGNPOST_EVENT_EMIT(ccdb, sid, "fetchFromCCDB", "Caching %{public}s for %{public}s (DPL id %" PRIu64 ")", path.data(), headers["ETag"].data(), cacheId.value); - // one could modify the adoptContainer to take optional old cacheID to clean: - // mapURL2DPLCache[URL] = ctx.outputs().adoptContainer(output, std::move(outputBuffer), DataAllocator::CacheStrategy::Always, mapURL2DPLCache[URL]); } // cached object is fine } diff --git a/Framework/CCDBSupport/src/CCDBHelpers.h b/Framework/CCDBSupport/src/CCDBHelpers.h index 0b216aedeafd6..ad67e2c64558e 100644 --- a/Framework/CCDBSupport/src/CCDBHelpers.h +++ b/Framework/CCDBSupport/src/CCDBHelpers.h @@ -12,6 +12,10 @@ #define O2_FRAMEWORK_CCDBHELPERS_H_ #include "Framework/AlgorithmSpec.h" +#include "Framework/DataAllocator.h" +#include "Framework/Output.h" +#include "Headers/DataHeader.h" +#include "MemoryResources/MemoryResources.h" #include #include @@ -25,6 +29,24 @@ struct CCDBHelpers { }; static AlgorithmSpec fetchFromCCDB(); static ParserResult parseRemappings(char const*); + + /// Adopt a freshly-fetched CCDB payload as a new SHM message and prune + /// the previously cached one for this path. The new SHM message is + /// adopted BEFORE the old cached one is pruned + /// @a allocator producer-device DPL DataAllocator + /// @a cache read-only view of the producer-local path -> CacheId map; + /// @a path CCDB path + /// @a output DPL Output matcher + /// @a v freshly-fetched CCDB payload; consumed by the call, leaving @a v empty + /// @a method serialization-method tag written into the message header + /// @return the new CacheId; the caller must record it in its map + static DataAllocator::CacheId adoptAndReplaceCachedMessage( + DataAllocator& allocator, + std::unordered_map const& cache, + std::string const& path, + Output const& output, + o2::pmr::vector&& v, + o2::header::SerializationMethod method); }; } // namespace o2::framework diff --git a/Framework/Core/include/Framework/DataAllocator.h b/Framework/Core/include/Framework/DataAllocator.h index ed9a31ca2857c..104e418cbcd19 100644 --- a/Framework/Core/include/Framework/DataAllocator.h +++ b/Framework/Core/include/Framework/DataAllocator.h @@ -525,6 +525,10 @@ class DataAllocator /// Adopt an already cached message, using an already provided CacheId. void adoptFromCache(Output const& spec, CacheId id, header::SerializationMethod method = header::gSerializationMethodNone); + /// Prune a previously cached message identified by @a id from the message cache. + /// Calling this with an unknown id is a no-op. + void pruneFromCache(CacheId id); + /// snapshot object and route to output specified by OutputRef /// Framework makes a (serialized) copy of object content. /// diff --git a/Framework/Core/include/Framework/InputRecord.h b/Framework/Core/include/Framework/InputRecord.h index d2e152c1bcacc..a33745a5bd00a 100644 --- a/Framework/Core/include/Framework/InputRecord.h +++ b/Framework/Core/include/Framework/InputRecord.h @@ -415,37 +415,35 @@ class InputRecord auto id = ObjectCache::Id::fromRef(ref); ConcreteDataMatcher matcher{header->dataOrigin, header->dataDescription, header->subSpecification}; // If the matcher does not have an entry in the cache, deserialise it - // and cache the deserialised object at the given id. + // and cache the deserialised object alongside its id, keyed by path. auto path = fmt::format("{}", DataSpecUtils::describe(matcher)); LOGP(debug, "{}", path); auto& cache = mRegistry.get(); auto& callbacks = mRegistry.get(); - auto cacheEntry = cache.matcherToId.find(path); - if (cacheEntry == cache.matcherToId.end()) { - cache.matcherToId.insert(std::make_pair(path, id)); + auto cacheEntry = cache.matcherToEntry.find(path); + if (cacheEntry == cache.matcherToEntry.end()) { std::unique_ptr> result(DataRefUtils::as>(ref).release(), false); void* obj = (void*)result.get(); callbacks.call((ConcreteDataMatcher&)matcher, (void*)obj); - cache.idToObject[id] = obj; + cache.matcherToEntry.emplace(path, ObjectCache::Entry{id, obj}); LOGP(info, "Caching in {} ptr to {} ({})", id.value, path, obj); return result; } - auto& oldId = cacheEntry->second; + auto& entry = cacheEntry->second; // The id in the cache is the same, let's simply return it. - if (oldId.value == id.value) { - std::unique_ptr> result((ValueT const*)cache.idToObject[id], false); + if (entry.id.value == id.value) { + std::unique_ptr> result((ValueT const*)entry.obj, false); LOGP(debug, "Returning cached entry {} for {} ({})", id.value, path, (void*)result.get()); return result; } - // The id in the cache is different. Let's destroy the old cached entry - // and create a new one. - delete reinterpret_cast(cache.idToObject[oldId]); + // The id in the cache is different. Destroy this path's previously cached object and replace it. + delete reinterpret_cast(entry.obj); std::unique_ptr> result(DataRefUtils::as>(ref).release(), false); void* obj = (void*)result.get(); callbacks.call((ConcreteDataMatcher&)matcher, (void*)obj); - cache.idToObject[id] = obj; - LOGP(info, "Replacing cached entry {} with {} for {} ({})", oldId.value, id.value, path, obj); - oldId.value = id.value; + LOGP(info, "Replacing cached entry {} with {} for {} ({})", entry.id.value, id.value, path, obj); + entry.id = id; + entry.obj = obj; return result; } else { throw runtime_error("Attempt to extract object from message with unsupported serialization type"); @@ -496,30 +494,28 @@ class InputRecord // it's updated. auto id = ObjectCache::Id::fromRef(ref); ConcreteDataMatcher matcher{header->dataOrigin, header->dataDescription, header->subSpecification}; - // If the matcher does not have an entry in the cache, deserialise it - // and cache the deserialised object at the given id. + // If the matcher does not have an entry in the cache, deserialise it and cache it per path. auto path = fmt::format("{}", DataSpecUtils::describe(matcher)); LOGP(debug, "{}", path); auto& cache = mRegistry.get(); - auto cacheEntry = cache.matcherToMetadataId.find(path); - if (cacheEntry == cache.matcherToMetadataId.end()) { - cache.matcherToMetadataId.insert(std::make_pair(path, id)); - cache.idToMetadata[id] = DataRefUtils::extractCCDBHeaders(ref); + auto cacheEntry = cache.matcherToMetadata.find(path); + if (cacheEntry == cache.matcherToMetadata.end()) { + auto [it, inserted] = cache.matcherToMetadata.emplace( + path, ObjectCache::MetadataEntry{id, DataRefUtils::extractCCDBHeaders(ref)}); LOGP(info, "Caching CCDB metadata {}: {}", id.value, path); - return cache.idToMetadata[id]; + return it->second.metadata; } - auto& oldId = cacheEntry->second; + auto& entry = cacheEntry->second; // The id in the cache is the same, let's simply return it. - if (oldId.value == id.value) { + if (entry.id.value == id.value) { LOGP(debug, "Returning cached CCDB metatada {}: {}", id.value, path); - return cache.idToMetadata[id]; + return entry.metadata; } - // The id in the cache is different. Let's destroy the old cached entry - // and create a new one. - LOGP(info, "Replacing cached entry {} with {} for {}", oldId.value, id.value, path); - cache.idToMetadata[id] = DataRefUtils::extractCCDBHeaders(ref); - oldId.value = id.value; - return cache.idToMetadata[id]; + // The id in the cache is different. Replace this path's metadata. + LOGP(info, "Replacing cached entry {} with {} for {}", entry.id.value, id.value, path); + entry.id = id; + entry.metadata = DataRefUtils::extractCCDBHeaders(ref); + return entry.metadata; } template diff --git a/Framework/Core/include/Framework/ObjectCache.h b/Framework/Core/include/Framework/ObjectCache.h index a6873aec8a1ac..cf0d8f51a81bc 100644 --- a/Framework/Core/include/Framework/ObjectCache.h +++ b/Framework/Core/include/Framework/ObjectCache.h @@ -14,12 +14,14 @@ #include "Framework/DataRef.h" #include #include +#include namespace o2::framework { /// A cache for CCDB objects or objects in general /// which have more than one timeframe of lifetime. +/// The cache is keyed *per path* rather than by a global id-derived hash. struct ObjectCache { struct Id { int64_t value; @@ -39,20 +41,28 @@ struct ObjectCache { } }; }; - /// A cache for deserialised objects. + + /// Per-path cache entry for a deserialised CCDB object. + struct Entry { + Id id{0}; + void* obj{nullptr}; + }; + + /// Per-path cache entry for the CCDB metadata map. + struct MetadataEntry { + Id id{0}; + std::map metadata; + }; + + /// A per-path cache for deserialised objects. /// This keeps a mapping so that we can tell if a given - /// path was already received and it's blob stored in - /// .second. - std::unordered_map matcherToId; - /// A map from a CacheId (which is the void* ptr of the previous map). - /// to an actual (type erased) pointer to the deserialised object. - std::unordered_map idToObject; - - /// A cache to the deserialised metadata + /// path was already received and it's blob stored in .second.obj + std::unordered_map matcherToEntry; + + /// A per-path cache to the deserialised metadata /// We keep it separate because we want to avoid that looking up /// the metadata also pollutes the object cache. - std::unordered_map matcherToMetadataId; - std::unordered_map, Id::hash_fn> idToMetadata; + std::unordered_map matcherToMetadata; }; } // namespace o2::framework diff --git a/Framework/Core/src/DataAllocator.cxx b/Framework/Core/src/DataAllocator.cxx index d7bfff0dbf19d..0802bb8300ae7 100644 --- a/Framework/Core/src/DataAllocator.cxx +++ b/Framework/Core/src/DataAllocator.cxx @@ -392,6 +392,12 @@ void DataAllocator::adoptFromCache(const Output& spec, CacheId id, header::Seria context.add(std::move(headerMessage), std::move(payloadMessage), routeIndex); } +void DataAllocator::pruneFromCache(CacheId id) +{ + auto& context = mRegistry.get(); + context.pruneFromCache(id.value); +} + void DataAllocator::cookDeadBeef(const Output& spec) { auto& proxy = mRegistry.get();