From 17008764457e610fe0163df107deeffaf3133db9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E3=81=93=E3=81=AE=E4=B8=AD=E4=BA=8C=E7=97=85=E3=81=AB?= =?UTF-8?q?=E7=88=86=E7=84=94=E3=82=92=EF=BC=81?= Date: Wed, 13 Jan 2021 08:48:35 +0800 Subject: [PATCH] improvements of the task model --- Parallel/BufferBuilder.vb | 3 ++ Parallel/Parallel.vbproj | 1 + Parallel/SlaveTask.vb | 61 ++++++++++++++++++++++++++++++++------- 3 files changed, 54 insertions(+), 11 deletions(-) create mode 100644 Parallel/BufferBuilder.vb diff --git a/Parallel/BufferBuilder.vb b/Parallel/BufferBuilder.vb new file mode 100644 index 0000000..1f21d46 --- /dev/null +++ b/Parallel/BufferBuilder.vb @@ -0,0 +1,3 @@ +Public Class BufferBuilder + +End Class diff --git a/Parallel/Parallel.vbproj b/Parallel/Parallel.vbproj index bd5f821..bfd6319 100644 --- a/Parallel/Parallel.vbproj +++ b/Parallel/Parallel.vbproj @@ -64,6 +64,7 @@ + diff --git a/Parallel/SlaveTask.vb b/Parallel/SlaveTask.vb index bd0602a..ce37890 100644 --- a/Parallel/SlaveTask.vb +++ b/Parallel/SlaveTask.vb @@ -1,27 +1,66 @@ Imports System.IO +Imports System.Threading +Imports Microsoft.VisualBasic.CommandLine.InteropService Imports Microsoft.VisualBasic.MIME.application.json Imports Microsoft.VisualBasic.MIME.application.json.BSON -Imports Microsoft.VisualBasic.Scripting.MetaData + +Public Delegate Function ISlaveTask(processor As InteropService, port As Integer) As String Public Class SlaveTask + ReadOnly toBuffers As New Dictionary(Of Type, Func(Of Object, Stream)) + ReadOnly fromBuffer As New Dictionary(Of Type, Func(Of Stream, Object)) + ReadOnly processor As InteropService + ReadOnly builder As ISlaveTask + + Sub New(processor As InteropService, cli As ISlaveTask) + Me.builder = cli + Me.processor = processor + End Sub + + Public Function Emit(Of T)(streamAs As Func(Of T, Stream)) As SlaveTask + toBuffers(GetType(T)) = Function(obj) streamAs(obj) + Return Me + End Function + + Public Function Emit(Of T)(fromStream As Func(Of Stream, T)) As SlaveTask + fromBuffer(GetType(T)) = Function(buf) fromStream(buf) + Return Me + End Function + + Private Function handlePOST(buf As Stream, type As Type) As Object + If fromBuffer.ContainsKey(type) Then + Return fromBuffer(type)(buf) + Else + Return BSONFormat.Load(buf).CreateObject(type) + End If + End Function + + Private Function handleGET(param As Object) As Stream + Dim type As Type = param.GetType + + If toBuffers.ContainsKey(type) Then + Return toBuffers(type)(param) + Else + Dim element = type.GetJsonElement(param, New JSONSerializerOptions) + Dim buf As Stream = BSONFormat.GetBuffer(element) + + Return buf + End If + End Function + Public Function RunTask(entry As [Delegate], ParamArray parameters As Object()) As Object Dim target As New IDelegate(entry) Dim result As Object = Nothing Dim host As New IPCSocket With { - .handlePOSTResult = Sub(buf) - result = BSONFormat.Load(buf).CreateObject(entry.Method.ReturnType) - End Sub, + .handlePOSTResult = Sub(buf) result = handlePOST(buf, entry.Method.ReturnType), .nargs = parameters.Length, - .handleGetArgument = Function(i) - Dim type As Type = parameters(i).GetType - Dim element = type.GetJsonElement(parameters(i), New JSONSerializerOptions) - - Return BSONFormat.GetBuffer(element) - End Function + .handleGetArgument = Function(i) handleGET(parameters(i)) } - Call host.Run() + Call New Thread(AddressOf host.Run).Start() + Call Thread.Sleep(100) + Call CommandLine.Call(processor, builder(processor, host.HostPort)) Return result End Function