Namespace ThreadTask

master
xieguigang 5 years ago
parent aa9ffd4645
commit 6449dca5f6

@ -49,8 +49,15 @@ Imports Microsoft.VisualBasic.ApplicationServices.Development.NetCore5
#End If
Imports TypeInfo = Microsoft.VisualBasic.Scripting.MetaData.TypeInfo
''' <summary>
''' remote method handler
''' </summary>
Public Class IDelegate
''' <summary>
''' the function name
''' </summary>
''' <returns></returns>
Public Property name As String
Public Property type As TypeInfo
''' <summary>
@ -62,6 +69,12 @@ Public Class IDelegate
Sub New()
End Sub
''' <summary>
'''
''' </summary>
''' <param name="target">
''' The name of this target method should be unique!
''' </param>
Sub New(target As MethodInfo)
type = New TypeInfo(target.DeclaringType)
name = target.Name

@ -45,62 +45,67 @@ Imports System.Runtime.CompilerServices
Imports Microsoft.VisualBasic.CommandLine
Imports Microsoft.VisualBasic.Language
Public Module BatchTasks
''' <summary>
''' Folk this program itself for the large amount data batch processing.
''' </summary>
''' <param name="CLI">Self folk processing commandline collection.</param>
''' <param name="parallel">If this parameter value less than 1, then will be a single
''' thread task. Any positive value that greater than 1 will be parallel task.
''' (线)
''' </param>
''' <returns>
''' Returns the total executation time for running this task collection.
''' ()
''' </returns>
Public Function SelfFolks(CLI As IEnumerable(Of String), Optional parallel% = 0) As Long
Dim sw As Stopwatch = Stopwatch.StartNew
If parallel <= 0 Then
For Each args As String In CLI
Call App.SelfFolk(args).Run()
Next
Else
Dim Tasks As Func(Of Integer)() = LinqAPI.Exec(Of Func(Of Integer)) <=
Namespace ThreadTask
Public Module BatchTasks
''' <summary>
''' Folk this program itself for the large amount data batch processing.
''' </summary>
''' <param name="CLI">Self folk processing commandline collection.</param>
''' <param name="parallel">If this parameter value less than 1, then will be a single
''' thread task. Any positive value that greater than 1 will be parallel task.
''' (线)
''' </param>
''' <returns>
''' Returns the total executation time for running this task collection.
''' ()
''' </returns>
Public Function SelfFolks(CLI As IEnumerable(Of String), Optional parallel% = 0) As Long
Dim sw As Stopwatch = Stopwatch.StartNew
If parallel <= 0 Then
For Each args As String In CLI
Call App.SelfFolk(args).Run()
Next
Else
Dim Tasks As Func(Of Integer)() = LinqAPI.Exec(Of Func(Of Integer)) <=
_
From args As String
In CLI
Let io As IIORedirectAbstract = App.SelfFolk(args)
Let task As Func(Of Integer) = AddressOf io.Run
Select task
Call New ThreadTask(Of Integer)(Tasks).WithDegreeOfParallelism(parallel).RunParallel.ToArray
End If
Return sw.ElapsedMilliseconds
End Function
''' <summary>
'''
''' </summary>
''' <param name="tasks"></param>
''' <param name="numOfThreads"></param>
''' <remarks></remarks>
<Extension>
Public Sub Invoke(tasks As Action(), numOfThreads As Integer)
Dim getTask As Func(Of Action, Func(Of Integer)) =
Function(task)
Return AddressOf New TaskInvokeHelper With {
.task = task
}.RunTask
End Function
Dim invokes = From action As Action In tasks Select getTask(action)
Call New ThreadTask(Of Integer)(invokes) _
.WithDegreeOfParallelism(numOfThreads) _
.RunParallel() _
.ToArray
End Sub
End Module
From args As String
In CLI
Let io As IIORedirectAbstract = App.SelfFolk(args)
Let task As Func(Of Integer) = AddressOf io.Run
Select task
Call New ThreadTask(Of Integer)(Tasks) _
.WithDegreeOfParallelism(parallel) _
.RunParallel _
.ToArray
End If
Return sw.ElapsedMilliseconds
End Function
''' <summary>
'''
''' </summary>
''' <param name="tasks"></param>
''' <param name="numOfThreads"></param>
''' <remarks></remarks>
<Extension>
Public Sub Invoke(tasks As Action(), numOfThreads As Integer)
Dim getTask As Func(Of Action, Func(Of Integer)) =
Function(task)
Return AddressOf New TaskInvokeHelper With {
.task = task
}.RunTask
End Function
Dim invokes = From action As Action In tasks Select getTask(action)
Call New ThreadTask(Of Integer)(invokes) _
.WithDegreeOfParallelism(numOfThreads) _
.RunParallel() _
.ToArray
End Sub
End Module
End Namespace

