Please ensure Javascript is enabled for purposes of website accessibility
Powered by Zoomin Software. For more details please contactZoomin

AF SDK Reference

AFDataPipe Class

  • Last UpdatedNov 18, 2025
  • 14 minute read
AFDataPipe Class
An AFDataPipe is a collection of AFAttribute objects that are signed up for data change events on the server.

Inheritance Hierarchy

SystemObject
  OSIsoft.AF.DataAFDataPipe

Namespace:  OSIsoft.AF.Data
Assembly:  OSIsoft.AFSDK (in OSIsoft.AFSDK.dll) Version: 3.1.1.1182

Syntax

public sealed class AFDataPipe : IDisposable
Public NotInheritable Class AFDataPipe
	Implements IDisposable

Dim instance As AFDataPipe
public ref class AFDataPipe sealed : IDisposable
[<SealedAttribute>]
type AFDataPipe =  
    class
        interface IDisposable
    end

The AFDataPipe type exposes the following members.

Constructors

  NameDescription
Public method
AFDataPipe
Creates an instance of a AFDataPipe.

Properties

  NameDescription
Public property
DataPipeStatistics
Contains statistics for the most recent signup or event scan.
Public property
EventHorizonMode
EventHorizonMode specifies what events are returned by the datapipe at the GetUpdateEvent call based on the timestamp of the events.
Public property
EventHorizonOffset
EventHorizonOffset allows the pipe to return events in the future. The pipe would fetch events with timestamps up to current time + EventHorizonOffset when calling the GetUpdateEvents/GetObserverEvents methods
Public property
IsSuspendedSignupEnabled
If IsSuspendedSignupEnabled is set, the data pipe will suspend signups that are currently invalid but might become valid with changes outside of AF.
Public property
PIServers
Get the list of PIServer that could provide data for the attributes in the pipe

Methods

  NameDescription
Public method
AddSignups
Adds a list of AFAttribute objects to be monitored by the data pipe. The method returns server level and attribute level errors in AFErrorsTKey .
Public method
AddSignupsWithInitEvents
Adds a list of AFAttribute objects to be monitored by the data pipe and returns the initial events for this list of new signup objects. PIserver level and attribute level errors can be accessed in the AFListResultsTKey, TResult
Public method
Dispose
Closes the event pipe
Public method
Equals
Determines whether the specified object is equal to the current object.
(Inherited from Object.)
Public method
GetHashCode
Serves as the default hash function.
(Inherited from Object.)
Public method
GetObserverEvents
Trigger retrieval of new events that occurred on the PIPoint objects monitored by the data pipe. The new events will be sent to the IObserver objects registered with the data pipe.
Public method
GetObserverEvents(Boolean)
Trigger retrieval of new events that occurred on the PIPoint objects monitored by the data pipe. The new events will be sent to the IObserver objects registered with the data pipe.
Public method
GetRelatedAttributes
For each AFChangeInfo, finds the subscribed AFAttributes that might be impacted by the change.
Public method
GetSignups
Get the list of the AFAttribute objects being monitored by the pipe.
Public method
GetSignupsByPIServer
Get the list of the AFAttribute objects being monitored by the pipe that could get data from the passed PIServer.
Public method
GetType
Gets the Type of the current instance.
(Inherited from Object.)
Public method
GetUpdateEvents
Get the list of data change events from the server
Public method
GetUpdateEvents(Boolean)
Get the list of data change events from the server
Public method
ObservePendingChanges
Observes a set of changes from the server and prepares to update the AFDataPipe to reflect them.
Public method
ProcessAppliedChanges
Updates the AFDataPipe to reflect previously observed changes.
Public method
RemoveSignups
Remove a list of AFAttribute objects being monitored by the data pipe. The method returns server level and attribute level errors in AFErrorsTKey .
Public method
Subscribe
Register an IObserver for AFDataPipeEvent with the AFDataPipe. All the AFDataPipeEvents received by the data pipe will be sent to the IObserver.
Public method
ToString
Returns a string that represents the current object.
(Inherited from Object.)

Remarks

A client signs up to be notified of data change events for an AFAttribute by adding it to the AFDataPipe. All signed up attributes in the same event pipe do not all have to be on the same AFDatabase.

