improvements of the stream data handler

master
この中二病に爆焔を! 5 years ago
parent e2f21333e8
commit 8c7e13df24

@ -2,63 +2,40 @@
Imports System.Threading
Imports Microsoft.VisualBasic.CommandLine.InteropService
Imports Microsoft.VisualBasic.Language
Imports Microsoft.VisualBasic.MIME.application.json
Imports Microsoft.VisualBasic.MIME.application.json.BSON
Imports Microsoft.VisualBasic.Scripting.MetaData
Public Delegate Function ISlaveTask(processor As InteropService, port As Integer) As String
Public Class SlaveTask
ReadOnly toBuffers As New Dictionary(Of Type, toBuffer)
ReadOnly loadBuffers As New Dictionary(Of Type, loadBuffer)
ReadOnly processor As InteropService
ReadOnly builder As ISlaveTask
ReadOnly debugPort As Integer?
ReadOnly streamBuf As New StreamEmit
Sub New(processor As InteropService, cli As ISlaveTask, Optional debugPort As Integer? = Nothing)
Me.builder = cli
Me.processor = processor
Me.debugPort = debugPort
For Each [handle] In EmitHandler.PopulatePrimitiveHandles
toBuffers(handle.target) = handle.emit
Next
End Sub
Public Function Emit(Of T)(streamAs As Func(Of T, Stream)) As SlaveTask
toBuffers(GetType(T)) = Function(obj) streamAs(obj)
Call streamBuf.Emit(streamAs)
Return Me
End Function
Public Function Emit(Of T)(fromStream As Func(Of Stream, T)) As SlaveTask
loadBuffers(GetType(T)) = Function(buf) fromStream(buf)
Call streamBuf.Emit(fromStream)
Return Me
End Function
Private Function handlePOST(buf As Stream, type As Type, debugCode As Integer) As Object
Call Console.WriteLine($"[{debugCode.ToHexString}] task finished!")
If loadBuffers.ContainsKey(type) Then
Return loadBuffers(type)(buf)
Else
Return BSONFormat.Load(buf).CreateObject(type)
End If
Return streamBuf.handleCreate(buf, type, StreamMethods.Auto)
End Function
Private Function handleGET(param As Object, i As Integer, debugCode As Integer) As ObjectStream
Dim type As Type = param.GetType
Call Console.WriteLine($"[{debugCode.ToHexString}] get argument[{i + 1}]...")
If toBuffers.ContainsKey(type) Then
Return New ObjectStream(New TypeInfo(type, fullpath:=True), StreamMethods.Emit, toBuffers(type)(param))
Else
Dim element = type.GetJsonElement(param, New JSONSerializerOptions)
Dim buf As Stream = BSONFormat.SafeGetBuffer(element)
Return New ObjectStream(New TypeInfo(type, fullpath:=True), StreamMethods.BSON, buf)
End If
Return streamBuf.handleSerialize(param)
End Function
Public Function RunTask(Of T)(entry As [Delegate], ParamArray parameters As Object()) As T
@ -102,27 +79,4 @@ Public Class SlaveTask
Return result
End Function
Private Function decomposingStdoutput(buffer As MemoryStream, type As Type, host As Integer) As Object
Using reader As New StreamReader(buffer)
Dim str As Value(Of String) = ""
Do While (str = reader.ReadLine) <> TaskBuilder.streamDelimiter
' Call Console.WriteLine($"[{host.ToHexString}] {str.Value}")
Loop
Dim dataSize As Integer = Integer.Parse(reader.ReadLine)
Dim chunkBuffer As Byte() = New Byte(dataSize - 1) {}
Call Console.WriteLine($"[{host.ToHexString}] chunksize: {dataSize}/{buffer.Length}")
buffer.Seek(buffer.Length - dataSize, SeekOrigin.Begin)
buffer.Read(chunkBuffer, Scan0, chunkBuffer.Length)
Using resultBuffer As New MemoryStream(chunkBuffer)
Return handlePOST(resultBuffer, type, host)
End Using
End Using
End Function
End Class

