#Region "Microsoft.VisualBasic::9604f376dbb73bcef11d1602a8461d85, Parallel\IpcParallel\TaskBuilder.vb" ' Author: ' ' asuka (amethyst.asuka@gcmodeller.org) ' xie (genetics@smrucc.org) ' xieguigang (xie.guigang@live.com) ' ' Copyright (c) 2018 GPL3 Licensed ' ' ' GNU GENERAL PUBLIC LICENSE (GPL3) ' ' ' This program is free software: you can redistribute it and/or modify ' it under the terms of the GNU General Public License as published by ' the Free Software Foundation, either version 3 of the License, or ' (at your option) any later version. ' ' This program is distributed in the hope that it will be useful, ' but WITHOUT ANY WARRANTY; without even the implied warranty of ' MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ' GNU General Public License for more details. ' ' You should have received a copy of the GNU General Public License ' along with this program. If not, see . ' /********************************************************************************/ ' Summaries: ' Class TaskBuilder ' ' Constructor: (+1 Overloads) Sub New ' ' Function: FromStream, GetArgumentValue, GetArgumentValueNumber, GetMethod, GetParameters ' PostError, Run ' ' Sub: PostFinished ' ' /********************************************************************************/ #End Region Imports System.IO Imports System.Reflection #If netcore5 = 1 Then Imports Microsoft.VisualBasic.ApplicationServices.Development.NetCore5 #End If Imports Microsoft.VisualBasic.ComponentModel Imports Microsoft.VisualBasic.MIME.application.json Imports Microsoft.VisualBasic.Net.Tcp Imports Microsoft.VisualBasic.Parallel Imports Microsoft.VisualBasic.Serialization.JSON Imports Parallel.IpcStream ''' ''' Run on slave node ''' Public Class TaskBuilder : Implements ITaskDriver ReadOnly masterPort As Integer ReadOnly emit As New StreamEmit Sub New(port As Integer) masterPort = port End Sub Private Iterator Function GetParameters(params As ParameterInfo(), n As Integer) As IEnumerable(Of Object) For i As Integer = 0 To n - 1 Dim par As Object = GetArgumentValue(i) Dim targetType As Type = params(i).ParameterType If par.GetType.IsInheritsFrom(targetType) Then Yield Conversion.CTypeDynamic(par, targetType) ElseIf par.GetType Is GetType(SocketRef) Then ' is a common parameter value between ' the parallel batchs Dim socket As SocketRef = DirectCast(par, SocketRef) Using buffer As ObjectStream = socket.Open Yield FromStream(buffer) End Using Else Yield par End If Next End Function Public Function Run() As Integer Implements ITaskDriver.Run Dim task As IDelegate = GetMethod() Dim api As MethodInfo = task.GetMethod Dim params As ParameterInfo() = api.GetParameters Dim target As Object = task.GetMethodTarget Dim n As Integer = GetArgumentValueNumber() Dim args As New List(Of Object)(GetParameters(params, n)) Call Console.WriteLine("run task:") Call Console.WriteLine(task.GetJson(indent:=False, simpleDict:=True)) ' fix for optional parameter values For i As Integer = n To params.Length - 1 If Not params(i).IsOptional Then Return PostError(New Exception($"missing parameter value for [{i}]{params(i).Name}!")) Else args.Add(params(i).DefaultValue) End If Next ' send debug message Call New TcpRequest(masterPort).SendMessage(New RequestStream(IPCSocket.Protocol, Protocols.PostStart)) Try Call PostFinished(api.Invoke(target, args.ToArray), Protocols.PostResult) Catch ex As Exception Call PostError(ex) Finally Call Console.WriteLine("job done!") End Try Return 0 End Function Private Function GetArgumentValueNumber() As Integer Dim resp = New TcpRequest(masterPort).SendMessage(New RequestStream(IPCSocket.Protocol, Protocols.GetArgumentNumber)) Dim n As Integer = BitConverter.ToInt32(resp.ChunkBuffer, Scan0) Return n End Function Private Function GetMethod() As IDelegate Dim resp = New TcpRequest(masterPort).SendMessage(New RequestStream(IPCSocket.Protocol, Protocols.GetTask)) Dim json As String = resp.GetUTF8String Dim target As IDelegate = json.LoadJSON(Of IDelegate) Return target End Function Private Function FromStream(stream As ObjectStream) As Object Dim type As Type = stream.type.GetType(knownFirst:=True) #If netcore5 = 1 Then Call deps.TryHandleNetCore5AssemblyBugs(package:=type) #End If Using buf As MemoryStream = stream.openMemoryBuffer Return emit.handleCreate(buf, type, stream.method) End Using End Function ''' ''' -> target ''' ''' ''' Private Function GetArgumentValue(i As Integer) As Object Dim request As New RequestStream(IPCSocket.Protocol, Protocols.GetArgumentByIndex, BitConverter.GetBytes(i)) Dim resp = New TcpRequest(masterPort).SendMessage(request) Dim stream As New ObjectStream(resp.ChunkBuffer) Dim socket As SocketRef = SocketRef.GetSocket(stream) Using buffer As ObjectStream = socket.Open Return FromStream(buffer) End Using End Function Private Function PostError(err As Exception) As Integer Call PostFinished(New IPCError(err), Protocols.PostError) Return 500 End Function Private Sub PostFinished(result As Object, protocol As Protocols) Dim socket As SocketRef = SocketRef.WriteBuffer(result, emit) Using buf As ObjectStream = emit.handleSerialize(socket) Dim request As New RequestStream( protocolCategory:=IPCSocket.Protocol, protocol:=protocol, buffer:=buf.Serialize ) If TypeOf result Is IPCError Then Call Console.WriteLine($"post error...") Else Call Console.WriteLine($"post result...") End If Call New TcpRequest(masterPort).SendMessage(request) End Using End Sub End Class