using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.IO;
using System.Security.Cryptography;
using System.Threading;
using GeoVLog.Core.Crypto;
using GeoVLog.Core.Manifests;
using GeoVLog.Core.Models;
namespace GeoVLog.Core.Hdf5;
///
/// Consumes s produced by GeoVLogSvc, writes them
/// into an HDF5 file, performs auto‑flush, hourly rotation,
/// AES‑GCM encryption and manifest bookkeeping – all on a dedicated
/// background thread so PureHDF is never accessed concurrently.
///
///
/// 2025‑05‑28 robustness improvements
///
/// - Single‑pass hash + encryption to shorten file‑lock time.
/// - Retry‑aware helpers (ReadAllBytesWithRetry, DeleteFileWithRetry)
/// to tolerate transient Windows file locks (AV/indexer).
/// - All read handles opened with FileShare.ReadWrite | FileShare.Delete
///
/// These additions are 100 % backward‑compatible with production behaviour and
/// do not alter public semantics or manifest format.
///
///
/// Test‑support additions (2025‑05‑28)
///
/// - Optional startThread == false disables the background thread
/// for deterministic unit testing (constructor work only).
/// - Any unhandled exception inside the worker thread is captured and
/// re‑thrown from so tests fail loudly instead
/// of crashing the process.
///
///
///
internal sealed class H5LogQueueWorker : IDisposable
{
// ────────────────────────── ctor & fields ──────────────────────────
private readonly BlockingCollection _queue = new();
private readonly Thread? _thread; // null when startThread == false
private readonly bool _startThread; // remember the choice for Dispose
private readonly string _outputDir;
private readonly FlightManifest _manifest;
private readonly string _manifestPath;
private readonly byte[] _aesKey;
private readonly IFlightManifestSerializer _manifestSer;
private readonly H5LogFlushOptions _flushOpt;
private H5LogFileWriter? _writer; // current hour’s writer
private DateTime _fileStartUtc; // first record of hour
private string? _filePath;
private readonly ConcurrentDictionary? cb)> _sensorMeta = new();
private volatile bool _disposed;
private volatile Exception? _threadException; // captured background error
///
public H5LogQueueWorker(
string outputDirectory,
FlightManifest manifest,
string manifestPath,
byte[] aesKey,
IFlightManifestSerializer manifestSerializer,
H5LogFlushOptions? flushOptions = null,
bool startThread = true)
{
// basic guards
_outputDir = outputDirectory ?? throw new ArgumentNullException(nameof(outputDirectory));
_manifest = manifest ?? throw new ArgumentNullException(nameof(manifest));
_manifestPath = manifestPath ?? throw new ArgumentNullException(nameof(manifestPath));
_aesKey = aesKey ?? throw new ArgumentNullException(nameof(aesKey));
_manifestSer = manifestSerializer ?? throw new ArgumentNullException(nameof(manifestSerializer));
if (_aesKey.Length != AesGcmHelper.KeySize)
throw new ArgumentException($"AES key must be {AesGcmHelper.KeySize} bytes.", nameof(aesKey));
_flushOpt = flushOptions ?? new H5LogFlushOptions(); // apply defaults
_startThread = startThread;
// 1) Encrypt any leftover plaintext files before starting a new log
EncryptPlainFilesInDirectory(_outputDir, _aesKey);
// 2) Production path: open first “current hour” file & spawn thread
if (startThread)
{
_fileStartUtc = DateTime.UtcNow;
_filePath = Path.Combine(_outputDir, $"{_fileStartUtc:yyyyMMdd_HH}.h5");
_writer = new H5LogFileWriter(_filePath);
_thread = new Thread(Run)
{
IsBackground = true,
Name = "H5LogQueueWorker"
};
_thread.Start();
}
}
// ─────────────────────── public API (producer side) ───────────────────────
public void RegisterSensor(
SensorId id, string datasetPath, bool enableParse, SensorSchema schema,
Action? cb = null)
{
_sensorMeta[id] = (datasetPath, enableParse, schema, cb);
_writer?.RegisterSensor(id, datasetPath, enableParse, schema, cb);
}
///
/// Enable or disable parsing for a specific sensor at runtime.
///
public void SetParsingEnabled(SensorId id, bool enable)
{
if (_sensorMeta.TryGetValue(id, out var meta))
{
_sensorMeta[id] = (meta.path, enable, meta.schema, meta.cb);
_writer?.SetParsingEnabled(id, enable);
}
}
///
/// Enable or disable parsing for all registered sensors.
///
public void SetParsingEnabledForAll(bool enable)
{
foreach (var id in _sensorMeta.Keys)
{
SetParsingEnabled(id, enable);
}
}
public void Enqueue(H5LogRecord rec)
{
if (_disposed) throw new ObjectDisposedException(nameof(H5LogQueueWorker));
if (!_startThread)
throw new InvalidOperationException("Enqueue() is unusable when startThread == false.");
_queue.Add(rec);
}
// ───────────────────────── background consumer loop ───────────────────────
/// Worker thread entry point – never call directly.
private void Run()
{
DateTime? lastRecUtc = null;
DateTime lastFlushed = DateTime.UtcNow;
try
{
while (true)
{
// choose blocking wait time if interval flushing enabled
int waitMs = (_flushOpt.EnableAutoFlush && _flushOpt.FlushInterval > TimeSpan.Zero)
? (int)_flushOpt.FlushInterval.TotalMilliseconds
: Timeout.Infinite;
if (!_queue.TryTake(out var rec, waitMs))
{
// timeout – flush if interval triggered
if (_flushOpt.EnableAutoFlush && _writer != null)
FlushCurrentWriter(ref lastFlushed, ref lastRecUtc);
if (_queue.IsAddingCompleted) break;
continue;
}
// ---------- rotation check ----------
if (_writer != null && rec.Timestamp >= _fileStartUtc.AddHours(1))
{
CloseRotateAndEncrypt(lastRecUtc ?? rec.Timestamp);
// now _writer points to a fresh hour‑file
lastFlushed = DateTime.UtcNow;
}
// ---------- normal write ----------
_writer!.Write(rec.SensorId, rec.Timestamp, rec.Payload.Span);
lastRecUtc = rec.Timestamp;
// ---------- flush count / interval ----------
if (_flushOpt.EnableAutoFlush)
{
if (_flushOpt.FlushCount > 0 && _writer.UnflushedCount >= _flushOpt.FlushCount)
FlushCurrentWriter(ref lastFlushed, ref lastRecUtc);
else if (_flushOpt.FlushInterval > TimeSpan.Zero &&
DateTime.UtcNow - lastFlushed >= _flushOpt.FlushInterval)
FlushCurrentWriter(ref lastFlushed, ref lastRecUtc);
}
}
}
catch (Exception ex)
{
// capture for Dispose(); don’t re‑throw here because it would
// terminate the process if unobserved.
_threadException = ex;
}
finally
{
// always finalise last file
try
{
CloseRotateAndEncrypt(lastRecUtc ?? DateTime.UtcNow);
}
catch (Exception ex)
{
_threadException ??= ex; // preserve original if present
}
}
}
// ───────────────────────────── helpers ─────────────────────────────
private void FlushCurrentWriter(ref DateTime lastFlushed, ref DateTime? lastRecUtc)
{
_writer?.Flush();
lastFlushed = DateTime.UtcNow;
// flushing doesn’t change lastRecUtc
}
private void CloseRotateAndEncrypt(DateTime endUtc)
{
if (_writer == null || _filePath == null) return;
_writer.Dispose(); // flush & close
// ---- integrity hash (single‑pass) ----
string sha256Hex;
byte[] plain = ReadAllBytesWithRetry(_filePath);
using (var sha = SHA256.Create())
sha256Hex = Convert.ToHexString(sha.ComputeHash(plain)).ToLowerInvariant();
// ---- encrypt ----
string encPath = _filePath + ".enc";
var (iv, tag, cipher) = AesGcmHelper.Encrypt(plain, _aesKey);
using (var encFs = new FileStream(encPath, FileMode.Create, FileAccess.Write))
{
encFs.Write(cipher, 0, cipher.Length);
encFs.Write(tag, 0, tag.Length);
}
DeleteFileWithRetry(_filePath); // remove plaintext with retries
// ---- manifest ----
_manifest.LogFiles.Add(new LogFileInfo
{
FileName = Path.GetFileName(encPath),
StartUtc = _fileStartUtc,
EndUtc = endUtc,
Sha256Hex = sha256Hex,
IvBase64 = Convert.ToBase64String(iv),
TagBase64 = Convert.ToBase64String(tag),
SizeBytes = new FileInfo(encPath).Length
});
ManifestIO.WriteAsync(_manifest, _aesKey, _manifestPath, _manifestSer)
.GetAwaiter().GetResult();
// ---- open next hour file stub (re‑register sensors) ----
_fileStartUtc = endUtc;
_filePath = Path.Combine(_outputDir, $"{_fileStartUtc:yyyyMMdd_HH}.h5");
_writer = new H5LogFileWriter(_filePath);
foreach (var (id, meta) in _sensorMeta)
_writer.RegisterSensor(id, meta.path, meta.parse, meta.schema, meta.cb);
}
// ─────────────────────────── IDisposable ───────────────────────────
///
/// Shuts down the background thread, flushes outstanding data and re‑throws
/// any exception captured in the worker. Safe to call multiple times.
///
public void Dispose()
{
if (_disposed) return;
_disposed = true;
// If the thread exists, signal completion & wait
if (_startThread && _thread != null)
{
_queue.CompleteAdding();
_thread.Join();
}
// Bubble up any exception that happened in the worker
if (_threadException != null)
throw new AggregateException("Background logging thread failed.", _threadException);
}
// ─────────────────── static helper methods ───────────────────
///
/// Attempts to delete a file, retrying a number of times when the file is
/// temporarily locked by another process (e.g., antivirus).
///
internal static void DeleteFileWithRetry(string path, int maxAttempts = 5, int delayMs = 500)
{
if (path is null)
throw new ArgumentNullException(nameof(path));
for (int attempt = 1; attempt <= maxAttempts; ++attempt)
{
try
{
File.Delete(path);
return; // success
}
catch (IOException) when (attempt < maxAttempts)
{
Thread.Sleep(delayMs);
}
catch (UnauthorizedAccessException) when (attempt < maxAttempts)
{
Thread.Sleep(delayMs);
}
}
// Final attempt – if it still fails, let the exception propagate.
File.Delete(path);
}
///
/// Reads all bytes from using shared‑read access
/// and retries when the file is briefly locked.
///
internal static byte[] ReadAllBytesWithRetry(string path, int maxAttempts = 5, int delayMs = 500)
{
if (path is null)
throw new ArgumentNullException(nameof(path));
for (int attempt = 1; attempt <= maxAttempts; ++attempt)
{
try
{
using FileStream fs = new(path, FileMode.Open, FileAccess.Read,
FileShare.ReadWrite | FileShare.Delete);
byte[] buffer = new byte[fs.Length];
int read;
int offset = 0;
while ((read = fs.Read(buffer, offset, buffer.Length - offset)) > 0)
offset += read;
return buffer;
}
catch (IOException) when (attempt < maxAttempts)
{
Thread.Sleep(delayMs);
}
catch (UnauthorizedAccessException) when (attempt < maxAttempts)
{
Thread.Sleep(delayMs);
}
}
// last try – propagate if fails again
using FileStream finalFs = new(path, FileMode.Open, FileAccess.Read,
FileShare.ReadWrite | FileShare.Delete);
byte[] finalBuf = new byte[finalFs.Length];
int bytesRead = 0;
while (bytesRead < finalBuf.Length)
bytesRead += finalFs.Read(finalBuf, bytesRead, finalBuf.Length - bytesRead);
return finalBuf;
}
internal static string GetUniquePath(string basePath)
{
var candidate = basePath;
var i = 0;
while (File.Exists(candidate + ".h5") || File.Exists(candidate + ".h5.enc"))
{
i++;
candidate = $"{basePath}({i})";
}
return candidate;
}
internal static void EncryptPlainFilesInDirectory(string outputDirectory, byte[] aesKey)
{
if (outputDirectory is null)
throw new ArgumentNullException(nameof(outputDirectory));
if (aesKey is null)
throw new ArgumentNullException(nameof(aesKey));
if (aesKey.Length != AesGcmHelper.KeySize)
throw new ArgumentException($"AES key must be {AesGcmHelper.KeySize} bytes.", nameof(aesKey));
foreach (var path in Directory.GetFiles(outputDirectory, "*.h5"))
{
// Skip if already encrypted counterpart exists
if (File.Exists(path + ".enc"))
continue;
// Compute SHA‑256 (mirrors rotation logic; result unused here)
byte[] plain = ReadAllBytesWithRetry(path);
using (var sha = SHA256.Create())
{
sha.ComputeHash(plain);
}
var (iv, tag, cipher) = AesGcmHelper.Encrypt(plain, aesKey);
string encPath = path + ".enc";
using (var encFs = new FileStream(encPath, FileMode.Create, FileAccess.Write))
{
encFs.Write(cipher, 0, cipher.Length);
encFs.Write(tag, 0, tag.Length);
}
DeleteFileWithRetry(path);
}
}
}