using GeoVLog.Core.Models; using GeoVLog.Core.Parsers; using PureHDF; using PureHDF.Selections; using System.Text; using System.Threading.Tasks; namespace GeoVLog.Core.Hdf5; /// /// Handles writing to a single sensor's dataset (e.g., "/Sensors/GPS") in the HDF5 file. /// Logs raw messages with timestamps, and optionally parses messages for live processing via callback. /// /// /// If parsing is enabled, the parsing and callback execution is offloaded to a background thread /// to avoid blocking the logging thread. /// public sealed class H5SensorWriter : IDisposable { private readonly object _gate = new(); private readonly H5Dataset _ds; private volatile bool _enableParse; private readonly SensorSchema _schema; private readonly Action? _callback; private H5NativeWriter _writer; // not readonly, allows updating on flush private ulong _count; /// /// Initializes a writer for a specific sensor's data stream. /// /// The HDF5 file (root group) to use for creating/opening the dataset. /// The H5NativeWriter managing writes to disk (from H5File.BeginWrite()). /// The dataset path for this sensor's data (e.g., "/Sensors/GPS"). /// Whether to parse the raw messages for this sensor. /// The schema defining how to parse the sensor's data. /// Callback invoked with (sensorId, parsedObject) for each new record if parsing is enabled. public H5SensorWriter( H5File file, H5NativeWriter writer, string datasetPath, bool enableParse, SensorSchema schema, Action? parsedCallback) { _enableParse = enableParse; _schema = schema; _callback = parsedCallback; _writer = writer; // Create or open the dataset for this sensor (variable-length string dataset). // The dataset is 1D, chunked, initially empty, and can expand. _ds = file.CreateOrOpenVLStringDataset(datasetPath); _count = 0UL; // Start appending at index 0. } /// /// Appends a new raw data record for this sensor. /// /// The sensor identifier (for context in callbacks). /// The timestamp of the record (UTC). /// The raw message bytes (UTF-8 encoded). /// /// Stores a new entry in the dataset as a UTF-8 string in the format "YYYY-MM-DDThh:mm:ss.fffffffZ|<message>". /// If parsing is enabled, the message is parsed and the callback invoked with the result on a separate thread. /// This method is thread-safe; it locks on a per-sensor basis. /// public void Write(SensorId id, DateTime utc, ReadOnlySpan msg) { lock (_gate) { // Format timestamp in ISO 8601 ("O") and append raw message text string record = $"{utc:O}|{Encoding.UTF8.GetString(msg)}"; Append(record); // If parsing is enabled, offload parsing and callback to a background thread if (_enableParse && _callback != null) { var sensorId = id; var utcCopy = utc; byte[] msgCopy = msg.ToArray(); // copy message bytes for thread-safety Task.Run(() => { var parsed = ParserRouter.Parse(_schema, msgCopy, utcCopy); if (parsed is not null) { _callback(sensorId, parsed); } }); } } } /// /// Appends the given string as a new entry in the dataset. /// /// The raw record string to append. private void Append(string s) { // Compute the index at which to append the new element (current count). ulong index = _count; _count++; // Select the position (hyperslab) in the dataset where the new element will go. var fileSelection = new HyperslabSelection(start: index, block: 1); // Write the new string element at the computed index in the dataset. // PureHDF 2.1.1: H5NativeWriter.Write expects data of the dataset's element type (string in this case):contentReference[oaicite:9]{index=9}. _writer.Write(_ds, s, fileSelection: fileSelection); // ^ We pass a single string `s` directly. The writer will extend the dataset as needed to accommodate this new element. } /// /// Updates the HDF5 writer used by this sensor writer (e.g., after a flush operation). /// /// The new H5NativeWriter to use for subsequent writes. internal void UpdateWriter(H5NativeWriter newWriter) { lock (_gate) { _writer = newWriter; } } /// /// Enable or disable parsing of incoming messages for this sensor. /// /// True to parse messages; false to store only raw data. internal void SetParsingEnabled(bool enable) { lock (_gate) { _enableParse = enable; } } /// /// Releases this sensor writer's resources. /// public void Dispose() { // No explicit disposal needed for H5Dataset or related objects in PureHDF. // (Datasets are managed by the H5NativeWriter and will be closed when the file writer is disposed.) } }