From 8c7e13df2484d1e545fdae63183ab1f795bb92af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E3=81=93=E3=81=AE=E4=B8=AD=E4=BA=8C=E7=97=85=E3=81=AB?= =?UTF-8?q?=E7=88=86=E7=84=94=E3=82=92=EF=BC=81?= Date: Tue, 26 Jan 2021 16:57:07 +0800 Subject: [PATCH] improvements of the stream data handler --- Parallel/IpcParallel/SlaveTask.vb | 56 ++--------------- Parallel/IpcParallel/Stream/ObjectStream.vb | 11 ++++ Parallel/IpcParallel/Stream/StreamEmit.vb | 58 ++++++++++++++++++ Parallel/IpcParallel/Stream/StreamMethods.vb | 1 + Parallel/IpcParallel/TaskBuilder.vb | 63 +++++++------------- Parallel/Parallel.vbproj | 1 + 6 files changed, 98 insertions(+), 92 deletions(-) create mode 100644 Parallel/IpcParallel/Stream/StreamEmit.vb diff --git a/Parallel/IpcParallel/SlaveTask.vb b/Parallel/IpcParallel/SlaveTask.vb index dc00c75..c6088c6 100644 --- a/Parallel/IpcParallel/SlaveTask.vb +++ b/Parallel/IpcParallel/SlaveTask.vb @@ -2,63 +2,40 @@ Imports System.Threading Imports Microsoft.VisualBasic.CommandLine.InteropService Imports Microsoft.VisualBasic.Language -Imports Microsoft.VisualBasic.MIME.application.json -Imports Microsoft.VisualBasic.MIME.application.json.BSON -Imports Microsoft.VisualBasic.Scripting.MetaData Public Delegate Function ISlaveTask(processor As InteropService, port As Integer) As String Public Class SlaveTask - ReadOnly toBuffers As New Dictionary(Of Type, toBuffer) - ReadOnly loadBuffers As New Dictionary(Of Type, loadBuffer) ReadOnly processor As InteropService ReadOnly builder As ISlaveTask ReadOnly debugPort As Integer? + ReadOnly streamBuf As New StreamEmit Sub New(processor As InteropService, cli As ISlaveTask, Optional debugPort As Integer? = Nothing) Me.builder = cli Me.processor = processor Me.debugPort = debugPort - - For Each [handle] In EmitHandler.PopulatePrimitiveHandles - toBuffers(handle.target) = handle.emit - Next End Sub Public Function Emit(Of T)(streamAs As Func(Of T, Stream)) As SlaveTask - toBuffers(GetType(T)) = Function(obj) streamAs(obj) + Call streamBuf.Emit(streamAs) Return Me End Function Public Function Emit(Of T)(fromStream As Func(Of Stream, T)) As SlaveTask - loadBuffers(GetType(T)) = Function(buf) fromStream(buf) + Call streamBuf.Emit(fromStream) Return Me End Function Private Function handlePOST(buf As Stream, type As Type, debugCode As Integer) As Object Call Console.WriteLine($"[{debugCode.ToHexString}] task finished!") - - If loadBuffers.ContainsKey(type) Then - Return loadBuffers(type)(buf) - Else - Return BSONFormat.Load(buf).CreateObject(type) - End If + Return streamBuf.handleCreate(buf, type, StreamMethods.Auto) End Function Private Function handleGET(param As Object, i As Integer, debugCode As Integer) As ObjectStream - Dim type As Type = param.GetType - Call Console.WriteLine($"[{debugCode.ToHexString}] get argument[{i + 1}]...") - - If toBuffers.ContainsKey(type) Then - Return New ObjectStream(New TypeInfo(type, fullpath:=True), StreamMethods.Emit, toBuffers(type)(param)) - Else - Dim element = type.GetJsonElement(param, New JSONSerializerOptions) - Dim buf As Stream = BSONFormat.SafeGetBuffer(element) - - Return New ObjectStream(New TypeInfo(type, fullpath:=True), StreamMethods.BSON, buf) - End If + Return streamBuf.handleSerialize(param) End Function Public Function RunTask(Of T)(entry As [Delegate], ParamArray parameters As Object()) As T @@ -102,27 +79,4 @@ Public Class SlaveTask Return result End Function - - Private Function decomposingStdoutput(buffer As MemoryStream, type As Type, host As Integer) As Object - Using reader As New StreamReader(buffer) - Dim str As Value(Of String) = "" - - Do While (str = reader.ReadLine) <> TaskBuilder.streamDelimiter - ' Call Console.WriteLine($"[{host.ToHexString}] {str.Value}") - Loop - - Dim dataSize As Integer = Integer.Parse(reader.ReadLine) - Dim chunkBuffer As Byte() = New Byte(dataSize - 1) {} - - Call Console.WriteLine($"[{host.ToHexString}] chunksize: {dataSize}/{buffer.Length}") - - buffer.Seek(buffer.Length - dataSize, SeekOrigin.Begin) - buffer.Read(chunkBuffer, Scan0, chunkBuffer.Length) - - Using resultBuffer As New MemoryStream(chunkBuffer) - Return handlePOST(resultBuffer, type, host) - End Using - End Using - End Function - End Class diff --git a/Parallel/IpcParallel/Stream/ObjectStream.vb b/Parallel/IpcParallel/Stream/ObjectStream.vb index 0147109..b6ca5db 100644 --- a/Parallel/IpcParallel/Stream/ObjectStream.vb +++ b/Parallel/IpcParallel/Stream/ObjectStream.vb @@ -1,5 +1,6 @@ Imports System.IO Imports System.Text +Imports Microsoft.VisualBasic.ComponentModel.DataSourceModel Imports Microsoft.VisualBasic.Parallel Imports Microsoft.VisualBasic.Scripting.MetaData Imports Microsoft.VisualBasic.Serialization @@ -14,6 +15,12 @@ Public Class ObjectStream : Inherits RawStream Public Property stream As Byte() Public Property type As TypeInfo + Public ReadOnly Property isPrimitive As Boolean + Get + Return DataFramework.IsPrimitive(type.GetType(knownFirst:=True)) + End Get + End Property + Sub New(type As TypeInfo, method As StreamMethods, stream As Stream) Me.method = method Me.stream = New StreamPipe(stream).Read @@ -37,6 +44,10 @@ Public Class ObjectStream : Inherits RawStream End Using End Sub + Public Function openMemoryBuffer() As MemoryStream + Return New MemoryStream(stream) + End Function + Public Overrides Function Serialize() As Byte() Using ms As New MemoryStream Dim json As Byte() = Encoding.UTF8.GetBytes(type.GetJson) diff --git a/Parallel/IpcParallel/Stream/StreamEmit.vb b/Parallel/IpcParallel/Stream/StreamEmit.vb new file mode 100644 index 0000000..e6f8d90 --- /dev/null +++ b/Parallel/IpcParallel/Stream/StreamEmit.vb @@ -0,0 +1,58 @@ +Imports System.IO +Imports Microsoft.VisualBasic.MIME.application.json +Imports Microsoft.VisualBasic.MIME.application.json.BSON +Imports Microsoft.VisualBasic.Scripting.MetaData + +Public Class StreamEmit + + ReadOnly toBuffers As New Dictionary(Of Type, toBuffer) + ReadOnly loadBuffers As New Dictionary(Of Type, loadBuffer) + + Sub New() + For Each [handle] In EmitHandler.PopulatePrimitiveHandles + toBuffers(handle.target) = handle.emit + Next + For Each [handle] In EmitHandler.PopulatePrimitiveParsers + loadBuffers(handle.target) = handle.emit + Next + End Sub + + Public Function Emit(Of T)(streamAs As Func(Of T, Stream)) As StreamEmit + toBuffers(GetType(T)) = Function(obj) streamAs(obj) + Return Me + End Function + + Public Function Emit(Of T)(fromStream As Func(Of Stream, T)) As StreamEmit + loadBuffers(GetType(T)) = Function(buf) fromStream(buf) + Return Me + End Function + + Public Function handleCreate(buf As Stream, type As Type, emit As StreamMethods) As Object + If emit = StreamMethods.Auto Then + If loadBuffers.ContainsKey(type) Then + Return loadBuffers(type)(buf) + Else + Return BSONFormat.Load(buf).CreateObject(type) + End If + ElseIf emit = StreamMethods.BSON Then + Return BSONFormat.Load(buf).CreateObject(type) + ElseIf loadBuffers.ContainsKey(type) Then + Return loadBuffers(type)(buf) + Else + Throw New NotImplementedException + End If + End Function + + Public Function handleSerialize(param As Object) As ObjectStream + Dim type As Type = param.GetType + + If toBuffers.ContainsKey(type) Then + Return New ObjectStream(New TypeInfo(type, fullpath:=True), StreamMethods.Emit, toBuffers(type)(param)) + Else + Dim element = type.GetJsonElement(param, New JSONSerializerOptions) + Dim buf As Stream = BSONFormat.SafeGetBuffer(element) + + Return New ObjectStream(New TypeInfo(type, fullpath:=True), StreamMethods.BSON, buf) + End If + End Function +End Class diff --git a/Parallel/IpcParallel/Stream/StreamMethods.vb b/Parallel/IpcParallel/Stream/StreamMethods.vb index a794834..d7fb5bb 100644 --- a/Parallel/IpcParallel/Stream/StreamMethods.vb +++ b/Parallel/IpcParallel/Stream/StreamMethods.vb @@ -1,4 +1,5 @@ Public Enum StreamMethods + Auto BSON Emit End Enum diff --git a/Parallel/IpcParallel/TaskBuilder.vb b/Parallel/IpcParallel/TaskBuilder.vb index 1e2e615..db8ad42 100644 --- a/Parallel/IpcParallel/TaskBuilder.vb +++ b/Parallel/IpcParallel/TaskBuilder.vb @@ -1,20 +1,18 @@ Imports System.IO Imports System.Reflection -Imports System.Text #If netcore5 = 1 Then Imports Microsoft.VisualBasic.ApplicationServices.Development.NetCore5 #End If Imports Microsoft.VisualBasic.ComponentModel Imports Microsoft.VisualBasic.MIME.application.json -Imports Microsoft.VisualBasic.MIME.application.json.BSON Imports Microsoft.VisualBasic.Net.Tcp Imports Microsoft.VisualBasic.Parallel -Imports Microsoft.VisualBasic.Serialization Imports Microsoft.VisualBasic.Serialization.JSON Public Class TaskBuilder : Implements ITaskDriver ReadOnly masterPort As Integer + ReadOnly emit As New StreamEmit Sub New(port As Integer) masterPort = port @@ -46,9 +44,14 @@ Public Class TaskBuilder : Implements ITaskDriver ' send debug message Call New TcpRequest(masterPort).SendMessage(New RequestStream(IPCSocket.Protocol, Protocols.PostStart)) - Call PostFinished(api.Invoke(target, args.ToArray)) - ' Call PostStdOut(api.Invoke(target, args.ToArray)) - Call Console.WriteLine("job done!") + + Try + Call PostFinished(api.Invoke(target, args.ToArray)) + Catch ex As Exception + Call PostError(ex) + Finally + Call Console.WriteLine("job done!") + End Try Return 0 End Function @@ -78,11 +81,9 @@ Public Class TaskBuilder : Implements ITaskDriver Call deps.TryHandleNetCore5AssemblyBugs(package:=type) #End If - If stream.method = StreamMethods.BSON Then - Return BSONFormat.Load(stream.stream).CreateObject(type) - Else - Throw New NotImplementedException - End If + Using buf As MemoryStream = stream.openMemoryBuffer + Return emit.handleCreate(buf, type, stream.method) + End Using End Function Private Function PostError(err As Exception) As Integer @@ -91,36 +92,16 @@ Public Class TaskBuilder : Implements ITaskDriver Return 500 End Function - Friend Const streamDelimiter As String = "------endoftext------" - - ''' - ''' 将结果数据写到标准输出上 - ''' - ''' - Private Sub PostStdOut(result As Object) - Dim type As Type = result.GetType - Dim element = type.GetJsonElement(result, New JSONSerializerOptions) - Dim buf As MemoryStream = BSONFormat.SafeGetBuffer(element) - Dim bufferSize As String = buf.Length - Dim outstream As Stream = Console.OpenStandardOutput - - Call Console.WriteLine(streamDelimiter) - Call Console.WriteLine(bufferSize) - Call Console.Write(vbCrLf) - - Using stdout As New BinaryWriter(outstream) - Call stdout.Write(buf.ToArray) - Call stdout.Flush() - End Using - End Sub - Private Sub PostFinished(result As Object) - Dim type As Type = result.GetType - Dim element = type.GetJsonElement(result, New JSONSerializerOptions) - Dim buf As Stream = BSONFormat.SafeGetBuffer(element) - Dim request As New RequestStream(IPCSocket.Protocol, Protocols.PostResult, New StreamPipe(buf).Read) - - Call Console.WriteLine($"post result: {type.GetObjectJson(result, indent:=False)}") - Call New TcpRequest(masterPort).SendMessage(request) + Using buf As Stream = emit.handleSerialize(result).openMemoryBuffer + Dim request As New RequestStream( + protocolCategory:=IPCSocket.Protocol, + protocol:=Protocols.PostResult, + buffer:=New StreamPipe(buf).Read + ) + + Call Console.WriteLine($"post result...") + Call New TcpRequest(masterPort).SendMessage(request) + End Using End Sub End Class diff --git a/Parallel/Parallel.vbproj b/Parallel/Parallel.vbproj index 4d474c3..a87b887 100644 --- a/Parallel/Parallel.vbproj +++ b/Parallel/Parallel.vbproj @@ -124,6 +124,7 @@ +