From 56617aee76cb8ddd78fdc25830bd9e08f450efea Mon Sep 17 00:00:00 2001 From: xieguigang Date: Fri, 9 Apr 2021 19:11:25 +0800 Subject: [PATCH 1/4] Update SlaveTask.vb --- Parallel/IpcParallel/SlaveTask.vb | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/Parallel/IpcParallel/SlaveTask.vb b/Parallel/IpcParallel/SlaveTask.vb index 56ea3aa..4d70380 100644 --- a/Parallel/IpcParallel/SlaveTask.vb +++ b/Parallel/IpcParallel/SlaveTask.vb @@ -152,10 +152,10 @@ Public Class SlaveTask Dim commandlineArgvs As String = builder(processor, host.HostPort) - If Not debugPort Is Nothing Then - Console.WriteLine(commandlineArgvs) - Pause() - End If + 'If Not debugPort Is Nothing Then + ' Console.WriteLine(commandlineArgvs) + ' Pause() + 'End If #If netcore5 = 0 Then Call CommandLine.Call(processor, commandlineArgvs) From 22b477e116993c92a9e0d2d4d689eecd55e17bff Mon Sep 17 00:00:00 2001 From: xieguigang Date: Fri, 9 Apr 2021 20:20:38 +0800 Subject: [PATCH 2/4] option verbose --- Parallel/IpcParallel/IPCSocket.vb | 14 +++++++++++--- Parallel/IpcParallel/SlaveTask.vb | 29 ++++++++++++++++++++++------- 2 files changed, 33 insertions(+), 10 deletions(-) diff --git a/Parallel/IpcParallel/IPCSocket.vb b/Parallel/IpcParallel/IPCSocket.vb index 26715a2..ad99373 100644 --- a/Parallel/IpcParallel/IPCSocket.vb +++ b/Parallel/IpcParallel/IPCSocket.vb @@ -72,6 +72,7 @@ Public Class IPCSocket : Implements ITaskDriver ReadOnly socket As TcpServicesSocket ReadOnly target As IDelegate + ReadOnly verbose As Boolean Public ReadOnly Property HostPort As Integer Get @@ -85,10 +86,11 @@ Public Class IPCSocket : Implements ITaskDriver Public Property handleGetArgument As Func(Of Integer, ObjectStream) Public Property host As SlaveTask - Sub New(target As IDelegate, Optional debug As Integer? = Nothing) + Sub New(target As IDelegate, Optional debug As Integer? = Nothing, Optional verbose As Boolean = False) Me.socket = New TcpServicesSocket(If(debug, GetFirstAvailablePort())) Me.socket.ResponseHandler = AddressOf New ProtocolHandler(Me).HandleRequest Me.target = target + Me.verbose = verbose End Sub Private Function GetFirstAvailablePort() As Integer @@ -132,7 +134,10 @@ Public Class IPCSocket : Implements ITaskDriver Public Function GetTask(request As RequestStream, remoteAddress As System.Net.IPEndPoint) As BufferPipe - Call Console.WriteLine($"[{GetHashCode.ToHexString}] get parallel task entry.") + If verbose Then + Call Console.WriteLine($"[{GetHashCode.ToHexString}] get parallel task entry.") + End If + Return New DataPipe(Encoding.UTF8.GetBytes(target.GetJson)) End Function @@ -147,7 +152,10 @@ Public Class IPCSocket : Implements ITaskDriver Public Function PostStart(request As RequestStream, remoteAddress As System.Net.IPEndPoint) As BufferPipe - Call Console.WriteLine($"[{GetHashCode.ToHexString}] started!") + If verbose Then + Call Console.WriteLine($"[{GetHashCode.ToHexString}] started!") + End If + Return New DataPipe(Encoding.UTF8.GetBytes("OK!")) End Function diff --git a/Parallel/IpcParallel/SlaveTask.vb b/Parallel/IpcParallel/SlaveTask.vb index 4d70380..5e16eb8 100644 --- a/Parallel/IpcParallel/SlaveTask.vb +++ b/Parallel/IpcParallel/SlaveTask.vb @@ -62,18 +62,21 @@ Public Class SlaveTask ReadOnly builder As ISlaveTask ReadOnly debugPort As Integer? ReadOnly ignoreError As Boolean + ReadOnly verbose As Boolean Friend ReadOnly streamBuf As New StreamEmit Sub New(processor As InteropService, cli As ISlaveTask, Optional debugPort As Integer? = Nothing, - Optional ignoreError As Boolean = False) + Optional ignoreError As Boolean = False, + Optional verbose As Boolean = False) Me.builder = cli Me.processor = processor Me.debugPort = debugPort Me.ignoreError = ignoreError + Me.verbose = verbose End Sub Public Function Emit(Of T)(streamAs As Func(Of T, Stream)) As SlaveTask @@ -94,7 +97,10 @@ Public Class SlaveTask ''' ''' Private Function handlePOST(buf As Stream, type As Type, debugCode As Integer) As Object - Call Console.WriteLine($"[{debugCode.ToHexString}] task finished!") + If verbose Then + Call Console.WriteLine($"[{debugCode.ToHexString}] task finished!") + End If + Return GetValueFromStream(buf, type, streamBuf) End Function @@ -111,7 +117,11 @@ Public Class SlaveTask Private Function handleGET(param As Object, i As Integer, debugCode As Integer) As ObjectStream Dim socket As SocketRef = SocketRef.WriteBuffer(param, streamBuf) - Call Console.WriteLine($"[{debugCode.ToHexString}] get argument[{i + 1}]...") + + If verbose Then + Call Console.WriteLine($"[{debugCode.ToHexString}] get argument[{i + 1}]...") + End If + Return streamBuf.handleSerialize(socket) End Function @@ -131,7 +141,7 @@ Public Class SlaveTask Dim host As IPCSocket = Nothing Dim resultType As Type = entry.Method.ReturnType - host = New IPCSocket(target, debugPort) With { + host = New IPCSocket(target, debugPort, verbose:=verbose) With { .host = Me, .handlePOSTResult = Sub(buf) @@ -146,10 +156,12 @@ Public Class SlaveTask } Call Microsoft.VisualBasic.Parallel.RunTask(AddressOf host.Run) - - Call Console.WriteLine($"[{host.GetHashCode.ToHexString}] port:{host.HostPort}") Call Thread.Sleep(100) + If verbose Then + Call Console.WriteLine($"[{host.GetHashCode.ToHexString}] port:{host.HostPort}") + End If + Dim commandlineArgvs As String = builder(processor, host.HostPort) 'If Not debugPort Is Nothing Then @@ -164,7 +176,10 @@ Public Class SlaveTask #End If Call host.Stop() - Call Console.WriteLine($"[{host.GetHashCode.ToHexString}] thread exit...") + + If verbose Then + Call Console.WriteLine($"[{host.GetHashCode.ToHexString}] thread exit...") + End If If TypeOf result Is IPCError Then If ignoreError Then From 459faa607911aaacd2d04906090a4934c396d9a6 Mon Sep 17 00:00:00 2001 From: xieguigang Date: Fri, 9 Apr 2021 20:41:23 +0800 Subject: [PATCH 3/4] add benchmark info of task exec --- Parallel/ThreadTask/ThreadTask.vb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Parallel/ThreadTask/ThreadTask.vb b/Parallel/ThreadTask/ThreadTask.vb index f33fcee..3186287 100644 --- a/Parallel/ThreadTask/ThreadTask.vb +++ b/Parallel/ThreadTask/ThreadTask.vb @@ -168,8 +168,8 @@ Namespace ThreadTask If j > -1 Then Yield threads(j).GetValue + Call Console.WriteLine($"{ToString()} [thread_{j + 1}] job done ({threads(j).GetTaskExecTimeSpan.FormatTime})!") threads(j) = Nothing - Call Console.WriteLine($"{ToString()} [thread_{j + 1}] job done!") End If Loop @@ -178,8 +178,8 @@ Namespace ThreadTask If j > -1 Then Yield threads(j).GetValue + Call Console.WriteLine($"{ToString()} [thread_{j + 1}] job done ({threads(j).GetTaskExecTimeSpan.FormatTime})!") threads(j) = Nothing - Call Console.WriteLine($"{ToString()} [thread_{j + 1}] job done!") End If Loop End Function From 66279810d129f97673e3222779b3f8574b274228 Mon Sep 17 00:00:00 2001 From: xieguigang Date: Fri, 9 Apr 2021 22:02:52 +0800 Subject: [PATCH 4/4] improvements of the socket model --- Parallel/IpcParallel/IPCSocket.vb | 23 +++-- Parallel/IpcParallel/SlaveTask.vb | 138 ++++++++++++++++-------------- 2 files changed, 92 insertions(+), 69 deletions(-) diff --git a/Parallel/IpcParallel/IPCSocket.vb b/Parallel/IpcParallel/IPCSocket.vb index ad99373..6ff53e3 100644 --- a/Parallel/IpcParallel/IPCSocket.vb +++ b/Parallel/IpcParallel/IPCSocket.vb @@ -80,17 +80,23 @@ Public Class IPCSocket : Implements ITaskDriver End Get End Property - Public Property handlePOSTResult As Action(Of Stream) - Public Property handleError As Action(Of IPCError) + Public ReadOnly Property result As Object = Nothing + + Public Property handlePOSTResult As Func(Of Stream, IPCSocket, Object) Public Property nargs As Integer - Public Property handleGetArgument As Func(Of Integer, ObjectStream) + Public Property handleGetArgument As Func(Of Integer, IPCSocket, ObjectStream) Public Property host As SlaveTask + Public ReadOnly Property handleSetResult As Boolean = False + Public ReadOnly Property socketExitCode As Integer + Sub New(target As IDelegate, Optional debug As Integer? = Nothing, Optional verbose As Boolean = False) Me.socket = New TcpServicesSocket(If(debug, GetFirstAvailablePort())) Me.socket.ResponseHandler = AddressOf New ProtocolHandler(Me).HandleRequest Me.target = target Me.verbose = verbose + Me.result = Nothing + Me.handleSetResult = False End Sub Private Function GetFirstAvailablePort() As Integer @@ -129,7 +135,8 @@ Public Class IPCSocket : Implements ITaskDriver End Sub Public Function Run() As Integer Implements ITaskDriver.Run - Return socket.Run + _socketExitCode = socket.Run + Return socketExitCode End Function @@ -145,7 +152,7 @@ Public Class IPCSocket : Implements ITaskDriver Public Function GetArgumentByIndex(request As RequestStream, remoteAddress As System.Net.IPEndPoint) As BufferPipe Dim i As Integer = BitConverter.ToInt32(request.ChunkBuffer, Scan0) - Using buf As ObjectStream = _handleGetArgument(i) + Using buf As ObjectStream = _handleGetArgument(i, Me) Return New DataPipe(buf) End Using End Function @@ -167,7 +174,8 @@ Public Class IPCSocket : Implements ITaskDriver Public Function PostError(request As RequestStream, remoteAddress As System.Net.IPEndPoint) As BufferPipe Using ms As New MemoryStream(request.ChunkBuffer) - Call DirectCast(SlaveTask.GetValueFromStream(ms, GetType(IPCError), host.streamBuf), IPCError).DoCall(_handleError) + _result = DirectCast(SlaveTask.GetValueFromStream(ms, GetType(IPCError), host.streamBuf), IPCError) + _handleSetResult = True End Using Return New DataPipe(Encoding.ASCII.GetBytes("OK!")) @@ -176,7 +184,8 @@ Public Class IPCSocket : Implements ITaskDriver Public Function PostResult(request As RequestStream, remoteAddress As System.Net.IPEndPoint) As BufferPipe Using ms As New MemoryStream(request.ChunkBuffer) - Call _handlePOSTResult(ms) + _result = _handlePOSTResult(ms, Me) + _handleSetResult = True End Using Return New DataPipe(Encoding.ASCII.GetBytes("OK!")) diff --git a/Parallel/IpcParallel/SlaveTask.vb b/Parallel/IpcParallel/SlaveTask.vb index 5e16eb8..c9c0e5e 100644 --- a/Parallel/IpcParallel/SlaveTask.vb +++ b/Parallel/IpcParallel/SlaveTask.vb @@ -1,47 +1,47 @@ #Region "Microsoft.VisualBasic::c7901ef38489164efc1ff25498efb52f, Parallel\IpcParallel\SlaveTask.vb" - ' Author: - ' - ' asuka (amethyst.asuka@gcmodeller.org) - ' xie (genetics@smrucc.org) - ' xieguigang (xie.guigang@live.com) - ' - ' Copyright (c) 2018 GPL3 Licensed - ' - ' - ' GNU GENERAL PUBLIC LICENSE (GPL3) - ' - ' - ' This program is free software: you can redistribute it and/or modify - ' it under the terms of the GNU General Public License as published by - ' the Free Software Foundation, either version 3 of the License, or - ' (at your option) any later version. - ' - ' This program is distributed in the hope that it will be useful, - ' but WITHOUT ANY WARRANTY; without even the implied warranty of - ' MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - ' GNU General Public License for more details. - ' - ' You should have received a copy of the GNU General Public License - ' along with this program. If not, see . - - - - ' /********************************************************************************/ - - ' Summaries: - - ' Delegate Function - ' - ' - ' Class SlaveTask - ' - ' Constructor: (+1 Overloads) Sub New - ' Function: (+2 Overloads) Emit, GetValueFromStream, handleGET, handlePOST, RunTask - ' - ' - ' - ' /********************************************************************************/ +' Author: +' +' asuka (amethyst.asuka@gcmodeller.org) +' xie (genetics@smrucc.org) +' xieguigang (xie.guigang@live.com) +' +' Copyright (c) 2018 GPL3 Licensed +' +' +' GNU GENERAL PUBLIC LICENSE (GPL3) +' +' +' This program is free software: you can redistribute it and/or modify +' it under the terms of the GNU General Public License as published by +' the Free Software Foundation, either version 3 of the License, or +' (at your option) any later version. +' +' This program is distributed in the hope that it will be useful, +' but WITHOUT ANY WARRANTY; without even the implied warranty of +' MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +' GNU General Public License for more details. +' +' You should have received a copy of the GNU General Public License +' along with this program. If not, see . + + + +' /********************************************************************************/ + +' Summaries: + +' Delegate Function +' +' +' Class SlaveTask +' +' Constructor: (+1 Overloads) Sub New +' Function: (+2 Overloads) Emit, GetValueFromStream, handleGET, handlePOST, RunTask +' +' +' +' /********************************************************************************/ #End Region @@ -125,6 +125,24 @@ Public Class SlaveTask Return streamBuf.handleSerialize(socket) End Function + Private Function startSocket(entry As [Delegate], parameters As Object()) As IPCSocket + Dim target As New IDelegate(entry) + Dim resultType As Type = entry.Method.ReturnType + + Return New IPCSocket(target, debugPort, verbose:=verbose) With { + .host = Me, + .handlePOSTResult = + Function(buf, host) + Return handlePOST(buf, resultType, host.GetHashCode) + End Function, + .nargs = parameters.Length, + .handleGetArgument = + Function(i, host) + Return handleGET(parameters(i), i, host.GetHashCode) + End Function + } + End Function + ''' ''' ''' @@ -136,30 +154,16 @@ Public Class SlaveTask ''' ''' Public Function RunTask(Of T)(entry As [Delegate], ParamArray parameters As Object()) As T - Dim target As New IDelegate(entry) +RE0: + Dim host As IPCSocket = startSocket(entry, parameters) Dim result As Object = Nothing - Dim host As IPCSocket = Nothing - Dim resultType As Type = entry.Method.ReturnType - - host = New IPCSocket(target, debugPort, verbose:=verbose) With { - .host = Me, - .handlePOSTResult = - Sub(buf) - result = handlePOST(buf, resultType, host.GetHashCode) - End Sub, - .nargs = parameters.Length, - .handleGetArgument = - Function(i) - Return handleGET(parameters(i), i, host.GetHashCode) - End Function, - .handleError = Sub(ex) result = ex - } + Dim hostIndex As Integer = host.GetHashCode Call Microsoft.VisualBasic.Parallel.RunTask(AddressOf host.Run) Call Thread.Sleep(100) If verbose Then - Call Console.WriteLine($"[{host.GetHashCode.ToHexString}] port:{host.HostPort}") + Call Console.WriteLine($"[{hostIndex.ToHexString}] port:{host.HostPort}") End If Dim commandlineArgvs As String = builder(processor, host.HostPort) @@ -177,10 +181,20 @@ Public Class SlaveTask Call host.Stop() + If Not host.handleSetResult Then + If verbose Then + Call Console.WriteLine("socket have non-ZERO exit status, retry...") + End If + + GoTo RE0 + End If + If verbose Then - Call Console.WriteLine($"[{host.GetHashCode.ToHexString}] thread exit...") + Call Console.WriteLine($"[{hostIndex.ToHexString}] thread exit...") End If + result = host.result + If TypeOf result Is IPCError Then If ignoreError Then With DirectCast(result, IPCError) @@ -188,7 +202,7 @@ Public Class SlaveTask Call Console.WriteLine($"[error] {msg}") Next For Each frame As StackFrame In .GetSourceTrace - Call Console.WriteLine($"[{host.GetHashCode.ToHexString}] {frame.ToString}") + Call Console.WriteLine($"[{hostIndex.ToHexString}] {frame.ToString}") Next End With