Namespace IpcStream

master
この中二病に爆焔を! 5 years ago
parent 5a1dc64dee
commit 8adfdc6a43

@ -8,6 +8,8 @@ Imports Microsoft.VisualBasic.Net.Protocols.Reflection
Imports Microsoft.VisualBasic.Net.Tcp
Imports Microsoft.VisualBasic.Parallel
Imports Microsoft.VisualBasic.Serialization.JSON
Imports Parallel.IpcStream
#If netcore5 = 1 Then
Imports randf = Microsoft.VisualBasic.Math.RandomExtensions
#End If

@ -4,4 +4,5 @@
GetArgumentByIndex
PostStart
PostResult
PostError
End Enum

@ -1,7 +1,7 @@
Imports System.IO
Imports System.Threading
Imports Microsoft.VisualBasic.CommandLine.InteropService
Imports Microsoft.VisualBasic.Language
Imports Parallel.IpcStream
Public Delegate Function ISlaveTask(processor As InteropService, port As Integer) As String

@ -3,55 +3,58 @@ Imports System.Runtime.CompilerServices
Imports System.Text
Imports Microsoft.VisualBasic.ValueTypes
Module EmitHandler
Namespace IpcStream
Public Delegate Function toBuffer(obj As Object) As Stream
Public Delegate Function loadBuffer(buf As Stream) As Object
Module EmitHandler
Public Iterator Function PopulatePrimitiveHandles() As IEnumerable(Of (target As Type, emit As toBuffer))
Yield (GetType(Integer), Function(i) New MemoryStream(BitConverter.GetBytes(DirectCast(i, Integer))))
Yield (GetType(UInteger), Function(i) New MemoryStream(BitConverter.GetBytes(DirectCast(i, UInteger))))
Public Delegate Function toBuffer(obj As Object) As Stream
Public Delegate Function loadBuffer(buf As Stream) As Object
Yield (GetType(Long), Function(l) New MemoryStream(BitConverter.GetBytes(DirectCast(l, Long))))
Yield (GetType(ULong), Function(l) New MemoryStream(BitConverter.GetBytes(DirectCast(l, ULong))))
Public Iterator Function PopulatePrimitiveHandles() As IEnumerable(Of (target As Type, emit As toBuffer))
Yield (GetType(Integer), Function(i) New MemoryStream(BitConverter.GetBytes(DirectCast(i, Integer))))
Yield (GetType(UInteger), Function(i) New MemoryStream(BitConverter.GetBytes(DirectCast(i, UInteger))))
Yield (GetType(Single), Function(f) New MemoryStream(BitConverter.GetBytes(DirectCast(f, Single))))
Yield (GetType(Double), Function(d) New MemoryStream(BitConverter.GetBytes(DirectCast(d, Double))))
Yield (GetType(Long), Function(l) New MemoryStream(BitConverter.GetBytes(DirectCast(l, Long))))
Yield (GetType(ULong), Function(l) New MemoryStream(BitConverter.GetBytes(DirectCast(l, ULong))))
Yield (GetType(UShort), Function(s) New MemoryStream(BitConverter.GetBytes(DirectCast(s, UShort))))
Yield (GetType(Boolean), Function(b) New MemoryStream(New Byte() {If(DirectCast(b, Boolean), 1, 0)}))
Yield (GetType(String), Function(s) New MemoryStream(Encoding.UTF8.GetBytes(DirectCast(s, String))))
Yield (GetType(Char), Function(c) New MemoryStream(BitConverter.GetBytes(AscW(DirectCast(c, Char)))))
Yield (GetType(Single), Function(f) New MemoryStream(BitConverter.GetBytes(DirectCast(f, Single))))
Yield (GetType(Double), Function(d) New MemoryStream(BitConverter.GetBytes(DirectCast(d, Double))))
Yield (GetType(Byte), Function(b) New MemoryStream({DirectCast(b, Byte)}))
Yield (GetType(Date), Function(d) New MemoryStream(BitConverter.GetBytes(DirectCast(d, Date).UnixTimeStamp)))
End Function
Yield (GetType(UShort), Function(s) New MemoryStream(BitConverter.GetBytes(DirectCast(s, UShort))))
Yield (GetType(Boolean), Function(b) New MemoryStream(New Byte() {If(DirectCast(b, Boolean), 1, 0)}))
Yield (GetType(String), Function(s) New MemoryStream(Encoding.UTF8.GetBytes(DirectCast(s, String))))
Yield (GetType(Char), Function(c) New MemoryStream(BitConverter.GetBytes(AscW(DirectCast(c, Char)))))
Public Iterator Function PopulatePrimitiveParsers() As IEnumerable(Of (target As Type, emit As loadBuffer))
Yield (GetType(Integer), Function(i) BitConverter.ToInt32(i.readAllBytes, Scan0))
Yield (GetType(UInteger), Function(i) BitConverter.ToUInt32(i.readAllBytes, Scan0))
Yield (GetType(Byte), Function(b) New MemoryStream({DirectCast(b, Byte)}))
Yield (GetType(Date), Function(d) New MemoryStream(BitConverter.GetBytes(DirectCast(d, Date).UnixTimeStamp)))
End Function
Yield (GetType(Long), Function(l) BitConverter.ToInt64(l.readAllBytes, Scan0))
Yield (GetType(ULong), Function(l) BitConverter.ToUInt64(l.readAllBytes, Scan0))
Public Iterator Function PopulatePrimitiveParsers() As IEnumerable(Of (target As Type, emit As loadBuffer))
Yield (GetType(Integer), Function(i) BitConverter.ToInt32(i.readAllBytes, Scan0))
Yield (GetType(UInteger), Function(i) BitConverter.ToUInt32(i.readAllBytes, Scan0))
Yield (GetType(Single), Function(f) BitConverter.ToSingle(f.readAllBytes, Scan0))
Yield (GetType(Double), Function(d) BitConverter.ToDouble(d.readAllBytes, Scan0))
Yield (GetType(Long), Function(l) BitConverter.ToInt64(l.readAllBytes, Scan0))
Yield (GetType(ULong), Function(l) BitConverter.ToUInt64(l.readAllBytes, Scan0))
Yield (GetType(Short), Function(s) BitConverter.ToInt16(s.readAllBytes, Scan0))
Yield (GetType(UShort), Function(s) BitConverter.ToUInt16(s.readAllBytes, Scan0))
Yield (GetType(Single), Function(f) BitConverter.ToSingle(f.readAllBytes, Scan0))
Yield (GetType(Double), Function(d) BitConverter.ToDouble(d.readAllBytes, Scan0))
Yield (GetType(Boolean), Function(b) b.readAllBytes()(Scan0) <> 0)
Yield (GetType(String), Function(s) Encoding.UTF8.GetString(s.readAllBytes))
Yield (GetType(Char), Function(c) ChrW(BitConverter.ToInt32(c.readAllBytes, Scan0)))
Yield (GetType(Short), Function(s) BitConverter.ToInt16(s.readAllBytes, Scan0))
Yield (GetType(UShort), Function(s) BitConverter.ToUInt16(s.readAllBytes, Scan0))
Yield (GetType(Byte), Function(b) b.readAllBytes()(Scan0))
Yield (GetType(Date), Function(d) FromUnixTimeStamp(BitConverter.ToDouble(d.readAllBytes, Scan0)))
End Function
Yield (GetType(Boolean), Function(b) b.readAllBytes()(Scan0) <> 0)
Yield (GetType(String), Function(s) Encoding.UTF8.GetString(s.readAllBytes))
Yield (GetType(Char), Function(c) ChrW(BitConverter.ToInt32(c.readAllBytes, Scan0)))
<Extension>
Private Function readAllBytes(buf As Stream) As Byte()
Dim bytes As Byte() = New Byte(buf.Length - 1) {}
Call buf.Read(bytes, Scan0, bytes.Length - 1)
Return bytes
End Function
End Module
Yield (GetType(Byte), Function(b) b.readAllBytes()(Scan0))
Yield (GetType(Date), Function(d) FromUnixTimeStamp(BitConverter.ToDouble(d.readAllBytes, Scan0)))
End Function
<Extension>
Private Function readAllBytes(buf As Stream) As Byte()
Dim bytes As Byte() = New Byte(buf.Length - 1) {}
Call buf.Read(bytes, Scan0, bytes.Length - 1)
Return bytes
End Function
End Module
End Namespace

