From 8adfdc6a43e1a7314c0df2a8a682f911ff3c21d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E3=81=93=E3=81=AE=E4=B8=AD=E4=BA=8C=E7=97=85=E3=81=AB?= =?UTF-8?q?=E7=88=86=E7=84=94=E3=82=92=EF=BC=81?= Date: Wed, 27 Jan 2021 09:18:19 +0800 Subject: [PATCH] Namespace IpcStream --- Parallel/IpcParallel/IPCSocket.vb | 2 + Parallel/IpcParallel/Protocols.vb | 1 + Parallel/IpcParallel/SlaveTask.vb | 2 +- Parallel/IpcParallel/Stream/EmitHandler.vb | 81 ++++----- Parallel/IpcParallel/Stream/IPCError.vb | 57 ++++--- Parallel/IpcParallel/Stream/ObjectStream.vb | 165 ++++++++++--------- Parallel/IpcParallel/Stream/StreamEmit.vb | 103 ++++++------ Parallel/IpcParallel/Stream/StreamMethods.vb | 13 +- Parallel/IpcParallel/TaskBuilder.vb | 9 +- Parallel/Parallel.vbproj | 1 + Parallel/ThreadTask/BatchTasks.vb | 12 +- Parallel/ThreadTask/TaskInvokeHelper.vb | 9 + 12 files changed, 237 insertions(+), 218 deletions(-) create mode 100644 Parallel/ThreadTask/TaskInvokeHelper.vb diff --git a/Parallel/IpcParallel/IPCSocket.vb b/Parallel/IpcParallel/IPCSocket.vb index e3ea96c..4afb536 100644 --- a/Parallel/IpcParallel/IPCSocket.vb +++ b/Parallel/IpcParallel/IPCSocket.vb @@ -8,6 +8,8 @@ Imports Microsoft.VisualBasic.Net.Protocols.Reflection Imports Microsoft.VisualBasic.Net.Tcp Imports Microsoft.VisualBasic.Parallel Imports Microsoft.VisualBasic.Serialization.JSON +Imports Parallel.IpcStream + #If netcore5 = 1 Then Imports randf = Microsoft.VisualBasic.Math.RandomExtensions #End If diff --git a/Parallel/IpcParallel/Protocols.vb b/Parallel/IpcParallel/Protocols.vb index 2ae25f7..e7158bd 100644 --- a/Parallel/IpcParallel/Protocols.vb +++ b/Parallel/IpcParallel/Protocols.vb @@ -4,4 +4,5 @@ GetArgumentByIndex PostStart PostResult + PostError End Enum diff --git a/Parallel/IpcParallel/SlaveTask.vb b/Parallel/IpcParallel/SlaveTask.vb index c6088c6..f432e26 100644 --- a/Parallel/IpcParallel/SlaveTask.vb +++ b/Parallel/IpcParallel/SlaveTask.vb @@ -1,7 +1,7 @@ Imports System.IO Imports System.Threading Imports Microsoft.VisualBasic.CommandLine.InteropService -Imports Microsoft.VisualBasic.Language +Imports Parallel.IpcStream Public Delegate Function ISlaveTask(processor As InteropService, port As Integer) As String diff --git a/Parallel/IpcParallel/Stream/EmitHandler.vb b/Parallel/IpcParallel/Stream/EmitHandler.vb index 09aa53f..bf8caf9 100644 --- a/Parallel/IpcParallel/Stream/EmitHandler.vb +++ b/Parallel/IpcParallel/Stream/EmitHandler.vb @@ -3,55 +3,58 @@ Imports System.Runtime.CompilerServices Imports System.Text Imports Microsoft.VisualBasic.ValueTypes -Module EmitHandler +Namespace IpcStream - Public Delegate Function toBuffer(obj As Object) As Stream - Public Delegate Function loadBuffer(buf As Stream) As Object + Module EmitHandler - Public Iterator Function PopulatePrimitiveHandles() As IEnumerable(Of (target As Type, emit As toBuffer)) - Yield (GetType(Integer), Function(i) New MemoryStream(BitConverter.GetBytes(DirectCast(i, Integer)))) - Yield (GetType(UInteger), Function(i) New MemoryStream(BitConverter.GetBytes(DirectCast(i, UInteger)))) + Public Delegate Function toBuffer(obj As Object) As Stream + Public Delegate Function loadBuffer(buf As Stream) As Object - Yield (GetType(Long), Function(l) New MemoryStream(BitConverter.GetBytes(DirectCast(l, Long)))) - Yield (GetType(ULong), Function(l) New MemoryStream(BitConverter.GetBytes(DirectCast(l, ULong)))) + Public Iterator Function PopulatePrimitiveHandles() As IEnumerable(Of (target As Type, emit As toBuffer)) + Yield (GetType(Integer), Function(i) New MemoryStream(BitConverter.GetBytes(DirectCast(i, Integer)))) + Yield (GetType(UInteger), Function(i) New MemoryStream(BitConverter.GetBytes(DirectCast(i, UInteger)))) - Yield (GetType(Single), Function(f) New MemoryStream(BitConverter.GetBytes(DirectCast(f, Single)))) - Yield (GetType(Double), Function(d) New MemoryStream(BitConverter.GetBytes(DirectCast(d, Double)))) + Yield (GetType(Long), Function(l) New MemoryStream(BitConverter.GetBytes(DirectCast(l, Long)))) + Yield (GetType(ULong), Function(l) New MemoryStream(BitConverter.GetBytes(DirectCast(l, ULong)))) - Yield (GetType(UShort), Function(s) New MemoryStream(BitConverter.GetBytes(DirectCast(s, UShort)))) - Yield (GetType(Boolean), Function(b) New MemoryStream(New Byte() {If(DirectCast(b, Boolean), 1, 0)})) - Yield (GetType(String), Function(s) New MemoryStream(Encoding.UTF8.GetBytes(DirectCast(s, String)))) - Yield (GetType(Char), Function(c) New MemoryStream(BitConverter.GetBytes(AscW(DirectCast(c, Char))))) + Yield (GetType(Single), Function(f) New MemoryStream(BitConverter.GetBytes(DirectCast(f, Single)))) + Yield (GetType(Double), Function(d) New MemoryStream(BitConverter.GetBytes(DirectCast(d, Double)))) - Yield (GetType(Byte), Function(b) New MemoryStream({DirectCast(b, Byte)})) - Yield (GetType(Date), Function(d) New MemoryStream(BitConverter.GetBytes(DirectCast(d, Date).UnixTimeStamp))) - End Function + Yield (GetType(UShort), Function(s) New MemoryStream(BitConverter.GetBytes(DirectCast(s, UShort)))) + Yield (GetType(Boolean), Function(b) New MemoryStream(New Byte() {If(DirectCast(b, Boolean), 1, 0)})) + Yield (GetType(String), Function(s) New MemoryStream(Encoding.UTF8.GetBytes(DirectCast(s, String)))) + Yield (GetType(Char), Function(c) New MemoryStream(BitConverter.GetBytes(AscW(DirectCast(c, Char))))) - Public Iterator Function PopulatePrimitiveParsers() As IEnumerable(Of (target As Type, emit As loadBuffer)) - Yield (GetType(Integer), Function(i) BitConverter.ToInt32(i.readAllBytes, Scan0)) - Yield (GetType(UInteger), Function(i) BitConverter.ToUInt32(i.readAllBytes, Scan0)) + Yield (GetType(Byte), Function(b) New MemoryStream({DirectCast(b, Byte)})) + Yield (GetType(Date), Function(d) New MemoryStream(BitConverter.GetBytes(DirectCast(d, Date).UnixTimeStamp))) + End Function - Yield (GetType(Long), Function(l) BitConverter.ToInt64(l.readAllBytes, Scan0)) - Yield (GetType(ULong), Function(l) BitConverter.ToUInt64(l.readAllBytes, Scan0)) + Public Iterator Function PopulatePrimitiveParsers() As IEnumerable(Of (target As Type, emit As loadBuffer)) + Yield (GetType(Integer), Function(i) BitConverter.ToInt32(i.readAllBytes, Scan0)) + Yield (GetType(UInteger), Function(i) BitConverter.ToUInt32(i.readAllBytes, Scan0)) - Yield (GetType(Single), Function(f) BitConverter.ToSingle(f.readAllBytes, Scan0)) - Yield (GetType(Double), Function(d) BitConverter.ToDouble(d.readAllBytes, Scan0)) + Yield (GetType(Long), Function(l) BitConverter.ToInt64(l.readAllBytes, Scan0)) + Yield (GetType(ULong), Function(l) BitConverter.ToUInt64(l.readAllBytes, Scan0)) - Yield (GetType(Short), Function(s) BitConverter.ToInt16(s.readAllBytes, Scan0)) - Yield (GetType(UShort), Function(s) BitConverter.ToUInt16(s.readAllBytes, Scan0)) + Yield (GetType(Single), Function(f) BitConverter.ToSingle(f.readAllBytes, Scan0)) + Yield (GetType(Double), Function(d) BitConverter.ToDouble(d.readAllBytes, Scan0)) - Yield (GetType(Boolean), Function(b) b.readAllBytes()(Scan0) <> 0) - Yield (GetType(String), Function(s) Encoding.UTF8.GetString(s.readAllBytes)) - Yield (GetType(Char), Function(c) ChrW(BitConverter.ToInt32(c.readAllBytes, Scan0))) + Yield (GetType(Short), Function(s) BitConverter.ToInt16(s.readAllBytes, Scan0)) + Yield (GetType(UShort), Function(s) BitConverter.ToUInt16(s.readAllBytes, Scan0)) - Yield (GetType(Byte), Function(b) b.readAllBytes()(Scan0)) - Yield (GetType(Date), Function(d) FromUnixTimeStamp(BitConverter.ToDouble(d.readAllBytes, Scan0))) - End Function + Yield (GetType(Boolean), Function(b) b.readAllBytes()(Scan0) <> 0) + Yield (GetType(String), Function(s) Encoding.UTF8.GetString(s.readAllBytes)) + Yield (GetType(Char), Function(c) ChrW(BitConverter.ToInt32(c.readAllBytes, Scan0))) - - Private Function readAllBytes(buf As Stream) As Byte() - Dim bytes As Byte() = New Byte(buf.Length - 1) {} - Call buf.Read(bytes, Scan0, bytes.Length - 1) - Return bytes - End Function -End Module + Yield (GetType(Byte), Function(b) b.readAllBytes()(Scan0)) + Yield (GetType(Date), Function(d) FromUnixTimeStamp(BitConverter.ToDouble(d.readAllBytes, Scan0))) + End Function + + + Private Function readAllBytes(buf As Stream) As Byte() + Dim bytes As Byte() = New Byte(buf.Length - 1) {} + Call buf.Read(bytes, Scan0, bytes.Length - 1) + Return bytes + End Function + End Module +End Namespace \ No newline at end of file diff --git a/Parallel/IpcParallel/Stream/IPCError.vb b/Parallel/IpcParallel/Stream/IPCError.vb index a0a8f0b..857fdf2 100644 --- a/Parallel/IpcParallel/Stream/IPCError.vb +++ b/Parallel/IpcParallel/Stream/IPCError.vb @@ -1,37 +1,40 @@ Imports Microsoft.VisualBasic.ApplicationServices.Debugging.Diagnostics -Public Class IPCError +Namespace IpcStream - Public Property message As String - Public Property stackTrace As StackFrame() - Public Property inner As IPCError - Public Property exceptionName As String + Public Class IPCError - Sub New() - End Sub + Public Property message As String + Public Property stackTrace As StackFrame() + Public Property inner As IPCError + Public Property exceptionName As String - Sub New(ex As Exception) - exceptionName = ex.GetType.Name - message = ex.Message - stackTrace = ExceptionData.ParseStackTrace(ex.StackTrace) + Sub New() + End Sub - If Not ex.InnerException Is Nothing Then - inner = New IPCError(ex.InnerException) - End If - End Sub + Sub New(ex As Exception) + exceptionName = ex.GetType.Name + message = ex.Message + stackTrace = ExceptionData.ParseStackTrace(ex.StackTrace) - Public Iterator Function GetAllErrorMessages() As IEnumerable(Of String) - If Not inner Is Nothing Then - For Each msg As String In inner.GetAllErrorMessages - Yield msg - Next - End If + If Not ex.InnerException Is Nothing Then + inner = New IPCError(ex.InnerException) + End If + End Sub - Yield $"{exceptionName}: {message}" - End Function + Public Iterator Function GetAllErrorMessages() As IEnumerable(Of String) + If Not inner Is Nothing Then + For Each msg As String In inner.GetAllErrorMessages + Yield msg + Next + End If - Public Overrides Function ToString() As String - Return $"{exceptionName}: {message}" - End Function + Yield $"{exceptionName}: {message}" + End Function -End Class + Public Overrides Function ToString() As String + Return $"{exceptionName}: {message}" + End Function + + End Class +End Namespace \ No newline at end of file diff --git a/Parallel/IpcParallel/Stream/ObjectStream.vb b/Parallel/IpcParallel/Stream/ObjectStream.vb index b6ca5db..f766e42 100644 --- a/Parallel/IpcParallel/Stream/ObjectStream.vb +++ b/Parallel/IpcParallel/Stream/ObjectStream.vb @@ -6,85 +6,88 @@ Imports Microsoft.VisualBasic.Scripting.MetaData Imports Microsoft.VisualBasic.Serialization Imports Microsoft.VisualBasic.Serialization.JSON -Public Class ObjectStream : Inherits RawStream - Implements IDisposable - - Dim disposedValue As Boolean - - Public Property method As StreamMethods - Public Property stream As Byte() - Public Property type As TypeInfo - - Public ReadOnly Property isPrimitive As Boolean - Get - Return DataFramework.IsPrimitive(type.GetType(knownFirst:=True)) - End Get - End Property - - Sub New(type As TypeInfo, method As StreamMethods, stream As Stream) - Me.method = method - Me.stream = New StreamPipe(stream).Read - Me.type = type - - Call stream.Close() - Call stream.Dispose() - End Sub - - Sub New(raw As Byte()) - Using read As New BinaryReader(New MemoryStream(raw)) - Dim methodi As Integer = read.ReadInt32 - Dim size As Integer = read.ReadInt32 - Dim chunk As Byte() = read.ReadBytes(size) - Dim typeJson As String = chunk.UTF8String - - size = read.ReadInt32 - type = typeJson.LoadJSON(Of TypeInfo) - method = CType(methodi, StreamMethods) - stream = read.ReadBytes(size) - End Using - End Sub - - Public Function openMemoryBuffer() As MemoryStream - Return New MemoryStream(stream) - End Function - - Public Overrides Function Serialize() As Byte() - Using ms As New MemoryStream - Dim json As Byte() = Encoding.UTF8.GetBytes(type.GetJson) - - Call ms.Write(BitConverter.GetBytes(method), Scan0, RawStream.INT32) - Call ms.Write(BitConverter.GetBytes(json.Length), Scan0, RawStream.INT32) - Call ms.Write(json, Scan0, json.Length) - Call ms.Write(BitConverter.GetBytes(stream.Length), Scan0, RawStream.INT32) - Call ms.Write(stream, Scan0, stream.Length) - - Return ms.ToArray - End Using - End Function - - Protected Overridable Sub Dispose(disposing As Boolean) - If Not disposedValue Then - If disposing Then - ' TODO: 释放托管状态(托管对象) - Erase stream +Namespace IpcStream + + Public Class ObjectStream : Inherits RawStream + Implements IDisposable + + Dim disposedValue As Boolean + + Public Property method As StreamMethods + Public Property stream As Byte() + Public Property type As TypeInfo + + Public ReadOnly Property isPrimitive As Boolean + Get + Return DataFramework.IsPrimitive(type.GetType(knownFirst:=True)) + End Get + End Property + + Sub New(type As TypeInfo, method As StreamMethods, stream As Stream) + Me.method = method + Me.stream = New StreamPipe(stream).Read + Me.type = type + + Call stream.Close() + Call stream.Dispose() + End Sub + + Sub New(raw As Byte()) + Using read As New BinaryReader(New MemoryStream(raw)) + Dim methodi As Integer = read.ReadInt32 + Dim size As Integer = read.ReadInt32 + Dim chunk As Byte() = read.ReadBytes(size) + Dim typeJson As String = chunk.UTF8String + + size = read.ReadInt32 + type = typeJson.LoadJSON(Of TypeInfo) + method = CType(methodi, StreamMethods) + stream = read.ReadBytes(size) + End Using + End Sub + + Public Function openMemoryBuffer() As MemoryStream + Return New MemoryStream(stream) + End Function + + Public Overrides Function Serialize() As Byte() + Using ms As New MemoryStream + Dim json As Byte() = Encoding.UTF8.GetBytes(type.GetJson) + + Call ms.Write(BitConverter.GetBytes(method), Scan0, RawStream.INT32) + Call ms.Write(BitConverter.GetBytes(json.Length), Scan0, RawStream.INT32) + Call ms.Write(json, Scan0, json.Length) + Call ms.Write(BitConverter.GetBytes(stream.Length), Scan0, RawStream.INT32) + Call ms.Write(stream, Scan0, stream.Length) + + Return ms.ToArray + End Using + End Function + + Protected Overridable Sub Dispose(disposing As Boolean) + If Not disposedValue Then + If disposing Then + ' TODO: 释放托管状态(托管对象) + Erase stream + End If + + ' TODO: 释放未托管的资源(未托管的对象)并替代终结器 + ' TODO: 将大型字段设置为 null + disposedValue = True End If - - ' TODO: 释放未托管的资源(未托管的对象)并替代终结器 - ' TODO: 将大型字段设置为 null - disposedValue = True - End If - End Sub - - ' ' TODO: 仅当“Dispose(disposing As Boolean)”拥有用于释放未托管资源的代码时才替代终结器 - ' Protected Overrides Sub Finalize() - ' ' 不要更改此代码。请将清理代码放入“Dispose(disposing As Boolean)”方法中 - ' Dispose(disposing:=False) - ' MyBase.Finalize() - ' End Sub - - Public Sub Dispose() Implements IDisposable.Dispose - ' 不要更改此代码。请将清理代码放入“Dispose(disposing As Boolean)”方法中 - Dispose(disposing:=True) - GC.SuppressFinalize(Me) - End Sub -End Class + End Sub + + ' ' TODO: 仅当“Dispose(disposing As Boolean)”拥有用于释放未托管资源的代码时才替代终结器 + ' Protected Overrides Sub Finalize() + ' ' 不要更改此代码。请将清理代码放入“Dispose(disposing As Boolean)”方法中 + ' Dispose(disposing:=False) + ' MyBase.Finalize() + ' End Sub + + Public Sub Dispose() Implements IDisposable.Dispose + ' 不要更改此代码。请将清理代码放入“Dispose(disposing As Boolean)”方法中 + Dispose(disposing:=True) + GC.SuppressFinalize(Me) + End Sub + End Class +End Namespace \ No newline at end of file diff --git a/Parallel/IpcParallel/Stream/StreamEmit.vb b/Parallel/IpcParallel/Stream/StreamEmit.vb index e6f8d90..1299403 100644 --- a/Parallel/IpcParallel/Stream/StreamEmit.vb +++ b/Parallel/IpcParallel/Stream/StreamEmit.vb @@ -3,56 +3,59 @@ Imports Microsoft.VisualBasic.MIME.application.json Imports Microsoft.VisualBasic.MIME.application.json.BSON Imports Microsoft.VisualBasic.Scripting.MetaData -Public Class StreamEmit - - ReadOnly toBuffers As New Dictionary(Of Type, toBuffer) - ReadOnly loadBuffers As New Dictionary(Of Type, loadBuffer) - - Sub New() - For Each [handle] In EmitHandler.PopulatePrimitiveHandles - toBuffers(handle.target) = handle.emit - Next - For Each [handle] In EmitHandler.PopulatePrimitiveParsers - loadBuffers(handle.target) = handle.emit - Next - End Sub - - Public Function Emit(Of T)(streamAs As Func(Of T, Stream)) As StreamEmit - toBuffers(GetType(T)) = Function(obj) streamAs(obj) - Return Me - End Function - - Public Function Emit(Of T)(fromStream As Func(Of Stream, T)) As StreamEmit - loadBuffers(GetType(T)) = Function(buf) fromStream(buf) - Return Me - End Function - - Public Function handleCreate(buf As Stream, type As Type, emit As StreamMethods) As Object - If emit = StreamMethods.Auto Then - If loadBuffers.ContainsKey(type) Then +Namespace IpcStream + + Public Class StreamEmit + + ReadOnly toBuffers As New Dictionary(Of Type, toBuffer) + ReadOnly loadBuffers As New Dictionary(Of Type, loadBuffer) + + Sub New() + For Each [handle] In EmitHandler.PopulatePrimitiveHandles + toBuffers(handle.target) = handle.emit + Next + For Each [handle] In EmitHandler.PopulatePrimitiveParsers + loadBuffers(handle.target) = handle.emit + Next + End Sub + + Public Function Emit(Of T)(streamAs As Func(Of T, Stream)) As StreamEmit + toBuffers(GetType(T)) = Function(obj) streamAs(obj) + Return Me + End Function + + Public Function Emit(Of T)(fromStream As Func(Of Stream, T)) As StreamEmit + loadBuffers(GetType(T)) = Function(buf) fromStream(buf) + Return Me + End Function + + Public Function handleCreate(buf As Stream, type As Type, emit As StreamMethods) As Object + If emit = StreamMethods.Auto Then + If loadBuffers.ContainsKey(type) Then + Return loadBuffers(type)(buf) + Else + Return BSONFormat.Load(buf).CreateObject(type) + End If + ElseIf emit = StreamMethods.BSON Then + Return BSONFormat.Load(buf).CreateObject(type) + ElseIf loadBuffers.ContainsKey(type) Then Return loadBuffers(type)(buf) Else - Return BSONFormat.Load(buf).CreateObject(type) + Throw New NotImplementedException + End If + End Function + + Public Function handleSerialize(param As Object) As ObjectStream + Dim type As Type = param.GetType + + If toBuffers.ContainsKey(type) Then + Return New ObjectStream(New TypeInfo(type, fullpath:=True), StreamMethods.Emit, toBuffers(type)(param)) + Else + Dim element = type.GetJsonElement(param, New JSONSerializerOptions) + Dim buf As Stream = BSONFormat.SafeGetBuffer(element) + + Return New ObjectStream(New TypeInfo(type, fullpath:=True), StreamMethods.BSON, buf) End If - ElseIf emit = StreamMethods.BSON Then - Return BSONFormat.Load(buf).CreateObject(type) - ElseIf loadBuffers.ContainsKey(type) Then - Return loadBuffers(type)(buf) - Else - Throw New NotImplementedException - End If - End Function - - Public Function handleSerialize(param As Object) As ObjectStream - Dim type As Type = param.GetType - - If toBuffers.ContainsKey(type) Then - Return New ObjectStream(New TypeInfo(type, fullpath:=True), StreamMethods.Emit, toBuffers(type)(param)) - Else - Dim element = type.GetJsonElement(param, New JSONSerializerOptions) - Dim buf As Stream = BSONFormat.SafeGetBuffer(element) - - Return New ObjectStream(New TypeInfo(type, fullpath:=True), StreamMethods.BSON, buf) - End If - End Function -End Class + End Function + End Class +End Namespace \ No newline at end of file diff --git a/Parallel/IpcParallel/Stream/StreamMethods.vb b/Parallel/IpcParallel/Stream/StreamMethods.vb index d7fb5bb..d199edf 100644 --- a/Parallel/IpcParallel/Stream/StreamMethods.vb +++ b/Parallel/IpcParallel/Stream/StreamMethods.vb @@ -1,5 +1,8 @@ -Public Enum StreamMethods - Auto - BSON - Emit -End Enum +Namespace IpcStream + + Public Enum StreamMethods + Auto + BSON + Emit + End Enum +End Namespace \ No newline at end of file diff --git a/Parallel/IpcParallel/TaskBuilder.vb b/Parallel/IpcParallel/TaskBuilder.vb index d956b26..fa04887 100644 --- a/Parallel/IpcParallel/TaskBuilder.vb +++ b/Parallel/IpcParallel/TaskBuilder.vb @@ -8,6 +8,7 @@ Imports Microsoft.VisualBasic.MIME.application.json Imports Microsoft.VisualBasic.Net.Tcp Imports Microsoft.VisualBasic.Parallel Imports Microsoft.VisualBasic.Serialization.JSON +Imports Parallel.IpcStream Public Class TaskBuilder : Implements ITaskDriver @@ -46,7 +47,7 @@ Public Class TaskBuilder : Implements ITaskDriver Call New TcpRequest(masterPort).SendMessage(New RequestStream(IPCSocket.Protocol, Protocols.PostStart)) Try - Call PostFinished(api.Invoke(target, args.ToArray)) + Call PostFinished(api.Invoke(target, args.ToArray), Protocols.PostResult) Catch ex As Exception Call PostError(ex) Finally @@ -87,16 +88,16 @@ Public Class TaskBuilder : Implements ITaskDriver End Function Private Function PostError(err As Exception) As Integer - Call PostFinished(New IPCError(err)) + Call PostFinished(New IPCError(err), Protocols.PostError) Return 500 End Function - Private Sub PostFinished(result As Object) + Private Sub PostFinished(result As Object, protocol As Protocols) Using buf As Stream = emit.handleSerialize(result).openMemoryBuffer Dim request As New RequestStream( protocolCategory:=IPCSocket.Protocol, - protocol:=Protocols.PostResult, + protocol:=protocol, buffer:=New StreamPipe(buf).Read ) diff --git a/Parallel/Parallel.vbproj b/Parallel/Parallel.vbproj index b96570b..590950c 100644 --- a/Parallel/Parallel.vbproj +++ b/Parallel/Parallel.vbproj @@ -150,6 +150,7 @@ True + diff --git a/Parallel/ThreadTask/BatchTasks.vb b/Parallel/ThreadTask/BatchTasks.vb index 14776d2..9636301 100644 --- a/Parallel/ThreadTask/BatchTasks.vb +++ b/Parallel/ThreadTask/BatchTasks.vb @@ -48,7 +48,7 @@ Public Module BatchTasks Public Sub Invoke(tasks As Action(), numOfThreads As Integer) Dim getTask As Func(Of Action, Func(Of Integer)) = Function(task) - Return AddressOf New invokeHelper With { + Return AddressOf New TaskInvokeHelper With { .task = task }.RunTask End Function @@ -59,14 +59,4 @@ Public Module BatchTasks .RunParallel() _ .ToArray End Sub - - Private Structure invokeHelper - - Dim task As Action - - Public Function RunTask() As Integer - Call task() - Return 0 - End Function - End Structure End Module diff --git a/Parallel/ThreadTask/TaskInvokeHelper.vb b/Parallel/ThreadTask/TaskInvokeHelper.vb new file mode 100644 index 0000000..303a98b --- /dev/null +++ b/Parallel/ThreadTask/TaskInvokeHelper.vb @@ -0,0 +1,9 @@ +Friend Structure TaskInvokeHelper + + Dim task As Action + + Public Function RunTask() As Integer + Call task() + Return 0 + End Function +End Structure \ No newline at end of file