The client gets all the data change events since the last call by calling the GetUpdateEvents or GetObserverEventsmethod. Normally event retrieval would be called from a background thread to handle the processing of the changes. This also allows the client to control how often to check for events.

There are two ways to get the data change events: a) direct GetUpdateEvents method to get events; b) through IObserver of AFDataPipeEvent. The IObserver pattern provides significant performance improvement over direct GetUpdateEvents method for high throughput scenario. Application will have to implement the IObserver and register the IObserver with the data pipe via the Subscribe(IObserverAFDataPipeEvent) method.

Note Notes to Callers
This method, property, or class is not available in the legacy .NET 3.5 version of the SDK.

Examples

// This example demonstrates how to create an AFDataPipe
// and how to use it to get new events for an attribute

// Get the Database
PISystems myPISystems = new PISystems();
PISystem myPISystem = myPISystems.DefaultPISystem;
AFDatabase myDB = myPISystem.Databases.DefaultDatabase;
bool bMoreEvents = true;

// Create an Element with some attributes
AFElement myElement = myDB.Elements.Add("MyElement*");
AFAttribute myAttribute1 = myElement.Attributes.Add("MyAttribute1");
AFAttribute myAttribute2 = myElement.Attributes.Add("MyAttribute2");

// Create PIPoints to Update
PIServer piServer = PIServers.GetPIServers().DefaultPIServer;
PIPoint point;
if (!PIPoint.TryFindPIPoint(piServer, "testfloat1", out point))
{
    point = piServer.CreatePIPoint("testfloat1", null);
}

if (!PIPoint.TryFindPIPoint(piServer, "testfloat2", out point))
{
    point = piServer.CreatePIPoint("testfloat2", null);
}

myAttribute1.DefaultUOM = myPISystem.UOMDatabase.UOMs["kelvin"];
myAttribute1.DataReferencePlugIn = AFDataReference.GetPIPointDataReference(myPISystem);
myAttribute1.ConfigString = @"\\%Server%\testfloat1;ReadOnly=false";

myAttribute2.DefaultUOM = myPISystem.UOMDatabase.UOMs["kelvin"];
myAttribute2.DataReferencePlugIn = AFDataReference.GetPIPointDataReference(myPISystem);
myAttribute2.ConfigString = @"\\%Server%\testfloat2;ReadOnly=false";

// Sign up for updates for attributes
AFDataPipe myDataPipe = new AFDataPipe();
myDataPipe.AddSignups(myElement.Attributes);

// Send values to attributes to test out data pipe
AFValue inputValue1 = new AFValue(5, OSIsoft.AF.Time.AFTime.Now);
AFValue inputValue2 = new AFValue(10, OSIsoft.AF.Time.AFTime.Now);

myAttribute1.SetValue(inputValue1);
myAttribute2.SetValue(inputValue2);

Thread.Sleep(500);

// Get updates from pipe and process them (should get two events)
AFListResults<AFAttribute, AFDataPipeEvent> myResults1 = myDataPipe.GetUpdateEvents();

// Do same as above but with more events 
for (int i = 1; i <= 100; i++)
{
    inputValue1 = new AFValue(5 * i, OSIsoft.AF.Time.AFTime.Now);
    inputValue2 = new AFValue(10 * i, OSIsoft.AF.Time.AFTime.Now);
    myAttribute1.SetValue(inputValue1);
    myAttribute2.SetValue(inputValue2);
    Thread.Sleep(100);
}

AFListResults<AFAttribute, AFDataPipeEvent> myResults2;
int numTotalEvents = 0;
int numAttribute1Events = 0;
int numAttribute2Events = 0;
int numUnknownEvents = 0;

// with high throughput pipe, it is possible not all data are returned in one GetUpdateEvents call
// application can loop to get all events or call GetUpdateEvents more frequently.
// However, 200 events are not enough to trigger bMoreEvents to be true. Will take tens of thousands 
// of events between GetUpdateEvents for the datapipe to have unfetched events on the data source server.
while (bMoreEvents)
{
    myResults2 = myDataPipe.GetUpdateEvents(out bMoreEvents);

    foreach (AFDataPipeEvent dpevent in myResults2.Results)
    {
        // Checking for specific attribute and if new snapshot
        if (dpevent.Value.Attribute == myAttribute1 && dpevent.Action == AFDataPipeAction.Add)
            numAttribute1Events++;
        else if (dpevent.Value.Attribute == myAttribute2 && dpevent.Action == AFDataPipeAction.Add)
            numAttribute2Events++;
        else
            numUnknownEvents++;
    }

    numTotalEvents += myResults2.Count;
}
' This example demonstrates how to create the DataReference configuration
' for all attributes with a PIPoint DataReference.