@ -1,37 +1,40 @@
Imports Microsoft.VisualBasic.ApplicationServices.Debugging.Diagnostics
Public Class IPCError
Namespace IpcStream
Public Property message As String
Public Property stackTrace As StackFrame()
Public Property inner As IPCError
Public Property exceptionName As String
Public Class IPCError
Sub New()
End Sub
Public Property message As String
Public Property stackTrace As StackFrame()
Public Property inner As IPCError
Public Property exceptionName As String
Sub New(ex As Exception)
exceptionName = ex.GetType.Name
message = ex.Message
stackTrace = ExceptionData.ParseStackTrace(ex.StackTrace)
Sub New()
End Sub
If Not ex.InnerException Is Nothing Then
inner = New IPCError(ex.InnerException)
End If
End Sub
Sub New(ex As Exception)
exceptionName = ex.GetType.Name
message = ex.Message
stackTrace = ExceptionData.ParseStackTrace(ex.StackTrace)
Public Iterator Function GetAllErrorMessages() As IEnumerable(Of String)
If Not inner Is Nothing Then
For Each msg As String In inner.GetAllErrorMessages
Yield msg
Next
End If
If Not ex.InnerException Is Nothing Then
inner = New IPCError(ex.InnerException)
End If
End Sub
Yield $"{exceptionName}: {message}"
End Function
Public Iterator Function GetAllErrorMessages() As IEnumerable(Of String)
If Not inner Is Nothing Then
For Each msg As String In inner.GetAllErrorMessages
Yield msg
Next
End If
Public Overrides Function ToString() As String
Return $"{exceptionName}: {message}"
End Function
Yield $"{exceptionName}: {message}"
End Function
End Class
Public Overrides Function ToString() As String
Return $"{exceptionName}: {message}"
End Function
End Class
End Namespace

