Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
73ab521
Flush structural info of resetDataset() to backend immediately
franzpoeschel Mar 11, 2026
bba35bf
Erase flushMeshes/ParticlesPath
franzpoeschel Mar 11, 2026
39db932
Move flushing from storeChunk to resetDataset
franzpoeschel Mar 11, 2026
167ce9a
Hmm, move CREATE_DATASET task back to storeChunk
franzpoeschel Mar 12, 2026
fdb1273
Fix attribute flushing logic
franzpoeschel Mar 12, 2026
8a6d68d
flush mode helpers
franzpoeschel Jan 14, 2026
2a9ec85
Fix dirty handling
franzpoeschel Jan 14, 2026
1621e21
Add TODO comment
franzpoeschel Mar 12, 2026
0f318cf
WIP Runtime verification of flush level
franzpoeschel Mar 12, 2026
7bd276d
dont flush to IO handler yet in resetDataset
franzpoeschel Mar 13, 2026
47bc305
Revert "dont flush to IO handler yet in resetDataset"
franzpoeschel Mar 13, 2026
b4e17a1
Continue fixing and breaking things..
franzpoeschel Mar 13, 2026
eba8cc2
Fix API call after rebase
franzpoeschel Mar 16, 2026
c8e485d
Fix dirty handling filebased
franzpoeschel Mar 16, 2026
95c0c6e
TMP REVERT ME: deactivate span table tests
franzpoeschel Mar 16, 2026
864c83e
TMP REVERT ME take out hanging parallel test
franzpoeschel Mar 16, 2026
72d0d3f
Fix ranktable logic
franzpoeschel Mar 27, 2026
cb509ab
Take out the next hanging parallel test
franzpoeschel Mar 27, 2026
3d78c9b
Separate MPI tests by MPI barriers
franzpoeschel Mar 27, 2026
eb49aa6
Fix wrong MPI_COMM_WORLD
franzpoeschel Mar 27, 2026
35a8abd
wip: debugging state
franzpoeschel Mar 27, 2026
7ff867a
deactivate malicious tests
franzpoeschel Mar 30, 2026
d2f3fe4
Revert "deactivate malicious tests"
franzpoeschel Mar 30, 2026
c1997e1
Revert "wip: debugging state"
franzpoeschel Mar 30, 2026
f47d71d
Revert "TMP REVERT ME take out hanging parallel test"
franzpoeschel Mar 30, 2026
545eac1
Revert some WIPs
franzpoeschel Mar 30, 2026
8523236
Use an Attributable per Iteration for rankTable in filebased encoding
franzpoeschel May 19, 2026
054a178
fix nompi builds
franzpoeschel May 20, 2026
c16f5ac
CI fixes
franzpoeschel May 20, 2026
a11efd8
Activate test again
franzpoeschel May 20, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions include/openPMD/IO/AbstractIOHandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "openPMD/IterationEncoding.hpp"
#include "openPMD/config.hpp"
#include "openPMD/version.hpp"
#include <ostream>

#if openPMD_HAVE_MPI
#include <mpi.h>
Expand Down Expand Up @@ -81,6 +82,66 @@ enum class FlushLevel
CreateOrOpenFiles
};

std::ostream &operator<<(std::ostream &, FlushLevel);

namespace flush_level
{
inline constexpr auto global_flushpoint(FlushLevel fl)
{
switch (fl)
{
case FlushLevel::UserFlush:
return true;
case FlushLevel::InternalFlush:
case FlushLevel::SkeletonOnly:
case FlushLevel::CreateOrOpenFiles:
return false;
}
return false; // unreachable
}
// same as global_flushpoint for now, but we will soon introduce
// immediate_flush
inline constexpr auto write_datasets(FlushLevel fl)
{
switch (fl)
{
case FlushLevel::UserFlush:
return true;
case FlushLevel::InternalFlush:
case FlushLevel::SkeletonOnly:
case FlushLevel::CreateOrOpenFiles:
return false;
}
return false; // unreachable
}
inline constexpr auto write_attributes(FlushLevel fl)
{
switch (fl)
{
case FlushLevel::UserFlush:
case FlushLevel::InternalFlush:
return true;
case FlushLevel::SkeletonOnly:
case FlushLevel::CreateOrOpenFiles:
return false;
}
return false; // unreachable
}
inline constexpr auto flush_hierarchy(FlushLevel fl)
{
switch (fl)
{
case FlushLevel::UserFlush:
case FlushLevel::InternalFlush:
case FlushLevel::SkeletonOnly:
return true;
case FlushLevel::CreateOrOpenFiles:
return false;
}
return false; // unreachable
}
} // namespace flush_level

