この中二病に爆焔を! 5 years ago
commit 014c5763cd

@ -72,6 +72,7 @@ Public Class IPCSocket : Implements ITaskDriver
ReadOnly socket As TcpServicesSocket
ReadOnly target As IDelegate
ReadOnly verbose As Boolean
Public ReadOnly Property HostPort As Integer
Get
@ -79,16 +80,23 @@ Public Class IPCSocket : Implements ITaskDriver
End Get
End Property
Public Property handlePOSTResult As Action(Of Stream)
Public Property handleError As Action(Of IPCError)
Public ReadOnly Property result As Object = Nothing
Public Property handlePOSTResult As Func(Of Stream, IPCSocket, Object)
Public Property nargs As Integer
Public Property handleGetArgument As Func(Of Integer, ObjectStream)
Public Property handleGetArgument As Func(Of Integer, IPCSocket, ObjectStream)
Public Property host As SlaveTask
Sub New(target As IDelegate, Optional debug As Integer? = Nothing)
Public ReadOnly Property handleSetResult As Boolean = False
Public ReadOnly Property socketExitCode As Integer
Sub New(target As IDelegate, Optional debug As Integer? = Nothing, Optional verbose As Boolean = False)
Me.socket = New TcpServicesSocket(If(debug, GetFirstAvailablePort()))
Me.socket.ResponseHandler = AddressOf New ProtocolHandler(Me).HandleRequest
Me.target = target
Me.verbose = verbose
Me.result = Nothing
Me.handleSetResult = False
End Sub
Private Function GetFirstAvailablePort() As Integer
@ -127,12 +135,16 @@ Public Class IPCSocket : Implements ITaskDriver
End Sub
Public Function Run() As Integer Implements ITaskDriver.Run
Return socket.Run
_socketExitCode = socket.Run
Return socketExitCode
End Function
<Protocol(Protocols.GetTask)>
Public Function GetTask(request As RequestStream, remoteAddress As System.Net.IPEndPoint) As BufferPipe
Call Console.WriteLine($"[{GetHashCode.ToHexString}] get parallel task entry.")
If verbose Then
Call Console.WriteLine($"[{GetHashCode.ToHexString}] get parallel task entry.")
End If
Return New DataPipe(Encoding.UTF8.GetBytes(target.GetJson))
End Function
@ -140,14 +152,17 @@ Public Class IPCSocket : Implements ITaskDriver
Public Function GetArgumentByIndex(request As RequestStream, remoteAddress As System.Net.IPEndPoint) As BufferPipe
Dim i As Integer = BitConverter.ToInt32(request.ChunkBuffer, Scan0)
Using buf As ObjectStream = _handleGetArgument(i)
Using buf As ObjectStream = _handleGetArgument(i, Me)
Return New DataPipe(buf)
End Using
End Function
<Protocol(Protocols.PostStart)>
Public Function PostStart(request As RequestStream, remoteAddress As System.Net.IPEndPoint) As BufferPipe
Call Console.WriteLine($"[{GetHashCode.ToHexString}] started!")
If verbose Then
Call Console.WriteLine($"[{GetHashCode.ToHexString}] started!")
End If
Return New DataPipe(Encoding.UTF8.GetBytes("OK!"))
End Function
@ -159,7 +174,8 @@ Public Class IPCSocket : Implements ITaskDriver
<Protocol(Protocols.PostError)>
Public Function PostError(request As RequestStream, remoteAddress As System.Net.IPEndPoint) As BufferPipe
Using ms As New MemoryStream(request.ChunkBuffer)
Call DirectCast(SlaveTask.GetValueFromStream(ms, GetType(IPCError), host.streamBuf), IPCError).DoCall(_handleError)
_result = DirectCast(SlaveTask.GetValueFromStream(ms, GetType(IPCError), host.streamBuf), IPCError)
_handleSetResult = True
End Using
Return New DataPipe(Encoding.ASCII.GetBytes("OK!"))
@ -168,7 +184,8 @@ Public Class IPCSocket : Implements ITaskDriver
<Protocol(Protocols.PostResult)>
Public Function PostResult(request As RequestStream, remoteAddress As System.Net.IPEndPoint) As BufferPipe
Using ms As New MemoryStream(request.ChunkBuffer)
Call _handlePOSTResult(ms)
_result = _handlePOSTResult(ms, Me)
_handleSetResult = True
End Using
Return New DataPipe(Encoding.ASCII.GetBytes("OK!"))