' Get the Database
Dim myPISystems As New PISystems
Dim myPISystem As PISystem = myPISystems.DefaultPISystem
Dim myDB As AFDatabase = myPISystem.Databases.DefaultDatabase
Dim bMoreEvents As Boolean = True

' Create an Element with an Attribute
Dim myElement As AFElement = myDB.Elements.Add("MyElement*")
Dim myAttribute1 As AFAttribute = myElement.Attributes.Add("MyAttribute1")
Dim myAttribute2 As AFAttribute = myElement.Attributes.Add("MyAttribute2")

' Create PIPoints to Update
Dim piServer As PIServer = PIServers.GetPIServers().DefaultPIServer
Dim point As PIPoint = Nothing
If (Not PIPoint.TryFindPIPoint(piServer, "testfloat1", point)) Then
    point = piServer.CreatePIPoint("testfloat1", Nothing)
End If

If (Not PIPoint.TryFindPIPoint(piServer, "testfloat2", point)) Then
    point = piServer.CreatePIPoint("testfloat2", Nothing)
End If

myAttribute1.DefaultUOM = myPISystem.UOMDatabase.UOMs("kelvin")
myAttribute1.DataReferencePlugIn = AFDataReference.GetPIPointDataReference(myPISystem)
myAttribute1.ConfigString = "\\%Server%\testfloat1;ReadOnly=false"

myAttribute2.DefaultUOM = myPISystem.UOMDatabase.UOMs("kelvin")
myAttribute2.DataReferencePlugIn = AFDataReference.GetPIPointDataReference(myPISystem)
myAttribute2.ConfigString = "\\%Server%\testfloat2;ReadOnly=false"

' Sign up for updates for attributes
Dim myDataPipe As New AFDataPipe()
myDataPipe.AddSignups(myElement.Attributes)

' Send values to attributes to test out data pipe
Dim inputValue1 As New AFValue(5, OSIsoft.AF.Time.AFTime.Now)
Dim inputValue2 As New AFValue(10, OSIsoft.AF.Time.AFTime.Now)

myAttribute1.SetValue(inputValue1)
myAttribute2.SetValue(inputValue2)

Thread.Sleep(500)

' Get updates from pipe and process them
Dim myResults1 As AFListResults(Of AFAttribute, AFDataPipeEvent) = myDataPipe.GetUpdateEvents()

' Do same as above but with more events and institute a limit to how much is retrieved per GetUpdateEvents call
For i As Integer = 1 To 100 Step 1
    inputValue1 = New AFValue(5 * i, OSIsoft.AF.Time.AFTime.Now)
    inputValue2 = New AFValue(10 * i, OSIsoft.AF.Time.AFTime.Now)
    myAttribute1.SetValue(inputValue1)
    myAttribute2.SetValue(inputValue2)
    Thread.Sleep(100)
Next i

Dim myResults2 As AFListResults(Of AFAttribute, AFDataPipeEvent)
Dim numTotalEvents As Integer
Dim numAttribute1Events As Integer
Dim numAttribute2Events As Integer
Dim numUnknownEvents As Integer

' with high throughput pipe, it is possible not all data are returned in one GetUpdateEvents call
' application can loop to get all events or call GetUpdateEvents more frequently.
' However, 200 events are not enough to trigger bMoreEvents to be true. Will take tens of thousands 
' of events between GetUpdateEvents for the datapipe to have unfetched events on the data source server.
While bMoreEvents
    myResults2 = myDataPipe.GetUpdateEvents(bMoreEvents)

    For Each dpevent As AFDataPipeEvent In myResults2.Results
        ' Checking for specific attribute and if new snapshot
        If (dpevent.Value.Attribute = myAttribute1 AndAlso dpevent.Action = AFDataPipeAction.Add) Then
            numAttribute1Events += 1
        ElseIf (dpevent.Value.Attribute = myAttribute2 AndAlso dpevent.Action = AFDataPipeAction.Add) Then
            numAttribute2Events += 1
        Else
            numUnknownEvents += 1
        End If
    Next

    numTotalEvents += myResults2.Count