enum class OpenpmdStandard
{
v_1_0_0,
Expand Down Expand Up @@ -121,6 +182,7 @@ namespace internal
* To be used for reading
*/
FlushParams const defaultFlushParams{};
FlushParams const publicFlush{FlushLevel::UserFlush};

struct ParsedFlushParams;

Expand Down
2 changes: 1 addition & 1 deletion include/openPMD/IO/AbstractIOHandlerImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class AbstractIOHandlerImpl

virtual ~AbstractIOHandlerImpl() = default;

std::future<void> flush();
std::future<void> flush(FlushLevel);

/**
* Close the file corresponding with the writable and release file handles.
Expand Down
3 changes: 3 additions & 0 deletions include/openPMD/IO/IOTask.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include <cstddef>
#include <memory>
#include <optional>
#include <ostream>
#include <string>
#include <utility>
#include <variant>
Expand Down Expand Up @@ -89,6 +90,8 @@ OPENPMDAPI_EXPORT_ENUM_CLASS(Operation){
}; // note: if you change the enum members here, please update
// docs/source/dev/design.rst

std::ostream &operator<<(std::ostream &os, Operation op);

namespace internal
{
/*
Expand Down
2 changes: 1 addition & 1 deletion include/openPMD/IO/JSON/JSONIOHandlerImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ class JSONIOHandlerImpl : public AbstractIOHandlerImpl

void touch(Writable *, Parameter<Operation::TOUCH> const &) override;

std::future<void> flush();
std::future<void> flush(internal::ParsedFlushParams &params);

private:
#if openPMD_HAVE_MPI
Expand Down
17 changes: 10 additions & 7 deletions include/openPMD/Iteration.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "openPMD/backend/Attributable.hpp"
#include "openPMD/backend/Container.hpp"
#include "openPMD/backend/HierarchyVisitor.hpp"
#include "openPMD/backend/PerIterationData.hpp"
#include "openPMD/backend/scientific_defaults/ScientificDefaults.hpp"

#include <cstdint>
Expand Down Expand Up @@ -122,14 +123,16 @@ namespace internal
*/
bool allow_reopening_implicitly = false;

/**
* Whether a step is currently active for this iteration.
* Used for file-based iteration layout, see Series.hpp for
* group-based layout.
* Access via stepStatus() method to automatically select the correct
* one among both flags.
/*
* This stores data items that are:
*
* 1. global in group and variable encodings
* 2. per-iteration in file encoding
*
* The struct is stored as part of the Series and as part of each
* Iteration. Access must be distinguished by iteration encoding.
*/
StepStatus m_stepStatus = StepStatus::NoStep;
PerIterationData m_perIterationData;

/**
* Cached copy of the key under which this Iteration lives in
Expand Down
3 changes: 2 additions & 1 deletion include/openPMD/RecordComponent.tpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ RecordComponent::storeChunk(Offset o, Extent e, F &&createBuffer)
{
size *= ext;
}

/*
* Flushing the skeleton does not create datasets,
* so we might need to do it now.
Expand Down Expand Up @@ -129,7 +130,7 @@ RecordComponent::storeChunk(Offset o, Extent e, F &&createBuffer)
* actual data yet.
*/
seriesFlush_impl</* flush_entire_series = */ false>(
{FlushLevel::SkeletonOnly});
{FlushLevel::SkeletonOnly}, /*flush_io_handler=*/false);
Parameter<Operation::CREATE_DATASET> dCreate(rc.m_dataset.value());
dCreate.name = Attributable::get().m_writable.ownKeyWithinParent;
IOHandler()->enqueue(IOTask(this, dCreate));
Expand Down
27 changes: 15 additions & 12 deletions include/openPMD/Series.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "openPMD/backend/Container.hpp"
#include "openPMD/backend/HierarchyVisitor.hpp"
#include "openPMD/backend/ParsePreference.hpp"
#include "openPMD/backend/PerIterationData.hpp"
#include "openPMD/config.hpp"
#include "openPMD/snapshots/Snapshots.hpp"
#include "openPMD/version.hpp"
Expand Down Expand Up @@ -205,14 +206,18 @@ namespace internal
* Detected IO format (backend).
*/
Format m_format;
/**
* Whether a step is currently active for this iteration.
* Used for group-based iteration layout, see SeriesData.hpp for
* iteration-based layout.
* Access via stepStatus() method to automatically select the correct
* one among both flags.

/*
* This stores data items that are:
*
* 1. global in group and variable encodings
* 2. per-iteration in file encoding
*
* The struct is stored as part of the Series and as part of each
* Iteration. Access must be distinguished by iteration encoding.
*/
StepStatus m_stepStatus = StepStatus::NoStep;
PerIterationData m_perIterationData;

/**
* True if a user opts into lazy parsing.
*/
Expand Down Expand Up @@ -261,7 +266,6 @@ namespace internal

struct RankTableData
{
Attributable m_attributable;
std::variant<
NoSourceSpecified,
SourceSpecifiedViaJSON,
Expand Down Expand Up @@ -900,9 +904,7 @@ OPENPMD_private
iterations_iterator end,
internal::FlushParams const &flushParams,
bool flushIOHandler = true);
void flushMeshesPath();
void flushParticlesPath();
void flushRankTable();
void flushRankTable(FlushLevel, Attributable &attributable);
/* Parameter `read_only_this_single_iteration` used for reopening an
* Iteration after closing it.
*/
Expand Down Expand Up @@ -985,8 +987,9 @@ OPENPMD_private
* least one step was written.
*
* @param doFlush If true, flush the IO handler.
* @param l This operation must only run at flush level write_datasets
*/
void flushStep(bool doFlush);
void flushStep(bool doFlush, FlushLevel l);