@ -1,47 +1,47 @@
#Region "Microsoft.VisualBasic::c7901ef38489164efc1ff25498efb52f, Parallel\IpcParallel\SlaveTask.vb"
' Author:
'
' asuka (amethyst.asuka@gcmodeller.org)
' xie (genetics@smrucc.org)
' xieguigang (xie.guigang@live.com)
'
' Copyright (c) 2018 GPL3 Licensed
'
'
' GNU GENERAL PUBLIC LICENSE (GPL3)
'
'
' This program is free software: you can redistribute it and/or modify
' it under the terms of the GNU General Public License as published by
' the Free Software Foundation, either version 3 of the License, or
' (at your option) any later version.
'
' This program is distributed in the hope that it will be useful,
' but WITHOUT ANY WARRANTY; without even the implied warranty of
' MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
' GNU General Public License for more details.
'
' You should have received a copy of the GNU General Public License
' along with this program. If not, see <http://www.gnu.org/licenses/>.
' /********************************************************************************/
' Summaries:
' Delegate Function
'
'
' Class SlaveTask
'
' Constructor: (+1 Overloads) Sub New
' Function: (+2 Overloads) Emit, GetValueFromStream, handleGET, handlePOST, RunTask
'
'
'
' /********************************************************************************/
' Author:
'
' asuka (amethyst.asuka@gcmodeller.org)
' xie (genetics@smrucc.org)
' xieguigang (xie.guigang@live.com)
'
' Copyright (c) 2018 GPL3 Licensed
'
'
' GNU GENERAL PUBLIC LICENSE (GPL3)
'
'
' This program is free software: you can redistribute it and/or modify
' it under the terms of the GNU General Public License as published by
' the Free Software Foundation, either version 3 of the License, or
' (at your option) any later version.
'
' This program is distributed in the hope that it will be useful,
' but WITHOUT ANY WARRANTY; without even the implied warranty of
' MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
' GNU General Public License for more details.
'
' You should have received a copy of the GNU General Public License
' along with this program. If not, see <http://www.gnu.org/licenses/>.
' /********************************************************************************/
' Summaries:
' Delegate Function
'
'
' Class SlaveTask
'
' Constructor: (+1 Overloads) Sub New
' Function: (+2 Overloads) Emit, GetValueFromStream, handleGET, handlePOST, RunTask
'
'
'
' /********************************************************************************/
#End Region
@ -62,18 +62,21 @@ Public Class SlaveTask
ReadOnly builder As ISlaveTask
ReadOnly debugPort As Integer?
ReadOnly ignoreError As Boolean
ReadOnly verbose As Boolean
Friend ReadOnly streamBuf As New StreamEmit
<DebuggerStepThrough>
Sub New(processor As InteropService, cli As ISlaveTask,
Optional debugPort As Integer? = Nothing,
Optional ignoreError As Boolean = False)
Optional ignoreError As Boolean = False,
Optional verbose As Boolean = False)
Me.builder = cli
Me.processor = processor
Me.debugPort = debugPort
Me.ignoreError = ignoreError
Me.verbose = verbose
End Sub
Public Function Emit(Of T)(streamAs As Func(Of T, Stream)) As SlaveTask
@ -94,7 +97,10 @@ Public Class SlaveTask
''' <param name="debugCode"></param>
''' <returns></returns>
Private Function handlePOST(buf As Stream, type As Type, debugCode As Integer) As Object
Call Console.WriteLine($"[{debugCode.ToHexString}] task finished!")
If verbose Then
Call Console.WriteLine($"[{debugCode.ToHexString}] task finished!")
End If
Return GetValueFromStream(buf, type, streamBuf)
End Function
@ -111,10 +117,32 @@ Public Class SlaveTask
Private Function handleGET(param As Object, i As Integer, debugCode As Integer) As ObjectStream
Dim socket As SocketRef = SocketRef.WriteBuffer(param, streamBuf)
Call Console.WriteLine($"[{debugCode.ToHexString}] get argument[{i + 1}]...")
If verbose Then
Call Console.WriteLine($"[{debugCode.ToHexString}] get argument[{i + 1}]...")
End If
Return streamBuf.handleSerialize(socket)
End Function
Private Function startSocket(entry As [Delegate], parameters As Object()) As IPCSocket
Dim target As New IDelegate(entry)
Dim resultType As Type = entry.Method.ReturnType
Return New IPCSocket(target, debugPort, verbose:=verbose) With {
.host = Me,
.handlePOSTResult =
Function(buf, host)
Return handlePOST(buf, resultType, host.GetHashCode)
End Function,
.nargs = parameters.Length,
.handleGetArgument =
Function(i, host)
Return handleGET(parameters(i), i, host.GetHashCode)
End Function
}
End Function
''' <summary>
'''
''' </summary>
@ -126,36 +154,24 @@ Public Class SlaveTask
''' </param>
''' <returns></returns>
Public Function RunTask(Of T)(entry As [Delegate], ParamArray parameters As Object()) As T
Dim target As New IDelegate(entry)
RE0:
Dim host As IPCSocket = startSocket(entry, parameters)
Dim result As Object = Nothing
Dim host As IPCSocket = Nothing
Dim resultType As Type = entry.Method.ReturnType
host = New IPCSocket(target, debugPort) With {
.host = Me,
.handlePOSTResult =
Sub(buf)
result = handlePOST(buf, resultType, host.GetHashCode)
End Sub,
.nargs = parameters.Length,
.handleGetArgument =
Function(i)
Return handleGET(parameters(i), i, host.GetHashCode)
End Function,
.handleError = Sub(ex) result = ex
}
Dim hostIndex As Integer = host.GetHashCode
Call Microsoft.VisualBasic.Parallel.RunTask(AddressOf host.Run)
Call Console.WriteLine($"[{host.GetHashCode.ToHexString}] port:{host.HostPort}")
Call Thread.Sleep(100)
If verbose Then
Call Console.WriteLine($"[{hostIndex.ToHexString}] port:{host.HostPort}")
End If
Dim commandlineArgvs As String = builder(processor, host.HostPort)
If Not debugPort Is Nothing Then
Console.WriteLine(commandlineArgvs)
Pause()
End If
'If Not debugPort Is Nothing Then
' Console.WriteLine(commandlineArgvs)
' Pause()
'End If
#If netcore5 = 0 Then
Call CommandLine.Call(processor, commandlineArgvs)
@ -164,7 +180,20 @@ Public Class SlaveTask
#End If
Call host.Stop()
Call Console.WriteLine($"[{host.GetHashCode.ToHexString}] thread exit...")
If Not host.handleSetResult Then
If verbose Then
Call Console.WriteLine("socket have non-ZERO exit status, retry...")
End If
GoTo RE0
End If
If verbose Then
Call Console.WriteLine($"[{hostIndex.ToHexString}] thread exit...")
End If
result = host.result
If TypeOf result Is IPCError Then
If ignoreError Then
@ -173,7 +202,7 @@ Public Class SlaveTask
Call Console.WriteLine($"[error] {msg}")
Next
For Each frame As StackFrame In .GetSourceTrace
Call Console.WriteLine($"[{host.GetHashCode.ToHexString}] {frame.ToString}")
Call Console.WriteLine($"[{hostIndex.ToHexString}] {frame.ToString}")
Next
End With

@ -167,8 +167,8 @@ Namespace ThreadTask
If j > -1 Then
Yield threads(j).GetValue
Call Console.WriteLine($"{ToString()} [thread_{j + 1}] job done ({threads(j).GetTaskExecTimeSpan.FormatTime})!")
threads(j) = Nothing
Call Console.WriteLine($"{ToString()} [thread_{j + 1}] job done!")
End If
Loop
@ -177,8 +177,8 @@ Namespace ThreadTask
If j > -1 Then
Yield threads(j).GetValue
Call Console.WriteLine($"{ToString()} [thread_{j + 1}] job done ({threads(j).GetTaskExecTimeSpan.FormatTime})!")
threads(j) = Nothing
Call Console.WriteLine($"{ToString()} [thread_{j + 1}] job done!")
End If
Loop
End Function

Loading…
Cancel
Save