I started implementing StreamInsight (SI) for Enterprise Project and realized there are not many latest SI articles around. I am sharing with you my experience one post at a time, hope will be beneficial to someone. If you are new to StreamInsight(SI), please refer to Microsoft Article – http://msdn.microsoft.com/en-us/library/hh750618(v=sql.10).aspx , it explains about all available versions of SI, last released version of SI at the time of writing this article is – StreamInsight 2.1.
Please note SI 2.1 has some breaking API changes if you were previously using SI 1.2. Read overview of new changes and backward compatibility in SI 2.1 here – http://msdn.microsoft.com/en-us/library/ee362329(v=sql.111).aspx
I am posting this article with assumption that you know what is SI and can write basic queries in SI using LINQPad. Also, please note that it is highly recommended that you read the StreamInsight Server Concepts before starting development to make yourself familiar with the concepts.
This simple end-to-end code demonstrates the use of an event source (SQL database in this case) and event sink that implement the IEnumerable interface to create a working StreamInsight application example.
The StreamInsight engine is a server that can be embedded (in-memory) or remote (e.g. the Azure Service). We first use Server.Create to create a server instance and return a handle to that instance.
using (Server server = Server.Create(“Instance1”))
{
Application application = server.CreateApplication(“MyFirstApp”);
/*First, define the event source data for the query by issuing a LINQ to Entities query over the Northwind or (database of your choice, like I choose RecipeDB) database. */
using (RecipeDBEntities recipedb = new RecipeDBEntities())
{
// Query all recipes where there is a known Create date, Modification/Update date and UserId for an Active Recipe. It will work the same way on real-time data or past recorded events.
var databaseQuery = from o in recipedb.Table_Recipes
where o.DateModified.HasValue && o.DateCreated.HasValue
&& o.UserId != null
&& o.IsActive.HasValue && o.IsActive.Value == true
orderby o.DateModified.Value
select o;
/*Next, transform the result of the query into a stream of interval event*/
// Transform the query results into a stream of interval events whose start and end
// times are defined by the recipe creation and modification timestamps. Keep track of the UserId.
var streamSource = databaseQuery
.ToIntervalStream(application,
o => IntervalEvent.CreateInsert(
o.DateCreated.Value,
o.DateModified.Value,
new { o.UserId }), AdvanceTimeSettings.IncreasingStartTime);
/*Next, write the time-aware StreamInsight query that is appropriate for the incoming stream of events:*/
// Find time intervals during which more than 3 recipes are in process/updated for a User.
var streamQuery = from o in streamSource
group o by o.UserId into g
from window in g.SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
select new { RecipeCount = window.Count(), UserId = g.Key } into agg
where agg.RecipeCount > 3
select agg;
/*Next, transform the output stream from the query into an enumerable result:*/
// Convert temporal query results into an enumerable result of interval events. This example
// filters out CTI events, and projects the relevant portions of the interval event.
var results = from intervalEvent in streamQuery.ToIntervalEnumerable()
where intervalEvent.EventKind != EventKind.Cti
select new
{
intervalEvent.StartTime,
intervalEvent.EndTime,
intervalEvent.Payload.RecipeCount,
intervalEvent.Payload.UserId
};
/*Consume the results of the query.*/
//Enumerating the results triggers the underlying SQL Server and StreamInsight queries.
foreach (var activeInterval in results)
{
Console.WriteLine(“Between {0} and {1}, {2} recipes were updated by user ‘{3}’ .”,
activeInterval.StartTime,
activeInterval.EndTime,
activeInterval.RecipeCount,
activeInterval.UserId);
}
Console.ReadLine();
}
}
I am hoping to continue posting in this SI series with end objective to creating a StreamInsight (codename – Austin) Azure Service with frontend integration with MVC and SignalR to process CEs and notify a MVC app users of latest results using SignalR. Stay tuned and check back again for any future posts or you can subscribe to post by email facility.