Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 89 additions & 15 deletions source/adios2/engine/bp5/BP5Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ void BP5Reader::GetMetadata(char **md, size_t *size)
so we have to read it from file again
*/
std::vector<char> mdbuf(sizes[0]);
if (!m_MDFile)
{
std::string metadataFile(GetBPMetadataFileName(m_Name));
m_MDFile = m_DataFiles->Acquire(metadataFile, m_dataIsRemote);
}
m_MDFile->Read(mdbuf.data(), sizes[0], 0);

size_t mdsize = sizes[0] + sizes[1] + sizes[2] + 3 * sizeof(uint64_t);
Expand Down Expand Up @@ -1440,10 +1445,11 @@ void BP5Reader::UpdateBuffer(const TimePoint &timeoutInstant, const Seconds &pol
{
size_t newIdxSize = 0;
m_MetadataIndex.Reset(true, false);
if (m_Comm.Rank() == 0)
m_MetadataIndex.m_Buffer.resize(0);
Comment on lines 1447 to +1448
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you do a reset of the metadata index without resizing the buffer to 0? In no, I would put the resize inside the reset function

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of the time we call Reset and want the buffer to survive. Here, the next bit of code needs to see the buffer as size zero, so we can't really move it inside (without adding something like a parameter to say we wanted the buffer to go away or something).

if (m_Comm.Rank() == 0 && m_MDIndexFile)
{
/* Read metadata index table into memory */
const size_t metadataIndexFileSize = m_MDIndexFile->GetSize();
size_t metadataIndexFileSize = m_MDIndexFile->GetSize();
newIdxSize = metadataIndexFileSize - m_MDIndexFileAlreadyReadSize;
if (metadataIndexFileSize > m_MDIndexFileAlreadyReadSize)
{
Expand All @@ -1455,6 +1461,37 @@ void BP5Reader::UpdateBuffer(const TimePoint &timeoutInstant, const Seconds &pol
{
m_MetadataIndex.m_Buffer.resize(0);
}

/* When the writer is done (activeFlag == false), fstat() may
briefly report a stale (too small) size for md.idx due to
filesystem metadata lag. Poll until the size stabilizes so
we don't miss trailing index entries. Check both the already-known
m_WriterIsActive flag (for subsequent reads) and the raw header
byte (for the first read before ParseMetadataIndex runs). */
if (newIdxSize > 0 && (!m_WriterIsActive ||
(!m_MDIndexFileAlreadyReadSize && newIdxSize >= m_IndexHeaderSize &&
m_MetadataIndex.m_Buffer[m_ActiveFlagPosition] != '\1')))
{
for (int retry = 0; retry < 20; retry++)
{
size_t updatedSize = m_MDIndexFile->GetSize();
if (updatedSize > metadataIndexFileSize)
{
size_t extraSize = updatedSize - metadataIndexFileSize;
m_MetadataIndex.m_Buffer.resize(newIdxSize + extraSize);
m_MDIndexFile->Read(m_MetadataIndex.m_Buffer.data() + newIdxSize, extraSize,
metadataIndexFileSize);
newIdxSize += extraSize;
metadataIndexFileSize = updatedSize;
/* Size grew — check again in case even more appeared */
}
else
{
break; /* Size stabilized, we have everything */
}
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
}

// broadcast metadata index buffer to all ranks from zero
Expand Down Expand Up @@ -1549,20 +1586,26 @@ void BP5Reader::UpdateBuffer(const TimePoint &timeoutInstant, const Seconds &pol
}

/* Read new meta-meta-data into memory and append to existing one in
* memory */
const size_t metametadataFileSize = m_MetaMetadataFile->GetSize();
if (metametadataFileSize > m_MetaMetaDataFileAlreadyReadSize)
* memory. Guard against null — mmd.0 may have been closed in a
* previous call when the writer was inactive. */
if (m_MetaMetadataFile)
{
const size_t newMMDSize = metametadataFileSize - m_MetaMetaDataFileAlreadyReadSize;
m_JSONProfiler.Start("MetaMetaDataRead");
m_JSONProfiler.AddBytes("metametadataread", newMMDSize);
m_MetaMetadata.Resize(metametadataFileSize, "(re)allocating meta-meta-data buffer, "
"in call to BP5Reader Open");
m_MetaMetadataFile->Read(m_MetaMetadata.m_Buffer.data() +
m_MetaMetaDataFileAlreadyReadSize,
newMMDSize, m_MetaMetaDataFileAlreadyReadSize);
m_MetaMetaDataFileAlreadyReadSize += newMMDSize;
m_JSONProfiler.Stop("MetaMetaDataRead");
const size_t metametadataFileSize = m_MetaMetadataFile->GetSize();
if (metametadataFileSize > m_MetaMetaDataFileAlreadyReadSize)
{
const size_t newMMDSize =
metametadataFileSize - m_MetaMetaDataFileAlreadyReadSize;
m_JSONProfiler.Start("MetaMetaDataRead");
m_JSONProfiler.AddBytes("metametadataread", newMMDSize);
m_MetaMetadata.Resize(metametadataFileSize,
"(re)allocating meta-meta-data buffer, "
"in call to BP5Reader Open");
m_MetaMetadataFile->Read(m_MetaMetadata.m_Buffer.data() +
m_MetaMetaDataFileAlreadyReadSize,
newMMDSize, m_MetaMetaDataFileAlreadyReadSize);
m_MetaMetaDataFileAlreadyReadSize += newMMDSize;
m_JSONProfiler.Stop("MetaMetaDataRead");
}
}
}

Expand Down Expand Up @@ -1597,6 +1640,37 @@ void BP5Reader::UpdateBuffer(const TimePoint &timeoutInstant, const Seconds &pol
}
}
}

/* Close metadata files individually once all their data is in memory.
Only rank 0 has these files open; on other ranks the pointers are
already null, so reset() is a harmless no-op.
After releasing the PoolableFile, we must also Evict the entry from the
FilePool — otherwise the pool keeps the transport (and FD) alive for
potential reuse. */
if (!m_WriterIsActive)
{
/* mmd.0 is always read in full — close it immediately */
if (m_MetaMetadataFile)
{
m_MetaMetadataFile.reset();
m_DataFiles->Evict(GetBPMetaMetadataFileName(m_Name));
}
/* md.idx: close only when all index entries have been parsed
(parsedIdxSize >= newIdxSize). In streaming mode the 16 MB limit
may leave unparsed entries for a later call. */
if (m_MDIndexFile && parsedIdxSize >= newIdxSize)
{
m_MDIndexFile.reset();
m_DataFiles->Evict(GetBPMetadataIndexFileName(m_Name));
}
/* md.0: close once all index entries are parsed — at that point
every metadata section they reference has already been read. */
if (m_MDFile && !m_MDIndexFile)
{
m_MDFile.reset();
m_DataFiles->Evict(GetBPMetadataFileName(m_Name));
}
}
}

size_t BP5Reader::ParseMetadataIndex(format::BufferSTL &bufferSTL, const size_t absoluteStartPos,
Expand Down
33 changes: 33 additions & 0 deletions source/adios2/toolkit/filepool/FilePool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,39 @@ void FilePool::Release(PoolEntry *obj)
obj->m_InUseCount--;
}

void FilePool::Evict(const std::string &filename)
{
std::lock_guard<std::mutex> lockGuard(PoolMutex);

/* Resolve TarInfo the same way Acquire() does. If this filename maps
into a tar container, the pool entry is the containing file and is
shared with other files inside the tar — do NOT evict it. */
auto finalFileName = filename;
if (m_TarInfoMap && m_TarInfoMap->size())
{
auto FilenameInTar = adios2sys::SystemTools::GetFilenameName(filename);
auto it = m_TarInfoMap->find(FilenameInTar);
if (it != m_TarInfoMap->end())
{
return; // file lives inside a tar — transport is shared, leave it alone
}
}

auto range = m_Pool.equal_range(finalFileName);
for (auto it = range.first; it != range.second;)
{
if (it->second->m_InUseCount == 0)
{
it = m_Pool.erase(it);
m_OpenFileCount--;
}
else
{
++it;
}
}
}

std::vector<std::shared_ptr<adios2::Transport>> FilePool::ListOfTransports()
{
std::vector<std::shared_ptr<adios2::Transport>> Ret;
Expand Down
2 changes: 2 additions & 0 deletions source/adios2/toolkit/filepool/FilePool.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ class FilePool
std::unique_ptr<PoolableFile> Acquire(const std::string &filename,
const bool skipTarInfo = false);
void Release(PoolEntry *obj);
// Remove all unused pool entries for a given filename, closing their transports.
void Evict(const std::string &filename);
void SetParameters(const adios2::Params &params);
std::vector<std::shared_ptr<adios2::Transport>> ListOfTransports();

Expand Down
3 changes: 3 additions & 0 deletions testing/adios2/engine/bp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,9 @@ gtest_add_tests_helper(WriteNull MPI_ALLOW BP Engine.BP. .BP3
bp4_bp5_gtest_add_tests_helper(WriteAppendReadADIOS2 MPI_ALLOW)
bp4_bp5_gtest_add_tests_helper(JoinedArray MPI_ALLOW)
bp4_bp5_gtest_add_tests_helper(OpenWithMetadata MPI_NONE)
if(NOT WIN32)
bp5_gtest_add_tests_helper(MetadataFDClose MPI_NONE)
endif()

# BP4 only for now
# gtest_add_tests_helper(WriteAppendReadADIOS2 MPI_ALLOW BP Engine.BP. .BP4
Expand Down
191 changes: 191 additions & 0 deletions testing/adios2/engine/bp/TestBPMetadataFDClose.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
/*
* Distributed under the OSI-approved Apache License, Version 2.0. See
* accompanying file Copyright.txt for details.
*
* Test that BP5Reader closes metadata file descriptors (md.idx, md.0, mmd.0)
* after opening a completed file in ReadRandomAccess mode.
*
* Uses /proc/self/fd on Linux and fcntl(F_GETPATH) on macOS to inspect
* which files the process has open.
*/

#include <cstdint>
#include <cstring>
#include <iostream>
#include <string>
#include <vector>

#include <adios2.h>

#include <gtest/gtest.h>

#include "../TestHelpers.h"

#if defined(__linux__)
#include <dirent.h>
#include <unistd.h>
#define HAS_FD_INSPECTION 1
#elif defined(__APPLE__)
#include <fcntl.h>
#include <sys/param.h>
#include <sys/resource.h>
#include <unistd.h>
#define HAS_FD_INSPECTION 1
#else
#define HAS_FD_INSPECTION 0
#endif

std::string engineName; // comes from command line
std::string engineParameters; // comes from command line

#if HAS_FD_INSPECTION
/* Return the number of open file descriptors whose path contains `substr`. */
static int CountOpenFDs(const std::string &substr)
{
int count = 0;

#if defined(__linux__)
DIR *dir = opendir("/proc/self/fd");
if (!dir)
{
return -1;
}
struct dirent *entry;
while ((entry = readdir(dir)) != nullptr)
{
if (entry->d_name[0] == '.')
{
continue;
}
char linkTarget[PATH_MAX];
std::string fdPath = std::string("/proc/self/fd/") + entry->d_name;
ssize_t len = readlink(fdPath.c_str(), linkTarget, sizeof(linkTarget) - 1);
if (len > 0)
{
linkTarget[len] = '\0';
if (std::string(linkTarget).find(substr) != std::string::npos)
{
++count;
}
}
}
closedir(dir);

#elif defined(__APPLE__)
/* On macOS, iterate FD numbers and use fcntl(F_GETPATH) to get the path. */
struct rlimit rl;
rlim_t maxfd = 256;
if (getrlimit(RLIMIT_NOFILE, &rl) == 0 && rl.rlim_cur < 4096)
{
maxfd = rl.rlim_cur;
}
char pathbuf[MAXPATHLEN];
for (int fd = 0; fd < (int)maxfd; ++fd)
{
if (fcntl(fd, F_GETPATH, pathbuf) != -1)
{
if (std::string(pathbuf).find(substr) != std::string::npos)
{
++count;
}
}
}
#endif

return count;
}
#endif /* HAS_FD_INSPECTION */

TEST(BPMetadataFDClose, ReadRandomAccessClosesFDs)
{
const std::string fname("BPMetadataFDCloseTest.bp");
const size_t Nx = 10;
const size_t NSteps = 3;

adios2::ADIOS adios;

/* Write a small file */
{
adios2::IO io = adios.DeclareIO("WriteIO");
if (!engineName.empty())
{
io.SetEngine(engineName);
}
if (!engineParameters.empty())
{
io.SetParameters(engineParameters);
}
auto var = io.DefineVariable<double>("data", {Nx}, {0}, {Nx});
adios2::Engine writer = io.Open(fname, adios2::Mode::Write);
std::vector<double> buf(Nx);
for (size_t step = 0; step < NSteps; ++step)
{
for (size_t i = 0; i < Nx; ++i)
{
buf[i] = static_cast<double>(step * Nx + i);
}
writer.BeginStep();
writer.Put(var, buf.data());
writer.EndStep();
}
writer.Close();
}

/* Open in ReadRandomAccess and verify metadata FDs are closed */
{
adios2::IO io = adios.DeclareIO("ReadIO");
if (!engineName.empty())
{
io.SetEngine(engineName);
}
if (!engineParameters.empty())
{
io.SetParameters(engineParameters);
}

adios2::Engine reader = io.Open(fname, adios2::Mode::ReadRandomAccess);
ASSERT_EQ(reader.Steps(), NSteps);

/* Verify we can still read data (FD closing must not break reads) */
auto var = io.InquireVariable<double>("data");
ASSERT_TRUE(var);
std::vector<double> buf(Nx);
var.SetStepSelection({0, 1});
reader.Get(var, buf.data(), adios2::Mode::Sync);
EXPECT_EQ(buf[0], 0.0);
EXPECT_EQ(buf[Nx - 1], static_cast<double>(Nx - 1));

#if HAS_FD_INSPECTION
/* After Open in ReadRandomAccess on a completed file, the three
metadata files (md.idx, md.0, mmd.0) should be closed. */
int mdIdxFDs = CountOpenFDs("/md.idx");
int md0FDs = CountOpenFDs("/md.0");
int mmd0FDs = CountOpenFDs("/mmd.0");

EXPECT_EQ(mdIdxFDs, 0) << "md.idx should be closed after ReadRandomAccess Open";
EXPECT_EQ(md0FDs, 0) << "md.0 should be closed after ReadRandomAccess Open";
EXPECT_EQ(mmd0FDs, 0) << "mmd.0 should be closed after ReadRandomAccess Open";
#endif

reader.Close();
}

CleanupTestFiles(fname);
}

int main(int argc, char **argv)
{
int result;
::testing::InitGoogleTest(&argc, argv);

if (argc > 1)
{
engineName = std::string(argv[1]);
}
if (argc > 2)
{
engineParameters = std::string(argv[2]);
}
result = RUN_ALL_TESTS();
return result;
}