AFDataPipe Class
- Last UpdatedNov 18, 2025
- 14 minute read
- PI System
- AF SDK 2024 R2
- Developer

Inheritance Hierarchy
OSIsoft.AF.DataAFDataPipe
Namespace: OSIsoft.AF.Data
Assembly: OSIsoft.AFSDK (in OSIsoft.AFSDK.dll) Version: 3.1.1.1182
Syntax
public sealed class AFDataPipe : IDisposable
Public NotInheritable Class AFDataPipe Implements IDisposable Dim instance As AFDataPipe
public ref class AFDataPipe sealed : IDisposable
[<SealedAttribute>] type AFDataPipe = class interface IDisposable end
The AFDataPipe type exposes the following members.
Constructors
| Name | Description | |
|---|---|---|
| AFDataPipe |
Creates an instance of a AFDataPipe.
|
Properties
| Name | Description | |
|---|---|---|
| DataPipeStatistics |
Contains statistics for the most recent signup or event scan.
| |
| EventHorizonMode |
EventHorizonMode specifies what events are returned by the datapipe at the GetUpdateEvent call based on the timestamp of the events.
| |
| EventHorizonOffset |
EventHorizonOffset allows the pipe to return events in the future. The pipe would fetch events with timestamps
up to current time + EventHorizonOffset when calling the GetUpdateEvents/GetObserverEvents methods
| |
| IsSuspendedSignupEnabled |
If IsSuspendedSignupEnabled is set, the data pipe will suspend signups that are currently invalid but might become valid with changes outside of AF.
| |
| PIServers |
Get the list of PIServer that could provide data for the attributes in the pipe
|
Methods
| Name | Description | |
|---|---|---|
| AddSignups |
Adds a list of AFAttribute objects to be monitored by the data pipe.
The method returns server level and attribute level errors in AFErrorsTKey .
| |
| AddSignupsWithInitEvents |
Adds a list of AFAttribute objects to be monitored by the data pipe and returns the
initial events for this list of new signup objects.
PIserver level and attribute level errors can be accessed in the AFListResultsTKey, TResult | |
| Dispose |
Closes the event pipe
| |
| Equals | Determines whether the specified object is equal to the current object. (Inherited from Object.) | |
| GetHashCode | Serves as the default hash function. (Inherited from Object.) | |
| GetObserverEvents |
Trigger retrieval of new events that occurred on the PIPoint objects monitored
by the data pipe. The new events will be sent to the IObserver objects registered with the data pipe.
| |
| GetObserverEvents(Boolean) |
Trigger retrieval of new events that occurred on the PIPoint objects monitored
by the data pipe. The new events will be sent to the IObserver objects registered with the data pipe.
| |
| GetRelatedAttributes |
For each AFChangeInfo, finds the subscribed AFAttributes that might be impacted by the change.
| |
| GetSignups |
Get the list of the AFAttribute objects being monitored by the pipe.
| |
| GetSignupsByPIServer |
Get the list of the AFAttribute objects being monitored by the pipe that could
get data from the passed PIServer.
| |
| GetType | Gets the Type of the current instance. (Inherited from Object.) | |
| GetUpdateEvents |
Get the list of data change events from the server
| |
| GetUpdateEvents(Boolean) |
Get the list of data change events from the server
| |
| ObservePendingChanges |
Observes a set of changes from the server and prepares to update the AFDataPipe to reflect them.
| |
| ProcessAppliedChanges |
Updates the AFDataPipe to reflect previously observed changes.
| |
| RemoveSignups |
Remove a list of AFAttribute objects being monitored by the data pipe.
The method returns server level and attribute level errors in AFErrorsTKey .
| |
| Subscribe |
Register an IObserver for AFDataPipeEvent with the AFDataPipe. All the AFDataPipeEvents
received by the data pipe will be sent to the IObserver.
| |
| ToString | Returns a string that represents the current object. (Inherited from Object.) |
Remarks
A client signs up to be notified of data change events for an AFAttribute by adding it to the AFDataPipe. All signed up attributes in the same event pipe do not all have to be on the same AFDatabase.
The client gets all the data change events since the last call by calling the GetUpdateEvents or GetObserverEventsmethod. Normally event retrieval would be called from a background thread to handle the processing of the changes. This also allows the client to control how often to check for events.
There are two ways to get the data change events: a) direct GetUpdateEvents method to get events; b) through IObserver of AFDataPipeEvent. The IObserver pattern provides significant performance improvement over direct GetUpdateEvents method for high throughput scenario. Application will have to implement the IObserver and register the IObserver with the data pipe via the Subscribe(IObserverAFDataPipeEvent) method.
| This method, property, or class is not available in the legacy .NET 3.5 version of the SDK. |
Examples
// This example demonstrates how to create an AFDataPipe // and how to use it to get new events for an attribute // Get the Database PISystems myPISystems = new PISystems(); PISystem myPISystem = myPISystems.DefaultPISystem; AFDatabase myDB = myPISystem.Databases.DefaultDatabase; bool bMoreEvents = true; // Create an Element with some attributes AFElement myElement = myDB.Elements.Add("MyElement*"); AFAttribute myAttribute1 = myElement.Attributes.Add("MyAttribute1"); AFAttribute myAttribute2 = myElement.Attributes.Add("MyAttribute2"); // Create PIPoints to Update PIServer piServer = PIServers.GetPIServers().DefaultPIServer; PIPoint point; if (!PIPoint.TryFindPIPoint(piServer, "testfloat1", out point)) { point = piServer.CreatePIPoint("testfloat1", null); } if (!PIPoint.TryFindPIPoint(piServer, "testfloat2", out point)) { point = piServer.CreatePIPoint("testfloat2", null); } myAttribute1.DefaultUOM = myPISystem.UOMDatabase.UOMs["kelvin"]; myAttribute1.DataReferencePlugIn = AFDataReference.GetPIPointDataReference(myPISystem); myAttribute1.ConfigString = @"\\%Server%\testfloat1;ReadOnly=false"; myAttribute2.DefaultUOM = myPISystem.UOMDatabase.UOMs["kelvin"]; myAttribute2.DataReferencePlugIn = AFDataReference.GetPIPointDataReference(myPISystem); myAttribute2.ConfigString = @"\\%Server%\testfloat2;ReadOnly=false"; // Sign up for updates for attributes AFDataPipe myDataPipe = new AFDataPipe(); myDataPipe.AddSignups(myElement.Attributes); // Send values to attributes to test out data pipe AFValue inputValue1 = new AFValue(5, OSIsoft.AF.Time.AFTime.Now); AFValue inputValue2 = new AFValue(10, OSIsoft.AF.Time.AFTime.Now); myAttribute1.SetValue(inputValue1); myAttribute2.SetValue(inputValue2); Thread.Sleep(500); // Get updates from pipe and process them (should get two events) AFListResults<AFAttribute, AFDataPipeEvent> myResults1 = myDataPipe.GetUpdateEvents(); // Do same as above but with more events for (int i = 1; i <= 100; i++) { inputValue1 = new AFValue(5 * i, OSIsoft.AF.Time.AFTime.Now); inputValue2 = new AFValue(10 * i, OSIsoft.AF.Time.AFTime.Now); myAttribute1.SetValue(inputValue1); myAttribute2.SetValue(inputValue2); Thread.Sleep(100); } AFListResults<AFAttribute, AFDataPipeEvent> myResults2; int numTotalEvents = 0; int numAttribute1Events = 0; int numAttribute2Events = 0; int numUnknownEvents = 0; // with high throughput pipe, it is possible not all data are returned in one GetUpdateEvents call // application can loop to get all events or call GetUpdateEvents more frequently. // However, 200 events are not enough to trigger bMoreEvents to be true. Will take tens of thousands // of events between GetUpdateEvents for the datapipe to have unfetched events on the data source server. while (bMoreEvents) { myResults2 = myDataPipe.GetUpdateEvents(out bMoreEvents); foreach (AFDataPipeEvent dpevent in myResults2.Results) { // Checking for specific attribute and if new snapshot if (dpevent.Value.Attribute == myAttribute1 && dpevent.Action == AFDataPipeAction.Add) numAttribute1Events++; else if (dpevent.Value.Attribute == myAttribute2 && dpevent.Action == AFDataPipeAction.Add) numAttribute2Events++; else numUnknownEvents++; } numTotalEvents += myResults2.Count; }
' This example demonstrates how to create the DataReference configuration ' for all attributes with a PIPoint DataReference. ' Get the Database Dim myPISystems As New PISystems Dim myPISystem As PISystem = myPISystems.DefaultPISystem Dim myDB As AFDatabase = myPISystem.Databases.DefaultDatabase Dim bMoreEvents As Boolean = True ' Create an Element with an Attribute Dim myElement As AFElement = myDB.Elements.Add("MyElement*") Dim myAttribute1 As AFAttribute = myElement.Attributes.Add("MyAttribute1") Dim myAttribute2 As AFAttribute = myElement.Attributes.Add("MyAttribute2") ' Create PIPoints to Update Dim piServer As PIServer = PIServers.GetPIServers().DefaultPIServer Dim point As PIPoint = Nothing If (Not PIPoint.TryFindPIPoint(piServer, "testfloat1", point)) Then point = piServer.CreatePIPoint("testfloat1", Nothing) End If If (Not PIPoint.TryFindPIPoint(piServer, "testfloat2", point)) Then point = piServer.CreatePIPoint("testfloat2", Nothing) End If myAttribute1.DefaultUOM = myPISystem.UOMDatabase.UOMs("kelvin") myAttribute1.DataReferencePlugIn = AFDataReference.GetPIPointDataReference(myPISystem) myAttribute1.ConfigString = "\\%Server%\testfloat1;ReadOnly=false" myAttribute2.DefaultUOM = myPISystem.UOMDatabase.UOMs("kelvin") myAttribute2.DataReferencePlugIn = AFDataReference.GetPIPointDataReference(myPISystem) myAttribute2.ConfigString = "\\%Server%\testfloat2;ReadOnly=false" ' Sign up for updates for attributes Dim myDataPipe As New AFDataPipe() myDataPipe.AddSignups(myElement.Attributes) ' Send values to attributes to test out data pipe Dim inputValue1 As New AFValue(5, OSIsoft.AF.Time.AFTime.Now) Dim inputValue2 As New AFValue(10, OSIsoft.AF.Time.AFTime.Now) myAttribute1.SetValue(inputValue1) myAttribute2.SetValue(inputValue2) Thread.Sleep(500) ' Get updates from pipe and process them Dim myResults1 As AFListResults(Of AFAttribute, AFDataPipeEvent) = myDataPipe.GetUpdateEvents() ' Do same as above but with more events and institute a limit to how much is retrieved per GetUpdateEvents call For i As Integer = 1 To 100 Step 1 inputValue1 = New AFValue(5 * i, OSIsoft.AF.Time.AFTime.Now) inputValue2 = New AFValue(10 * i, OSIsoft.AF.Time.AFTime.Now) myAttribute1.SetValue(inputValue1) myAttribute2.SetValue(inputValue2) Thread.Sleep(100) Next i Dim myResults2 As AFListResults(Of AFAttribute, AFDataPipeEvent) Dim numTotalEvents As Integer Dim numAttribute1Events As Integer Dim numAttribute2Events As Integer Dim numUnknownEvents As Integer ' with high throughput pipe, it is possible not all data are returned in one GetUpdateEvents call ' application can loop to get all events or call GetUpdateEvents more frequently. ' However, 200 events are not enough to trigger bMoreEvents to be true. Will take tens of thousands ' of events between GetUpdateEvents for the datapipe to have unfetched events on the data source server. While bMoreEvents myResults2 = myDataPipe.GetUpdateEvents(bMoreEvents) For Each dpevent As AFDataPipeEvent In myResults2.Results ' Checking for specific attribute and if new snapshot If (dpevent.Value.Attribute = myAttribute1 AndAlso dpevent.Action = AFDataPipeAction.Add) Then numAttribute1Events += 1 ElseIf (dpevent.Value.Attribute = myAttribute2 AndAlso dpevent.Action = AFDataPipeAction.Add) Then numAttribute2Events += 1 Else numUnknownEvents += 1 End If Next numTotalEvents += myResults2.Count End While
No code example is currently available or this language may not be supported.
No code example is currently available or this language may not be supported.
// This example demonstrates how to create an AFDataPipe // and how to use it to get new events for an attribute // from both PI points and a time-series enabled // Table Lookup data reference. Only time-series enabled // table lookups support AFDataPipe. // Get the Database PISystems myPISystems = new PISystems(); PISystem myPISystem = myPISystems.DefaultPISystem; AFDatabase myDB = myPISystem.Databases.DefaultDatabase; bool bMoreEvents = true; // Create an Element with some attributes AFElement myElement = myDB.Elements.Add("MyElement*"); AFAttribute myAttribute1 = myElement.Attributes.Add("MyAttribute1"); AFAttribute myAttribute2 = myElement.Attributes.Add("MyAttribute2"); // Create PIPoints to Update PIServer piServer = PIServers.GetPIServers().DefaultPIServer; PIPoint point; if (!PIPoint.TryFindPIPoint(piServer, "testfloat1", out point)) { point = piServer.CreatePIPoint("testfloat1", null); } if (!PIPoint.TryFindPIPoint(piServer, "testfloat2", out point)) { point = piServer.CreatePIPoint("testfloat2", null); } myAttribute1.DefaultUOM = myPISystem.UOMDatabase.UOMs["kelvin"]; myAttribute1.DataReferencePlugIn = AFDataReference.GetPIPointDataReference(myPISystem); myAttribute1.ConfigString = @"\\%Server%\testfloat1;ReadOnly=false"; myAttribute2.DefaultUOM = myPISystem.UOMDatabase.UOMs["kelvin"]; myAttribute2.DataReferencePlugIn = AFDataReference.GetPIPointDataReference(myPISystem); myAttribute2.ConfigString = @"\\%Server%\testfloat2;ReadOnly=false"; // Create an AFTable that has a number of values that are only a fraction // of a second apart in time, so that results will be returned to the // AFDataPipe in a short period. DateTime startTime = DateTime.Now; DataTable dt = new DataTable(); dt.Columns.Add("id", typeof(UInt64)); dt.Columns.Add("time", typeof(DateTime)); dt.Columns.Add("value", typeof(double)); TimeSpan changeInterval = TimeSpan.FromSeconds(0.10); DateTime currentTime = startTime; for (int i = 0; i < 1000; i++) { dt.Rows.Add(i, currentTime, i * 1.00001); currentTime += changeInterval; } dt.AcceptChanges(); AFTable myTable = myDB.Tables["TestTable"]; if (myTable != null) myDB.Tables.Remove(myTable); myTable = myDB.Tables.Add("TestTable"); myTable.TimeZone = AFTimeZone.CurrentTimeZone as AFTimeZone; myTable.Table = dt; // Create a time series enabled Table Lookup data reference // that references the AFTable created earlier in this sample // TC=Time in the ConfigString configures the data reference // to use the time column and produce time series data from the table AFAttribute myAttribute3 = myElement.Attributes["FastTable"]; if (myAttribute3 == null) myAttribute3 = myElement.Attributes.Add("FastTable"); myAttribute3.DataReferencePlugIn = myPISystem.DataReferencePlugIns["Table Lookup"]; myAttribute3.ConfigString = "SELECT value FROM TestTable;TC=time"; // Sign up for updates for attributes AFDataPipe myDataPipe = new AFDataPipe(); myDataPipe.AddSignups(myElement.Attributes); // Send values to attributes to test out data pipe AFValue inputValue1 = new AFValue(5, OSIsoft.AF.Time.AFTime.Now); AFValue inputValue2 = new AFValue(10, OSIsoft.AF.Time.AFTime.Now); myAttribute1.SetValue(inputValue1); myAttribute2.SetValue(inputValue2); Thread.Sleep(500); // Get updates from pipe and process them // should get 2 events from PI and several events from table lookup // how many table lookup events are returned depends on how long // it has taken to run AFListResults<AFAttribute, AFDataPipeEvent> myResults1 = myDataPipe.GetUpdateEvents(); // Do same as above but with more events for (int i = 1; i <= 100; i++) { inputValue1 = new AFValue(5 * i, OSIsoft.AF.Time.AFTime.Now); inputValue2 = new AFValue(10 * i, OSIsoft.AF.Time.AFTime.Now); myAttribute1.SetValue(inputValue1); myAttribute2.SetValue(inputValue2); Thread.Sleep(100); } AFListResults<AFAttribute, AFDataPipeEvent> myResults2; int numTotalEvents = 0; int numAttribute1Events = 0; int numAttribute2Events = 0; int numAttribute3Events = 0; int numUnknownEvents = 0; // with high throughput pipe, it is possible not all data are returned in one GetUpdateEvents call // application can loop to get all events or call GetUpdateEvents more frequently. // However, 200 events are not enough to trigger bMoreEvents to be true. Will take tens of thousands // of events between GetUpdateEvents for the datapipe to have unfetched events on the data source server. while (bMoreEvents) { myResults2 = myDataPipe.GetUpdateEvents(out bMoreEvents); foreach (AFDataPipeEvent dpevent in myResults2.Results) { // Checking for specific attribute and if new snapshot if (dpevent.Value.Attribute == myAttribute1 && dpevent.Action == AFDataPipeAction.Add) numAttribute1Events++; else if (dpevent.Value.Attribute == myAttribute2 && dpevent.Action == AFDataPipeAction.Add) numAttribute2Events++; else if (dpevent.Value.Attribute == myAttribute3 && dpevent.Action == AFDataPipeAction.Add) numAttribute3Events++; else numUnknownEvents++; } numTotalEvents += myResults2.Count; }
' This example demonstrates how to create the DataReference configuration ' for all attributes with a PIPoint DataReferenc ' This example demonstrates how to create an AFDataPipe ' and how to use it to get new events for an attribute ' from both PI points and a time-series enabled ' Table Lookup data reference. Only time-series enabled ' table lookups support AFDataPipe. ' Get the Database Dim myPISystems As New PISystems Dim myPISystem As PISystem = myPISystems.DefaultPISystem Dim myDB As AFDatabase = myPISystem.Databases.DefaultDatabase Dim bMoreEvents As Boolean = True ' Create an Element with an Attribute Dim myElement As AFElement = myDB.Elements.Add("MyElement*") Dim myAttribute1 As AFAttribute = myElement.Attributes.Add("MyAttribute1") Dim myAttribute2 As AFAttribute = myElement.Attributes.Add("MyAttribute2") ' Create PIPoints to Update Dim piServer As PIServer = PIServers.GetPIServers().DefaultPIServer Dim point As PIPoint = Nothing If (Not PIPoint.TryFindPIPoint(piServer, "testfloat1", point)) Then point = piServer.CreatePIPoint("testfloat1", Nothing) End If If (Not PIPoint.TryFindPIPoint(piServer, "testfloat2", point)) Then point = piServer.CreatePIPoint("testfloat2", Nothing) End If myAttribute1.DefaultUOM = myPISystem.UOMDatabase.UOMs("kelvin") myAttribute1.DataReferencePlugIn = AFDataReference.GetPIPointDataReference(myPISystem) myAttribute1.ConfigString = "\\%Server%\testfloat1;ReadOnly=false" myAttribute2.DefaultUOM = myPISystem.UOMDatabase.UOMs("kelvin") myAttribute2.DataReferencePlugIn = AFDataReference.GetPIPointDataReference(myPISystem) myAttribute2.ConfigString = "\\%Server%\testfloat2;ReadOnly=false" ' Create an AFTable that has a number of values that are only a fraction ' of a second apart in time, so that results will be returned to the ' AFDataPipe in a short period. Dim startTime As DateTime = DateTime.Now Dim dt As DataTable = New DataTable() dt.Columns.Add("id", GetType(UInt64)) dt.Columns.Add("time", GetType(DateTime)) dt.Columns.Add("value", GetType(Double)) Dim changeInterval As TimeSpan = TimeSpan.FromSeconds(0.1) Dim row As Integer Dim currentTime As DateTime = startTime For row = 0 To 1000 dt.Rows.Add(row, currentTime, row * 1.00001) currentTime += changeInterval Next row dt.AcceptChanges() Dim myTable As AFTable = myDB.Tables("TestTable") If myTable Is Nothing Then myTable = myDB.Tables.Add("TestTable") myTable.TimeZone = AFTimeZone.CurrentAFTimeZone myTable.Table = dt ' Create a time series enabled Table Lookup data reference ' that references the AFTable created earlier in this sample ' TC=Time in the ConfigString configures the data reference ' to use the time column and produce time series data from the table Dim myAttribute3 As AFAttribute = myElement.Attributes("FastTable") If myAttribute3 Is Nothing Then myAttribute3 = myElement.Attributes.Add("FastTable") myAttribute3.DataReferencePlugIn = myPISystem.DataReferencePlugIns("Table Lookup") myAttribute3.ConfigString = "SELECT value FROM TestTable;TC=time" ' Sign up for updates for attributes Dim myDataPipe As New AFDataPipe() myDataPipe.AddSignups(myElement.Attributes) ' Send values to attributes to test out data pipe Dim inputValue1 As New AFValue(5, OSIsoft.AF.Time.AFTime.Now) Dim inputValue2 As New AFValue(10, OSIsoft.AF.Time.AFTime.Now) myAttribute1.SetValue(inputValue1) myAttribute2.SetValue(inputValue2) Thread.Sleep(500) ' Get updates from pipe and process them ' should get 2 events from PI and several events from table lookup ' how many table lookup events are returned depends on how long ' it has taken to run Dim myResults1 As AFListResults(Of AFAttribute, AFDataPipeEvent) = myDataPipe.GetUpdateEvents() ' Do same as above but with more events and institute a limit to how much is retrieved per GetUpdateEvents call For i As Integer = 1 To 100 Step 1 inputValue1 = New AFValue(5 * i, OSIsoft.AF.Time.AFTime.Now) inputValue2 = New AFValue(10 * i, OSIsoft.AF.Time.AFTime.Now) myAttribute1.SetValue(inputValue1) myAttribute2.SetValue(inputValue2) Thread.Sleep(100) Next i Dim myResults2 As AFListResults(Of AFAttribute, AFDataPipeEvent) Dim numTotalEvents As Integer Dim numAttribute1Events As Integer Dim numAttribute2Events As Integer Dim numAttribute3Events As Integer Dim numUnknownEvents As Integer ' with high throughput pipe, it is possible not all data are returned in one GetUpdateEvents call ' application can loop to get all events or call GetUpdateEvents more frequently. ' However, 200 events are not enough to trigger bMoreEvents to be true. Will take tens of thousands ' of events between GetUpdateEvents for the datapipe to have unfetched events on the data source server. While bMoreEvents myResults2 = myDataPipe.GetUpdateEvents(bMoreEvents) For Each dpevent As AFDataPipeEvent In myResults2.Results ' Checking for specific attribute and if new snapshot If (dpevent.Value.Attribute = myAttribute1 AndAlso dpevent.Action = AFDataPipeAction.Add) Then numAttribute1Events += 1 ElseIf (dpevent.Value.Attribute = myAttribute2 AndAlso dpevent.Action = AFDataPipeAction.Add) Then numAttribute2Events += 1 ElseIf (dpevent.Value.Attribute = myAttribute3 AndAlso dpevent.Action = AFDataPipeAction.Add) Then numAttribute3Events += 1 Else numUnknownEvents += 1 End If Next numTotalEvents += myResults2.Count End While
No code example is currently available or this language may not be supported.
No code example is currently available or this language may not be supported.