@ -1,5 +1,6 @@
Imports System.IO
Imports System.Text
Imports Microsoft.VisualBasic.ComponentModel.DataSourceModel
Imports Microsoft.VisualBasic.Parallel
Imports Microsoft.VisualBasic.Scripting.MetaData
Imports Microsoft.VisualBasic.Serialization
@ -14,6 +15,12 @@ Public Class ObjectStream : Inherits RawStream
Public Property stream As Byte()
Public Property type As TypeInfo
Public ReadOnly Property isPrimitive As Boolean
Get
Return DataFramework.IsPrimitive(type.GetType(knownFirst:=True))
End Get
End Property
Sub New(type As TypeInfo, method As StreamMethods, stream As Stream)
Me.method = method
Me.stream = New StreamPipe(stream).Read
@ -37,6 +44,10 @@ Public Class ObjectStream : Inherits RawStream
End Using
End Sub
Public Function openMemoryBuffer() As MemoryStream
Return New MemoryStream(stream)
End Function
Public Overrides Function Serialize() As Byte()
Using ms As New MemoryStream
Dim json As Byte() = Encoding.UTF8.GetBytes(type.GetJson)

@ -0,0 +1,58 @@
Imports System.IO
Imports Microsoft.VisualBasic.MIME.application.json
Imports Microsoft.VisualBasic.MIME.application.json.BSON
Imports Microsoft.VisualBasic.Scripting.MetaData
Public Class StreamEmit
ReadOnly toBuffers As New Dictionary(Of Type, toBuffer)
ReadOnly loadBuffers As New Dictionary(Of Type, loadBuffer)
Sub New()
For Each [handle] In EmitHandler.PopulatePrimitiveHandles
toBuffers(handle.target) = handle.emit
Next
For Each [handle] In EmitHandler.PopulatePrimitiveParsers
loadBuffers(handle.target) = handle.emit
Next
End Sub
Public Function Emit(Of T)(streamAs As Func(Of T, Stream)) As StreamEmit
toBuffers(GetType(T)) = Function(obj) streamAs(obj)
Return Me
End Function
Public Function Emit(Of T)(fromStream As Func(Of Stream, T)) As StreamEmit
loadBuffers(GetType(T)) = Function(buf) fromStream(buf)
Return Me
End Function
Public Function handleCreate(buf As Stream, type As Type, emit As StreamMethods) As Object
If emit = StreamMethods.Auto Then
If loadBuffers.ContainsKey(type) Then
Return loadBuffers(type)(buf)
Else
Return BSONFormat.Load(buf).CreateObject(type)
End If
ElseIf emit = StreamMethods.BSON Then
Return BSONFormat.Load(buf).CreateObject(type)
ElseIf loadBuffers.ContainsKey(type) Then
Return loadBuffers(type)(buf)
Else
Throw New NotImplementedException
End If
End Function
Public Function handleSerialize(param As Object) As ObjectStream
Dim type As Type = param.GetType
If toBuffers.ContainsKey(type) Then
Return New ObjectStream(New TypeInfo(type, fullpath:=True), StreamMethods.Emit, toBuffers(type)(param))
Else
Dim element = type.GetJsonElement(param, New JSONSerializerOptions)
Dim buf As Stream = BSONFormat.SafeGetBuffer(element)
Return New ObjectStream(New TypeInfo(type, fullpath:=True), StreamMethods.BSON, buf)
End If
End Function
End Class

@ -1,4 +1,5 @@
Public Enum StreamMethods
Auto
BSON
Emit
End Enum

