diff --git a/Parallel/IpcParallel/SlaveTask.vb b/Parallel/IpcParallel/SlaveTask.vb index ae747ca..6dc83a5 100644 --- a/Parallel/IpcParallel/SlaveTask.vb +++ b/Parallel/IpcParallel/SlaveTask.vb @@ -88,6 +88,7 @@ Public Class SlaveTask End Function Private Function handleGET(param As Object, i As Integer, debugCode As Integer) As ObjectStream + Dim socket As SocketRef = SocketRef.CreateReference Call Console.WriteLine($"[{debugCode.ToHexString}] get argument[{i + 1}]...") Return streamBuf.handleSerialize(param) End Function diff --git a/Parallel/IpcParallel/SocketBuffer/SocketRef.vb b/Parallel/IpcParallel/SocketBuffer/SocketRef.vb new file mode 100644 index 0000000..0e560a2 --- /dev/null +++ b/Parallel/IpcParallel/SocketBuffer/SocketRef.vb @@ -0,0 +1,32 @@ +Imports System.IO + +Namespace IpcStream + + Public Class SocketRef + + Public Property address As String + + Public Shared Function WriteBuffer(target As Object, emit As StreamEmit) As SocketRef + Dim stream As ObjectStream = emit.handleSerialize(target) + Dim ref As SocketRef = CreateReference() + + Using file As Stream = ref.address.Open(FileMode.OpenOrCreate, doClear:=True, [readOnly]:=False) + + End Using + + Return ref + End Function + + Public Overrides Function ToString() As String + Return address + End Function + + Public Shared Function CreateReference() As SocketRef + Return App.GetAppSysTempFile(".sock", App.PID.ToHexString, prefix:="Parallel") + End Function + + Public Shared Widening Operator CType(ref As String) As SocketRef + Return New SocketRef With {.address = ref} + End Operator + End Class +End Namespace \ No newline at end of file diff --git a/Parallel/IpcParallel/Stream/ObjectStream.vb b/Parallel/IpcParallel/Stream/ObjectStream.vb index 865586a..2174ec6 100644 --- a/Parallel/IpcParallel/Stream/ObjectStream.vb +++ b/Parallel/IpcParallel/Stream/ObjectStream.vb @@ -1,54 +1,56 @@ #Region "Microsoft.VisualBasic::afd368cb2ea7bf946dcdfa3ed58b0c24, Parallel\IpcParallel\Stream\ObjectStream.vb" - ' Author: - ' - ' asuka (amethyst.asuka@gcmodeller.org) - ' xie (genetics@smrucc.org) - ' xieguigang (xie.guigang@live.com) - ' - ' Copyright (c) 2018 GPL3 Licensed - ' - ' - ' GNU GENERAL PUBLIC LICENSE (GPL3) - ' - ' - ' This program is free software: you can redistribute it and/or modify - ' it under the terms of the GNU General Public License as published by - ' the Free Software Foundation, either version 3 of the License, or - ' (at your option) any later version. - ' - ' This program is distributed in the hope that it will be useful, - ' but WITHOUT ANY WARRANTY; without even the implied warranty of - ' MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - ' GNU General Public License for more details. - ' - ' You should have received a copy of the GNU General Public License - ' along with this program. If not, see . - - - - ' /********************************************************************************/ - - ' Summaries: - - ' Class ObjectStream - ' - ' Properties: isPrimitive, method, stream, type - ' - ' Constructor: (+2 Overloads) Sub New - ' - ' Function: GetUnderlyingType, openMemoryBuffer, Serialize - ' - ' Sub: (+2 Overloads) Dispose - ' - ' - ' /********************************************************************************/ +' Author: +' +' asuka (amethyst.asuka@gcmodeller.org) +' xie (genetics@smrucc.org) +' xieguigang (xie.guigang@live.com) +' +' Copyright (c) 2018 GPL3 Licensed +' +' +' GNU GENERAL PUBLIC LICENSE (GPL3) +' +' +' This program is free software: you can redistribute it and/or modify +' it under the terms of the GNU General Public License as published by +' the Free Software Foundation, either version 3 of the License, or +' (at your option) any later version. +' +' This program is distributed in the hope that it will be useful, +' but WITHOUT ANY WARRANTY; without even the implied warranty of +' MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +' GNU General Public License for more details. +' +' You should have received a copy of the GNU General Public License +' along with this program. If not, see . + + + +' /********************************************************************************/ + +' Summaries: + +' Class ObjectStream +' +' Properties: isPrimitive, method, stream, type +' +' Constructor: (+2 Overloads) Sub New +' +' Function: GetUnderlyingType, openMemoryBuffer, Serialize +' +' Sub: (+2 Overloads) Dispose +' +' +' /********************************************************************************/ #End Region Imports System.IO Imports System.Text Imports Microsoft.VisualBasic.ComponentModel.DataSourceModel +Imports Microsoft.VisualBasic.Data.IO.MessagePack +Imports Microsoft.VisualBasic.Data.IO.MessagePack.Serialization Imports Microsoft.VisualBasic.Parallel Imports Microsoft.VisualBasic.Scripting.MetaData Imports Microsoft.VisualBasic.Serialization @@ -61,9 +63,9 @@ Namespace IpcStream Dim disposedValue As Boolean - Public Property method As StreamMethods - Public Property stream As Byte() - Public Property type As TypeInfo + Public Property method As StreamMethods + Public Property stream As Byte() + Public Property type As TypeInfo Public ReadOnly Property isPrimitive As Boolean Get @@ -71,6 +73,9 @@ Namespace IpcStream End Get End Property + Sub New() + End Sub + Sub New(type As TypeInfo, method As StreamMethods, stream As Stream) Me.method = method Me.stream = New StreamPipe(stream).Read @@ -94,6 +99,23 @@ Namespace IpcStream End Using End Sub + Shared Sub New() + Call MsgPackSerializer.DefaultContext.RegisterSerializer(New TypeMsgPack) + End Sub + + Private Class TypeMsgPack : Inherits SchemaProvider(Of TypeInfo) + + Protected Overrides Iterator Function GetObjectSchema() As IEnumerable(Of (obj As Type, schema As Dictionary(Of String, NilImplication))) + Dim map As New Dictionary(Of String, NilImplication) From { + {NameOf(TypeInfo.assembly), NilImplication.MemberDefault}, + {NameOf(TypeInfo.fullName), NilImplication.MemberDefault}, + {NameOf(TypeInfo.reference), NilImplication.MemberDefault} + } + + Yield (GetType(TypeInfo), map) + End Function + End Class + Public Function GetUnderlyingType() As Type Return type.GetType(knownFirst:=True) End Function @@ -102,19 +124,16 @@ Namespace IpcStream Return New MemoryStream(stream) End Function - Public Overrides Function Serialize() As Byte() - Using ms As New MemoryStream - Dim json As Byte() = Encoding.UTF8.GetBytes(type.GetJson) - - Call ms.Write(BitConverter.GetBytes(method), Scan0, RawStream.INT32) - Call ms.Write(BitConverter.GetBytes(json.Length), Scan0, RawStream.INT32) - Call ms.Write(json, Scan0, json.Length) - Call ms.Write(BitConverter.GetBytes(stream.Length), Scan0, RawStream.INT32) - Call ms.Write(stream, Scan0, stream.Length) + Public Overrides Sub Serialize(MS As Stream) + Dim json As Byte() = Encoding.UTF8.GetBytes(type.GetJson) - Return ms.ToArray - End Using - End Function + Call MS.Write(BitConverter.GetBytes(method), Scan0, RawStream.INT32) + Call MS.Write(BitConverter.GetBytes(json.Length), Scan0, RawStream.INT32) + Call MS.Write(json, Scan0, json.Length) + Call MS.Write(BitConverter.GetBytes(stream.Length), Scan0, RawStream.INT32) + Call MS.Write(stream, Scan0, stream.Length) + Call MS.Flush() + End Sub Protected Overridable Sub Dispose(disposing As Boolean) If Not disposedValue Then diff --git a/Parallel/Parallel.vbproj b/Parallel/Parallel.vbproj index 6f40e66..b5fc2f5 100644 --- a/Parallel/Parallel.vbproj +++ b/Parallel/Parallel.vbproj @@ -243,6 +243,7 @@ + @@ -294,6 +295,10 @@ + + {c473d2d3-65b3-0220-32a8-1e0687d9ae6b} + MessagePack + {fecce1fd-e1d4-49e3-a668-60bb5e7aed99} 47-dotnet_Microsoft.VisualBasic