#Region "Microsoft.VisualBasic::d7312dbb05793bd8941454dd896c4334, Rpc\RpcClient.vb" ' Author: ' ' asuka (amethyst.asuka@gcmodeller.org) ' xie (genetics@smrucc.org) ' xieguigang (xie.guigang@live.com) ' ' Copyright (c) 2018 GPL3 Licensed ' ' ' GNU GENERAL PUBLIC LICENSE (GPL3) ' ' ' This program is free software: you can redistribute it and/or modify ' it under the terms of the GNU General Public License as published by ' the Free Software Foundation, either version 3 of the License, or ' (at your option) any later version. ' ' This program is distributed in the hope that it will be useful, ' but WITHOUT ANY WARRANTY; without even the implied warranty of ' MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ' GNU General Public License for more details. ' ' You should have received a copy of the GNU General Public License ' along with this program. If not, see . ' /********************************************************************************/ ' Summaries: ' Class RpcClient ' ' Constructor: (+1 Overloads) Sub New ' ' Function: CreateTask, FromTcp, FromUdp, NewSession ' ' Sub: Close, Dispose, OnSessionExcepted, OnSessionMessageSended, RemoveTicket ' SendNextQueuedItem ' ' ' /********************************************************************************/ #End Region Imports System Imports System.Collections.Generic Imports System.Linq Imports System.Net Imports System.Threading Imports System.Threading.Tasks Imports Microsoft.VisualBasic.ApplicationServices.Debugging.Logging Imports Rpc.Connectors Imports Rpc.MessageProtocol Namespace Rpc ''' ''' The RPC client. ''' Public Class RpcClient Implements IDisposable, IRpcClient, ITicketOwner Private Shared Log As LogFile = Microsoft.VisualBasic.My.FrameworkInternal.getLogger(GetType(RpcClient).FullName) Private ReadOnly _sessionCreater As Func(Of IRpcSession) Private ReadOnly _sync As Object = New Object() Private _session As IRpcSession = Nothing Private _nextXid As Integer = 0 Private _sendingInProgress As Boolean = False Private _pendingRequests As LinkedList(Of ITicket) = New LinkedList(Of ITicket)() Friend Sub New(sessionCreater As Func(Of IRpcSession)) _sessionCreater = sessionCreater NewSession() End Sub ''' ''' Create RPC client from TCP protocol. ''' ''' server address ''' block size Public Shared Function FromTcp(ep As IPEndPoint, Optional blockSize As Integer = 1024 * 4) As RpcClient Log.Debug("Create RPC client for TCP server:{0}", ep) Return New RpcClient(Function() New TcpSession(ep, blockSize)) End Function ''' ''' Create RPC client from UDP protocol. ''' ''' server address Public Shared Function FromUdp(ep As IPEndPoint) As RpcClient Log.Debug("Create RPC client for UDP server:{0}", ep) Return New RpcClient(Function() New UdpSession(ep)) End Function Private Function NewSession() As IRpcSession Dim prevSession = _session _session = _sessionCreater() AddHandler _session.OnExcepted, AddressOf OnSessionExcepted AddHandler _session.OnSended, AddressOf OnSessionMessageSended Return prevSession End Function Private Sub RemoveTicket(ticket As ITicket) Implements ITicketOwner.RemoveTicket Dim sessionCopy As IRpcSession SyncLock _sync If _pendingRequests.Remove(ticket) Then Return sessionCopy = _session End SyncLock sessionCopy.RemoveTicket(ticket) End Sub ''' ''' Close this connection and cancel all queued tasks ''' Public Sub Close() Implements IRpcClient.Close Log.Debug("Close connector.") Dim tickets As ITicket() Dim prevSession As IRpcSession SyncLock _sync tickets = _pendingRequests.ToArray() _pendingRequests.Clear() prevSession = NewSession() End SyncLock Dim ex = New TaskCanceledException("close connector") For Each t In tickets t.Except(ex) Next prevSession.Close(ex) End Sub ''' ''' creates the task for the control request to the RPC server ''' Public Function CreateTask(Of TReq, TResp)(callBody As call_body, reqArgs As TReq, options As TaskCreationOptions, token As CancellationToken) As Task(Of TResp) Implements IRpcClient.CreateTask Dim ticket As Ticket(Of TReq, TResp) = New Ticket(Of TReq, TResp)(Me, callBody, reqArgs, options, token) SyncLock _sync If token.IsCancellationRequested Then Return ticket.Task _pendingRequests.AddLast(ticket) End SyncLock SendNextQueuedItem() Return ticket.Task End Function Private Sub SendNextQueuedItem() Log.Trace("Send next queued item.") Dim ticket As ITicket = Nothing Dim sessionCopy As IRpcSession = Nothing SyncLock _sync If _sendingInProgress Then Log.Debug("Already sending.") Return End If If _pendingRequests.Count = 0 Then Log.Debug("Not pending requests to send.") Return End If sessionCopy = _session _sendingInProgress = True ticket = _pendingRequests.First.Value _pendingRequests.RemoveFirst() ticket.Xid = Math.Min(Interlocked.Increment(_nextXid), _nextXid - 1) End SyncLock sessionCopy.AsyncSend(ticket) End Sub Private Sub OnSessionExcepted(session As IRpcSession, ex As Exception) Dim prevSession As IRpcSession SyncLock _sync If session IsNot _session Then Return _sendingInProgress = False prevSession = NewSession() End SyncLock Log.Debug("Session excepted: {0}", ex) SendNextQueuedItem() prevSession.Close(ex) End Sub Private Sub OnSessionMessageSended(session As IRpcSession) SyncLock _sync If session IsNot _session Then Return _sendingInProgress = False End SyncLock SendNextQueuedItem() End Sub ''' ''' dispose this connection ''' Public Sub Dispose() Implements IDisposable.Dispose Close() End Sub End Class End Namespace