@ -1,20 +1,18 @@
Imports System.IO
Imports System.Reflection
Imports System.Text
#If netcore5 = 1 Then
Imports Microsoft.VisualBasic.ApplicationServices.Development.NetCore5
#End If
Imports Microsoft.VisualBasic.ComponentModel
Imports Microsoft.VisualBasic.MIME.application.json
Imports Microsoft.VisualBasic.MIME.application.json.BSON
Imports Microsoft.VisualBasic.Net.Tcp
Imports Microsoft.VisualBasic.Parallel
Imports Microsoft.VisualBasic.Serialization
Imports Microsoft.VisualBasic.Serialization.JSON
Public Class TaskBuilder : Implements ITaskDriver
ReadOnly masterPort As Integer
ReadOnly emit As New StreamEmit
Sub New(port As Integer)
masterPort = port
@ -46,9 +44,14 @@ Public Class TaskBuilder : Implements ITaskDriver
' send debug message
Call New TcpRequest(masterPort).SendMessage(New RequestStream(IPCSocket.Protocol, Protocols.PostStart))
Call PostFinished(api.Invoke(target, args.ToArray))
' Call PostStdOut(api.Invoke(target, args.ToArray))
Call Console.WriteLine("job done!")
Try
Call PostFinished(api.Invoke(target, args.ToArray))
Catch ex As Exception
Call PostError(ex)
Finally
Call Console.WriteLine("job done!")
End Try
Return 0
End Function
@ -78,11 +81,9 @@ Public Class TaskBuilder : Implements ITaskDriver
Call deps.TryHandleNetCore5AssemblyBugs(package:=type)
#End If
If stream.method = StreamMethods.BSON Then
Return BSONFormat.Load(stream.stream).CreateObject(type)
Else
Throw New NotImplementedException
End If
Using buf As MemoryStream = stream.openMemoryBuffer
Return emit.handleCreate(buf, type, stream.method)
End Using
End Function
Private Function PostError(err As Exception) As Integer
@ -91,36 +92,16 @@ Public Class TaskBuilder : Implements ITaskDriver
Return 500
End Function
Friend Const streamDelimiter As String = "------endoftext------"
''' <summary>
'''
''' </summary>
''' <param name="result"></param>
Private Sub PostStdOut(result As Object)
Dim type As Type = result.GetType
Dim element = type.GetJsonElement(result, New JSONSerializerOptions)
Dim buf As MemoryStream = BSONFormat.SafeGetBuffer(element)
Dim bufferSize As String = buf.Length
Dim outstream As Stream = Console.OpenStandardOutput
Call Console.WriteLine(streamDelimiter)
Call Console.WriteLine(bufferSize)
Call Console.Write(vbCrLf)
Using stdout As New BinaryWriter(outstream)
Call stdout.Write(buf.ToArray)
Call stdout.Flush()
End Using
End Sub
Private Sub PostFinished(result As Object)
Dim type As Type = result.GetType
Dim element = type.GetJsonElement(result, New JSONSerializerOptions)
Dim buf As Stream = BSONFormat.SafeGetBuffer(element)
Dim request As New RequestStream(IPCSocket.Protocol, Protocols.PostResult, New StreamPipe(buf).Read)
Call Console.WriteLine($"post result: {type.GetObjectJson(result, indent:=False)}")
Call New TcpRequest(masterPort).SendMessage(request)
Using buf As Stream = emit.handleSerialize(result).openMemoryBuffer
Dim request As New RequestStream(
protocolCategory:=IPCSocket.Protocol,
protocol:=Protocols.PostResult,
buffer:=New StreamPipe(buf).Read
)
Call Console.WriteLine($"post result...")
Call New TcpRequest(masterPort).SendMessage(request)
End Using
End Sub
End Class

@ -124,6 +124,7 @@
<ItemGroup>
<Compile Include="IpcParallel\IDelegate.vb" />
<Compile Include="IpcParallel\IPCSocket.vb" />
<Compile Include="IpcParallel\Stream\StreamEmit.vb" />
<Compile Include="IpcParallel\Stream\EmitHandler.vb" />
<Compile Include="IpcParallel\Stream\ObjectStream.vb" />
<Compile Include="IpcParallel\Protocols.vb" />

Loading…
Cancel
Save