@ -1,50 +1,53 @@
#Region "Microsoft.VisualBasic::ef1677867edb409e7343869d372b4818, Parallel\ThreadTask\TaskInvokeHelper.vb"
' Author:
'
' asuka (amethyst.asuka@gcmodeller.org)
' xie (genetics@smrucc.org)
' xieguigang (xie.guigang@live.com)
'
' Copyright (c) 2018 GPL3 Licensed
'
'
' GNU GENERAL PUBLIC LICENSE (GPL3)
'
'
' This program is free software: you can redistribute it and/or modify
' it under the terms of the GNU General Public License as published by
' the Free Software Foundation, either version 3 of the License, or
' (at your option) any later version.
'
' This program is distributed in the hope that it will be useful,
' but WITHOUT ANY WARRANTY; without even the implied warranty of
' MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
' GNU General Public License for more details.
'
' You should have received a copy of the GNU General Public License
' along with this program. If not, see <http://www.gnu.org/licenses/>.
' /********************************************************************************/
' Summaries:
' Structure TaskInvokeHelper
'
' Function: RunTask
'
' /********************************************************************************/
' Author:
'
' asuka (amethyst.asuka@gcmodeller.org)
' xie (genetics@smrucc.org)
' xieguigang (xie.guigang@live.com)
'
' Copyright (c) 2018 GPL3 Licensed
'
'
' GNU GENERAL PUBLIC LICENSE (GPL3)
'
'
' This program is free software: you can redistribute it and/or modify
' it under the terms of the GNU General Public License as published by
' the Free Software Foundation, either version 3 of the License, or
' (at your option) any later version.
'
' This program is distributed in the hope that it will be useful,
' but WITHOUT ANY WARRANTY; without even the implied warranty of
' MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
' GNU General Public License for more details.
'
' You should have received a copy of the GNU General Public License
' along with this program. If not, see <http://www.gnu.org/licenses/>.
' /********************************************************************************/
' Summaries:
' Structure TaskInvokeHelper
'
' Function: RunTask
'
' /********************************************************************************/
#End Region
Friend Structure TaskInvokeHelper
Namespace ThreadTask
Dim task As Action
Friend Structure TaskInvokeHelper
Public Function RunTask() As Integer
Call task()
Return 0
End Function
End Structure
Dim task As Action
Public Function RunTask() As Integer
Call task()
Return 0
End Function
End Structure
End Namespace

