How to integrate Salesforce Streaming API with .NET Core Application?
This post has been written for the developers who are looking to fulfill the knowledge gap of the .NET core and Salesforce integration via streaming and rest api. So today I'm going to explain about the integration of .NET core console application and Salesforce Streaming API which is using CometD event bus.
.NET Core is an open source and cross-platform implementation of the .net framework. It is maintained by Microsoft and the .net core has a vast community of developers over on GitHub.
What is Salesforce Streaming API?
Streaming API has a publisher/subscriber model which enables streaming of events using push technology and provides a subscription mechanism for receiving events in real time. The Streaming API supports multiple types of events. Below I mentioned the event types.
Don't forget to check out: Integrate .NET console application with Salesforce Rest API
PushTopic eventsGeneric eventsPlatform eventsChange Data Capture events. Streaming Event features comparison.
Push Topic Event as publish/subscribe model.
If we need to get data to a client from a server we need to pull data from server via a HTTP request. But in the publish/subscribe model data will push from server to the client. Once we are creating a PushTopic event, we need to specify the information that the client receives with specific criteria. The client then subscribes to the PushTopic channel and it is notified of events that match the PushTopic criteria. In order to the client to receive the notifications , the client should make a connection with the Salesforce via streaming api.
Streaming API uses the Bayeux protocol and CometD, so the client-to-server connection is maintained through long polling.
You can find more information from the Streaming API Developer Guide.
So you have some idea about the Streaming API and push topics. In here we are focusing only for the push topic streaming event in order to subscribe and get notifications from the channel.
"Let's code :)"
Step 1: Create .Net Core console application
Here I'm creating a .net core console application in order to keep it simple. And also I'm using visual studio community 2017 as the IDE in windows. You can use visual studio code if you are going to coding in Linux, Mac or Windows. I'm using Visual studio Community 2017 because I'm fan of it and it's integrated with the Nuget package manager. If you are using visual studio code use need to install Nuget CLI or Nuget extension for visual studio code in order to install packages from Nuget.
Step 2: Install CometD event bus from Nuget package manager.
Now you need to install cometd event and message routing bus in order to subscribe to push topic notifications. Just right click on the project and click on "Manage Nuget Packages". Then you need to browse "Cometd.Bayeux" in the panel and install the package.
Step 3: Get the session Id from Salesforce Rest API
I'm not going to explain the rest api integration in this post because I have explained it earlier post which is explains how to retrieve session Id from Salesforce via the rest api. You can find it from <a "target =_blank" rel="nofollow noreferrer noopener"href="https://wordpress.com/post/force2cloud.wordpress.com/93">here.
Here I have created three new classes as LoginController in LoginController.cs file and Constants in Constants.cs file and AuthResponse in Wrappers.cs file. You can find all the source code at the end of this post.
LoginController class
using Newtonsoft.Json; using System; using System.Collections.Generic; using System.Net.Http; using System.Text; using System.Threading.Tasks; namespace StreamingAPI_Core { class LoginController { public async static Task AsyncAuthRequest() { var content = new FormUrlEncodedContent(new[] { new KeyValuePair<string, string>("grant_type", "password"), new KeyValuePair<string, string>("client_id", Constants.CONSUMER_KEY), new KeyValuePair<string, string>("client_secret", Constants.CONSUMER_SECRET), new KeyValuePair<string, string>("username", Constants.USERNAME), new KeyValuePair<string, string>("password", Constants.PASSWORD + Constants.TOKEN) }); HttpClient _httpClient = new HttpClient(); var request = new HttpRequestMessage { Method = HttpMethod.Post, RequestUri = new Uri(Constants.TOKEN_REQUEST_ENDPOINTURL), Content = content }; var responseMessage = await _httpClient.SendAsync(request).ConfigureAwait(false); var response = await responseMessage.Content.ReadAsStringAsync().ConfigureAwait(false); AuthResponse responseDyn = JsonConvert.DeserializeObject(response); return responseDyn; } } }
Constants class
using System; using System.Collections.Generic; using System.Text; namespace StreamingAPI_Core { class Constants { public static string USERNAME = "REPLACE WITH YOUR USERNAME"; public static string PASSWORD = "REPLACE WITH YOUR PASSWORD"; public static string TOKEN = "REPLACE WITH YOUR SECURITY TOKEN"; public static string CONSUMER_KEY = "REPLACE WITH YOUR CONNECTED APP CONSUMER KEY"; public static string CONSUMER_SECRET = "REPLACE WITH YOUR CONNECTED APP CONSUMER SECRET"; public static string TOKEN_REQUEST_ENDPOINTURL = "https://login.salesforce.com/services/oauth2/token"; public static string TOKEN_REQUEST_QUERYURL = "/services/data/v43.0/query?q=select+Id+,name+from+account+limit+10"; } }
AuthResponse class
using System; using System.Collections.Generic; using System.Text; namespace StreamingAPI_Core { public class AuthResponse { public string access_token { get; set; } public string instance_url { get; set; } public string id { get; set; } public string token_type { get; set; } public string issued_at { get; set; } public string signature { get; set; } } }
Program class
using System; using System.Net; using System.Threading.Tasks; namespace StreamingAPI_Core { class Program { static void Main(string[] args) { ServicePointManager.SecurityProtocol = SecurityProtocolType.Tls12; Task authResponse = Task.Run(() => LoginController.AsyncAuthRequest()); authResponse.Wait(); if (authResponse.Result != null) { // logic will implement in here } } } }
The authResponse object will retrieve the response from Salesforce once we passing the authentication parameters.
Step 4: Create Listener to receive the response from Salesforce if any event happens.
Listener class will listen for the events form the subscribed channel. I'm implementing the code from the listener in order to understand the process easily. Here I'm implementing IMessageListener interface from the CometD in order to grab the callback invoked when a message is received on to the given channel. So the response will be written in the console.
Listener class
using CometD.NetCore.Bayeux; using CometD.NetCore.Bayeux.Client; using System; using System.Collections.Generic; using System.Text; namespace StreamingAPI_Core { class Listener : IMessageListener { public void OnMessage(IClientSessionChannel channel, IMessage message) { var convertedJson = message.Json; Console.WriteLine(convertedJson); } } }
Step 5: Create push topic named "LeadUpdates" in Salesforce in order to listen lead object modifications.
Now we need to create a push topic in Salesforce from the anonymous window in the developer console or any place which you can execute a code block. You can create a push topic with the rest API as well.
PushTopic pushTopic = new PushTopic(); pushTopic.Name = 'LeadUpdates'; pushTopic.Query = 'SELECT Id, Name FROM Lead'; pushTopic.ApiVersion = 43.0; insert pushTopic;
This push topic will trigger an event when lead object created, lead object deleted, lead object undeleted or lead object updated. Make sure the API version you gives in here should be same as the API version in the cometd url.
Step 6: Create PushTopicConnection class to subscribe to the channel
using CometD.NetCore.Client; using System; using System.Collections.Generic; using System.Text; namespace StreamingAPI_Core { class PushTopicConnection { BayeuxClient _bayeuxClient = null; string channel = "/topic/LeadUpdates"; public PushTopicConnection(BayeuxClient bayeuxClient) { _bayeuxClient = bayeuxClient; } public void Connect() { _bayeuxClient.Handshake(); _bayeuxClient.WaitFor(1000, new[] { BayeuxClient.State.CONNECTED }); _bayeuxClient.GetChannel(channel).Subscribe(new Listener()); Console.WriteLine("Waiting event from salesforce for the push topic " + channel.ToString()); } public void Disconect() { _bayeuxClient.Disconnect(); _bayeuxClient.WaitFor(1000, new[] { BayeuxClient.State.DISCONNECTED }); } } }
Here I have created two methods for creating the connection and delete the connection. Once you are creating the connection with the streaming api, we should handshake with the API first and then we should subscribe to the channel which we have created as a push topic.
Step 7: Implement the connection with the CometD.
using System; using System.Collections.Generic; using System.Collections.Specialized; using System.Net; using System.Threading.Tasks; using CometD.NetCore.Client; using CometD.NetCore.Client.Transport; namespace StreamingAPI_Core { class Program { static void Main(string[] args) { ServicePointManager.SecurityProtocol = SecurityProtocolType.Tls12; Task authResponse = Task.Run(() => LoginController.AsyncAuthRequest()); authResponse.Wait(); if (authResponse.Result != null) { ServicePointManager.SecurityProtocol = SecurityProtocolType.Tls12; try { int readTimeOut = 120000; string streamingEndpointURI = "/cometd/43.0"; var options = new Dictionary<String, Object> { { ClientTransport.TIMEOUT_OPTION, readTimeOut } }; NameValueCollection collection = new NameValueCollection(); collection.Add(HttpRequestHeader.Authorization.ToString(), "OAuth " + authResponse.Result.access_token); var transport = new LongPollingTransport(options, new NameValueCollection { collection }); var serverUri = new Uri(authResponse.Result.instance_url); String endpoint = String.Format("{0}://{1}{2}", serverUri.Scheme, serverUri.Host, streamingEndpointURI); var bayeuxClient = new BayeuxClient(endpoint, new[] { transport }); var pushTopicConnection = new PushTopicConnection(bayeuxClient); pushTopicConnection.Connect(); //Close the connection Console.WriteLine("Press any key to shut down.\n"); Console.ReadKey(); pushTopicConnection.Disconect(); Console.ReadKey(); } catch (Exception e) { Console.WriteLine(e.Message); } } } } }
Here what I'm doing is, setting the auth_token in to the header and create the push topic connection with Salesforce and listening to the channel.
Step 8: Generate the response wrapper class from visual studio and DeSerialize the response.
So now you need to build and run the .net core console application. If everything went good you won't have any errors. The next step would be the testing the channel.
Just make a debug point in the OnMessage method in the Listener class and create a lead in your Salesforce org. As soon as you create/update or delete a lead record , the debugger will hit in the debug point. Then you need to hover in the convertedJson variable and click on the "Text Visualizer" (Search Icon). Then after you will be end up with the below screen.
Copy the text and close the debugger. And then navigate to Edit -> Paste Special -> Paste JSON as Classes. You can see the generated classes from visual studio in to the cursor point. Visual studio makes the life more easier. 🙂
You can see below generated classes at your cursor point.
public class Rootobject{ public Data data { get; set; } public string channel { get; set; } } public class Data{ public Event _event { get; set; } public Sobject sobject { get; set; } } public class Event{ public DateTime createdDate { get; set; } public int replayId { get; set; } public string type { get; set; } } public class Sobject{ public string Id { get; set; } public string Name { get; set; } }
Cut and paste those classes in the Wrappers.cs file. You can find the full class in below.
using System; using System.Collections.Generic; using System.Text; namespace StreamingAPI_Core{ public class AuthResponse{ public string access_token { get; set; } public string instance_url { get; set; } public string id { get; set; } public string token_type { get; set; } public string issued_at { get; set; } public string signature { get; set; } } public class Rootobject{ public Data data { get; set; } public string channel { get; set; } } public class Data{ public Event _event { get; set; } public Sobject sobject { get; set; } } public class Event{ public DateTime createdDate { get; set; } public int replayId { get; set; } public string type { get; set; } } public class Sobject{ public string Id { get; set; } public string Name { get; set; } } }
Next, we should deserialize the response by modifying the OnMessage method in the Listener class as below.
using CometD.NetCore.Bayeux; using CometD.NetCore.Bayeux.Client; using Newtonsoft.Json; using System; using System.Collections.Generic; using System.Text; namespace StreamingAPI_Core{ class Listener : IMessageListener{ public void OnMessage(IClientSessionChannel channel, IMessage message){ var convertedJson = message.Json; var obj = JsonConvert.DeserializeObject(convertedJson); Console.WriteLine(convertedJson); Console.WriteLine(obj.data.sobject.Id + " " + obj.data.sobject.Name); } } }
Now all the needed code is implemented in the console app in order to grab the response from the salesforce via streaming api. You can test the events by creating / updating or deleting the lead objects.
Thanks and Welcome your comments and suggestions.
This is a simple implementation of the CometD and .NET core integration for the Salesforce streaming api. You can find the source code from here https://github.com/prasadruwanpathirana/sfdc-streamingapi-netcore.git
How do we handle the case when the Access token expires? Since the Access Token is short lived (typically an hour or 2).