@ -6,85 +6,88 @@ Imports Microsoft.VisualBasic.Scripting.MetaData
Imports Microsoft.VisualBasic.Serialization
Imports Microsoft.VisualBasic.Serialization.JSON
Public Class ObjectStream : Inherits RawStream
Implements IDisposable
Dim disposedValue As Boolean
Public Property method As StreamMethods
Public Property stream As Byte()
Public Property type As TypeInfo
Public ReadOnly Property isPrimitive As Boolean
Get
Return DataFramework.IsPrimitive(type.GetType(knownFirst:=True))
End Get
End Property
Sub New(type As TypeInfo, method As StreamMethods, stream As Stream)
Me.method = method
Me.stream = New StreamPipe(stream).Read
Me.type = type
Call stream.Close()
Call stream.Dispose()
End Sub
Sub New(raw As Byte())
Using read As New BinaryReader(New MemoryStream(raw))
Dim methodi As Integer = read.ReadInt32
Dim size As Integer = read.ReadInt32
Dim chunk As Byte() = read.ReadBytes(size)
Dim typeJson As String = chunk.UTF8String
size = read.ReadInt32
type = typeJson.LoadJSON(Of TypeInfo)
method = CType(methodi, StreamMethods)
stream = read.ReadBytes(size)
End Using
End Sub
Public Function openMemoryBuffer() As MemoryStream
Return New MemoryStream(stream)
End Function
Public Overrides Function Serialize() As Byte()
Using ms As New MemoryStream
Dim json As Byte() = Encoding.UTF8.GetBytes(type.GetJson)
Call ms.Write(BitConverter.GetBytes(method), Scan0, RawStream.INT32)
Call ms.Write(BitConverter.GetBytes(json.Length), Scan0, RawStream.INT32)
Call ms.Write(json, Scan0, json.Length)
Call ms.Write(BitConverter.GetBytes(stream.Length), Scan0, RawStream.INT32)
Call ms.Write(stream, Scan0, stream.Length)
Return ms.ToArray
End Using
End Function
Protected Overridable Sub Dispose(disposing As Boolean)
If Not disposedValue Then
If disposing Then
' TODO: ()
Erase stream
Namespace IpcStream
Public Class ObjectStream : Inherits RawStream
Implements IDisposable
Dim disposedValue As Boolean
Public Property method As StreamMethods
Public Property stream As Byte()
Public Property type As TypeInfo
Public ReadOnly Property isPrimitive As Boolean
Get
Return DataFramework.IsPrimitive(type.GetType(knownFirst:=True))
End Get
End Property
Sub New(type As TypeInfo, method As StreamMethods, stream As Stream)
Me.method = method
Me.stream = New StreamPipe(stream).Read
Me.type = type
Call stream.Close()
Call stream.Dispose()
End Sub
Sub New(raw As Byte())
Using read As New BinaryReader(New MemoryStream(raw))
Dim methodi As Integer = read.ReadInt32
Dim size As Integer = read.ReadInt32
Dim chunk As Byte() = read.ReadBytes(size)
Dim typeJson As String = chunk.UTF8String
size = read.ReadInt32
type = typeJson.LoadJSON(Of TypeInfo)
method = CType(methodi, StreamMethods)
stream = read.ReadBytes(size)
End Using
End Sub
Public Function openMemoryBuffer() As MemoryStream
Return New MemoryStream(stream)
End Function
Public Overrides Function Serialize() As Byte()
Using ms As New MemoryStream
Dim json As Byte() = Encoding.UTF8.GetBytes(type.GetJson)
Call ms.Write(BitConverter.GetBytes(method), Scan0, RawStream.INT32)
Call ms.Write(BitConverter.GetBytes(json.Length), Scan0, RawStream.INT32)
Call ms.Write(json, Scan0, json.Length)
Call ms.Write(BitConverter.GetBytes(stream.Length), Scan0, RawStream.INT32)
Call ms.Write(stream, Scan0, stream.Length)
Return ms.ToArray
End Using
End Function
Protected Overridable Sub Dispose(disposing As Boolean)
If Not disposedValue Then
If disposing Then
' TODO: ()
Erase stream
End If
' TODO: ()
' TODO: null
disposedValue = True
End If
' TODO: ()
' TODO: null
disposedValue = True
End If
End Sub
' ' TODO: Dispose(disposing As Boolean)
' Protected Overrides Sub Finalize()
' ' Dispose(disposing As Boolean)
' Dispose(disposing:=False)
' MyBase.Finalize()
' End Sub
Public Sub Dispose() Implements IDisposable.Dispose
' Dispose(disposing As Boolean)
Dispose(disposing:=True)
GC.SuppressFinalize(Me)
End Sub
End Class
End Sub
' ' TODO: Dispose(disposing As Boolean)
' Protected Overrides Sub Finalize()
' ' Dispose(disposing As Boolean)
' Dispose(disposing:=False)
' MyBase.Finalize()
' End Sub
Public Sub Dispose() Implements IDisposable.Dispose
' Dispose(disposing As Boolean)
Dispose(disposing:=True)
GC.SuppressFinalize(Me)
End Sub
End Class
End Namespace

