diff --git a/src/Elastic.Managed.Ephemeral/Tasks/InstallationTasks/InstallPlugins.cs b/src/Elastic.Managed.Ephemeral/Tasks/InstallationTasks/InstallPlugins.cs index b4ba92f..77dcdcb 100644 --- a/src/Elastic.Managed.Ephemeral/Tasks/InstallationTasks/InstallPlugins.cs +++ b/src/Elastic.Managed.Ephemeral/Tasks/InstallationTasks/InstallPlugins.cs @@ -35,7 +35,7 @@ public override void Run(IEphemeralCluster cluste .Where(p => p.Moniker != "x-pack") //x-pack is already installed OOTB since 6.3.0 NOT an error condition though to specify the plugin .Select(p => p.Moniker).ToList(); if (invalidPlugins.Any()) - throw new CleanExitException( + throw new ObservableProcessException( $"Can not install the following plugins for version {v}: {string.Join(", ", invalidPlugins)} "); } diff --git a/src/Elastic.Managed/Elastic.Managed.csproj b/src/Elastic.Managed/Elastic.Managed.csproj index 5059fc5..4a84459 100644 --- a/src/Elastic.Managed/Elastic.Managed.csproj +++ b/src/Elastic.Managed/Elastic.Managed.csproj @@ -19,7 +19,7 @@ elastic,elasticsearch,cluster,observable,rx - + diff --git a/src/Elastic.Managed/ElasticsearchNode.cs b/src/Elastic.Managed/ElasticsearchNode.cs index a7e53d3..5d3794d 100644 --- a/src/Elastic.Managed/ElasticsearchNode.cs +++ b/src/Elastic.Managed/ElasticsearchNode.cs @@ -1,29 +1,22 @@ using System; using System.Collections.Generic; -using System.Threading; using Elastic.Managed.Configuration; -using Elastic.Managed.ConsoleWriters; using Elastic.Managed.FileSystem; using ProcNet; using ProcNet.Std; +using Elastic.Managed.ConsoleWriters; namespace Elastic.Managed { - public class ElasticsearchNode : ObservableProcess + public class ElasticsearchNode : ElasticsearchObservableProcess { - public string Version { get; private set; } - public int? Port { get; private set; } - public bool NodeStarted { get; private set; } public NodeConfiguration NodeConfiguration { get; } - private int? JavaProcessId { get; set; } - public override int? ProcessId => this.JavaProcessId ?? base.ProcessId; - public int? HostProcessId => base.ProcessId; - - public ElasticsearchNode(ElasticsearchVersion version, string elasticsearchHome = null) - : this(new NodeConfiguration(new ClusterConfiguration(version, fileSystem: (v, s) => new NodeFileSystem(v, elasticsearchHome)))) { } - - public ElasticsearchNode(NodeConfiguration config) : base(StartArgs(config)) => this.NodeConfiguration = config; + public ElasticsearchNode(NodeConfiguration config) : base(StartArgs(config)) + { + this.NodeConfiguration = config; + this.ShowElasticsearchOutputAfterStarted = config.ShowElasticsearchOutputAfterStarted; + } private static StartArguments StartArgs(NodeConfiguration config) { @@ -38,165 +31,34 @@ private static StartArguments StartArgs(NodeConfiguration config) return startArguments; } - private static Dictionary EnvVars(NodeConfiguration config) - { - if (string.IsNullOrWhiteSpace(config.FileSystem.ConfigPath)) return null; - return new Dictionary - { - { config.FileSystem.ConfigEnvironmentVariableName, config.FileSystem.ConfigPath }, - {"ES_HOME", config.FileSystem.ElasticsearchHome} - }; - } - - /// - /// Set this true if you want the node to go into assumed started state as soon as its waiting for more nodes to start doing the election. - /// Useful to speed up starting multi node clusters - /// - public bool AssumeStartedOnNotEnoughMasterPing { get; set; } - - private bool AssumedStartedStateChecker(string section, string message) - { - if (AssumeStartedOnNotEnoughMasterPing - && section.Contains("ZenDiscovery") - && message.Contains("not enough master nodes discovered during pinging")) - return true; - return false; - } - - public IDisposable Start() => this.Start(TimeSpan.FromMinutes(2)); - - public IDisposable Start(TimeSpan waitForStarted) => this.Start(new LineHighlightWriter(), waitForStarted); + protected override string StartTimeoutExceptionMessage(TimeSpan waitForStarted) => + $"Failed to start node: {this.NodeConfiguration.DesiredNodeName} before the configured timeout of: {waitForStarted}"; - public IDisposable Start(IConsoleLineWriter writer, TimeSpan waitForStarted) + protected override void WriteStartedMessage(IConsoleLineWriter writer) { - var node = this.NodeConfiguration.DesiredNodeName; - var subscription = this.SubscribeLines(writer); - if (this.WaitForStarted(waitForStarted)) return subscription; - subscription.Dispose(); - throw new CleanExitException($"Failed to start node: {node} before the configured timeout of: {waitForStarted}"); - } - - internal IConsoleLineWriter Writer { get; private set; } - - public IDisposable SubscribeLines() => this.SubscribeLines(new LineHighlightWriter()); - public IDisposable SubscribeLines(IConsoleLineWriter writer) => - this.SubscribeLines(writer, delegate { }, delegate { }, delegate { }); - - public IDisposable SubscribeLines(IConsoleLineWriter writer, Action onNext) => - this.SubscribeLines(writer, onNext, delegate { }, delegate { }); - - public IDisposable SubscribeLines(IConsoleLineWriter writer, Action onNext, Action onError) => - this.SubscribeLines(writer, onNext, onError, delegate { }); - - public IDisposable SubscribeLines(IConsoleLineWriter writer, Action onNext, Action onError, Action onCompleted) - { - this.Writer = writer; var node = this.NodeConfiguration.DesiredNodeName; writer?.WriteDiagnostic($"Elasticsearch location: [{this.Binary}]", node); writer?.WriteDiagnostic($"Settings: {{{string.Join(" ", this.NodeConfiguration.CommandLineArguments)}}}", node); - return this.SubscribeLines( - l => { - writer?.Write(l); - onNext?.Invoke(l); - }, - e => - { - this.LastSeenException = e; - writer?.Write(e); - onError?.Invoke(e); - this._startedHandle.Set(); - }, - () => - { - onCompleted?.Invoke(); - this._startedHandle.Set(); - }); - } - - public Exception LastSeenException { get; set; } - - private readonly ManualResetEvent _startedHandle = new ManualResetEvent(false); - public WaitHandle StartedHandle => _startedHandle; - public bool WaitForStarted(TimeSpan timeout) => this._startedHandle.WaitOne(timeout); - - protected override void OnBeforeSetCompletedHandle() - { - this._startedHandle.Set(); - base.OnBeforeSetCompletedHandle(); - } - - protected override void OnBeforeWaitForEndOfStreamsError(TimeSpan waited) - { - // The wait for streams finished before streams were fully read. - // this usually indicates the process is still running. - // Proc will successfully kill the host but will leave the JavaProcess the bat file starts running - // The elasticsearch jar is closing down so won't leak but might prevent EphemeralClusterComposer to do its clean up. - // We do a hard kill on both here to make sure both processes are gone. - HardKill(this.HostProcessId); - HardKill(this.JavaProcessId); } - private static void HardKill(int? processId) + protected override void ValidatePort(int port) { - if (!processId.HasValue) return; - try - { - var p = System.Diagnostics.Process.GetProcessById(processId.Value); - p.Kill(); - } - catch (Exception) { } + var dp = this.NodeConfiguration.DesiredPort; + if (dp.HasValue && this.Port != dp.Value) + throw new ObservableProcessException($"Node started on port {port} but {dp.Value} was requested"); } - protected override bool ContinueReadingFromProcessReaders() - { - if (!this.NodeStarted) return true; - return true; + protected override void OnNoPortCaptured() => + throw new ObservableProcessException($"Node started but ElasticsearchNode did not grab its port number"); - // some how if we return false here it leads to Task starvation in Proc and tests in e.g will Elastic.Xunit will start - // to timeout. This makes little sense to me now, so leaving this performance optimization out for now. Hopefully another fresh look will yield - // to (not so) obvious. - //return this.NodeConfiguration.ShowElasticsearchOutputAfterStarted; - } - - protected override bool KeepBufferingLines(LineOut c) + private static Dictionary EnvVars(NodeConfiguration config) { - //if the node is already started only keep buffering lines while we have a writer and the nodeconfiguration wants output after started - if (this.NodeStarted) - { - var keepBuffering = this.Writer != null && this.NodeConfiguration.ShowElasticsearchOutputAfterStarted; - if (!keepBuffering) this.CancelAsyncReads(); - return keepBuffering; - } - - var parsed = LineOutParser.TryParse(c?.Line, out _, out _, out var section, out _, out var message, out var started); - - if (!parsed) return this.Writer != null; - - if (this.JavaProcessId == null && LineOutParser.TryParseNodeInfo(section, message, out var version, out var pid)) - { - this.JavaProcessId = pid; - this.Version = version; - } - else if (LineOutParser.TryGetPortNumber(section, message, out var port)) - { - this.Port = port; - var dp = this.NodeConfiguration.DesiredPort; - if (dp.HasValue && this.Port != dp.Value) - throw new CleanExitException($"Node started on port {port} but {dp.Value} was requested"); - } - - if (!started) started = AssumedStartedStateChecker(section, message); - if (started) + if (string.IsNullOrWhiteSpace(config.FileSystem.ConfigPath)) return null; + return new Dictionary { - if (!this.Port.HasValue) throw new CleanExitException($"Node started but ElasticsearchNode did not grab its port number"); - this.NodeStarted = true; - this._startedHandle.Set(); - } - - // if we have dont a writer always return true - if (this.Writer != null) return true; - //otherwise only keep buffering if we are not started - return !started; + { config.FileSystem.ConfigEnvironmentVariableName, config.FileSystem.ConfigPath }, + {"ES_HOME", config.FileSystem.ElasticsearchHome} + }; } } } diff --git a/src/Elastic.Managed/ElasticsearchObservableProcess.cs b/src/Elastic.Managed/ElasticsearchObservableProcess.cs new file mode 100644 index 0000000..344180f --- /dev/null +++ b/src/Elastic.Managed/ElasticsearchObservableProcess.cs @@ -0,0 +1,195 @@ +using System; +using System.Threading; +using Elastic.Managed.ConsoleWriters; +using ProcNet; +using ProcNet.Std; + +namespace Elastic.Managed +{ + public class ElasticsearchObservableProcess : ObservableProcess + { + internal ElasticsearchObservableProcess(StartArguments args) : base(args) { } + + public ElasticsearchObservableProcess(bool interactive, string binary, params string[] arguments) + : base(StartArgs(binary, arguments)) + { + this.ShowElasticsearchOutputAfterStarted = interactive; + } + + public string Version { get; private set; } + public int? Port { get; private set; } + public bool NodeStarted { get; private set; } + + private IConsoleLineWriter Writer { get; set; } + + private int? JavaProcessId { get; set; } + public override int? ProcessId => this.JavaProcessId ?? base.ProcessId; + public int? HostProcessId => base.ProcessId; + + public bool ShowElasticsearchOutputAfterStarted { get; protected set; } + + /// + /// Set this true if you want the node to go into assumed started state as soon as its waiting for more nodes to start doing the election. + /// Useful to speed up starting multi node clusters + /// + public bool AssumeStartedOnNotEnoughMasterPing { get; set; } + + private bool AssumedStartedStateChecker(string section, string message) + { + if (AssumeStartedOnNotEnoughMasterPing + && section.Contains("ZenDiscovery") + && message.Contains("not enough master nodes discovered during pinging")) + return true; + return false; + } + + private static StartArguments StartArgs(string binary, params string[] arguments) + { + var waitForExit = TimeSpan.FromSeconds(10); + var startArguments = new StartArguments(binary, arguments) + { + SendControlCFirst = true, + WaitForExit = waitForExit, + WaitForStreamReadersTimeout = waitForExit + }; + return startArguments; + } + + public IDisposable Start() => this.Start(TimeSpan.FromMinutes(2)); + + public IDisposable Start(TimeSpan waitForStarted) => this.Start(new LineHighlightWriter(), waitForStarted); + + public IDisposable Start(IConsoleLineWriter writer, TimeSpan waitForStarted) + { + var subscription = this.SubscribeLines(writer); + if (this.WaitForStarted(waitForStarted)) return subscription; + subscription.Dispose(); + throw new ObservableProcessException(this.StartTimeoutExceptionMessage(waitForStarted)); + } + + protected virtual string StartTimeoutExceptionMessage(TimeSpan waitForStarted) => $"Failed to start node before the configured timeout of: {waitForStarted}"; + + public IDisposable SubscribeLines() => this.SubscribeLines(new LineHighlightWriter()); + public IDisposable SubscribeLines(IConsoleLineWriter writer) => + this.SubscribeLines(writer, delegate { }, delegate { }, delegate { }); + + public IDisposable SubscribeLines(IConsoleLineWriter writer, Action onNext) => + this.SubscribeLines(writer, onNext, delegate { }, delegate { }); + + public IDisposable SubscribeLines(IConsoleLineWriter writer, Action onNext, Action onError) => + this.SubscribeLines(writer, onNext, onError, delegate { }); + + protected virtual void WriteStartedMessage(IConsoleLineWriter writer) {} + + public IDisposable SubscribeLines(IConsoleLineWriter writer, Action onNext, Action onError, Action onCompleted) + { + this.Writer = writer; + this.WriteStartedMessage(writer); + return this.SubscribeLines( + l => { + writer?.Write(l); + onNext?.Invoke(l); + }, + e => + { + this.LastSeenException = e; + writer?.Write(e); + onError?.Invoke(e); + this._startedHandle.Set(); + }, + () => + { + onCompleted?.Invoke(); + this._startedHandle.Set(); + }); + } + + public Exception LastSeenException { get; set; } + + private readonly ManualResetEvent _startedHandle = new ManualResetEvent(false); + public WaitHandle StartedHandle => _startedHandle; + public bool WaitForStarted(TimeSpan timeout) => this._startedHandle.WaitOne(timeout); + + protected override void OnBeforeSetCompletedHandle() + { + this._startedHandle.Set(); + base.OnBeforeSetCompletedHandle(); + } + + protected override void OnBeforeWaitForEndOfStreamsError(TimeSpan waited) + { + // The wait for streams finished before streams were fully read. + // this usually indicates the process is still running. + // Proc will successfully kill the host but will leave the JavaProcess the bat file starts running + // The elasticsearch jar is closing down so won't leak but might prevent EphemeralClusterComposer to do its clean up. + // We do a hard kill on both here to make sure both processes are gone. + HardKill(this.HostProcessId); + HardKill(this.JavaProcessId); + } + + private static void HardKill(int? processId) + { + if (!processId.HasValue) return; + try + { + var p = System.Diagnostics.Process.GetProcessById(processId.Value); + p.Kill(); + } + catch (Exception) { } + } + + protected override bool ContinueReadingFromProcessReaders() + { + if (!this.NodeStarted) return true; + return true; + + // some how if we return false here it leads to Task starvation in Proc and tests in e.g will Elastic.Xunit will start + // to timeout. This makes little sense to me now, so leaving this performance optimization out for now. Hopefully another fresh look will yield + // to (not so) obvious. + //return this.NodeConfiguration.ShowElasticsearchOutputAfterStarted; + } + + protected virtual void ValidatePort(int port) { } + + protected virtual void OnNoPortCaptured() { } + + protected override bool KeepBufferingLines(LineOut c) + { + //if the node is already started only keep buffering lines while we have a writer and the nodeconfiguration wants output after started + if (this.NodeStarted) + { + var keepBuffering = this.Writer != null && this.ShowElasticsearchOutputAfterStarted; + if (!keepBuffering) this.CancelAsyncReads(); + return keepBuffering; + } + + var parsed = LineOutParser.TryParse(c?.Line, out _, out _, out var section, out _, out var message, out var started); + + if (!parsed) return this.Writer != null; + + if (this.JavaProcessId == null && LineOutParser.TryParseNodeInfo(section, message, out var version, out var pid)) + { + this.JavaProcessId = pid; + this.Version = version; + } + else if (LineOutParser.TryGetPortNumber(section, message, out var port)) + { + this.Port = port; + this.ValidatePort(port); + } + + if (!started) started = AssumedStartedStateChecker(section, message); + if (started) + { + if (!this.Port.HasValue) this.OnNoPortCaptured(); + this.NodeStarted = true; + this._startedHandle.Set(); + } + + // if we have dont a writer always return true + if (this.Writer != null) return true; + //otherwise only keep buffering if we are not started + return !started; + } + } +} diff --git a/src/Elastic.Xunit/Sdk/TestFrameworkExecutor.cs b/src/Elastic.Xunit/Sdk/TestFrameworkExecutor.cs index c01bddb..35649bf 100644 --- a/src/Elastic.Xunit/Sdk/TestFrameworkExecutor.cs +++ b/src/Elastic.Xunit/Sdk/TestFrameworkExecutor.cs @@ -65,10 +65,10 @@ protected override async void RunTestCases(IEnumerable testCases } catch (Exception e) { - if (e is CleanExitException || e is AggregateException ae && ae.Flatten().InnerException is CleanExitException) + if (e is CleanExitExceptionBase || e is AggregateException ae && ae.Flatten().InnerException is CleanExitExceptionBase) { sink.OnMessage(new TestAssemblyCleanupFailure(Enumerable.Empty(), this.TestAssembly, - new CleanExitException("Node failed to start", e))); + new ObservableProcessException("Node failed to start", e.Message, e))); } else sink.OnMessage(new TestAssemblyCleanupFailure(Enumerable.Empty(), this.TestAssembly, e)); diff --git a/src/Examples/ScratchPad/ScratchPad.csproj b/src/Examples/ScratchPad/ScratchPad.csproj index 50953f3..72eb355 100644 --- a/src/Examples/ScratchPad/ScratchPad.csproj +++ b/src/Examples/ScratchPad/ScratchPad.csproj @@ -5,7 +5,7 @@ False - +