End While

No code example is currently available or this language may not be supported.

No code example is currently available or this language may not be supported.

// This example demonstrates how to create an AFDataPipe
// and how to use it to get new events for an attribute
// from both PI points and a time-series enabled 
// Table Lookup data reference. Only time-series enabled
// table lookups support AFDataPipe.

// Get the Database
PISystems myPISystems = new PISystems();
PISystem myPISystem = myPISystems.DefaultPISystem;
AFDatabase myDB = myPISystem.Databases.DefaultDatabase;
bool bMoreEvents = true;

// Create an Element with some attributes
AFElement myElement = myDB.Elements.Add("MyElement*");
AFAttribute myAttribute1 = myElement.Attributes.Add("MyAttribute1");
AFAttribute myAttribute2 = myElement.Attributes.Add("MyAttribute2");

// Create PIPoints to Update
PIServer piServer = PIServers.GetPIServers().DefaultPIServer;
PIPoint point;
if (!PIPoint.TryFindPIPoint(piServer, "testfloat1", out point))
{
    point = piServer.CreatePIPoint("testfloat1", null);
}

if (!PIPoint.TryFindPIPoint(piServer, "testfloat2", out point))
{
    point = piServer.CreatePIPoint("testfloat2", null);
}

myAttribute1.DefaultUOM = myPISystem.UOMDatabase.UOMs["kelvin"];
myAttribute1.DataReferencePlugIn = AFDataReference.GetPIPointDataReference(myPISystem);
myAttribute1.ConfigString = @"\\%Server%\testfloat1;ReadOnly=false";

myAttribute2.DefaultUOM = myPISystem.UOMDatabase.UOMs["kelvin"];
myAttribute2.DataReferencePlugIn = AFDataReference.GetPIPointDataReference(myPISystem);
myAttribute2.ConfigString = @"\\%Server%\testfloat2;ReadOnly=false";

// Create an AFTable that has a number of values that are only a fraction
// of a second apart in time, so that results will be returned to the 
// AFDataPipe in a short period.

DateTime startTime = DateTime.Now;
DataTable dt = new DataTable();
dt.Columns.Add("id", typeof(UInt64));
dt.Columns.Add("time", typeof(DateTime));
dt.Columns.Add("value", typeof(double));

TimeSpan changeInterval = TimeSpan.FromSeconds(0.10);
DateTime currentTime = startTime;
for (int i = 0; i < 1000; i++)
{
    dt.Rows.Add(i, currentTime, i * 1.00001);
    currentTime += changeInterval;
}
dt.AcceptChanges();

AFTable myTable = myDB.Tables["TestTable"];
if (myTable != null) myDB.Tables.Remove(myTable);
myTable = myDB.Tables.Add("TestTable");
myTable.TimeZone = AFTimeZone.CurrentTimeZone as AFTimeZone;
myTable.Table = dt;

// Create a time series enabled Table Lookup data reference
// that references the AFTable created earlier in this sample
// TC=Time in the ConfigString configures the data reference
// to use the time column and produce time series data from the table
AFAttribute myAttribute3 = myElement.Attributes["FastTable"];
if (myAttribute3 == null) myAttribute3 = myElement.Attributes.Add("FastTable");
myAttribute3.DataReferencePlugIn = myPISystem.DataReferencePlugIns["Table Lookup"];
myAttribute3.ConfigString = "SELECT value FROM TestTable;TC=time";

// Sign up for updates for attributes
AFDataPipe myDataPipe = new AFDataPipe();
myDataPipe.AddSignups(myElement.Attributes);

// Send values to attributes to test out data pipe
AFValue inputValue1 = new AFValue(5, OSIsoft.AF.Time.AFTime.Now);
AFValue inputValue2 = new AFValue(10, OSIsoft.AF.Time.AFTime.Now);

myAttribute1.SetValue(inputValue1);
myAttribute2.SetValue(inputValue2);

Thread.Sleep(500);

// Get updates from pipe and process them 
// should get 2 events from PI and several events from table lookup
// how many table lookup events are returned depends on how long 
// it has taken to run
AFListResults<AFAttribute, AFDataPipeEvent> myResults1 = myDataPipe.GetUpdateEvents();

