diff --git a/Parallel/IpcParallel/SlaveTask.vb b/Parallel/IpcParallel/SlaveTask.vb
index dc00c75..c6088c6 100644
--- a/Parallel/IpcParallel/SlaveTask.vb
+++ b/Parallel/IpcParallel/SlaveTask.vb
@@ -2,63 +2,40 @@
Imports System.Threading
Imports Microsoft.VisualBasic.CommandLine.InteropService
Imports Microsoft.VisualBasic.Language
-Imports Microsoft.VisualBasic.MIME.application.json
-Imports Microsoft.VisualBasic.MIME.application.json.BSON
-Imports Microsoft.VisualBasic.Scripting.MetaData
Public Delegate Function ISlaveTask(processor As InteropService, port As Integer) As String
Public Class SlaveTask
- ReadOnly toBuffers As New Dictionary(Of Type, toBuffer)
- ReadOnly loadBuffers As New Dictionary(Of Type, loadBuffer)
ReadOnly processor As InteropService
ReadOnly builder As ISlaveTask
ReadOnly debugPort As Integer?
+ ReadOnly streamBuf As New StreamEmit
Sub New(processor As InteropService, cli As ISlaveTask, Optional debugPort As Integer? = Nothing)
Me.builder = cli
Me.processor = processor
Me.debugPort = debugPort
-
- For Each [handle] In EmitHandler.PopulatePrimitiveHandles
- toBuffers(handle.target) = handle.emit
- Next
End Sub
Public Function Emit(Of T)(streamAs As Func(Of T, Stream)) As SlaveTask
- toBuffers(GetType(T)) = Function(obj) streamAs(obj)
+ Call streamBuf.Emit(streamAs)
Return Me
End Function
Public Function Emit(Of T)(fromStream As Func(Of Stream, T)) As SlaveTask
- loadBuffers(GetType(T)) = Function(buf) fromStream(buf)
+ Call streamBuf.Emit(fromStream)
Return Me
End Function
Private Function handlePOST(buf As Stream, type As Type, debugCode As Integer) As Object
Call Console.WriteLine($"[{debugCode.ToHexString}] task finished!")
-
- If loadBuffers.ContainsKey(type) Then
- Return loadBuffers(type)(buf)
- Else
- Return BSONFormat.Load(buf).CreateObject(type)
- End If
+ Return streamBuf.handleCreate(buf, type, StreamMethods.Auto)
End Function
Private Function handleGET(param As Object, i As Integer, debugCode As Integer) As ObjectStream
- Dim type As Type = param.GetType
-
Call Console.WriteLine($"[{debugCode.ToHexString}] get argument[{i + 1}]...")
-
- If toBuffers.ContainsKey(type) Then
- Return New ObjectStream(New TypeInfo(type, fullpath:=True), StreamMethods.Emit, toBuffers(type)(param))
- Else
- Dim element = type.GetJsonElement(param, New JSONSerializerOptions)
- Dim buf As Stream = BSONFormat.SafeGetBuffer(element)
-
- Return New ObjectStream(New TypeInfo(type, fullpath:=True), StreamMethods.BSON, buf)
- End If
+ Return streamBuf.handleSerialize(param)
End Function
Public Function RunTask(Of T)(entry As [Delegate], ParamArray parameters As Object()) As T
@@ -102,27 +79,4 @@ Public Class SlaveTask
Return result
End Function
-
- Private Function decomposingStdoutput(buffer As MemoryStream, type As Type, host As Integer) As Object
- Using reader As New StreamReader(buffer)
- Dim str As Value(Of String) = ""
-
- Do While (str = reader.ReadLine) <> TaskBuilder.streamDelimiter
- ' Call Console.WriteLine($"[{host.ToHexString}] {str.Value}")
- Loop
-
- Dim dataSize As Integer = Integer.Parse(reader.ReadLine)
- Dim chunkBuffer As Byte() = New Byte(dataSize - 1) {}
-
- Call Console.WriteLine($"[{host.ToHexString}] chunksize: {dataSize}/{buffer.Length}")
-
- buffer.Seek(buffer.Length - dataSize, SeekOrigin.Begin)
- buffer.Read(chunkBuffer, Scan0, chunkBuffer.Length)
-
- Using resultBuffer As New MemoryStream(chunkBuffer)
- Return handlePOST(resultBuffer, type, host)
- End Using
- End Using
- End Function
-
End Class
diff --git a/Parallel/IpcParallel/Stream/ObjectStream.vb b/Parallel/IpcParallel/Stream/ObjectStream.vb
index 0147109..b6ca5db 100644
--- a/Parallel/IpcParallel/Stream/ObjectStream.vb
+++ b/Parallel/IpcParallel/Stream/ObjectStream.vb
@@ -1,5 +1,6 @@
Imports System.IO
Imports System.Text
+Imports Microsoft.VisualBasic.ComponentModel.DataSourceModel
Imports Microsoft.VisualBasic.Parallel
Imports Microsoft.VisualBasic.Scripting.MetaData
Imports Microsoft.VisualBasic.Serialization
@@ -14,6 +15,12 @@ Public Class ObjectStream : Inherits RawStream
Public Property stream As Byte()
Public Property type As TypeInfo
+ Public ReadOnly Property isPrimitive As Boolean
+ Get
+ Return DataFramework.IsPrimitive(type.GetType(knownFirst:=True))
+ End Get
+ End Property
+
Sub New(type As TypeInfo, method As StreamMethods, stream As Stream)
Me.method = method
Me.stream = New StreamPipe(stream).Read
@@ -37,6 +44,10 @@ Public Class ObjectStream : Inherits RawStream
End Using
End Sub
+ Public Function openMemoryBuffer() As MemoryStream
+ Return New MemoryStream(stream)
+ End Function
+
Public Overrides Function Serialize() As Byte()
Using ms As New MemoryStream
Dim json As Byte() = Encoding.UTF8.GetBytes(type.GetJson)
diff --git a/Parallel/IpcParallel/Stream/StreamEmit.vb b/Parallel/IpcParallel/Stream/StreamEmit.vb
new file mode 100644
index 0000000..e6f8d90
--- /dev/null
+++ b/Parallel/IpcParallel/Stream/StreamEmit.vb
@@ -0,0 +1,58 @@
+Imports System.IO
+Imports Microsoft.VisualBasic.MIME.application.json
+Imports Microsoft.VisualBasic.MIME.application.json.BSON
+Imports Microsoft.VisualBasic.Scripting.MetaData
+
+Public Class StreamEmit
+
+ ReadOnly toBuffers As New Dictionary(Of Type, toBuffer)
+ ReadOnly loadBuffers As New Dictionary(Of Type, loadBuffer)
+
+ Sub New()
+ For Each [handle] In EmitHandler.PopulatePrimitiveHandles
+ toBuffers(handle.target) = handle.emit
+ Next
+ For Each [handle] In EmitHandler.PopulatePrimitiveParsers
+ loadBuffers(handle.target) = handle.emit
+ Next
+ End Sub
+
+ Public Function Emit(Of T)(streamAs As Func(Of T, Stream)) As StreamEmit
+ toBuffers(GetType(T)) = Function(obj) streamAs(obj)
+ Return Me
+ End Function
+
+ Public Function Emit(Of T)(fromStream As Func(Of Stream, T)) As StreamEmit
+ loadBuffers(GetType(T)) = Function(buf) fromStream(buf)
+ Return Me
+ End Function
+
+ Public Function handleCreate(buf As Stream, type As Type, emit As StreamMethods) As Object
+ If emit = StreamMethods.Auto Then
+ If loadBuffers.ContainsKey(type) Then
+ Return loadBuffers(type)(buf)
+ Else
+ Return BSONFormat.Load(buf).CreateObject(type)
+ End If
+ ElseIf emit = StreamMethods.BSON Then
+ Return BSONFormat.Load(buf).CreateObject(type)
+ ElseIf loadBuffers.ContainsKey(type) Then
+ Return loadBuffers(type)(buf)
+ Else
+ Throw New NotImplementedException
+ End If
+ End Function
+
+ Public Function handleSerialize(param As Object) As ObjectStream
+ Dim type As Type = param.GetType
+
+ If toBuffers.ContainsKey(type) Then
+ Return New ObjectStream(New TypeInfo(type, fullpath:=True), StreamMethods.Emit, toBuffers(type)(param))
+ Else
+ Dim element = type.GetJsonElement(param, New JSONSerializerOptions)
+ Dim buf As Stream = BSONFormat.SafeGetBuffer(element)
+
+ Return New ObjectStream(New TypeInfo(type, fullpath:=True), StreamMethods.BSON, buf)
+ End If
+ End Function
+End Class
diff --git a/Parallel/IpcParallel/Stream/StreamMethods.vb b/Parallel/IpcParallel/Stream/StreamMethods.vb
index a794834..d7fb5bb 100644
--- a/Parallel/IpcParallel/Stream/StreamMethods.vb
+++ b/Parallel/IpcParallel/Stream/StreamMethods.vb
@@ -1,4 +1,5 @@
Public Enum StreamMethods
+ Auto
BSON
Emit
End Enum
diff --git a/Parallel/IpcParallel/TaskBuilder.vb b/Parallel/IpcParallel/TaskBuilder.vb
index 1e2e615..db8ad42 100644
--- a/Parallel/IpcParallel/TaskBuilder.vb
+++ b/Parallel/IpcParallel/TaskBuilder.vb
@@ -1,20 +1,18 @@
Imports System.IO
Imports System.Reflection
-Imports System.Text
#If netcore5 = 1 Then
Imports Microsoft.VisualBasic.ApplicationServices.Development.NetCore5
#End If
Imports Microsoft.VisualBasic.ComponentModel
Imports Microsoft.VisualBasic.MIME.application.json
-Imports Microsoft.VisualBasic.MIME.application.json.BSON
Imports Microsoft.VisualBasic.Net.Tcp
Imports Microsoft.VisualBasic.Parallel
-Imports Microsoft.VisualBasic.Serialization
Imports Microsoft.VisualBasic.Serialization.JSON
Public Class TaskBuilder : Implements ITaskDriver
ReadOnly masterPort As Integer
+ ReadOnly emit As New StreamEmit
Sub New(port As Integer)
masterPort = port
@@ -46,9 +44,14 @@ Public Class TaskBuilder : Implements ITaskDriver
' send debug message
Call New TcpRequest(masterPort).SendMessage(New RequestStream(IPCSocket.Protocol, Protocols.PostStart))
- Call PostFinished(api.Invoke(target, args.ToArray))
- ' Call PostStdOut(api.Invoke(target, args.ToArray))
- Call Console.WriteLine("job done!")
+
+ Try
+ Call PostFinished(api.Invoke(target, args.ToArray))
+ Catch ex As Exception
+ Call PostError(ex)
+ Finally
+ Call Console.WriteLine("job done!")
+ End Try
Return 0
End Function
@@ -78,11 +81,9 @@ Public Class TaskBuilder : Implements ITaskDriver
Call deps.TryHandleNetCore5AssemblyBugs(package:=type)
#End If
- If stream.method = StreamMethods.BSON Then
- Return BSONFormat.Load(stream.stream).CreateObject(type)
- Else
- Throw New NotImplementedException
- End If
+ Using buf As MemoryStream = stream.openMemoryBuffer
+ Return emit.handleCreate(buf, type, stream.method)
+ End Using
End Function
Private Function PostError(err As Exception) As Integer
@@ -91,36 +92,16 @@ Public Class TaskBuilder : Implements ITaskDriver
Return 500
End Function
- Friend Const streamDelimiter As String = "------endoftext------"
-
- '''
- ''' 将结果数据写到标准输出上
- '''
- '''
- Private Sub PostStdOut(result As Object)
- Dim type As Type = result.GetType
- Dim element = type.GetJsonElement(result, New JSONSerializerOptions)
- Dim buf As MemoryStream = BSONFormat.SafeGetBuffer(element)
- Dim bufferSize As String = buf.Length
- Dim outstream As Stream = Console.OpenStandardOutput
-
- Call Console.WriteLine(streamDelimiter)
- Call Console.WriteLine(bufferSize)
- Call Console.Write(vbCrLf)
-
- Using stdout As New BinaryWriter(outstream)
- Call stdout.Write(buf.ToArray)
- Call stdout.Flush()
- End Using
- End Sub
-
Private Sub PostFinished(result As Object)
- Dim type As Type = result.GetType
- Dim element = type.GetJsonElement(result, New JSONSerializerOptions)
- Dim buf As Stream = BSONFormat.SafeGetBuffer(element)
- Dim request As New RequestStream(IPCSocket.Protocol, Protocols.PostResult, New StreamPipe(buf).Read)
-
- Call Console.WriteLine($"post result: {type.GetObjectJson(result, indent:=False)}")
- Call New TcpRequest(masterPort).SendMessage(request)
+ Using buf As Stream = emit.handleSerialize(result).openMemoryBuffer
+ Dim request As New RequestStream(
+ protocolCategory:=IPCSocket.Protocol,
+ protocol:=Protocols.PostResult,
+ buffer:=New StreamPipe(buf).Read
+ )
+
+ Call Console.WriteLine($"post result...")
+ Call New TcpRequest(masterPort).SendMessage(request)
+ End Using
End Sub
End Class
diff --git a/Parallel/Parallel.vbproj b/Parallel/Parallel.vbproj
index 4d474c3..a87b887 100644
--- a/Parallel/Parallel.vbproj
+++ b/Parallel/Parallel.vbproj
@@ -124,6 +124,7 @@
+