Asynchronous Data Methods
- Last UpdatedNov 18, 2025
- 8 minute read
- PI System
- AF SDK 2024 R2
- Developer
Asynchronous data methods provide a mechanism for concurrency during a data access call to the PI Data Archive. In this case, concurrency refers to two types of operations that can execute simultaneously:
- The handling of the data request by the PI Data Archive, possibly over the network.
- Continued execution of client-side code that does not depend on the result of the request.
PI AF SDK 2.8 introduces asynchronous data access methods based on the Task-based Async pattern introduced in .NET Framework 4.5. These methods are implemented in both the OSIsoft.AF.Data and OSIsoft.AF.PI namespaces. They are supported for AF Attributes configured with PI Point data references.
These asynchronous methods can be advantageous in many cases:
- In front-end applications, the UI thread can stay responsive during a data access call.
- In both client and server applications, the number of threads used to service a call can be reduced, as waiting threads can be returned to the thread pool for re-use.
- The effect of latency is mitigated because remote calls can be executed concurrently.
This example demonstrates an asynchronous method that retrieves the summary data for a given attribute of a specific element type. Summaries span the range of one day. The GetSummariesAsyncThrottled() demonstrates throttling asynchronous method calls to avoid one client executing many asynchronous requests and exhausting server resources. The GetSummariesAsyncWithTimeout() demonstrates placing a timeout on asynchronous method calls.
1void Async_Program(AFAttributeList attrList) 2{ 3 try 4 { 5 // Get summaries asynchronously and wait for the result 6 Task<IList<IDictionary<AFSummaryTypes, AFValue>>> summariesTask = GetSummariesAsync(attrList); 7 IList<IDictionary<AFSummaryTypes, AFValue>> summaries = summariesTask.Result; 8 Console.WriteLine("GetSummariesAsync:"); 9 foreach (var summary in summaries) 10 { 11 WriteSummaryItem(summary); 12 } 13 Console.WriteLine(); 14 15 // Get summaries asynchronously limited to 5 concurrent threads and wait for the result 16 summariesTask = GetSummariesAsyncThrottled(attrList, 5); 17 summaries = summariesTask.Result; 18 Console.WriteLine("GetSummariesAsyncThrottled:"); 19 foreach (var summary in summaries) 20 { 21 WriteSummaryItem(summary); 22 } 23 Console.WriteLine(); 24 25 // Get summaries asynchronously with a timeout and wait for the result 26 summariesTask = GetSummariesAsyncWithTimeout(attrList, 1000); 27 summaries = summariesTask.Result; 28 Console.WriteLine("GetSummariesAsyncThrottled:"); 29 foreach (var summary in summaries) 30 { 31 WriteSummaryItem(summary); 32 } 33 Console.WriteLine(); 34 } 35 catch (AggregateException ae) 36 { 37 Console.WriteLine("{0}", ae.Flatten().InnerException.Message); 38 } 39} 40 41private static void WriteSummaryItem(IDictionary<AFSummaryTypes, AFValue> summary) 42{ 43 Console.WriteLine("Summary for {0}", summary[AFSummaryTypes.Minimum].Attribute.Element); 44 Console.WriteLine(" Minimum: {0:N0}", summary[AFSummaryTypes.Minimum].ValueAsDouble()); 45 Console.WriteLine(" Maximum: {0:N0}", summary[AFSummaryTypes.Maximum].ValueAsDouble()); 46 Console.WriteLine(" Average: {0:N0}", summary[AFSummaryTypes.Average].ValueAsDouble()); 47 Console.WriteLine(" Total: {0:N0}", summary[AFSummaryTypes.Total].ValueAsDouble()); 48 Console.WriteLine(); 49} 50 51public static async Task<IList<IDictionary<AFSummaryTypes, AFValue>>> GetSummariesAsync(AFAttributeList attributeList) 52{ 53 Console.WriteLine("Calling GetSummariesAsync\n"); 54 55 Task<IDictionary<AFSummaryTypes, AFValue>>[] tasks = attributeList 56 // Do not make the call if async is not supported 57 .Where(attr => (attr.SupportedDataMethods & AFDataMethods.Asynchronous) == AFDataMethods.Asynchronous) 58 .Select(async attr => 59 { 60 try 61 { 62 AFSummaryTypes mySummaries = AFSummaryTypes.Minimum | AFSummaryTypes.Maximum | AFSummaryTypes.Average | AFSummaryTypes.Total; 63 AFTimeRange timeRange = new AFTimeRange(new AFTime("*-1d"), new AFTime("*")); 64 65 return await attr.Data.SummaryAsync( 66 timeRange: timeRange, 67 summaryType: mySummaries, 68 calculationBasis: AFCalculationBasis.TimeWeighted, 69 timeType: AFTimestampCalculation.Auto); 70 } 71 catch (AggregateException ae) 72 { 73 Console.WriteLine("{0}: {1}", attr.Name, ae.Flatten().InnerException.Message); 74 return null; 75 } 76 }) 77 .ToArray(); 78 79 return await Task.WhenAll(tasks); 80} 81 82public static async Task<IList<IDictionary<AFSummaryTypes, AFValue>>> GetSummariesAsyncThrottled(AFAttributeList attributeList, int numConcurrent) 83{ 84 // Use "asynchronous semaphore" pattern (e.g. SemaphoreSlim.WaitAsync()) to throttle the calls 85 Console.WriteLine("Calling GetSummariesAsyncThrottled"); 86 87 // Example: Limit to numConcurrent concurrent async I/O operations. 88 SemaphoreSlim throttler = new SemaphoreSlim(initialCount: numConcurrent); 89 90 Task<IDictionary<AFSummaryTypes, AFValue>>[] tasks = attributeList 91 // Do not make the call if async is not supported 92 .Where(attr => (attr.SupportedDataMethods & AFDataMethods.Asynchronous) == AFDataMethods.Asynchronous) 93 .Select(async attr => 94 { 95 // asynchronously try to acquire the semaphore 96 await throttler.WaitAsync(); 97 98 try 99 { 100 AFSummaryTypes mySummaries = AFSummaryTypes.Minimum | AFSummaryTypes.Maximum | AFSummaryTypes.Average | AFSummaryTypes.Total; 101 AFTimeRange timeRange = new AFTimeRange(new AFTime("*-1d"), new AFTime("*")); 102 103 return await attr.Data.SummaryAsync( 104 timeRange: timeRange, 105 summaryType: mySummaries, 106 calculationBasis: AFCalculationBasis.TimeWeighted, 107 timeType: AFTimestampCalculation.Auto); 108 } 109 catch (AggregateException ae) 110 { 111 Console.WriteLine("{0}: {1}", attr.Name, ae.Flatten().InnerException.Message); 112 return null; 113 } 114 finally 115 { 116 // release the resource 117 throttler.Release(); 118 } 119 }) 120 .ToArray(); 121 122 return await Task.WhenAll(tasks); 123} 124 125public static async Task<IList<IDictionary<AFSummaryTypes, AFValue>>> GetSummariesAsyncWithTimeout(AFAttributeList attributeList, int timeoutInMilliseconds) 126{ 127 // Use a "competing tasks" pattern to place timeout on multiple async requests 128 Console.WriteLine("Calling GetSummariesAsyncWithTimeout"); 129 130 CancellationTokenSource cts = new CancellationTokenSource(); 131 CancellationToken token = cts.Token; 132 CancellationTokenSource ctsForTimer = new CancellationTokenSource(); 133 CancellationToken tokenForTimer = ctsForTimer.Token; 134 135 Task<IDictionary<AFSummaryTypes, AFValue>>[] tasks = attributeList 136 // Do not make the call if async is not supported 137 .Where(attr => (attr.SupportedDataMethods & AFDataMethods.Asynchronous) == AFDataMethods.Asynchronous) 138 .Select(async attr => 139 { 140 try 141 { 142 AFSummaryTypes mySummaries = AFSummaryTypes.Minimum | AFSummaryTypes.Maximum | AFSummaryTypes.Average | AFSummaryTypes.Total; 143 AFTimeRange timeRange = new AFTimeRange(new AFTime("*-1d"), new AFTime("*")); 144 145 return await attr.Data.SummaryAsync( 146 timeRange: timeRange, 147 summaryType: mySummaries, 148 calculationBasis: AFCalculationBasis.TimeWeighted, 149 timeType: AFTimestampCalculation.Auto, 150 cancellationToken: token); 151 } 152 catch (AggregateException ae) 153 { 154 Console.WriteLine("{0}: {1}", attr.Element.Name, ae.Flatten().InnerException.Message); 155 return null; 156 } 157 catch (OperationCanceledException oe) 158 { 159 Console.WriteLine("{0}: {1}", attr.Element.Name, oe.Message); 160 return null; 161 } 162 }) 163 .ToArray(); 164 165 // Define a task that completes when all subtasks are complete 166 Task<IDictionary<AFSummaryTypes, AFValue>[]> allTtasks = Task.WhenAll(tasks); 167 168 // Asynchronously wait for either the summaries or timer task to complete 169 var finishedTask = await Task.WhenAny(allTtasks, Task.Delay(timeoutInMilliseconds, tokenForTimer)); 170 if (finishedTask.Equals(allTtasks)) 171 { 172 // Cancel the timer task 173 ctsForTimer.Cancel(); 174 // Return summaries result 175 return allTtasks.Result; 176 } 177 else 178 { 179 // Cancel the summaries task if timeout 180 cts.Cancel(); 181 throw new TimeoutException("The operation has timed out."); 182 } 183}
1Private Sub Async_Program(attrList As AFAttributeList) 2 Try 3 ' Get summaries asynchronously and wait for the result 4 Dim summariesTask As Task(Of IList(Of IDictionary(Of AFSummaryTypes, AFValue))) = GetSummariesAsync(attrList) 5 Dim summaries As IList(Of IDictionary(Of AFSummaryTypes, AFValue)) = summariesTask.Result 6 Console.WriteLine("GetSummariesAsync:") 7 For Each summary As IDictionary(Of AFSummaryTypes, AFValue) In summaries 8 WriteSummaryItem(summary) 9 Next 10 Console.WriteLine() 11 12 ' Get summaries asynchronously limited to 5 concurrent threads and wait for the result 13 summariesTask = GetSummariesAsyncThrottled(attrList, 5) 14 summaries = summariesTask.Result 15 Console.WriteLine("GetSummariesAsyncThrottled:") 16 For Each summary As IDictionary(Of AFSummaryTypes, AFValue) In summaries 17 WriteSummaryItem(summary) 18 Next 19 Console.WriteLine() 20 21 ' Get summaries asynchronously with a timeout and wait for the result 22 summariesTask = GetSummariesAsyncWithTimeout(attrList, 1000) 23 summaries = summariesTask.Result 24 Console.WriteLine("GetSummariesAsyncThrottled:") 25 For Each summary As IDictionary(Of AFSummaryTypes, AFValue) In summaries 26 WriteSummaryItem(summary) 27 Next 28 Console.WriteLine() 29 Catch ae As AggregateException 30 Console.WriteLine("{0}", ae.Flatten().InnerException.Message) 31 End Try 32End Sub 33 34Private Shared Sub WriteSummaryItem(summary As IDictionary(Of AFSummaryTypes, AFValue)) 35 Console.WriteLine("Summary for {0}", summary(AFSummaryTypes.Minimum).Attribute.Element) 36 Console.WriteLine(" Minimum: {0:N0}", summary(AFSummaryTypes.Minimum).ValueAsDouble()) 37 Console.WriteLine(" Maximum: {0:N0}", summary(AFSummaryTypes.Maximum).ValueAsDouble()) 38 Console.WriteLine(" Average: {0:N0}", summary(AFSummaryTypes.Average).ValueAsDouble()) 39 Console.WriteLine(" Total: {0:N0}", summary(AFSummaryTypes.Total).ValueAsDouble()) 40 Console.WriteLine() 41End Sub 42 43Public Shared Async Function GetSummariesAsync(attributeList As AFAttributeList) As Task(Of IList(Of IDictionary(Of AFSummaryTypes, AFValue))) 44 Console.WriteLine("Calling GetSummariesAsync" & vbLf) 45 46 ' Do not make the call if async is not supported 47 Dim tasks As Task(Of IDictionary(Of AFSummaryTypes, AFValue))() = 48 attributeList.Where(Function(attr) (attr.SupportedDataMethods And AFDataMethods.Asynchronous) = AFDataMethods.Asynchronous).[Select]( 49 Async Function(attr) 50 Try 51 Dim mySummaries As AFSummaryTypes = AFSummaryTypes.Minimum Or AFSummaryTypes.Maximum Or AFSummaryTypes.Average Or AFSummaryTypes.Total 52 Dim timeRange As New AFTimeRange(New AFTime("*-1d"), New AFTime("*")) 53 Return Await attr.Data.SummaryAsync(timeRange:=timeRange, 54 summaryType:=mySummaries, 55 calculationBasis:=AFCalculationBasis.TimeWeighted, 56 timeType:=AFTimestampCalculation.Auto) 57 Catch ae As AggregateException 58 Console.WriteLine("{0}: {1}", attr.Name, ae.Flatten().InnerException.Message) 59 Return Nothing 60 End Try 61 62 End Function).ToArray() 63 64 Return Await Task.WhenAll(tasks) 65End Function 66 67Public Shared Async Function GetSummariesAsyncThrottled(attributeList As AFAttributeList, numConcurrent As Integer) As Task(Of IList(Of IDictionary(Of AFSummaryTypes, AFValue))) 68 ' Use "asynchronous semaphore" pattern (e.g. SemaphoreSlim.WaitAsync()) to throttle the calls 69 Console.WriteLine("Calling GetSummariesAsyncThrottled") 70 71 ' Example: Limit to numConcurrent concurrent async I/O operations. 72 Dim throttler As New SemaphoreSlim(initialCount:=numConcurrent) 73 74 ' Do not make the call if async is not supported 75 Dim tasks As Task(Of IDictionary(Of AFSummaryTypes, AFValue))() = 76 attributeList.Where(Function(attr) (attr.SupportedDataMethods And AFDataMethods.Asynchronous) = AFDataMethods.Asynchronous).[Select]( 77 Async Function(attr) 78 ' asynchronously try to acquire the semaphore 79 Await throttler.WaitAsync() 80 Try 81 Dim mySummaries As AFSummaryTypes = AFSummaryTypes.Minimum Or AFSummaryTypes.Maximum Or AFSummaryTypes.Average Or AFSummaryTypes.Total 82 Dim timeRange As New AFTimeRange(New AFTime("*-1d"), New AFTime("*")) 83 Return Await attr.Data.SummaryAsync(timeRange:=timeRange, 84 summaryType:=mySummaries, 85 calculationBasis:=AFCalculationBasis.TimeWeighted, 86 timeType:=AFTimestampCalculation.Auto) 87 Catch ae As AggregateException 88 Console.WriteLine("{0}: {1}", attr.Name, ae.Flatten().InnerException.Message) 89 Return Nothing 90 Finally 91 ' release the resource 92 throttler.Release() 93 End Try 94 95 End Function).ToArray() 96 97 Return Await Task.WhenAll(tasks) 98End Function 99 100Public Shared Async Function GetSummariesAsyncWithTimeout(attributeList As AFAttributeList, timeoutInMilliseconds As Integer) As Task(Of IList(Of IDictionary(Of AFSummaryTypes, AFValue))) 101 ' Use a "competing tasks" pattern to place timeout on multiple async requests 102 Console.WriteLine("Calling GetSummariesAsyncWithTimeout") 103 104 Dim cts As New CancellationTokenSource() 105 Dim token As CancellationToken = cts.Token 106 Dim ctsForTimer As New CancellationTokenSource() 107 Dim tokenForTimer As CancellationToken = ctsForTimer.Token 108 109 ' Do not make the call if async is not supported 110 Dim tasks As Task(Of IDictionary(Of AFSummaryTypes, AFValue))() = 111 attributeList.Where(Function(attr) (attr.SupportedDataMethods And AFDataMethods.Asynchronous) = AFDataMethods.Asynchronous).[Select]( 112 Async Function(attr) 113 Try 114 Dim mySummaries As AFSummaryTypes = AFSummaryTypes.Minimum Or AFSummaryTypes.Maximum Or AFSummaryTypes.Average Or AFSummaryTypes.Total 115 Dim timeRange As New AFTimeRange(New AFTime("*-1d"), New AFTime("*")) 116 Return Await attr.Data.SummaryAsync(timeRange:=timeRange, summaryType:=mySummaries, 117 calculationBasis:=AFCalculationBasis.TimeWeighted, 118 timeType:=AFTimestampCalculation.Auto, 119 cancellationToken:=token) 120 Catch ae As AggregateException 121 Console.WriteLine("{0}: {1}", attr.Element.Name, ae.Flatten().InnerException.Message) 122 Return Nothing 123 Catch oe As OperationCanceledException 124 Console.WriteLine("{0}: {1}", attr.Element.Name, oe.Message) 125 Return Nothing 126 End Try 127 End Function).ToArray() 128 129 ' Define a task that completes when all subtasks are complete 130 Dim allTasks As Task(Of IDictionary(Of AFSummaryTypes, AFValue)()) = Task.WhenAll(tasks) 131 132 ' Asynchronously wait for either the summaries or timer task to complete 133 Dim finishedTask As Task = Await Task.WhenAny(allTasks, Task.Delay(timeoutInMilliseconds, tokenForTimer)) 134 If (finishedTask.Equals(allTasks)) Then 135 ' Cancel the timer task 136 ctsForTimer.Cancel() 137 ' Return summaries result 138 Return allTasks.Result 139 Else 140 ' Cancel the summaries task if timeout 141 cts.Cancel() 142 Throw New TimeoutException("The operation has timed out.") 143 End If 144End Function
No code example is currently available or this language may not be supported.
No code example is currently available or this language may not be supported.