// Do same as above but with more events 
for (int i = 1; i <= 100; i++)
{
    inputValue1 = new AFValue(5 * i, OSIsoft.AF.Time.AFTime.Now);
    inputValue2 = new AFValue(10 * i, OSIsoft.AF.Time.AFTime.Now);
    myAttribute1.SetValue(inputValue1);
    myAttribute2.SetValue(inputValue2);
    Thread.Sleep(100);
}

AFListResults<AFAttribute, AFDataPipeEvent> myResults2;
int numTotalEvents = 0;
int numAttribute1Events = 0;
int numAttribute2Events = 0;
int numAttribute3Events = 0;
int numUnknownEvents = 0;

// with high throughput pipe, it is possible not all data are returned in one GetUpdateEvents call
// application can loop to get all events or call GetUpdateEvents more frequently.
// However, 200 events are not enough to trigger bMoreEvents to be true. Will take tens of thousands 
// of events between GetUpdateEvents for the datapipe to have unfetched events on the data source server.
while (bMoreEvents)
{
    myResults2 = myDataPipe.GetUpdateEvents(out bMoreEvents);

    foreach (AFDataPipeEvent dpevent in myResults2.Results)
    {
        // Checking for specific attribute and if new snapshot
        if (dpevent.Value.Attribute == myAttribute1 && dpevent.Action == AFDataPipeAction.Add)
            numAttribute1Events++;
        else if (dpevent.Value.Attribute == myAttribute2 && dpevent.Action == AFDataPipeAction.Add)
            numAttribute2Events++;
        else if (dpevent.Value.Attribute == myAttribute3 && dpevent.Action == AFDataPipeAction.Add)
            numAttribute3Events++;
        else
            numUnknownEvents++;
    }

    numTotalEvents += myResults2.Count;
}
' This example demonstrates how to create the DataReference configuration
' for all attributes with a PIPoint DataReferenc
' This example demonstrates how to create an AFDataPipe
' and how to use it to get new events for an attribute
' from both PI points and a time-series enabled 
' Table Lookup data reference. Only time-series enabled
' table lookups support AFDataPipe.

' Get the Database
Dim myPISystems As New PISystems
Dim myPISystem As PISystem = myPISystems.DefaultPISystem
Dim myDB As AFDatabase = myPISystem.Databases.DefaultDatabase
Dim bMoreEvents As Boolean = True

' Create an Element with an Attribute
Dim myElement As AFElement = myDB.Elements.Add("MyElement*")
Dim myAttribute1 As AFAttribute = myElement.Attributes.Add("MyAttribute1")
Dim myAttribute2 As AFAttribute = myElement.Attributes.Add("MyAttribute2")

' Create PIPoints to Update
Dim piServer As PIServer = PIServers.GetPIServers().DefaultPIServer
Dim point As PIPoint = Nothing
If (Not PIPoint.TryFindPIPoint(piServer, "testfloat1", point)) Then
    point = piServer.CreatePIPoint("testfloat1", Nothing)
End If

If (Not PIPoint.TryFindPIPoint(piServer, "testfloat2", point)) Then
    point = piServer.CreatePIPoint("testfloat2", Nothing)
End If

myAttribute1.DefaultUOM = myPISystem.UOMDatabase.UOMs("kelvin")
myAttribute1.DataReferencePlugIn = AFDataReference.GetPIPointDataReference(myPISystem)
myAttribute1.ConfigString = "\\%Server%\testfloat1;ReadOnly=false"

myAttribute2.DefaultUOM = myPISystem.UOMDatabase.UOMs("kelvin")
myAttribute2.DataReferencePlugIn = AFDataReference.GetPIPointDataReference(myPISystem)
myAttribute2.ConfigString = "\\%Server%\testfloat2;ReadOnly=false"

' Create an AFTable that has a number of values that are only a fraction
' of a second apart in time, so that results will be returned to the 
' AFDataPipe in a short period.

Dim startTime As DateTime = DateTime.Now
Dim dt As DataTable = New DataTable()
dt.Columns.Add("id", GetType(UInt64))
dt.Columns.Add("time", GetType(DateTime))
dt.Columns.Add("value", GetType(Double))