@ -3,56 +3,59 @@ Imports Microsoft.VisualBasic.MIME.application.json
Imports Microsoft.VisualBasic.MIME.application.json.BSON
Imports Microsoft.VisualBasic.Scripting.MetaData
Public Class StreamEmit
ReadOnly toBuffers As New Dictionary(Of Type, toBuffer)
ReadOnly loadBuffers As New Dictionary(Of Type, loadBuffer)
Sub New()
For Each [handle] In EmitHandler.PopulatePrimitiveHandles
toBuffers(handle.target) = handle.emit
Next
For Each [handle] In EmitHandler.PopulatePrimitiveParsers
loadBuffers(handle.target) = handle.emit
Next
End Sub
Public Function Emit(Of T)(streamAs As Func(Of T, Stream)) As StreamEmit
toBuffers(GetType(T)) = Function(obj) streamAs(obj)
Return Me
End Function
Public Function Emit(Of T)(fromStream As Func(Of Stream, T)) As StreamEmit
loadBuffers(GetType(T)) = Function(buf) fromStream(buf)
Return Me
End Function
Public Function handleCreate(buf As Stream, type As Type, emit As StreamMethods) As Object
If emit = StreamMethods.Auto Then
If loadBuffers.ContainsKey(type) Then
Namespace IpcStream
Public Class StreamEmit
ReadOnly toBuffers As New Dictionary(Of Type, toBuffer)
ReadOnly loadBuffers As New Dictionary(Of Type, loadBuffer)
Sub New()
For Each [handle] In EmitHandler.PopulatePrimitiveHandles
toBuffers(handle.target) = handle.emit
Next
For Each [handle] In EmitHandler.PopulatePrimitiveParsers
loadBuffers(handle.target) = handle.emit
Next
End Sub
Public Function Emit(Of T)(streamAs As Func(Of T, Stream)) As StreamEmit
toBuffers(GetType(T)) = Function(obj) streamAs(obj)
Return Me
End Function
Public Function Emit(Of T)(fromStream As Func(Of Stream, T)) As StreamEmit
loadBuffers(GetType(T)) = Function(buf) fromStream(buf)
Return Me
End Function
Public Function handleCreate(buf As Stream, type As Type, emit As StreamMethods) As Object
If emit = StreamMethods.Auto Then
If loadBuffers.ContainsKey(type) Then
Return loadBuffers(type)(buf)
Else
Return BSONFormat.Load(buf).CreateObject(type)
End If
ElseIf emit = StreamMethods.BSON Then
Return BSONFormat.Load(buf).CreateObject(type)
ElseIf loadBuffers.ContainsKey(type) Then
Return loadBuffers(type)(buf)
Else
Return BSONFormat.Load(buf).CreateObject(type)
Throw New NotImplementedException
End If
End Function
Public Function handleSerialize(param As Object) As ObjectStream
Dim type As Type = param.GetType
If toBuffers.ContainsKey(type) Then
Return New ObjectStream(New TypeInfo(type, fullpath:=True), StreamMethods.Emit, toBuffers(type)(param))
Else
Dim element = type.GetJsonElement(param, New JSONSerializerOptions)
Dim buf As Stream = BSONFormat.SafeGetBuffer(element)
Return New ObjectStream(New TypeInfo(type, fullpath:=True), StreamMethods.BSON, buf)
End If
ElseIf emit = StreamMethods.BSON Then
Return BSONFormat.Load(buf).CreateObject(type)
ElseIf loadBuffers.ContainsKey(type) Then
Return loadBuffers(type)(buf)
Else
Throw New NotImplementedException
End If
End Function
Public Function handleSerialize(param As Object) As ObjectStream
Dim type As Type = param.GetType
If toBuffers.ContainsKey(type) Then
Return New ObjectStream(New TypeInfo(type, fullpath:=True), StreamMethods.Emit, toBuffers(type)(param))
Else
Dim element = type.GetJsonElement(param, New JSONSerializerOptions)
Dim buf As Stream = BSONFormat.SafeGetBuffer(element)
Return New ObjectStream(New TypeInfo(type, fullpath:=True), StreamMethods.BSON, buf)
End If
End Function
End Class
End Function
End Class
End Namespace

