object stream data

master
この中二病に爆焔を! 5 years ago
parent a07c29c9e2
commit cd56185fbe

@ -9,6 +9,8 @@ Imports Microsoft.VisualBasic.Serialization.JSON
<Protocol(GetType(Protocols))>
Public Class IPCSocket : Implements ITaskDriver
Public Shared ReadOnly Property Protocol As Long = New ProtocolAttribute(GetType(Protocols)).EntryPoint
ReadOnly socket As New TcpServicesSocket(GetFirstAvailablePort)
ReadOnly target As IDelegate
@ -20,7 +22,7 @@ Public Class IPCSocket : Implements ITaskDriver
Public Property handlePOSTResult As Action(Of Stream)
Public Property nargs As Integer
Public Property handleGetArgument As Func(Of Integer, Stream)
Public Property handleGetArgument As Func(Of Integer, ObjectStream)
Sub New(target As IDelegate)
Me.socket.ResponseHandler = AddressOf New ProtocolHandler(Me).HandleRequest
@ -39,8 +41,8 @@ Public Class IPCSocket : Implements ITaskDriver
<Protocol(Protocols.GetArgumentByIndex)>
Public Function GetArgumentByIndex(request As RequestStream, remoteAddress As System.Net.IPEndPoint) As BufferPipe
Dim i As Integer = BitConverter.ToInt32(request.ChunkBuffer, Scan0)
Dim buf As Stream = _handleGetArgument(i)
Dim pipe As New StreamPipe(buf)
Dim buf As ObjectStream = _handleGetArgument(i)
Dim pipe As New DataPipe(buf)
Return pipe
End Function

@ -0,0 +1,47 @@
Imports System.IO
Imports System.Text
Imports Microsoft.VisualBasic.Parallel
Imports Microsoft.VisualBasic.Scripting.MetaData
Imports Microsoft.VisualBasic.Serialization
Imports Microsoft.VisualBasic.Serialization.JSON
Public Class ObjectStream : Inherits RawStream
Public Property method As StreamMethods
Public Property stream As Byte()
Public Property type As TypeInfo
Sub New(type As TypeInfo, method As StreamMethods, stream As Stream)
Me.method = method
Me.stream = New StreamPipe(stream).Read
Me.type = type
End Sub
Sub New(raw As Byte())
Using read As New BinaryReader(New MemoryStream(raw))
Dim methodi As Integer = read.ReadInt32
Dim size As Integer = read.ReadInt32
Dim chunk As Byte() = read.ReadBytes(size)
Dim typeJson As String = chunk.UTF8String
size = read.ReadInt32
type = typeJson.LoadJSON(Of TypeInfo)
method = CType(methodi, StreamMethods)
stream = read.ReadBytes(size)
End Using
End Sub
Public Overrides Function Serialize() As Byte()
Using ms As New MemoryStream
Dim json As Byte() = Encoding.UTF8.GetBytes(type.GetJson)
Call ms.Write(BitConverter.GetBytes(method), Scan0, RawStream.INT32)
Call ms.Write(BitConverter.GetBytes(json.Length), Scan0, RawStream.INT32)
Call ms.Write(json, Scan0, json.Length)
Call ms.Write(BitConverter.GetBytes(stream.Length), Scan0, RawStream.INT32)
Call ms.Write(stream, Scan0, stream.Length)
Return ms.ToArray
End Using
End Function
End Class

@ -64,8 +64,9 @@
<Import Include="System.Threading.Tasks" />
</ItemGroup>
<ItemGroup>
<Compile Include="StreamMethods.vb" />
<Compile Include="TaskBuilder.vb" />
<Compile Include="ProtocolStream.vb" />
<Compile Include="ObjectStream.vb" />
<Compile Include="Protocols.vb" />
<Compile Include="IPCSocket.vb" />
<Compile Include="IDelegate.vb" />

@ -1,3 +0,0 @@
Module ProtocolStream
End Module

@ -3,6 +3,7 @@ Imports System.Threading
Imports Microsoft.VisualBasic.CommandLine.InteropService
Imports Microsoft.VisualBasic.MIME.application.json
Imports Microsoft.VisualBasic.MIME.application.json.BSON
Imports Microsoft.VisualBasic.Scripting.MetaData
Public Delegate Function ISlaveTask(processor As InteropService, port As Integer) As String
@ -36,16 +37,16 @@ Public Class SlaveTask
End If
End Function
Private Function handleGET(param As Object) As Stream
Private Function handleGET(param As Object) As ObjectStream
Dim type As Type = param.GetType
If toBuffers.ContainsKey(type) Then
Return toBuffers(type)(param)
Return New ObjectStream(New TypeInfo(type), StreamMethods.Emit, toBuffers(type)(param))
Else
Dim element = type.GetJsonElement(param, New JSONSerializerOptions)
Dim buf As Stream = BSONFormat.GetBuffer(element)
Return buf
Return New ObjectStream(New TypeInfo(type), StreamMethods.BSON, buf)
End If
End Function

@ -0,0 +1,4 @@
Public Enum StreamMethods
BSON
Emit
End Enum

@ -1,5 +1,8 @@
Imports System.Reflection
Imports Microsoft.VisualBasic.ComponentModel
Imports Microsoft.VisualBasic.Net.Tcp
Imports Microsoft.VisualBasic.Parallel
Imports Microsoft.VisualBasic.Serialization.JSON
Public Class TaskBuilder : Implements ITaskDriver
@ -11,7 +14,7 @@ Public Class TaskBuilder : Implements ITaskDriver
Public Function Run() As Integer Implements ITaskDriver.Run
Dim task As IDelegate = GetMethod()
Dim n As Integer
Dim n As Integer = GetArgumentValueNumber()
Dim args As New List(Of Object)
For i As Integer = 0 To n - 1
@ -34,12 +37,30 @@ Public Class TaskBuilder : Implements ITaskDriver
Return 0
End Function
Private Function GetArgumentValueNumber() As Integer
Dim resp = New TcpRequest(masterPort).SendMessage(New RequestStream(IPCSocket.Protocol, Protocols.GetArgumentNumber))
Dim n As Integer = BitConverter.ToInt32(resp.ChunkBuffer, Scan0)
Return n
End Function
Private Function GetMethod() As IDelegate
Dim resp = New TcpRequest(masterPort).SendMessage(New RequestStream(IPCSocket.Protocol, Protocols.GetTask))
Dim json As String = resp.GetUTF8String
Dim target As IDelegate = json.LoadJSON(Of IDelegate)
Return target
End Function
Private Function GetArgumentValue(i As Integer) As Object
Dim resp = New TcpRequest(masterPort).SendMessage(New RequestStream(IPCSocket.Protocol, Protocols.GetArgumentByIndex, BitConverter.GetBytes(i)))
Dim stream As New ObjectStream(resp.ChunkBuffer)
If stream.method = StreamMethods.BSON Then
Else
Throw New NotImplementedException
End If
End Function
Private Function PostError(err As Exception) As Integer

Loading…
Cancel
Save