@ -43,107 +43,109 @@
Imports Microsoft.VisualBasic.Parallel.Tasks
''' <summary>
''' Using parallel linq that may stuck the program when a linq task partion wait a long time task to complete.
''' By using this parallel function that you can avoid this problem from parallel linq, and also you can
''' controls the task thread number manually by using this parallel task function.
''' (LINQ线
''' 使线)
''' </summary>
''' <typeparam name="TOut"></typeparam>
Public Class ThreadTask(Of TOut)
Dim taskList As Queue(Of Func(Of TOut))
Dim threads As AsyncHandle(Of TOut)()
Dim size As Integer
Sub New(task As IEnumerable(Of Func(Of TOut)))
Me.taskList = New Queue(Of Func(Of TOut))(task)
Me.size = Me.taskList.Count
End Sub
Public Shared Function CreateThreads(Of T)(items As IEnumerable(Of T), task As Func(Of T, Func(Of TOut))) As ThreadTask(Of TOut)
Return New ThreadTask(Of TOut)(items.Select(task))
End Function
Public Shared Function CreateThreads(Of T)(items As IEnumerable(Of T), task As Func(Of T, TOut)) As ThreadTask(Of TOut)
Return New ThreadTask(Of TOut)(items.Select(Function(i) New Func(Of TOut)(Function() task(i))))
End Function
Namespace ThreadTask
''' <summary>
''' You can controls the parallel tasks number from this parameter, smaller or equals to ZERO means auto
''' config the thread number, If want single thread, not parallel, set this value to 1, and positive
''' value greater than 1 will makes the tasks parallel.
''' (线, 1线)
''' Using parallel linq that may stuck the program when a linq task partion wait a long time task to complete.
''' By using this parallel function that you can avoid this problem from parallel linq, and also you can
''' controls the task thread number manually by using this parallel task function.
''' (LINQ线
''' 使线)
''' </summary>
''' <param name="n_threads"></param>
''' <returns></returns>
Public Function WithDegreeOfParallelism(n_threads As Integer) As ThreadTask(Of TOut)
threads = New AsyncHandle(Of TOut)(n_threads) {}
Return Me
End Function
Private Function GetEmptyThread() As Integer
For i As Integer = 0 To threads.Length - 1
If threads(i) Is Nothing Then
Return i
End If
Next
Return -1
End Function
Private Function GetCompleteThread() As Integer
For i As Integer = 0 To threads.Length - 1
If (Not threads(i) Is Nothing) AndAlso threads(i).IsCompleted Then
Return i
End If
Next
Return -1
End Function
Public Overrides Function ToString() As String
Dim free$ = threads.Where(Function(t) t Is Nothing).Count
Dim running$ = threads.Where(Function(t) t IsNot Nothing AndAlso Not t.IsCompleted).Count
Dim finished$ = threads.Where(Function(t) t IsNot Nothing AndAlso t.IsCompleted).Count
Dim delta As Integer = size - taskList.Count
Return $"[free: {free}, running: {running}, finished: {finished}, progress: {delta} - {CInt(delta / size * 100)}%]"
End Function
''' <summary>
'''
''' </summary>
''' <returns></returns>
Public Iterator Function RunParallel() As IEnumerable(Of TOut)
Do While taskList.Count > 0
Dim i As Integer = GetEmptyThread()
If i > -1 Then
threads(i) = New AsyncHandle(Of TOut)(taskList.Dequeue).Run
Call Console.WriteLine($"{ToString()} submit new task on thread [{i + 1}]!")
End If
Dim j As Integer = GetCompleteThread()
If j > -1 Then
Yield threads(j).GetValue
threads(j) = Nothing
Call Console.WriteLine($"{ToString()} [thread_{j + 1}] job done!")
End If
Loop
Do While Not threads.All(Function(t) t Is Nothing)
Dim j As Integer = GetCompleteThread()
If j > -1 Then
Yield threads(j).GetValue
threads(j) = Nothing
Call Console.WriteLine($"{ToString()} [thread_{j + 1}] job done!")
End If
Loop
End Function
End Class
''' <typeparam name="TOut"></typeparam>
Public Class ThreadTask(Of TOut)
Dim taskList As Queue(Of Func(Of TOut))
Dim threads As AsyncHandle(Of TOut)()
Dim size As Integer
Sub New(task As IEnumerable(Of Func(Of TOut)))
Me.taskList = New Queue(Of Func(Of TOut))(task)
Me.size = Me.taskList.Count
End Sub
Public Shared Function CreateThreads(Of T)(items As IEnumerable(Of T), task As Func(Of T, Func(Of TOut))) As ThreadTask(Of TOut)
Return New ThreadTask(Of TOut)(items.Select(task))
End Function
Public Shared Function CreateThreads(Of T)(items As IEnumerable(Of T), task As Func(Of T, TOut)) As ThreadTask(Of TOut)
Return New ThreadTask(Of TOut)(items.Select(Function(i) New Func(Of TOut)(Function() task(i))))
End Function
''' <summary>
''' You can controls the parallel tasks number from this parameter, smaller or equals to ZERO means auto
''' config the thread number, If want single thread, not parallel, set this value to 1, and positive
''' value greater than 1 will makes the tasks parallel.
''' (线, 1线)
''' </summary>
''' <param name="n_threads"></param>
''' <returns></returns>
Public Function WithDegreeOfParallelism(n_threads As Integer) As ThreadTask(Of TOut)
threads = New AsyncHandle(Of TOut)(n_threads) {}
Return Me
End Function
Private Function GetEmptyThread() As Integer
For i As Integer = 0 To threads.Length - 1
If threads(i) Is Nothing Then
Return i
End If
Next
Return -1
End Function
Private Function GetCompleteThread() As Integer
For i As Integer = 0 To threads.Length - 1
If (Not threads(i) Is Nothing) AndAlso threads(i).IsCompleted Then
Return i
End If
Next
Return -1
End Function
Public Overrides Function ToString() As String
Dim free$ = threads.Where(Function(t) t Is Nothing).Count
Dim running$ = threads.Where(Function(t) t IsNot Nothing AndAlso Not t.IsCompleted).Count
Dim finished$ = threads.Where(Function(t) t IsNot Nothing AndAlso t.IsCompleted).Count
Dim delta As Integer = size - taskList.Count
Return $"[free: {free}, running: {running}, finished: {finished}, progress: {delta} - {CInt(delta / size * 100)}%]"
End Function
''' <summary>
'''
''' </summary>
''' <returns></returns>
Public Iterator Function RunParallel() As IEnumerable(Of TOut)
Do While taskList.Count > 0
Dim i As Integer = GetEmptyThread()
If i > -1 Then
threads(i) = New AsyncHandle(Of TOut)(taskList.Dequeue).Run
Call Console.WriteLine($"{ToString()} submit new task on thread [{i + 1}]!")
End If
Dim j As Integer = GetCompleteThread()
If j > -1 Then
Yield threads(j).GetValue
threads(j) = Nothing
Call Console.WriteLine($"{ToString()} [thread_{j + 1}] job done!")
End If
Loop
Do While Not threads.All(Function(t) t Is Nothing)
Dim j As Integer = GetCompleteThread()
If j > -1 Then
Yield threads(j).GetValue
threads(j) = Nothing
Call Console.WriteLine($"{ToString()} [thread_{j + 1}] job done!")
End If
Loop
End Function
End Class
End Namespace

Loading…
Cancel
Save