/*
* setIterationEncoding() should only be called by users of our public API,
Expand Down
23 changes: 22 additions & 1 deletion include/openPMD/backend/Attributable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ OPENPMD_protected
/** @} */

template <bool flush_entire_series>
void seriesFlush_impl(internal::FlushParams const &);
void seriesFlush_impl(internal::FlushParams const &, bool flush_io_handler);

void flushAttributes(internal::FlushParams const &);

Expand Down Expand Up @@ -606,6 +606,27 @@ OPENPMD_protected
{
return writable().dirtyRecursive;
}
void determineUnsetDirty(FlushLevel fl)
{
switch (fl)
{
case FlushLevel::UserFlush:
setDirty(false);
break;
case FlushLevel::InternalFlush:
// Used for parsing
if (IOHandler()->m_seriesStatus == internal::SeriesStatus::Parsing)
{
throw error::Internal(
"Parsing procedures should directly unset dirty.");
}
break;
case FlushLevel::SkeletonOnly:
case FlushLevel::CreateOrOpenFiles:
// noop
break;
}
}
void setDirty(bool dirty_in)
{
auto &w = writable();
Expand Down
54 changes: 54 additions & 0 deletions include/openPMD/backend/PerIterationData.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#pragma once

#include "openPMD/ChunkInfo.hpp"
#include "openPMD/Streaming.hpp"
#include "openPMD/backend/Attributable.hpp"

#include <variant>

namespace openPMD::internal
{
struct NoSourceSpecified
{};
struct SourceSpecifiedViaJSON
{
std::string value;
};
struct SourceSpecifiedManually
{
std::string value;
};

struct RankTableData
{
Attributable m_attributable;
std::variant<
NoSourceSpecified,
SourceSpecifiedViaJSON,
SourceSpecifiedManually>
m_rankTableSource;
std::optional<chunk_assignment::RankMeta> m_bufferedRead;
};

/*
* This stores data items that are:
*
* 1. global in group and variable encodings
* 2. per-iteration in file encoding
*
* The struct is stored as part of the Series and as part of each Iteration.
* Access must be distinguished by iteration encoding.
*/
struct PerIterationData
{
/**
* Whether a step is currently active for this iteration.
* Used for group-based iteration layout, see SeriesData.hpp for
* iteration-based layout.
* Access via stepStatus() method to automatically select the correct
* one among both flags.
*/
StepStatus m_stepStatus = StepStatus::NoStep;
Attributable m_rankTableAttributable;
};
} // namespace openPMD::internal
5 changes: 3 additions & 2 deletions include/openPMD/backend/Writable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,14 +128,15 @@ class Writable final
* it.
*/
template <bool flush_entire_series>
void seriesFlush(std::string backendConfig = "{}");
void
seriesFlush(std::string backendConfig = "{}", bool flush_io_handler = true);

// clang-format off
OPENPMD_private
// clang-format on

template <bool flush_entire_series>
void seriesFlush(internal::FlushParams const &);
void seriesFlush(internal::FlushParams const &, bool flush_io_handler);
/*
* These members need to be shared pointers since distinct instances of
* Writable may share them.
Expand Down
18 changes: 4 additions & 14 deletions src/IO/ADIOS/ADIOS2File.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1049,25 +1049,16 @@ void ADIOS2File::flush_impl(
drainedUniquePtrPuts.swap(m_uniquePtrPuts);
}

if (readOnly(m_mode))
if (readOnly(m_mode) || flush_level::write_datasets(level))
{
level = FlushLevel::UserFlush;
}

switch (level)
{
case FlushLevel::UserFlush:
performPutGets(*this, eng);
m_updateSpans.clear();
m_buffer.clear();
m_alreadyEnqueued.clear();
drainedUniquePtrPuts.clear();

break;

case FlushLevel::InternalFlush:
case FlushLevel::SkeletonOnly:
case FlushLevel::CreateOrOpenFiles:
}
else
{
/*
* Tasks have been given to ADIOS2, but we don't flush them
* yet. So, move everything to m_alreadyEnqueued to avoid
Expand All @@ -1084,7 +1075,6 @@ void ADIOS2File::flush_impl(
"wrong time.");
}
m_buffer.clear();
break;
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/IO/ADIOS/ADIOS2IOHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,7 @@ overrideFlushTarget(FlushTarget &inplace, FlushTarget new_val)
std::future<void>
ADIOS2IOHandlerImpl::flush(internal::ParsedFlushParams &flushParams)
{
auto res = AbstractIOHandlerImpl::flush();
auto res = AbstractIOHandlerImpl::flush(flushParams.flushLevel);

detail::ADIOS2File::ADIOS2FlushParams adios2FlushParams{
flushParams.flushLevel, m_flushTarget};
Expand Down
Loading
Loading