diff --git a/Distribute_computing/GridDynamics_plugins/CalculateFitness.vb b/Distribute_computing/GridDynamics_plugins/CalculateFitness.vb index e671265..0ed3c10 100644 --- a/Distribute_computing/GridDynamics_plugins/CalculateFitness.vb +++ b/Distribute_computing/GridDynamics_plugins/CalculateFitness.vb @@ -1,7 +1,9 @@ Imports System.IO Imports System.Runtime.CompilerServices +Imports System.Threading Imports Microsoft.VisualBasic.ApplicationServices.Plugin Imports Microsoft.VisualBasic.ComponentModel.DataSourceModel +Imports Microsoft.VisualBasic.Language Imports Microsoft.VisualBasic.MachineLearning.Darwinism.Models Imports Microsoft.VisualBasic.MachineLearning.Darwinism.NonlinearGridTopology Imports Microsoft.VisualBasic.Net.Http @@ -9,10 +11,15 @@ Imports Microsoft.VisualBasic.Parallel.Tasks Imports Microsoft.VisualBasic.Serialization.JSON Imports sciBASIC.ComputingServices.TaskHost +Public Structure SlaveTask + Dim output As String + Dim task As AsyncHandle(Of Integer) +End Structure + Public Module CalculateFitness - Public Function MultipleProcessParallel(comparator As FitnessPool(Of Genome), source As IEnumerable(Of Genome)) As IEnumerable(Of NamedValue(Of Double)) + Public Iterator Function MultipleProcessParallel(comparator As FitnessPool(Of Genome), source As IEnumerable(Of Genome)) As IEnumerable(Of NamedValue(Of Double)) Dim individuals As Genome() = source.ToArray Dim partitionSize = individuals.Length / App.CPUCoreNumbers Dim partitions = individuals.Split(partitionSize) @@ -20,17 +27,53 @@ Public Module CalculateFitness ' 在这里folk出多条进程进行并行计算 ' 这个方法可以极大的提升程序在Linux平台上面的计算效率 Dim compute As [Delegate] = New Func(Of String, String, NamedValue(Of Double)())(AddressOf SlaveProcess) - Dim endPoint As String = Base64Codec.Base64String(InvokeInfo.CreateObject(compute, {}).GetJson) Dim slave = CLI.thinking.FromEnvironment(App.HOME) - Dim folks As New List(Of AsyncHandle(Of Integer)) + Dim folks As New List(Of SlaveTask) Dim trainingSet = DirectCast(comparator.evaluateFitness, Environment) _ .GetTrainingSet() _ .writeMemory For Each block As Genome() In partitions ' 将数据写入内存 - Dim inputs = block.Select(Function(g) g.CreateSnapshot()) + Dim inputs As String = block _ + .Select(Function(g) g.CreateSnapshot()) _ + .ToArray _ + .writeMemory + Dim application = Base64Codec.Base64String(InvokeInfo.CreateObject(compute, {inputs, trainingSet}).GetJson) + Dim output = inputs _ + .Select(Function(null) + Return New NamedValue(Of Double) With { + .Name = New String("-"c, 32), + .Value = 0, + .Description = .Name + } + End Function) _ + .ToArray _ + .writeMemory + + folks += New SlaveTask With { + .output = output, + .task = New AsyncHandle(Of Integer)(Function() slave.Slave(application, output)).Run + } Next + + Do While folks > 0 + Dim success = folks.FirstOrDefault(Function(folk) folk.task.IsCompleted) + + If success.output Is Nothing Then + Thread.Sleep(1) + Continue Do + End If + + Using reader As New StreamReader(CommandLine.OpenForRead(success.output)) + Dim result = reader.ReadToEnd + Dim dataset = result.LoadJSON(Of NamedValue(Of Double)()) + + For Each fitness As NamedValue(Of Double) In dataset + Yield fitness + Next + End Using + Loop End Function diff --git a/Distribute_computing/GridDynamics_plugins/thinking.vb b/Distribute_computing/GridDynamics_plugins/thinking.vb index 242604c..be217cb 100644 --- a/Distribute_computing/GridDynamics_plugins/thinking.vb +++ b/Distribute_computing/GridDynamics_plugins/thinking.vb @@ -37,41 +37,40 @@ Imports Microsoft.VisualBasic.ApplicationServices Namespace CLI -''' -''' thinking.CLI -''' -''' -Public Class thinking : Inherits InteropService + ''' + ''' thinking.CLI + ''' + ''' + Public Class thinking : Inherits InteropService - Public Const App$ = "thinking.exe" + Public Const App$ = "thinking.exe" - Sub New(App$) - MyBase._executableAssembly = App$ - End Sub + Sub New(App$) + MyBase._executableAssembly = App$ + End Sub - - Public Shared Function FromEnvironment(directory As String) As thinking - Return New thinking(App:=directory & "/" & thinking.App) - End Function + + Public Shared Function FromEnvironment(directory As String) As thinking + Return New thinking(App:=directory & "/" & thinking.App) + End Function -''' -''' ``` -''' /slave /application <json_base64> /arguments <memory_mapfile> /out <memory_mapfile> -''' ``` -''' Program running in slave mode, apply for the multiple-process parallel. -''' -''' -Public Function Slave(application As String, arguments As String, out As String) As Integer - Dim CLI As New StringBuilder("/slave") - Call CLI.Append(" ") - Call CLI.Append("/application " & """" & application & """ ") - Call CLI.Append("/arguments " & """" & arguments & """ ") - Call CLI.Append("/out " & """" & out & """ ") - Call CLI.Append("/@set --internal_pipeline=TRUE ") + ''' + ''' ``` + ''' /slave /application <invokeinfo/json_base64> /out <memory_mapfile> + ''' ``` + ''' Program running in slave mode, apply for the multiple-process parallel. + ''' + ''' + Public Function Slave(application As String, out As String) As Integer + Dim CLI As New StringBuilder("/slave") + Call CLI.Append(" ") + Call CLI.Append("/application " & """" & application & """ ") + Call CLI.Append("/out " & """" & out & """ ") + Call CLI.Append("/@set --internal_pipeline=TRUE ") - Dim proc As IIORedirectAbstract = RunDotNetApp(CLI.ToString()) - Return proc.Run() -End Function -End Class + Dim proc As IIORedirectAbstract = RunDotNetApp(CLI.ToString()) + Return proc.Run() + End Function + End Class End Namespace