From 6449dca5f6b1589e76ca895c0fbf546517818b26 Mon Sep 17 00:00:00 2001 From: xieguigang Date: Fri, 19 Mar 2021 19:29:41 +0800 Subject: [PATCH] Namespace ThreadTask --- Parallel/IpcParallel/IDelegate.vb | 13 ++ Parallel/ThreadTask/BatchTasks.vb | 121 +++++++------- Parallel/ThreadTask/TaskInvokeHelper.vb | 89 ++++++----- Parallel/ThreadTask/ThreadTask.vb | 204 ++++++++++++------------ 4 files changed, 225 insertions(+), 202 deletions(-) diff --git a/Parallel/IpcParallel/IDelegate.vb b/Parallel/IpcParallel/IDelegate.vb index 853e11e..e4cca78 100644 --- a/Parallel/IpcParallel/IDelegate.vb +++ b/Parallel/IpcParallel/IDelegate.vb @@ -49,8 +49,15 @@ Imports Microsoft.VisualBasic.ApplicationServices.Development.NetCore5 #End If Imports TypeInfo = Microsoft.VisualBasic.Scripting.MetaData.TypeInfo +''' +''' remote method handler +''' Public Class IDelegate + ''' + ''' the function name + ''' + ''' Public Property name As String Public Property type As TypeInfo ''' @@ -62,6 +69,12 @@ Public Class IDelegate Sub New() End Sub + ''' + ''' + ''' + ''' + ''' The name of this target method should be unique! + ''' Sub New(target As MethodInfo) type = New TypeInfo(target.DeclaringType) name = target.Name diff --git a/Parallel/ThreadTask/BatchTasks.vb b/Parallel/ThreadTask/BatchTasks.vb index 249ec05..0c07271 100644 --- a/Parallel/ThreadTask/BatchTasks.vb +++ b/Parallel/ThreadTask/BatchTasks.vb @@ -45,62 +45,67 @@ Imports System.Runtime.CompilerServices Imports Microsoft.VisualBasic.CommandLine Imports Microsoft.VisualBasic.Language -Public Module BatchTasks - - ''' - ''' Folk this program itself for the large amount data batch processing. - ''' - ''' Self folk processing commandline collection. - ''' If this parameter value less than 1, then will be a single - ''' thread task. Any positive value that greater than 1 will be parallel task. - ''' (小于等于零表示非并行化,单线程任务) - ''' - ''' - ''' Returns the total executation time for running this task collection. - ''' (返回任务的执行的总时长) - ''' - Public Function SelfFolks(CLI As IEnumerable(Of String), Optional parallel% = 0) As Long - Dim sw As Stopwatch = Stopwatch.StartNew - - If parallel <= 0 Then - For Each args As String In CLI - Call App.SelfFolk(args).Run() - Next - Else - Dim Tasks As Func(Of Integer)() = LinqAPI.Exec(Of Func(Of Integer)) <= +Namespace ThreadTask + + Public Module BatchTasks + + ''' + ''' Folk this program itself for the large amount data batch processing. + ''' + ''' Self folk processing commandline collection. + ''' If this parameter value less than 1, then will be a single + ''' thread task. Any positive value that greater than 1 will be parallel task. + ''' (小于等于零表示非并行化,单线程任务) + ''' + ''' + ''' Returns the total executation time for running this task collection. + ''' (返回任务的执行的总时长) + ''' + Public Function SelfFolks(CLI As IEnumerable(Of String), Optional parallel% = 0) As Long + Dim sw As Stopwatch = Stopwatch.StartNew + + If parallel <= 0 Then + For Each args As String In CLI + Call App.SelfFolk(args).Run() + Next + Else + Dim Tasks As Func(Of Integer)() = LinqAPI.Exec(Of Func(Of Integer)) <= _ - From args As String - In CLI - Let io As IIORedirectAbstract = App.SelfFolk(args) - Let task As Func(Of Integer) = AddressOf io.Run - Select task - - Call New ThreadTask(Of Integer)(Tasks).WithDegreeOfParallelism(parallel).RunParallel.ToArray - End If - - Return sw.ElapsedMilliseconds - End Function - - ''' - ''' - ''' - ''' - ''' 同时执行的句柄的数目 - ''' - - Public Sub Invoke(tasks As Action(), numOfThreads As Integer) - Dim getTask As Func(Of Action, Func(Of Integer)) = - Function(task) - Return AddressOf New TaskInvokeHelper With { - .task = task - }.RunTask - End Function - Dim invokes = From action As Action In tasks Select getTask(action) - - Call New ThreadTask(Of Integer)(invokes) _ - .WithDegreeOfParallelism(numOfThreads) _ - .RunParallel() _ - .ToArray - End Sub -End Module - + From args As String + In CLI + Let io As IIORedirectAbstract = App.SelfFolk(args) + Let task As Func(Of Integer) = AddressOf io.Run + Select task + + Call New ThreadTask(Of Integer)(Tasks) _ + .WithDegreeOfParallelism(parallel) _ + .RunParallel _ + .ToArray + End If + + Return sw.ElapsedMilliseconds + End Function + + ''' + ''' + ''' + ''' + ''' 同时执行的句柄的数目 + ''' + + Public Sub Invoke(tasks As Action(), numOfThreads As Integer) + Dim getTask As Func(Of Action, Func(Of Integer)) = + Function(task) + Return AddressOf New TaskInvokeHelper With { + .task = task + }.RunTask + End Function + Dim invokes = From action As Action In tasks Select getTask(action) + + Call New ThreadTask(Of Integer)(invokes) _ + .WithDegreeOfParallelism(numOfThreads) _ + .RunParallel() _ + .ToArray + End Sub + End Module +End Namespace \ No newline at end of file diff --git a/Parallel/ThreadTask/TaskInvokeHelper.vb b/Parallel/ThreadTask/TaskInvokeHelper.vb index 49a5a12..1c46912 100644 --- a/Parallel/ThreadTask/TaskInvokeHelper.vb +++ b/Parallel/ThreadTask/TaskInvokeHelper.vb @@ -1,50 +1,53 @@ #Region "Microsoft.VisualBasic::ef1677867edb409e7343869d372b4818, Parallel\ThreadTask\TaskInvokeHelper.vb" - ' Author: - ' - ' asuka (amethyst.asuka@gcmodeller.org) - ' xie (genetics@smrucc.org) - ' xieguigang (xie.guigang@live.com) - ' - ' Copyright (c) 2018 GPL3 Licensed - ' - ' - ' GNU GENERAL PUBLIC LICENSE (GPL3) - ' - ' - ' This program is free software: you can redistribute it and/or modify - ' it under the terms of the GNU General Public License as published by - ' the Free Software Foundation, either version 3 of the License, or - ' (at your option) any later version. - ' - ' This program is distributed in the hope that it will be useful, - ' but WITHOUT ANY WARRANTY; without even the implied warranty of - ' MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - ' GNU General Public License for more details. - ' - ' You should have received a copy of the GNU General Public License - ' along with this program. If not, see . - - - - ' /********************************************************************************/ - - ' Summaries: - - ' Structure TaskInvokeHelper - ' - ' Function: RunTask - ' - ' /********************************************************************************/ +' Author: +' +' asuka (amethyst.asuka@gcmodeller.org) +' xie (genetics@smrucc.org) +' xieguigang (xie.guigang@live.com) +' +' Copyright (c) 2018 GPL3 Licensed +' +' +' GNU GENERAL PUBLIC LICENSE (GPL3) +' +' +' This program is free software: you can redistribute it and/or modify +' it under the terms of the GNU General Public License as published by +' the Free Software Foundation, either version 3 of the License, or +' (at your option) any later version. +' +' This program is distributed in the hope that it will be useful, +' but WITHOUT ANY WARRANTY; without even the implied warranty of +' MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +' GNU General Public License for more details. +' +' You should have received a copy of the GNU General Public License +' along with this program. If not, see . + + + +' /********************************************************************************/ + +' Summaries: + +' Structure TaskInvokeHelper +' +' Function: RunTask +' +' /********************************************************************************/ #End Region -Friend Structure TaskInvokeHelper +Namespace ThreadTask - Dim task As Action + Friend Structure TaskInvokeHelper - Public Function RunTask() As Integer - Call task() - Return 0 - End Function -End Structure + Dim task As Action + + Public Function RunTask() As Integer + Call task() + Return 0 + End Function + End Structure +End Namespace \ No newline at end of file diff --git a/Parallel/ThreadTask/ThreadTask.vb b/Parallel/ThreadTask/ThreadTask.vb index 52c6502..fb8d409 100644 --- a/Parallel/ThreadTask/ThreadTask.vb +++ b/Parallel/ThreadTask/ThreadTask.vb @@ -43,107 +43,109 @@ Imports Microsoft.VisualBasic.Parallel.Tasks -''' -''' Using parallel linq that may stuck the program when a linq task partion wait a long time task to complete. -''' By using this parallel function that you can avoid this problem from parallel linq, and also you can -''' controls the task thread number manually by using this parallel task function. -''' (由于LINQ是分片段来执行的,当某个片段有一个线程被卡住之后整个进程都会被卡住,所以执行大型的计算任务的时候效率不太好, -''' 使用这个并行化函数可以避免这个问题,同时也可以自己手动控制线程的并发数) -''' -''' -Public Class ThreadTask(Of TOut) - - Dim taskList As Queue(Of Func(Of TOut)) - Dim threads As AsyncHandle(Of TOut)() - Dim size As Integer - - Sub New(task As IEnumerable(Of Func(Of TOut))) - Me.taskList = New Queue(Of Func(Of TOut))(task) - Me.size = Me.taskList.Count - End Sub - - Public Shared Function CreateThreads(Of T)(items As IEnumerable(Of T), task As Func(Of T, Func(Of TOut))) As ThreadTask(Of TOut) - Return New ThreadTask(Of TOut)(items.Select(task)) - End Function - - Public Shared Function CreateThreads(Of T)(items As IEnumerable(Of T), task As Func(Of T, TOut)) As ThreadTask(Of TOut) - Return New ThreadTask(Of TOut)(items.Select(Function(i) New Func(Of TOut)(Function() task(i)))) - End Function +Namespace ThreadTask ''' - ''' You can controls the parallel tasks number from this parameter, smaller or equals to ZERO means auto - ''' config the thread number, If want single thread, not parallel, set this value to 1, and positive - ''' value greater than 1 will makes the tasks parallel. - ''' (可以在这里手动的控制任务的并发数,这个数值小于或者等于零则表示自动配置线程的数量, 1为单线程) + ''' Using parallel linq that may stuck the program when a linq task partion wait a long time task to complete. + ''' By using this parallel function that you can avoid this problem from parallel linq, and also you can + ''' controls the task thread number manually by using this parallel task function. + ''' (由于LINQ是分片段来执行的,当某个片段有一个线程被卡住之后整个进程都会被卡住,所以执行大型的计算任务的时候效率不太好, + ''' 使用这个并行化函数可以避免这个问题,同时也可以自己手动控制线程的并发数) ''' - ''' - ''' - Public Function WithDegreeOfParallelism(n_threads As Integer) As ThreadTask(Of TOut) - threads = New AsyncHandle(Of TOut)(n_threads) {} - Return Me - End Function - - Private Function GetEmptyThread() As Integer - For i As Integer = 0 To threads.Length - 1 - If threads(i) Is Nothing Then - Return i - End If - Next - - Return -1 - End Function - - Private Function GetCompleteThread() As Integer - For i As Integer = 0 To threads.Length - 1 - If (Not threads(i) Is Nothing) AndAlso threads(i).IsCompleted Then - Return i - End If - Next - - Return -1 - End Function - - Public Overrides Function ToString() As String - Dim free$ = threads.Where(Function(t) t Is Nothing).Count - Dim running$ = threads.Where(Function(t) t IsNot Nothing AndAlso Not t.IsCompleted).Count - Dim finished$ = threads.Where(Function(t) t IsNot Nothing AndAlso t.IsCompleted).Count - Dim delta As Integer = size - taskList.Count - - Return $"[free: {free}, running: {running}, finished: {finished}, progress: {delta} - {CInt(delta / size * 100)}%]" - End Function - - ''' - ''' - ''' - ''' - Public Iterator Function RunParallel() As IEnumerable(Of TOut) - Do While taskList.Count > 0 - Dim i As Integer = GetEmptyThread() - - If i > -1 Then - threads(i) = New AsyncHandle(Of TOut)(taskList.Dequeue).Run - Call Console.WriteLine($"{ToString()} submit new task on thread [{i + 1}]!") - End If - - Dim j As Integer = GetCompleteThread() - - If j > -1 Then - Yield threads(j).GetValue - threads(j) = Nothing - Call Console.WriteLine($"{ToString()} [thread_{j + 1}] job done!") - End If - Loop - - Do While Not threads.All(Function(t) t Is Nothing) - Dim j As Integer = GetCompleteThread() - - If j > -1 Then - Yield threads(j).GetValue - threads(j) = Nothing - Call Console.WriteLine($"{ToString()} [thread_{j + 1}] job done!") - End If - Loop - End Function - -End Class - + ''' + Public Class ThreadTask(Of TOut) + + Dim taskList As Queue(Of Func(Of TOut)) + Dim threads As AsyncHandle(Of TOut)() + Dim size As Integer + + Sub New(task As IEnumerable(Of Func(Of TOut))) + Me.taskList = New Queue(Of Func(Of TOut))(task) + Me.size = Me.taskList.Count + End Sub + + Public Shared Function CreateThreads(Of T)(items As IEnumerable(Of T), task As Func(Of T, Func(Of TOut))) As ThreadTask(Of TOut) + Return New ThreadTask(Of TOut)(items.Select(task)) + End Function + + Public Shared Function CreateThreads(Of T)(items As IEnumerable(Of T), task As Func(Of T, TOut)) As ThreadTask(Of TOut) + Return New ThreadTask(Of TOut)(items.Select(Function(i) New Func(Of TOut)(Function() task(i)))) + End Function + + ''' + ''' You can controls the parallel tasks number from this parameter, smaller or equals to ZERO means auto + ''' config the thread number, If want single thread, not parallel, set this value to 1, and positive + ''' value greater than 1 will makes the tasks parallel. + ''' (可以在这里手动的控制任务的并发数,这个数值小于或者等于零则表示自动配置线程的数量, 1为单线程) + ''' + ''' + ''' + Public Function WithDegreeOfParallelism(n_threads As Integer) As ThreadTask(Of TOut) + threads = New AsyncHandle(Of TOut)(n_threads) {} + Return Me + End Function + + Private Function GetEmptyThread() As Integer + For i As Integer = 0 To threads.Length - 1 + If threads(i) Is Nothing Then + Return i + End If + Next + + Return -1 + End Function + + Private Function GetCompleteThread() As Integer + For i As Integer = 0 To threads.Length - 1 + If (Not threads(i) Is Nothing) AndAlso threads(i).IsCompleted Then + Return i + End If + Next + + Return -1 + End Function + + Public Overrides Function ToString() As String + Dim free$ = threads.Where(Function(t) t Is Nothing).Count + Dim running$ = threads.Where(Function(t) t IsNot Nothing AndAlso Not t.IsCompleted).Count + Dim finished$ = threads.Where(Function(t) t IsNot Nothing AndAlso t.IsCompleted).Count + Dim delta As Integer = size - taskList.Count + + Return $"[free: {free}, running: {running}, finished: {finished}, progress: {delta} - {CInt(delta / size * 100)}%]" + End Function + + ''' + ''' + ''' + ''' + Public Iterator Function RunParallel() As IEnumerable(Of TOut) + Do While taskList.Count > 0 + Dim i As Integer = GetEmptyThread() + + If i > -1 Then + threads(i) = New AsyncHandle(Of TOut)(taskList.Dequeue).Run + Call Console.WriteLine($"{ToString()} submit new task on thread [{i + 1}]!") + End If + + Dim j As Integer = GetCompleteThread() + + If j > -1 Then + Yield threads(j).GetValue + threads(j) = Nothing + Call Console.WriteLine($"{ToString()} [thread_{j + 1}] job done!") + End If + Loop + + Do While Not threads.All(Function(t) t Is Nothing) + Dim j As Integer = GetCompleteThread() + + If j > -1 Then + Yield threads(j).GetValue + threads(j) = Nothing + Call Console.WriteLine($"{ToString()} [thread_{j + 1}] job done!") + End If + Loop + End Function + + End Class +End Namespace