Dim changeInterval As TimeSpan = TimeSpan.FromSeconds(0.1)
Dim row As Integer
Dim currentTime As DateTime = startTime
For row = 0 To 1000
    dt.Rows.Add(row, currentTime, row * 1.00001)
    currentTime += changeInterval
Next row
dt.AcceptChanges()

Dim myTable As AFTable = myDB.Tables("TestTable")
If myTable Is Nothing Then myTable = myDB.Tables.Add("TestTable")
myTable.TimeZone = AFTimeZone.CurrentAFTimeZone
myTable.Table = dt

' Create a time series enabled Table Lookup data reference
' that references the AFTable created earlier in this sample
' TC=Time in the ConfigString configures the data reference
' to use the time column and produce time series data from the table
Dim myAttribute3 As AFAttribute = myElement.Attributes("FastTable")
If myAttribute3 Is Nothing Then myAttribute3 = myElement.Attributes.Add("FastTable")
myAttribute3.DataReferencePlugIn = myPISystem.DataReferencePlugIns("Table Lookup")
myAttribute3.ConfigString = "SELECT value FROM TestTable;TC=time"

' Sign up for updates for attributes
Dim myDataPipe As New AFDataPipe()
myDataPipe.AddSignups(myElement.Attributes)

' Send values to attributes to test out data pipe
Dim inputValue1 As New AFValue(5, OSIsoft.AF.Time.AFTime.Now)
Dim inputValue2 As New AFValue(10, OSIsoft.AF.Time.AFTime.Now)

myAttribute1.SetValue(inputValue1)
myAttribute2.SetValue(inputValue2)

Thread.Sleep(500)

' Get updates from pipe and process them 
' should get 2 events from PI and several events from table lookup
' how many table lookup events are returned depends on how long 
' it has taken to run
Dim myResults1 As AFListResults(Of AFAttribute, AFDataPipeEvent) = myDataPipe.GetUpdateEvents()

' Do same as above but with more events and institute a limit to how much is retrieved per GetUpdateEvents call
For i As Integer = 1 To 100 Step 1
    inputValue1 = New AFValue(5 * i, OSIsoft.AF.Time.AFTime.Now)
    inputValue2 = New AFValue(10 * i, OSIsoft.AF.Time.AFTime.Now)
    myAttribute1.SetValue(inputValue1)
    myAttribute2.SetValue(inputValue2)
    Thread.Sleep(100)
Next i

Dim myResults2 As AFListResults(Of AFAttribute, AFDataPipeEvent)
Dim numTotalEvents As Integer
Dim numAttribute1Events As Integer
Dim numAttribute2Events As Integer
Dim numAttribute3Events As Integer
Dim numUnknownEvents As Integer

' with high throughput pipe, it is possible not all data are returned in one GetUpdateEvents call
' application can loop to get all events or call GetUpdateEvents more frequently.
' However, 200 events are not enough to trigger bMoreEvents to be true. Will take tens of thousands 
' of events between GetUpdateEvents for the datapipe to have unfetched events on the data source server.
While bMoreEvents
    myResults2 = myDataPipe.GetUpdateEvents(bMoreEvents)

    For Each dpevent As AFDataPipeEvent In myResults2.Results
        ' Checking for specific attribute and if new snapshot
        If (dpevent.Value.Attribute = myAttribute1 AndAlso dpevent.Action = AFDataPipeAction.Add) Then
            numAttribute1Events += 1
        ElseIf (dpevent.Value.Attribute = myAttribute2 AndAlso dpevent.Action = AFDataPipeAction.Add) Then
            numAttribute2Events += 1
        ElseIf (dpevent.Value.Attribute = myAttribute3 AndAlso dpevent.Action = AFDataPipeAction.Add) Then
            numAttribute3Events += 1
        Else
            numUnknownEvents += 1
        End If
    Next

    numTotalEvents += myResults2.Count
End While

No code example is currently available or this language may not be supported.

No code example is currently available or this language may not be supported.

Version Information

AFSDK

Supported in: 3.1.1, 3.1.0, 3.0.2, 3.0.1, 3.0.0, 2.10.11, 2.10.5, 2.10.0, 2.10, 2.9.5, 2.9, 2.8.5, 2.8, 2.7.5, 2.7, 2.6

See Also

TitleResults for “How to create a CRG?”Also Available in