diff --git a/Parallel/IPCSocket.vb b/Parallel/IPCSocket.vb index 17b75ea..b1420b4 100644 --- a/Parallel/IPCSocket.vb +++ b/Parallel/IPCSocket.vb @@ -9,6 +9,8 @@ Imports Microsoft.VisualBasic.Serialization.JSON Public Class IPCSocket : Implements ITaskDriver + Public Shared ReadOnly Property Protocol As Long = New ProtocolAttribute(GetType(Protocols)).EntryPoint + ReadOnly socket As New TcpServicesSocket(GetFirstAvailablePort) ReadOnly target As IDelegate @@ -20,7 +22,7 @@ Public Class IPCSocket : Implements ITaskDriver Public Property handlePOSTResult As Action(Of Stream) Public Property nargs As Integer - Public Property handleGetArgument As Func(Of Integer, Stream) + Public Property handleGetArgument As Func(Of Integer, ObjectStream) Sub New(target As IDelegate) Me.socket.ResponseHandler = AddressOf New ProtocolHandler(Me).HandleRequest @@ -39,8 +41,8 @@ Public Class IPCSocket : Implements ITaskDriver Public Function GetArgumentByIndex(request As RequestStream, remoteAddress As System.Net.IPEndPoint) As BufferPipe Dim i As Integer = BitConverter.ToInt32(request.ChunkBuffer, Scan0) - Dim buf As Stream = _handleGetArgument(i) - Dim pipe As New StreamPipe(buf) + Dim buf As ObjectStream = _handleGetArgument(i) + Dim pipe As New DataPipe(buf) Return pipe End Function diff --git a/Parallel/ObjectStream.vb b/Parallel/ObjectStream.vb new file mode 100644 index 0000000..5799dbf --- /dev/null +++ b/Parallel/ObjectStream.vb @@ -0,0 +1,47 @@ +Imports System.IO +Imports System.Text +Imports Microsoft.VisualBasic.Parallel +Imports Microsoft.VisualBasic.Scripting.MetaData +Imports Microsoft.VisualBasic.Serialization +Imports Microsoft.VisualBasic.Serialization.JSON + +Public Class ObjectStream : Inherits RawStream + + Public Property method As StreamMethods + Public Property stream As Byte() + Public Property type As TypeInfo + + Sub New(type As TypeInfo, method As StreamMethods, stream As Stream) + Me.method = method + Me.stream = New StreamPipe(stream).Read + Me.type = type + End Sub + + Sub New(raw As Byte()) + Using read As New BinaryReader(New MemoryStream(raw)) + Dim methodi As Integer = read.ReadInt32 + Dim size As Integer = read.ReadInt32 + Dim chunk As Byte() = read.ReadBytes(size) + Dim typeJson As String = chunk.UTF8String + + size = read.ReadInt32 + type = typeJson.LoadJSON(Of TypeInfo) + method = CType(methodi, StreamMethods) + stream = read.ReadBytes(size) + End Using + End Sub + + Public Overrides Function Serialize() As Byte() + Using ms As New MemoryStream + Dim json As Byte() = Encoding.UTF8.GetBytes(type.GetJson) + + Call ms.Write(BitConverter.GetBytes(method), Scan0, RawStream.INT32) + Call ms.Write(BitConverter.GetBytes(json.Length), Scan0, RawStream.INT32) + Call ms.Write(json, Scan0, json.Length) + Call ms.Write(BitConverter.GetBytes(stream.Length), Scan0, RawStream.INT32) + Call ms.Write(stream, Scan0, stream.Length) + + Return ms.ToArray + End Using + End Function +End Class diff --git a/Parallel/Parallel.vbproj b/Parallel/Parallel.vbproj index 3172c12..d07edec 100644 --- a/Parallel/Parallel.vbproj +++ b/Parallel/Parallel.vbproj @@ -64,8 +64,9 @@ + - + diff --git a/Parallel/ProtocolStream.vb b/Parallel/ProtocolStream.vb deleted file mode 100644 index 3a33bf3..0000000 --- a/Parallel/ProtocolStream.vb +++ /dev/null @@ -1,3 +0,0 @@ -Module ProtocolStream - -End Module diff --git a/Parallel/SlaveTask.vb b/Parallel/SlaveTask.vb index bebc64a..8c438cf 100644 --- a/Parallel/SlaveTask.vb +++ b/Parallel/SlaveTask.vb @@ -3,6 +3,7 @@ Imports System.Threading Imports Microsoft.VisualBasic.CommandLine.InteropService Imports Microsoft.VisualBasic.MIME.application.json Imports Microsoft.VisualBasic.MIME.application.json.BSON +Imports Microsoft.VisualBasic.Scripting.MetaData Public Delegate Function ISlaveTask(processor As InteropService, port As Integer) As String @@ -36,16 +37,16 @@ Public Class SlaveTask End If End Function - Private Function handleGET(param As Object) As Stream + Private Function handleGET(param As Object) As ObjectStream Dim type As Type = param.GetType If toBuffers.ContainsKey(type) Then - Return toBuffers(type)(param) + Return New ObjectStream(New TypeInfo(type), StreamMethods.Emit, toBuffers(type)(param)) Else Dim element = type.GetJsonElement(param, New JSONSerializerOptions) Dim buf As Stream = BSONFormat.GetBuffer(element) - Return buf + Return New ObjectStream(New TypeInfo(type), StreamMethods.BSON, buf) End If End Function diff --git a/Parallel/StreamMethods.vb b/Parallel/StreamMethods.vb new file mode 100644 index 0000000..a794834 --- /dev/null +++ b/Parallel/StreamMethods.vb @@ -0,0 +1,4 @@ +Public Enum StreamMethods + BSON + Emit +End Enum diff --git a/Parallel/TaskBuilder.vb b/Parallel/TaskBuilder.vb index 9698f7d..d50befd 100644 --- a/Parallel/TaskBuilder.vb +++ b/Parallel/TaskBuilder.vb @@ -1,5 +1,8 @@ Imports System.Reflection Imports Microsoft.VisualBasic.ComponentModel +Imports Microsoft.VisualBasic.Net.Tcp +Imports Microsoft.VisualBasic.Parallel +Imports Microsoft.VisualBasic.Serialization.JSON Public Class TaskBuilder : Implements ITaskDriver @@ -11,7 +14,7 @@ Public Class TaskBuilder : Implements ITaskDriver Public Function Run() As Integer Implements ITaskDriver.Run Dim task As IDelegate = GetMethod() - Dim n As Integer + Dim n As Integer = GetArgumentValueNumber() Dim args As New List(Of Object) For i As Integer = 0 To n - 1 @@ -34,12 +37,30 @@ Public Class TaskBuilder : Implements ITaskDriver Return 0 End Function + Private Function GetArgumentValueNumber() As Integer + Dim resp = New TcpRequest(masterPort).SendMessage(New RequestStream(IPCSocket.Protocol, Protocols.GetArgumentNumber)) + Dim n As Integer = BitConverter.ToInt32(resp.ChunkBuffer, Scan0) + + Return n + End Function + Private Function GetMethod() As IDelegate + Dim resp = New TcpRequest(masterPort).SendMessage(New RequestStream(IPCSocket.Protocol, Protocols.GetTask)) + Dim json As String = resp.GetUTF8String + Dim target As IDelegate = json.LoadJSON(Of IDelegate) + Return target End Function Private Function GetArgumentValue(i As Integer) As Object + Dim resp = New TcpRequest(masterPort).SendMessage(New RequestStream(IPCSocket.Protocol, Protocols.GetArgumentByIndex, BitConverter.GetBytes(i))) + Dim stream As New ObjectStream(resp.ChunkBuffer) + + If stream.method = StreamMethods.BSON Then + Else + Throw New NotImplementedException + End If End Function Private Function PostError(err As Exception) As Integer