@ -1,5 +1,8 @@
Public Enum StreamMethods
Auto
BSON
Emit
End Enum
Namespace IpcStream
Public Enum StreamMethods
Auto
BSON
Emit
End Enum
End Namespace

@ -8,6 +8,7 @@ Imports Microsoft.VisualBasic.MIME.application.json
Imports Microsoft.VisualBasic.Net.Tcp
Imports Microsoft.VisualBasic.Parallel
Imports Microsoft.VisualBasic.Serialization.JSON
Imports Parallel.IpcStream
Public Class TaskBuilder : Implements ITaskDriver
@ -46,7 +47,7 @@ Public Class TaskBuilder : Implements ITaskDriver
Call New TcpRequest(masterPort).SendMessage(New RequestStream(IPCSocket.Protocol, Protocols.PostStart))
Try
Call PostFinished(api.Invoke(target, args.ToArray))
Call PostFinished(api.Invoke(target, args.ToArray), Protocols.PostResult)
Catch ex As Exception
Call PostError(ex)
Finally
@ -87,16 +88,16 @@ Public Class TaskBuilder : Implements ITaskDriver
End Function
Private Function PostError(err As Exception) As Integer
Call PostFinished(New IPCError(err))
Call PostFinished(New IPCError(err), Protocols.PostError)
Return 500
End Function
Private Sub PostFinished(result As Object)
Private Sub PostFinished(result As Object, protocol As Protocols)
Using buf As Stream = emit.handleSerialize(result).openMemoryBuffer
Dim request As New RequestStream(
protocolCategory:=IPCSocket.Protocol,
protocol:=Protocols.PostResult,
protocol:=protocol,
buffer:=New StreamPipe(buf).Read
)

@ -150,6 +150,7 @@
<DesignTimeSharedInput>True</DesignTimeSharedInput>
</Compile>
<Compile Include="ThreadTask\BatchTasks.vb" />
<Compile Include="ThreadTask\TaskInvokeHelper.vb" />
<Compile Include="ThreadTask\ThreadTask.vb" />
</ItemGroup>
<ItemGroup>

@ -48,7 +48,7 @@ Public Module BatchTasks
Public Sub Invoke(tasks As Action(), numOfThreads As Integer)
Dim getTask As Func(Of Action, Func(Of Integer)) =
Function(task)
Return AddressOf New invokeHelper With {
Return AddressOf New TaskInvokeHelper With {
.task = task
}.RunTask
End Function
@ -59,14 +59,4 @@ Public Module BatchTasks
.RunParallel() _
.ToArray
End Sub
Private Structure invokeHelper
Dim task As Action
Public Function RunTask() As Integer
Call task()
Return 0
End Function
End Structure
End Module

@ -0,0 +1,9 @@
Friend Structure TaskInvokeHelper
Dim task As Action
Public Function RunTask() As Integer
Call task()
Return 0
End Function
End Structure
Loading…
Cancel
Save