From 16c5a7f38155d0261623d5eb374ede800502bbf2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E3=81=93=E3=81=AE=E4=B8=AD=E4=BA=8C=E7=97=85=E3=81=AB?= =?UTF-8?q?=E7=88=86=E7=84=94=E3=82=92=EF=BC=81?= Date: Thu, 14 Jan 2021 11:25:22 +0800 Subject: [PATCH] apply stdoutput instead of the socket data connection for avoid socket bugs on unix .NET 5 runtime --- Parallel/SlaveTask.vb | 31 ++++++++++++++++++++++++++++--- Parallel/TaskBuilder.vb | 20 ++++++++++++++++---- 2 files changed, 44 insertions(+), 7 deletions(-) diff --git a/Parallel/SlaveTask.vb b/Parallel/SlaveTask.vb index ae96af9..22996e7 100644 --- a/Parallel/SlaveTask.vb +++ b/Parallel/SlaveTask.vb @@ -1,6 +1,7 @@ Imports System.IO Imports System.Threading Imports Microsoft.VisualBasic.CommandLine.InteropService +Imports Microsoft.VisualBasic.Language Imports Microsoft.VisualBasic.MIME.application.json Imports Microsoft.VisualBasic.MIME.application.json.BSON Imports Microsoft.VisualBasic.Scripting.MetaData @@ -60,9 +61,10 @@ Public Class SlaveTask Dim target As New IDelegate(entry) Dim result As Object = Nothing Dim host As IPCSocket = Nothing + Dim resultType As Type = entry.Method.ReturnType host = New IPCSocket(target, debugPort) With { - .handlePOSTResult = Sub(buf) result = handlePOST(buf, entry.Method.ReturnType, host.GetHashCode), + .handlePOSTResult = Sub(buf) result = handlePOST(buf, resultType, host.GetHashCode), .nargs = parameters.Length, .handleGetArgument = Function(i) handleGET(parameters(i), i, host.GetHashCode) } @@ -72,20 +74,43 @@ Public Class SlaveTask Call New Thread(AddressOf host.Run).Start() Call Thread.Sleep(100) + Dim resultStream As MemoryStream + Dim commandlineArgvs As String = builder(processor, host.HostPort) + 'If Not debugPort Is Nothing Then ' Pause() 'End If #If netcore5 = 0 Then - Call CommandLine.Call(processor, builder(processor, host.HostPort),, dotnet:=False) + resultStream = CommandLine.CallDotNetCorePipeline(processor, commandlineArgvs) #Else - Call CommandLine.Call(processor, builder(processor, host.HostPort),, dotnet:=True) + resultStream = CommandLine.CallDotNetCorePipeline(processor, commandlineArgvs) #End If Call host.Stop() Call Console.WriteLine($"[{host.GetHashCode.ToHexString}] thread exit...") + result = decomposingStdoutput(resultStream, resultType, host.GetHashCode) + resultStream.Dispose() + Return result End Function + Private Function decomposingStdoutput(buffer As MemoryStream, type As Type, host As Integer) As Object + Using reader As New StreamReader(buffer) + Do While reader.ReadLine <> TaskBuilder.streamDelimiter + Loop + + Dim dataSize As Integer = Integer.Parse(reader.ReadLine) + Dim chunkBuffer As Byte() = New Byte(dataSize - 1) {} + + buffer.Seek(buffer.Length - dataSize, SeekOrigin.Begin) + buffer.Read(chunkBuffer, Scan0, chunkBuffer.Length) + + Using resultBuffer As New MemoryStream(chunkBuffer) + Return handlePOST(resultBuffer, type, host) + End Using + End Using + End Function + End Class diff --git a/Parallel/TaskBuilder.vb b/Parallel/TaskBuilder.vb index 0b37262..9ec1b2b 100644 --- a/Parallel/TaskBuilder.vb +++ b/Parallel/TaskBuilder.vb @@ -1,5 +1,6 @@ Imports System.IO Imports System.Reflection +Imports System.Text #If netcore5 = 1 Then Imports Microsoft.VisualBasic.ApplicationServices.Development.NetCore5 #End If @@ -8,6 +9,7 @@ Imports Microsoft.VisualBasic.MIME.application.json Imports Microsoft.VisualBasic.MIME.application.json.BSON Imports Microsoft.VisualBasic.Net.Tcp Imports Microsoft.VisualBasic.Parallel +Imports Microsoft.VisualBasic.Serialization Imports Microsoft.VisualBasic.Serialization.JSON Public Class TaskBuilder : Implements ITaskDriver @@ -18,6 +20,10 @@ Public Class TaskBuilder : Implements ITaskDriver masterPort = port End Sub + Shared Sub New() + Console.Out.NewLine = vbLf + End Sub + Public Function Run() As Integer Implements ITaskDriver.Run Dim task As IDelegate = GetMethod() Dim api As MethodInfo = task.GetMethod @@ -40,7 +46,8 @@ Public Class TaskBuilder : Implements ITaskDriver ' send debug message Call New TcpRequest(masterPort).SendMessage(New RequestStream(IPCSocket.Protocol, Protocols.PostStart)) - Call PostFinished(api.Invoke(Nothing, args.ToArray)) + ' Call PostFinished(api.Invoke(Nothing, args.ToArray)) + Call PostStdOut(api.Invoke(Nothing, args.ToArray)) Return 0 End Function @@ -83,6 +90,8 @@ Public Class TaskBuilder : Implements ITaskDriver Return 500 End Function + Friend Const streamDelimiter As String = "------endoftext------" + ''' ''' 将结果数据写到标准输出上 ''' @@ -91,11 +100,14 @@ Public Class TaskBuilder : Implements ITaskDriver Dim type As Type = result.GetType Dim element = type.GetJsonElement(result, New JSONSerializerOptions) Dim buf As MemoryStream = BSONFormat.SafeGetBuffer(element) + Dim bufferSize As String = buf.Length + Dim outstream As Stream = Console.OpenStandardOutput - Call Console.Out.Flush() + Call Console.WriteLine(streamDelimiter) + Call Console.WriteLine(bufferSize) + Call Console.Write(vbCrLf) - Using stdout As New BinaryWriter(Console.OpenStandardOutput) - Call stdout.Write(New Byte(32 - 1) {}) + Using stdout As New BinaryWriter(outstream) Call stdout.Write(buf.ToArray) Call stdout.Flush() End Using