From 20348666d322e229071945263aec7a023aea79c7 Mon Sep 17 00:00:00 2001 From: Alexander Sloutsky <17802887+sloutsky@users.noreply.github.com> Date: Thu, 30 Jan 2025 19:30:21 +0200 Subject: [PATCH 01/20] Update dynamic.md - better describe the limitations --- data-explorer/kusto/query/scalar-data-types/dynamic.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data-explorer/kusto/query/scalar-data-types/dynamic.md b/data-explorer/kusto/query/scalar-data-types/dynamic.md index 7622f5f39a..a89befb059 100644 --- a/data-explorer/kusto/query/scalar-data-types/dynamic.md +++ b/data-explorer/kusto/query/scalar-data-types/dynamic.md @@ -18,9 +18,9 @@ The `dynamic` scalar data type can be any of the following values: > [!NOTE] > -> * Values of type `dynamic` are limited to 1MB (2^20), uncompressed. If a cell value in a record exceeds 1MB, the value is dropped and ingestion succeeds. You can increase the `MaxValueSize` of the column by changing its [encoding policy](../../management/alter-encoding-policy.md). > * Although the `dynamic` type appears JSON-like, it can hold values that the JSON model doesn't represent because they don't exist in JSON (e.g. `long`, `real`, `datetime`, `timespan`, and `guid`). Therefore, in serializing `dynamic` values into a JSON representation, values that JSON can't represent are serialized into `string` values. Conversely, Kusto will parse strings as strongly-typed values if they can be parsed as such. This applies to `datetime`, `real`, `long`, and `guid` types. For more information on the JSON object model, see [json.org](https://json.org/). > * Kusto doesn't attempt to preserve the order of name-to-value mappings in a property bag, and so you can't assume the order to be preserved. It's entirely possible for two property bags with the same set of mappings to yield different results when they are represented as `string` values, for example. +> * At ingestion: Values of `dynamic` values are limited by default to 1MiB (2^20), uncompressed. You can increase the `MaxValueSize` of the column by changing its [encoding policy](../../management/alter-encoding-policy.md) and increase it up-to 32MiB. ## Dynamic literals From 760c4b37d4bdea1daee36f2fb6684d3bb1b26b06 Mon Sep 17 00:00:00 2001 From: Ed Baynash <105771311+EdB-MSFT@users.noreply.github.com> Date: Tue, 4 Feb 2025 12:37:15 +0200 Subject: [PATCH 02/20] New Article: Ingest data by streaming ingestion --- .../app-managed-streaming-ingest.md | 252 ++++++++++++++++++ 1 file changed, 252 insertions(+) create mode 100644 data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md diff --git a/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md b/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md new file mode 100644 index 0000000000..a31b34eaa5 --- /dev/null +++ b/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md @@ -0,0 +1,252 @@ +--- +title: Create an app to ingest data by streaming ingestion using Kusto’s batching manager +description: Learn how to create an app to ingest data from a file, stream, or blob streaming ingestion by queuing it to Kusto’s batching manager. +ms.reviewer: yogilad +ms.service: data-explorer +ms.topic: how-to +ms.date: 02/03/2025 +monikerRange: "azure-data-explorer" +#customer intent: To learn about creating an app injest using Kusto’s batching manager and streaming ingestion. +--- +# Create an app for streaming ingestion using Kusto’s batching manager + +> [!INCLUDE [applies](../../includes/applies-to-version/applies.md)] [!INCLUDE [fabric](../../includes/applies-to-version/fabric.md)] [!INCLUDE [azure-data-explorer](../../includes/applies-to-version/azure-data-explorer.md)] + +Streaming Ingestion allows writing data to Kusto with near-real-time latencies. It’s also useful when writing small amounts of data to a large number of tables, making batching inefficient. + +In this article you’ll learn how to ingest data to Kusto by queuing it to Kusto’s batching manager. +You will ingest a data stream in the form of a file, stream or blob. + +> [NOTE!] +> Streaming ingestion is a high velocity ingestion protocol. Streaming Ingestion isn't the same as `IngestFromStream`. +> `IngestFromStream` is an API that takes in a memory stream and sends it for ingestion. It is available for all ingestion client implementations including queued and Streaming ingestion. + +## Streaming and Managed Streaming + +Kusto SDKs provide two flavors of Streaming Ingestion Clients, `StreamingIngestionClient` and `ManagedStreamingIngestionClient` where Managed Streaming has built-in retry and failover logic. + +The following applies when using data ingesting with managed streaming: + ++ Streaming requests that fail due to server side size limitations are failed-over to queued ingestion. ++ Data that's larger then 4MB is automatically sent to queued ingest, regardless of format or compression. ++ Transient failure, for example throttling, are be retried 3 times, then moved to queued ingestion. ++ Permanent failures are not be retired. + +## Limitations + +Data Streaming has some limitations compared to queuing data for ingestion. +• Tags can not be set on data +• Mapping can only be provided by Mapping Reference. Inline Mapping is not supported. +• The payload sent in the request can not exceed 10MBs (regardless of format or compression). + +For mor information, see [Streaming Limitations](../../../ingest-data-streaming.md#limitations). + +## Prerequisites + ++ A Kusto cluster where you have database User or higher rights. You cam provision a free Kusto cluster in . +Prerequisites: ++ Download the file [stormevents.csv](where do we store this ???)and place it in a folder next to your script. + +## Before you begin + +Before creating the app, you need to do the following + +1. Configure streaming ingestion on your Azure Data Explorer cluster. +1. Create a Kusto table to ingest the data into. +1. Enable the streaming ingestion policy on the table. + +## Configure streaming ingestion + +To configure streaming ingestion on your Azure Data Explorer cluster, see [Configure streaming ingestion on your Azure Data Explorer cluster](/azure/data-explorer/ingest-data-streaming?tabs=azure-portal%2Ccsharp) + +### Create a Kusto table + +Run the commands below on your database via Kusto Explorer (Desktop) or Kusto Web Explorer. + +1. Create a Table Called Storm Events + +```kql +.create table MyStormEvents (StartTime:datetime, EndTime:datetime, State:string, DamageProperty:int, DamageCrops:int, Source:string, StormSummary:dynamic) +``` + +### Enable the streaming ingestion policy + +Enable streaming ingestion on the table or on the entire database using one of the following commands: + +```kql +.alter table policy streamingingestion enable + +.alter database policy streamingingestion enable +``` + +For more information about streaming policy, see [Streaming ingestion policy - Azure Data Explorer & Real-Time Analytics](../../../kusto//management/streaming-ingestion-policy.md) + +# Create a basic client application + +Create a basic client application which connects to the Kusto Help cluster. +Enter the cluster query and ingest URI and database name in the relevant variables. + +The code sample includes a service function `print_result_as_value_list()` for printing query results + +```python +import os +import time +import io + +from azure.kusto.data import KustoClient, KustoConnectionStringBuilder, ClientRequestProperties, DataFormat +from azure.kusto.ingest import QueuedIngestClient, FileDescriptor, StreamDescriptor, BlobDescriptor, IngestionProperties + + +def print_result_as_value_list(result): + row_count = 1 + + # create a list of columns + cols = list((col.column_name for col in result.primary_results[0].columns)) + + # print the values for each row + for row in result.primary_results[0]: + if row_count > 1: + print("######################") + + print("row", row_count, ":") + for col in cols: + print("\t", col, "-", row[col]) + + +def main(): + # Connect to the public access Help cluster + file_path = os.curdir + "/stormevents.csv" + cluster_url = "" + ingestion_url = "" + database_name = " " + table_name = "MyStormEvents" + cluster_kcsb = KustoConnectionStringBuilder.with_interactive_login(cluster_url) + ingestion_kcsb = KustoConnectionStringBuilder.with_interactive_login(ingestion_url) + + with KustoClient(cluster_kcsb) as kusto_client: + with ManagedStreamingIngestClient(cluster_kcsb, ingestion_kcsb) as ingest_client: + # with KustoStreamingIngestClient(cluster_kcsb) as ingest_client: + + print("Number of rows in " + table_name) + result = kusto_client.execute_query(database_name, table_name + " | count") + print_result_as_value_list(result) + + +main() +``` + +## Stream a file for ingestion + +Let’s use the `ingest_from_file()` API to ingest stormevents.csv. +Place stormevents.csv file in the same location as your script. Since our CSV file contains a header row we use `ignore_first_record=True` to ignore it. + +Add and ingestion section using the following lines to the end of main(). + +```python +# ingestion section +print("Ingesting data from a file") +ingest_props = IngestionProperties(database_name, table_name, DataFormat.CSV, ignore_first_record=True) +ingest_client.ingest_from_file(file_path, ingest_props) +``` + +Let’s also query the new number of rows and the most recent row. +Add the following lines after the ingestion command: + + +```python +print("New number of rows in " + table_name) +result = kusto_client.execute_query(database_name, table_name + " | count") +print_result_as_value_list(result) + +print("Example line from " + table_name) +result = kusto_client.execute_query(database_name, table_name + " | top 1 by EndTime") +print_result_as_value_list(result) +``` + +Run the script from the directory where the script and stormevents.csv are located. Alternatively, you can specify the full path to the file. replacing `file_path = os.curdir + "/stormevents.csv"` with `file_path = ""` + +The first time your run the application the results will be as follows: + +```plaintext +Number of rows in MyStormEvents +row 1 : + Count - 0 +Ingesting data from a file +New number of rows in MyStormEvents +row 1 : + Count - 1001 +Example line from MyStormEvents +row 1 : + StartTime - 2007-12-31 11:15:00+00:00 + EndTime - 2007-12-31 13:21:00+00:00 + State - HAWAII + DamageProperty - 0 + DamageCrops - 0 + Source - COOP Observer + StormSummary - {'TotalDamages': 0, 'StartTime': '2007-12-31T11:15:00.0000000Z', 'EndTime': '2007-12-31T13:21:00.0000000Z', 'Details': {'Description': 'Heavy showers caused flash flooding in the eastern part of Molokai. Water was running over the bridge at Halawa Valley.', 'Location': 'HAWAII'}} +``` + +### Stream in-memory data for ingestion + +To ingest data from memory, create a stream containing the data for ingestion and call the `ingest_from_stream()` API. + +Replace the ingestion section code with the following: + +```python +print("Ingesting data from memory") +single_line = '2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,"{}"' +string_stream = io.StringIO(single_line) +ingest_props = IngestionProperties(database_name, table_name, DataFormat.CSV) +# when possible provide the size of the raw data +stream_descriptor = StreamDescriptor(string_stream, is_compressed=False, size=len(single_line)) +ingest_client.ingest_from_stream(stream_descriptor, ingest_props) +``` + +The results will be as follows: + +```plaintext +Number of rows in MyStormEvents +row 1 : + Count - 0 + +Ingesting data from memory + +New number of rows in MyStormEvents +row 1 : + Count - 1 + +Example line from MyStormEvents +row 1 : + StartTime - 2018-01-26 00:00:00+00:00 + EndTime - 2018-01-27 14:00:00+00:00 + State - MEXICO + DamageProperty - 0 + DamageCrops - 0 + Source - Unknown + StormSummary - {} +``` + +### Stream a blob for ingestion + +Kusto supports ingestion from Azure Storage blobs, Azure Data Lake files and Amazon S3 files. + +When you send a blob for streaming, the client only sends the blob reference to the database, and the data is actually read once by the service itself. Read Access to the blob can be granted with keys, SAS tokens or managed identities attached to the Kusto Cluster. + +For this section you’ll need to upload the sample csv file to your storage account and generate a URI with built-in read permissions (e.g. via SAS) to provide to the code sample. For information on uploading a file to blob storage, see [Upload, download, and list blobs with the Azure portal](/azure/storage/blobs/storage-quickstart-blobs-portal). For information on generation an SAS URL, see [Generate a SAS token](kusto/api/connection-strings/generate-sas-token?view=azure-data-explorer&preserve-view=true). + +Replace the ingestion section code with the following section: + +```python +print("Ingesting data from an existing blob") +blob_descriptor = BlobDescriptor("") +ingest_props = IngestionProperties(database_name, table_name, DataFormat.CSV, ignore_first_record=True) +ingest_client.ingest_from_blob(blob_descriptor, ingest_props) +``` + +> [NOTE!] +> You can also use a managed identity based authorization as an alternative to SAS or account keys in Azure Storage and Azure Data Lake. For more information, see [Ingest data using managed identity authentication](/azure/data-explorer/ingest-data-managed-identity) + + +Resources: ++ Kusto Python Git Hub repository [https://github.com/Azure/azure-kusto-python] ++ Python Sample App Wizard [https://dataexplorer.azure.com/oneclick/generatecode?programingLang=Python] From f7e7e3011da432d35db4e6c1f18408b6992a5e72 Mon Sep 17 00:00:00 2001 From: Ed Baynash <105771311+EdB-MSFT@users.noreply.github.com> Date: Tue, 4 Feb 2025 20:40:42 +0200 Subject: [PATCH 03/20] added python --- .../app-managed-streaming-ingest.md | 96 ++++++++++++++++++- 1 file changed, 95 insertions(+), 1 deletion(-) diff --git a/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md b/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md index a31b34eaa5..3c886e9771 100644 --- a/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md +++ b/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md @@ -46,6 +46,7 @@ For mor information, see [Streaming Limitations](../../../ingest-data-streaming. + A Kusto cluster where you have database User or higher rights. You cam provision a free Kusto cluster in . Prerequisites: + Download the file [stormevents.csv](where do we store this ???)and place it in a folder next to your script. ++ [Set up your development environment](/kusto/api/get-started/app-set-up?view=azure-data-explorer) to use the Kusto client library. ## Before you begin @@ -54,6 +55,7 @@ Before creating the app, you need to do the following 1. Configure streaming ingestion on your Azure Data Explorer cluster. 1. Create a Kusto table to ingest the data into. 1. Enable the streaming ingestion policy on the table. +1. Download the [stormevent.csv](https://github.com/MicrosoftDocs/dataexplorer-docs-samples/blob/main/docs/resources/app-basic-ingestion/stormevents.csv) sample data file containing 1,000 storm event records. ## Configure streaming ingestion @@ -86,6 +88,75 @@ For more information about streaming policy, see [Streaming ingestion policy - A Create a basic client application which connects to the Kusto Help cluster. Enter the cluster query and ingest URI and database name in the relevant variables. + +### [C#](#tab/c-sharp) + +```C# +using System; +using Kusto.Data; +using Kusto.Data.Net.Client; +using Kusto.Ingest; +using Kusto.Data.Common; +using Microsoft.Identity.Client; +using System.Data; + +class Program +{ + static void Main(string[] args) + { + var tableName = "MyStormEvents"; + var cluster_url = ""; + var ingestion_url = ""; + var database_name = " "; + + + var clusterKcsb = new KustoConnectionStringBuilder(cluster_url).WithAadUserPromptAuthentication(); + var ingestionKcsb = new KustoConnectionStringBuilder(ingestion_url).WithAadUserPromptAuthentication();; + + using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb)) + using (var ingestClient = KustoIngestFactory.CreateManagedStreamingIngestClient(clusterKcsb, ingestionKcsb)) + { + string SASURI =""; + + + Console.WriteLine("Number of rows in " + tableName); + var queryProvider = KustoClientFactory.CreateCslQueryProvider(clusterKcsb); + var result = kustoClient.ExecuteQuery(databaseName, tableName + " | count", new ClientRequestProperties()); + + PrintResultAsValueList(result); + + //Ingestion section + Console.WriteLine("Ingesting data from a file"); + var ingestProps = new KustoIngestionProperties(databaseName, tableName) + { + Format = DataSourceFormat.csv, + IgnoreFirstRecord = true + }; + ingestClient.IngestFromStorageAsync(SASURI, ingestProps).Wait(); + + Console.WriteLine("Example line from " + tableName); + result = kustoClient.ExecuteQuery(databaseName, tableName + " | top 1 by EndTime", new ClientRequestProperties()); + PrintResultAsValueList(result); + } + } + + + static void PrintResultAsValueList(IDataReader result) + { + while (result.Read()) + { + for (int i = 0; i < result.FieldCount; i++) + { + Console.Write(result.GetValue(i) + "\t"); + } + Console.WriteLine(); + } + } +} +``` + +### [Python](#tab/python) + The code sample includes a service function `print_result_as_value_list()` for printing query results ```python @@ -94,7 +165,7 @@ import time import io from azure.kusto.data import KustoClient, KustoConnectionStringBuilder, ClientRequestProperties, DataFormat -from azure.kusto.ingest import QueuedIngestClient, FileDescriptor, StreamDescriptor, BlobDescriptor, IngestionProperties +from azure.kusto.ingest import QueuedIngestClient, FileDescriptor, StreamDescriptor, BlobDescriptor, IngestionProperties, ManagedStreamingIngestClient def print_result_as_value_list(result): @@ -135,6 +206,8 @@ def main(): main() ``` +--- + ## Stream a file for ingestion Let’s use the `ingest_from_file()` API to ingest stormevents.csv. @@ -142,6 +215,11 @@ Place stormevents.csv file in the same location as your script. Since our CSV fi Add and ingestion section using the following lines to the end of main(). +### [C#](#tab/c-sharp) + + +### [Python](#tab/python) + ```python # ingestion section print("Ingesting data from a file") @@ -186,12 +264,19 @@ row 1 : StormSummary - {'TotalDamages': 0, 'StartTime': '2007-12-31T11:15:00.0000000Z', 'EndTime': '2007-12-31T13:21:00.0000000Z', 'Details': {'Description': 'Heavy showers caused flash flooding in the eastern part of Molokai. Water was running over the bridge at Halawa Valley.', 'Location': 'HAWAII'}} ``` +--- + ### Stream in-memory data for ingestion To ingest data from memory, create a stream containing the data for ingestion and call the `ingest_from_stream()` API. Replace the ingestion section code with the following: +### [C#](#tab/c-sharp) + + +### [Python](#tab/python) + ```python print("Ingesting data from memory") single_line = '2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,"{}"' @@ -226,6 +311,8 @@ row 1 : StormSummary - {} ``` +--- + ### Stream a blob for ingestion Kusto supports ingestion from Azure Storage blobs, Azure Data Lake files and Amazon S3 files. @@ -236,6 +323,11 @@ For this section you’ll need to upload the sample csv file to your storage acc Replace the ingestion section code with the following section: +### [C#](#tab/c-sharp) + + +### [Python](#tab/python) + ```python print("Ingesting data from an existing blob") blob_descriptor = BlobDescriptor("") @@ -243,6 +335,8 @@ ingest_props = IngestionProperties(database_name, table_name, DataFormat.CSV, ig ingest_client.ingest_from_blob(blob_descriptor, ingest_props) ``` +--- + > [NOTE!] > You can also use a managed identity based authorization as an alternative to SAS or account keys in Azure Storage and Azure Data Lake. For more information, see [Ingest data using managed identity authentication](/azure/data-explorer/ingest-data-managed-identity) From edf90d730fb35cd0f512a1cdfd9bbe05cd5b02ee Mon Sep 17 00:00:00 2001 From: Ed Baynash <105771311+EdB-MSFT@users.noreply.github.com> Date: Wed, 5 Feb 2025 12:48:58 +0200 Subject: [PATCH 04/20] wip --- .../app-managed-streaming-ingest.md | 170 +++++++++++++----- 1 file changed, 124 insertions(+), 46 deletions(-) diff --git a/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md b/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md index 3c886e9771..45dbfa6fb9 100644 --- a/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md +++ b/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md @@ -7,7 +7,9 @@ ms.topic: how-to ms.date: 02/03/2025 monikerRange: "azure-data-explorer" #customer intent: To learn about creating an app injest using Kusto’s batching manager and streaming ingestion. + --- + # Create an app for streaming ingestion using Kusto’s batching manager > [!INCLUDE [applies](../../includes/applies-to-version/applies.md)] [!INCLUDE [fabric](../../includes/applies-to-version/fabric.md)] [!INCLUDE [azure-data-explorer](../../includes/applies-to-version/azure-data-explorer.md)] @@ -83,7 +85,7 @@ Enable streaming ingestion on the table or on the entire database using one of t For more information about streaming policy, see [Streaming ingestion policy - Azure Data Explorer & Real-Time Analytics](../../../kusto//management/streaming-ingestion-policy.md) -# Create a basic client application +## Create a basic client application Create a basic client application which connects to the Kusto Help cluster. Enter the cluster query and ingest URI and database name in the relevant variables. @@ -91,6 +93,8 @@ Enter the cluster query and ingest URI and database name in the relevant variabl ### [C#](#tab/c-sharp) +The code sample includes a service function `PrintResultAsValueList()` for printing query results + ```C# using System; using Kusto.Data; @@ -116,38 +120,29 @@ class Program using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb)) using (var ingestClient = KustoIngestFactory.CreateManagedStreamingIngestClient(clusterKcsb, ingestionKcsb)) { - string SASURI =""; - - + + Console.WriteLine("Number of rows in " + tableName); var queryProvider = KustoClientFactory.CreateCslQueryProvider(clusterKcsb); var result = kustoClient.ExecuteQuery(databaseName, tableName + " | count", new ClientRequestProperties()); PrintResultAsValueList(result); - - //Ingestion section - Console.WriteLine("Ingesting data from a file"); - var ingestProps = new KustoIngestionProperties(databaseName, tableName) - { - Format = DataSourceFormat.csv, - IgnoreFirstRecord = true - }; - ingestClient.IngestFromStorageAsync(SASURI, ingestProps).Wait(); - - Console.WriteLine("Example line from " + tableName); - result = kustoClient.ExecuteQuery(databaseName, tableName + " | top 1 by EndTime", new ClientRequestProperties()); - PrintResultAsValueList(result); + } } static void PrintResultAsValueList(IDataReader result) { + var row=0; while (result.Read()) - { + { + row ++; + Console.WriteLine("row:" + row.ToString() + "\t"); for (int i = 0; i < result.FieldCount; i++) { - Console.Write(result.GetValue(i) + "\t"); + + Console.WriteLine("\t"+ result.GetName(i)+" - " + result.GetValue(i) ); } Console.WriteLine(); } @@ -155,6 +150,39 @@ class Program } ``` +## Stream a file for ingestion + +Use the `IngestFromStorageAsync` method to ingest the *stormevents.csv* file. + +Copy *stormevents.csv* file in the same location as your script. Since our CSV file contains a header row, use `IgnoreFirstRecord=True` to ignore the header. + +Add and ingestion section using the following lines to the end of `Main()`. + +```csharp + var ingestProps = new KustoIngestionProperties(databaseName, tableName) + { + Format = DataSourceFormat.csv, + IgnoreFirstRecord = true + }; + + //Ingestion section + Console.WriteLine("Ingesting data from a file"); + ingestClient.IngestFromStorageAsync(".\\stormevents.csv", ingestProps).Wait(); +``` + +Let’s also query the new number of rows and the most recent row after the ingestion. +Add the following lines after the ingestion command: + +```csharp + Console.WriteLine("Number of rows in " + tableName); + result = kustoClient.ExecuteQuery(databaseName, tableName + " | count", new ClientRequestProperties()); + PrintResultAsValueList(result); + + Console.WriteLine("Example line from " + tableName); + result = kustoClient.ExecuteQuery(databaseName, tableName + " | top 1 by EndTime", new ClientRequestProperties()); + PrintResultAsValueList(result); +``` + ### [Python](#tab/python) The code sample includes a service function `print_result_as_value_list()` for printing query results @@ -202,23 +230,16 @@ def main(): result = kusto_client.execute_query(database_name, table_name + " | count") print_result_as_value_list(result) - main() ``` ---- - ## Stream a file for ingestion -Let’s use the `ingest_from_file()` API to ingest stormevents.csv. -Place stormevents.csv file in the same location as your script. Since our CSV file contains a header row we use `ignore_first_record=True` to ignore it. - -Add and ingestion section using the following lines to the end of main(). - -### [C#](#tab/c-sharp) +Use the `ingest_from_file()` API to ingest the *stormevents.csv* file. +Place *stormevents.csv* file in the same location as your script. Since our CSV file contains a header row, use `ignore_first_record=True` to ignore the header. -### [Python](#tab/python) +Add and ingestion section using the following lines to the end of `main()`. ```python # ingestion section @@ -227,10 +248,9 @@ ingest_props = IngestionProperties(database_name, table_name, DataFormat.CSV, ig ingest_client.ingest_from_file(file_path, ingest_props) ``` -Let’s also query the new number of rows and the most recent row. +Let’s also query the new number of rows and the most recent row after the ingestion. Add the following lines after the ingestion command: - ```python print("New number of rows in " + table_name) result = kusto_client.execute_query(database_name, table_name + " | count") @@ -241,9 +261,9 @@ result = kusto_client.execute_query(database_name, table_name + " | top 1 by End print_result_as_value_list(result) ``` -Run the script from the directory where the script and stormevents.csv are located. Alternatively, you can specify the full path to the file. replacing `file_path = os.curdir + "/stormevents.csv"` with `file_path = ""` +Run the script from the directory where the script and stormevents.csv are located. Alternatively, you can specify the full path to the file replacing `file_path = os.curdir + "/stormevents.csv"` with `file_path = ""` -The first time your run the application the results will be as follows: +The first time your run the application the results are as follows: ```plaintext Number of rows in MyStormEvents @@ -263,21 +283,20 @@ row 1 : Source - COOP Observer StormSummary - {'TotalDamages': 0, 'StartTime': '2007-12-31T11:15:00.0000000Z', 'EndTime': '2007-12-31T13:21:00.0000000Z', 'Details': {'Description': 'Heavy showers caused flash flooding in the eastern part of Molokai. Water was running over the bridge at Halawa Valley.', 'Location': 'HAWAII'}} ``` - --- ### Stream in-memory data for ingestion -To ingest data from memory, create a stream containing the data for ingestion and call the `ingest_from_stream()` API. - -Replace the ingestion section code with the following: +To ingest data from memory, create a stream containing the data for ingestion. -### [C#](#tab/c-sharp) +### [Python](#tab/python) +Call the `ingest_from_stream()` API to ingest the stream. -### [Python](#tab/python) +Replace the ingestion section code with the following: ```python +# Ingestion section print("Ingesting data from memory") single_line = '2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,"{}"' string_stream = io.StringIO(single_line) @@ -287,18 +306,41 @@ stream_descriptor = StreamDescriptor(string_stream, is_compressed=False, size=le ingest_client.ingest_from_stream(stream_descriptor, ingest_props) ``` -The results will be as follows: +### [C#](#tab/c-sharp) + +Call the `IngestFromStreamAsync()` method to ingest the stream. + +Replace the ingestion section code with the following: + +```csharp + // Ingestion section + Console.WriteLine("Ingesting data from memory"); + var singleLine = "2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,'{}'"; + byte[] byteArray = Encoding.UTF8.GetBytes(singleLine); + using (MemoryStream stream = new MemoryStream(byteArray)) + { + var streamSourceOptions = new StreamSourceOptions + { + LeaveOpen = false + }; + ingestClient.IngestFromStreamAsync(stream, ingestProps, streamSourceOptions).Wait(); + } +``` + +--- + +The results are as follows: ```plaintext Number of rows in MyStormEvents row 1 : - Count - 0 + Count - 1001 Ingesting data from memory New number of rows in MyStormEvents row 1 : - Count - 1 + Count - 1002 Example line from MyStormEvents row 1 : @@ -321,13 +363,11 @@ When you send a blob for streaming, the client only sends the blob reference to For this section you’ll need to upload the sample csv file to your storage account and generate a URI with built-in read permissions (e.g. via SAS) to provide to the code sample. For information on uploading a file to blob storage, see [Upload, download, and list blobs with the Azure portal](/azure/storage/blobs/storage-quickstart-blobs-portal). For information on generation an SAS URL, see [Generate a SAS token](kusto/api/connection-strings/generate-sas-token?view=azure-data-explorer&preserve-view=true). -Replace the ingestion section code with the following section: - -### [C#](#tab/c-sharp) - ### [Python](#tab/python) +Replace the ingestion section code with the following section: + ```python print("Ingesting data from an existing blob") blob_descriptor = BlobDescriptor("") @@ -335,8 +375,46 @@ ingest_props = IngestionProperties(database_name, table_name, DataFormat.CSV, ig ingest_client.ingest_from_blob(blob_descriptor, ingest_props) ``` +### [C#](#tab/c-sharp) + +Replace the ingestion section code with the following section: + +```csharp + //Ingestion section + Console.WriteLine("Ingesting data from an existing blob"); + var sasURI =""; + ingestClient.IngestFromStorageAsync(sasURI, ingestProps).Wait(); +``` + --- +The results are as follows: + +```plaintext +Number of rows in MyStormEvents +row 1 : + Count - 1002 + +Ingesting data from an existing blob + +New number of rows in MyStormEvents +row 1 : + Count - 2002 + +Example line from MyStormEvents +row 1 : + StartTime - 2018-01-26 00:00:00+00:00 + EndTime - 2018-01-27 14:00:00+00:00 + State - MEXICO + DamageProperty - 0 + DamageCrops - 0 + Source - Unknown + StormSummary - {} + +``` + + + > [NOTE!] > You can also use a managed identity based authorization as an alternative to SAS or account keys in Azure Storage and Azure Data Lake. For more information, see [Ingest data using managed identity authentication](/azure/data-explorer/ingest-data-managed-identity) From e4aa851670c966475f0ef60a0978ee57e4ef1119 Mon Sep 17 00:00:00 2001 From: Ed Baynash <105771311+EdB-MSFT@users.noreply.github.com> Date: Wed, 5 Feb 2025 14:38:55 +0200 Subject: [PATCH 05/20] wip2 --- .../get-started/app-managed-streaming-ingest.md | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md b/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md index 45dbfa6fb9..ec6726d72e 100644 --- a/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md +++ b/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md @@ -1,8 +1,7 @@ --- -title: Create an app to ingest data by streaming ingestion using Kusto’s batching manager +title: Create an app to ingest data by streaming ingestion using Kusto’s managed streaming ingestion client description: Learn how to create an app to ingest data from a file, stream, or blob streaming ingestion by queuing it to Kusto’s batching manager. ms.reviewer: yogilad -ms.service: data-explorer ms.topic: how-to ms.date: 02/03/2025 monikerRange: "azure-data-explorer" @@ -38,16 +37,16 @@ The following applies when using data ingesting with managed streaming: Data Streaming has some limitations compared to queuing data for ingestion. • Tags can not be set on data -• Mapping can only be provided by Mapping Reference. Inline Mapping is not supported. +• Mapping can only be provided by Mapping Reference. Inline Mapping isn't supported. • The payload sent in the request can not exceed 10MBs (regardless of format or compression). -For mor information, see [Streaming Limitations](../../../ingest-data-streaming.md#limitations). +For mor information, see [Streaming Limitations](/azure/data-explorer/ingest-data-streaming#limitations). ## Prerequisites + A Kusto cluster where you have database User or higher rights. You cam provision a free Kusto cluster in . Prerequisites: -+ Download the file [stormevents.csv](where do we store this ???)and place it in a folder next to your script. + + [Set up your development environment](/kusto/api/get-started/app-set-up?view=azure-data-explorer) to use the Kusto client library. ## Before you begin @@ -361,7 +360,7 @@ Kusto supports ingestion from Azure Storage blobs, Azure Data Lake files and Ama When you send a blob for streaming, the client only sends the blob reference to the database, and the data is actually read once by the service itself. Read Access to the blob can be granted with keys, SAS tokens or managed identities attached to the Kusto Cluster. -For this section you’ll need to upload the sample csv file to your storage account and generate a URI with built-in read permissions (e.g. via SAS) to provide to the code sample. For information on uploading a file to blob storage, see [Upload, download, and list blobs with the Azure portal](/azure/storage/blobs/storage-quickstart-blobs-portal). For information on generation an SAS URL, see [Generate a SAS token](kusto/api/connection-strings/generate-sas-token?view=azure-data-explorer&preserve-view=true). +For this section you’ll need to upload the sample csv file to your storage account and generate a URI with built-in read permissions (e.g. via SAS) to provide to the code sample. For information on uploading a file to blob storage, see [Upload, download, and list blobs with the Azure portal](/azure/storage/blobs/storage-quickstart-blobs-portal). For information on generation an SAS URL, see [Generate a SAS token](/kusto/api/connection-strings/generate-sas-token?view=azure-data-explorer&preserve-view=true). ### [Python](#tab/python) @@ -414,11 +413,9 @@ row 1 : ``` - -> [NOTE!] +> [!NOTE] > You can also use a managed identity based authorization as an alternative to SAS or account keys in Azure Storage and Azure Data Lake. For more information, see [Ingest data using managed identity authentication](/azure/data-explorer/ingest-data-managed-identity) - -Resources: +## Resources + Kusto Python Git Hub repository [https://github.com/Azure/azure-kusto-python] + Python Sample App Wizard [https://dataexplorer.azure.com/oneclick/generatecode?programingLang=Python] From 2dc91f0b32954489a12874514307f5e4b1a21538 Mon Sep 17 00:00:00 2001 From: Ed Baynash <105771311+EdB-MSFT@users.noreply.github.com> Date: Thu, 6 Feb 2025 13:29:11 +0200 Subject: [PATCH 06/20] added typescript --- .../app-managed-streaming-ingest.md | 204 ++++++++++++++---- 1 file changed, 160 insertions(+), 44 deletions(-) diff --git a/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md b/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md index ec6726d72e..fa802f71bf 100644 --- a/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md +++ b/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md @@ -157,7 +157,7 @@ Copy *stormevents.csv* file in the same location as your script. Since our CSV f Add and ingestion section using the following lines to the end of `Main()`. -```csharp +```csharp var ingestProps = new KustoIngestionProperties(databaseName, tableName) { Format = DataSourceFormat.csv, @@ -172,7 +172,7 @@ Add and ingestion section using the following lines to the end of `Main()`. Let’s also query the new number of rows and the most recent row after the ingestion. Add the following lines after the ingestion command: -```csharp +```csharp Console.WriteLine("Number of rows in " + tableName); result = kustoClient.ExecuteQuery(databaseName, tableName + " | count", new ClientRequestProperties()); PrintResultAsValueList(result); @@ -181,6 +181,95 @@ Add the following lines after the ingestion command: result = kustoClient.ExecuteQuery(databaseName, tableName + " | top 1 by EndTime", new ClientRequestProperties()); PrintResultAsValueList(result); ``` +### [Typescript](#tab/typescript) + +The code sample includes a service function `printResultAsValueList()` for printing query results + +```typescript + +import { Client as KustoClient, KustoConnectionStringBuilder } from "azure-kusto-data"; +import { InteractiveBrowserCredentialInBrowserOptions } from "@azure/identity"; +import { ManagedStreamingIngestClient, BlobDescriptor, IngestionProperties, DataFormat } from 'azure-kusto-ingest'; + +const clusterUrl = ""; +const ingestionUrl =""; +const databaseName = " "; + +const tableName = "MyStormEvents" + + async function main() { + const clusterKcsb = KustoConnectionStringBuilder.withUserPrompt(clusterUrl); + const ingestionKcsb = KustoConnectionStringBuilder.withUserPrompt(ingestionUrl); + + const kustoClient = new KustoClient(clusterKcsb); + const ingestClient = new ManagedStreamingIngestClient(clusterKcsb, ingestionKcsb); + + console.log(`Number of rows in ${tableName}`); + let result = await kustoClient.executeQuery(databaseName, `${tableName} | count`); + printResultAsValueList(result); +} + +function printResultAsValueList(result: any) { + let rowCount = 1; + + // Create a list of columns + const cols = result.primaryResults[0].columns.map((col: any) => col.name); + // Print the values for each row + for (const row of result.primaryResults) { + if (rowCount > 1) { + console.log('######################'); + } + + console.log(`row ${rowCount}:`); + for (const col of cols) { + const jsonObject = JSON.parse(row); + const value = jsonObject.data[0][col]; + console.log(`\t ${col} - ${JSON.stringify(value)}`); + } + rowCount++; + } +} + +main().catch((err) => { + console.error(err); +}); + +``` + +## Stream a file for ingestion + + +Use the `ingestFromFile()` API to ingest the *stormevents.csv* file. +Place *stormevents.csv* file in the same location as your script. Since our CSV file contains a header row, use `ignoreFirstRecord=True` to ignore the header. + +Add and ingestion section using the following lines to the end of `main()`. + + +```typescript + + const ingestProperties = new IngestionProperties({ + database: databaseName, + table: tableName, + format: DataFormat.CSV, + additionalProperties: { ignoreFirstRecord: true } + }); + + //Ingest section + console.log("Ingesting data from a file"); + await ingestClient.ingestFromFile(".\\stormevents.csv", ingestProperties); +``` +Let’s also query the new number of rows and the most recent row after the ingestion. +Add the following lines after the ingestion command: + +```typescript + console.log(`New number of rows in ${tableName}`); + result = await kustoClient.executeQuery(databaseName, `${tableName} | count`); + printResultAsValueList(result); + + console.log(`Example line from ${tableName}`); + result = await kustoClient.executeQuery(databaseName, `${tableName} | top 1 by EndTime`); + printResultAsValueList(result); +``` ### [Python](#tab/python) @@ -241,27 +330,29 @@ Place *stormevents.csv* file in the same location as your script. Since our CSV Add and ingestion section using the following lines to the end of `main()`. ```python -# ingestion section -print("Ingesting data from a file") -ingest_props = IngestionProperties(database_name, table_name, DataFormat.CSV, ignore_first_record=True) -ingest_client.ingest_from_file(file_path, ingest_props) + # Ingestion section + print("Ingesting data from a file") + ingest_props = IngestionProperties(database_name, table_name, DataFormat.CSV, ignore_first_record=True) + ingest_client.ingest_from_file(file_path, ingest_props) ``` Let’s also query the new number of rows and the most recent row after the ingestion. Add the following lines after the ingestion command: ```python -print("New number of rows in " + table_name) -result = kusto_client.execute_query(database_name, table_name + " | count") -print_result_as_value_list(result) - -print("Example line from " + table_name) -result = kusto_client.execute_query(database_name, table_name + " | top 1 by EndTime") -print_result_as_value_list(result) + print("New number of rows in " + table_name) + result = kusto_client.execute_query(database_name, table_name + " | count") + print_result_as_value_list(result) + + print("Example line from " + table_name) + result = kusto_client.execute_query(database_name, table_name + " | top 1 by EndTime") + print_result_as_value_list(result) ``` Run the script from the directory where the script and stormevents.csv are located. Alternatively, you can specify the full path to the file replacing `file_path = os.curdir + "/stormevents.csv"` with `file_path = ""` +--- + The first time your run the application the results are as follows: ```plaintext @@ -282,29 +373,11 @@ row 1 : Source - COOP Observer StormSummary - {'TotalDamages': 0, 'StartTime': '2007-12-31T11:15:00.0000000Z', 'EndTime': '2007-12-31T13:21:00.0000000Z', 'Details': {'Description': 'Heavy showers caused flash flooding in the eastern part of Molokai. Water was running over the bridge at Halawa Valley.', 'Location': 'HAWAII'}} ``` ---- ### Stream in-memory data for ingestion To ingest data from memory, create a stream containing the data for ingestion. -### [Python](#tab/python) - -Call the `ingest_from_stream()` API to ingest the stream. - -Replace the ingestion section code with the following: - -```python -# Ingestion section -print("Ingesting data from memory") -single_line = '2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,"{}"' -string_stream = io.StringIO(single_line) -ingest_props = IngestionProperties(database_name, table_name, DataFormat.CSV) -# when possible provide the size of the raw data -stream_descriptor = StreamDescriptor(string_stream, is_compressed=False, size=len(single_line)) -ingest_client.ingest_from_stream(stream_descriptor, ingest_props) -``` - ### [C#](#tab/c-sharp) Call the `IngestFromStreamAsync()` method to ingest the stream. @@ -326,6 +399,36 @@ Replace the ingestion section code with the following: } ``` +### [Python](#tab/python) + +Call the `ingest_from_stream()` API to ingest the stream. + +Replace the ingestion section code with the following: + +```python + # Ingestion section + print("Ingesting data from memory") + single_line = '2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,"{}"' + string_stream = io.StringIO(single_line) + ingest_props = IngestionProperties(database_name, table_name, DataFormat.CSV) + # when possible provide the size of the raw data + stream_descriptor = StreamDescriptor(string_stream, is_compressed=False, size=len(single_line)) + ingest_client.ingest_from_stream(stream_descriptor, ingest_props) +``` + +### [Typescript](#tab/typescript) + +Call the `ingestFromStream()` API to ingest the stream. + +Replace the ingestion section code with the following: + +```typescript + //Ingest section + console.log('Ingesting data from memory'); + const single_line = '2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,"{}"' + await ingestClient.ingestFromStream(Buffer.from(single_line), ingestProperties) +``` + --- The results are as follows: @@ -362,27 +465,38 @@ When you send a blob for streaming, the client only sends the blob reference to For this section you’ll need to upload the sample csv file to your storage account and generate a URI with built-in read permissions (e.g. via SAS) to provide to the code sample. For information on uploading a file to blob storage, see [Upload, download, and list blobs with the Azure portal](/azure/storage/blobs/storage-quickstart-blobs-portal). For information on generation an SAS URL, see [Generate a SAS token](/kusto/api/connection-strings/generate-sas-token?view=azure-data-explorer&preserve-view=true). +### [C#](#tab/c-sharp) + +Replace the ingestion section code with the following section: + +```csharp + // Ingestion section + Console.WriteLine("Ingesting data from an existing blob"); + var sasURI =""; + ingestClient.IngestFromStorageAsync(sasURI, ingestProps).Wait(); +``` ### [Python](#tab/python) Replace the ingestion section code with the following section: ```python -print("Ingesting data from an existing blob") -blob_descriptor = BlobDescriptor("") -ingest_props = IngestionProperties(database_name, table_name, DataFormat.CSV, ignore_first_record=True) -ingest_client.ingest_from_blob(blob_descriptor, ingest_props) + # Ingestion section + print("Ingesting data from an existing blob") + blob_descriptor = BlobDescriptor("") + ingest_props = IngestionProperties(database_name, table_name, DataFormat.CSV, ignore_first_record=True) + ingest_client.ingest_from_blob(blob_descriptor, ingest_props) ``` -### [C#](#tab/c-sharp) +### [Typescript](#tab/typescript) Replace the ingestion section code with the following section: -```csharp - //Ingestion section - Console.WriteLine("Ingesting data from an existing blob"); - var sasURI =""; - ingestClient.IngestFromStorageAsync(sasURI, ingestProps).Wait(); +```typescript + // Ingestion section + console.log('Ingesting data from an existing blob'); + const sasURI = ""; + await ingestClient.ingestFromBlob(sasURI, ingestProperties); ``` --- @@ -412,10 +526,12 @@ row 1 : ``` - > [!NOTE] > You can also use a managed identity based authorization as an alternative to SAS or account keys in Azure Storage and Azure Data Lake. For more information, see [Ingest data using managed identity authentication](/azure/data-explorer/ingest-data-managed-identity) ## Resources -+ Kusto Python Git Hub repository [https://github.com/Azure/azure-kusto-python] -+ Python Sample App Wizard [https://dataexplorer.azure.com/oneclick/generatecode?programingLang=Python] ++ [Kusto Python Git Hub repository](https://github.com/Azure/azure-kusto-python) ++ [Kusto NodeJS Git Hub repository](https://github.com/Azure/azure-kusto-node) ++ [Kusto Java Git Hub repository](https://github.com/azure/azure-kusto-java) ++ [Kusto .Net API SDK](/kusto/api/netfx/about-the-sdk) ++ [Generate a Sample App wizard](https://dataexplorer.azure.com/oneclick/generatecode) From bcfc53d1a097a081411e77996bee64eb74d200cb Mon Sep 17 00:00:00 2001 From: Ed Baynash <105771311+EdB-MSFT@users.noreply.github.com> Date: Thu, 6 Feb 2025 18:27:28 +0200 Subject: [PATCH 07/20] added java --- .../app-managed-streaming-ingest.md | 351 +++++++++++++----- 1 file changed, 256 insertions(+), 95 deletions(-) diff --git a/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md b/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md index fa802f71bf..509681f333 100644 --- a/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md +++ b/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md @@ -1,11 +1,12 @@ --- title: Create an app to ingest data by streaming ingestion using Kusto’s managed streaming ingestion client -description: Learn how to create an app to ingest data from a file, stream, or blob streaming ingestion by queuing it to Kusto’s batching manager. +description: Learn how to create an app to ingest data from a file, stream, or blob streaming using the ingestion managed streaming ingestion client. ms.reviewer: yogilad ms.topic: how-to ms.date: 02/03/2025 monikerRange: "azure-data-explorer" -#customer intent: To learn about creating an app injest using Kusto’s batching manager and streaming ingestion. + +# customer intent: To learn about creating an app ingest using Kusto’s batching manager and streaming ingestion. --- @@ -15,8 +16,8 @@ monikerRange: "azure-data-explorer" Streaming Ingestion allows writing data to Kusto with near-real-time latencies. It’s also useful when writing small amounts of data to a large number of tables, making batching inefficient. -In this article you’ll learn how to ingest data to Kusto by queuing it to Kusto’s batching manager. -You will ingest a data stream in the form of a file, stream or blob. +In this article, you’ll learn how to ingest data to Kusto by queuing it to Kusto’s batching manager. +You'll ingest a data stream in the form of a file, stream, or blob. > [NOTE!] > Streaming ingestion is a high velocity ingestion protocol. Streaming Ingestion isn't the same as `IngestFromStream`. @@ -26,45 +27,44 @@ You will ingest a data stream in the form of a file, stream or blob. Kusto SDKs provide two flavors of Streaming Ingestion Clients, `StreamingIngestionClient` and `ManagedStreamingIngestionClient` where Managed Streaming has built-in retry and failover logic. -The following applies when using data ingesting with managed streaming: +When ingesting with managed streaming failures and retries are handled automatically as follows: -+ Streaming requests that fail due to server side size limitations are failed-over to queued ingestion. -+ Data that's larger then 4MB is automatically sent to queued ingest, regardless of format or compression. -+ Transient failure, for example throttling, are be retried 3 times, then moved to queued ingestion. -+ Permanent failures are not be retired. ++ Streaming requests that fail due to server-side size limitations are failed-over to queued ingestion. ++ Data that's larger than 4 MB is automatically sent to queued ingestion, regardless of format or compression. ++ Transient failure, for example throttling, are retried three times, then moved to queued ingestion. ++ Permanent failures aren't retired. ## Limitations Data Streaming has some limitations compared to queuing data for ingestion. -• Tags can not be set on data -• Mapping can only be provided by Mapping Reference. Inline Mapping isn't supported. -• The payload sent in the request can not exceed 10MBs (regardless of format or compression). +• Tags can’t be set on data +• Mapping can only be provided using [`ingestionMappingReference`](kusto/management/mappings?view=microsoft-fabric#mapping-with-ingestionmappingreference). Inline mapping isn't supported. +• The payload sent in the request can’t exceed 10MBs (regardless of format or compression). -For mor information, see [Streaming Limitations](/azure/data-explorer/ingest-data-streaming#limitations). +For more information, see [Streaming Limitations](/azure/data-explorer/ingest-data-streaming#limitations). ## Prerequisites -+ A Kusto cluster where you have database User or higher rights. You cam provision a free Kusto cluster in . -Prerequisites: ++ A Kusto cluster where you have database User or higher rights. Provision a free Kusto cluster at . + [Set up your development environment](/kusto/api/get-started/app-set-up?view=azure-data-explorer) to use the Kusto client library. ## Before you begin -Before creating the app, you need to do the following +Before creating the app: -1. Configure streaming ingestion on your Azure Data Explorer cluster. +1. Configure streaming ingestion on your Azure Data Explorer cluster. 1. Create a Kusto table to ingest the data into. 1. Enable the streaming ingestion policy on the table. 1. Download the [stormevent.csv](https://github.com/MicrosoftDocs/dataexplorer-docs-samples/blob/main/docs/resources/app-basic-ingestion/stormevents.csv) sample data file containing 1,000 storm event records. ## Configure streaming ingestion -To configure streaming ingestion on your Azure Data Explorer cluster, see [Configure streaming ingestion on your Azure Data Explorer cluster](/azure/data-explorer/ingest-data-streaming?tabs=azure-portal%2Ccsharp) +To configure streaming ingestion on your Azure Data Explorer cluster, see [Configure streaming ingestion on your Azure Data Explorer cluster](/azure/data-explorer/ingest-data-streaming?tabs=azure-portal%2Ccsharp). If you're using a free cluster, streaming ingestion is automatically enabled. ### Create a Kusto table -Run the commands below on your database via Kusto Explorer (Desktop) or Kusto Web Explorer. +Run the following commands on your database via Kusto Explorer (Desktop) or Kusto Web Explorer. 1. Create a Table Called Storm Events @@ -89,7 +89,6 @@ For more information about streaming policy, see [Streaming ingestion policy - A Create a basic client application which connects to the Kusto Help cluster. Enter the cluster query and ingest URI and database name in the relevant variables. - ### [C#](#tab/c-sharp) The code sample includes a service function `PrintResultAsValueList()` for printing query results @@ -181,10 +180,92 @@ Add the following lines after the ingestion command: result = kustoClient.ExecuteQuery(databaseName, tableName + " | top 1 by EndTime", new ClientRequestProperties()); PrintResultAsValueList(result); ``` -### [Typescript](#tab/typescript) + +### [TypeScript](#tab/typescript) The code sample includes a service function `printResultAsValueList()` for printing query results + +### [Python](#tab/python) + +The code sample includes a service function `print_result_as_value_list()` for printing query results + +```python +import os +import time +import io + +from azure.kusto.data import KustoClient, KustoConnectionStringBuilder, ClientRequestProperties, DataFormat +from azure.kusto.ingest import QueuedIngestClient, FileDescriptor, StreamDescriptor, BlobDescriptor, IngestionProperties, ManagedStreamingIngestClient + + +def print_result_as_value_list(result): + row_count = 1 + + # create a list of columns + cols = list((col.column_name for col in result.primary_results[0].columns)) + + # print the values for each row + for row in result.primary_results[0]: + if row_count > 1: + print("######################") + + print("row", row_count, ":") + for col in cols: + print("\t", col, "-", row[col]) + + +def main(): + # Connect to the public access Help cluster + file_path = os.curdir + "/stormevents.csv" + cluster_url = "" + ingestion_url = "" + database_name = " " + table_name = "MyStormEvents" + cluster_kcsb = KustoConnectionStringBuilder.with_interactive_login(cluster_url) + ingestion_kcsb = KustoConnectionStringBuilder.with_interactive_login(ingestion_url) + + with KustoClient(cluster_kcsb) as kusto_client: + with ManagedStreamingIngestClient(cluster_kcsb, ingestion_kcsb) as ingest_client: + # with KustoStreamingIngestClient(cluster_kcsb) as ingest_client: + + print("Number of rows in " + table_name) + result = kusto_client.execute_query(database_name, table_name + " | count") + print_result_as_value_list(result) + +main() +``` + +## Stream a file for ingestion + + +Use the `ingest_from_file()` API to ingest the *stormevents.csv* file. +Place *stormevents.csv* file in the same location as your script. Since our CSV file contains a header row, use `ignore_first_record=True` to ignore the header. + +Add and ingestion section using the following lines to the end of `main()`. + +```python + # Ingestion section + print("Ingesting data from a file") + ingest_props = IngestionProperties(database_name, table_name, DataFormat.CSV, ignore_first_record=True) + ingest_client.ingest_from_file(file_path, ingest_props) +``` + +Let’s also query the new number of rows and the most recent row after the ingestion. +Add the following lines after the ingestion command: + +```python + print("New number of rows in " + table_name) + result = kusto_client.execute_query(database_name, table_name + " | count") + print_result_as_value_list(result) + + print("Example line from " + table_name) + result = kusto_client.execute_query(database_name, table_name + " | top 1 by EndTime") + print_result_as_value_list(result) +``` + +Run the script from the directory where the script and stormevents.csv are located. Alternatively, you can specify the full path to the file replacing `file_path = os.curdir + "/stormevents.csv"` with `file_path = ""` + ```typescript import { Client as KustoClient, KustoConnectionStringBuilder } from "azure-kusto-data"; @@ -271,86 +352,111 @@ Add the following lines after the ingestion command: printResultAsValueList(result); ``` -### [Python](#tab/python) -The code sample includes a service function `print_result_as_value_list()` for printing query results +### [Java](#tab/java) -```python -import os -import time -import io +The code sample includes a service method `printResultsAsValueList()` for printing query results -from azure.kusto.data import KustoClient, KustoConnectionStringBuilder, ClientRequestProperties, DataFormat -from azure.kusto.ingest import QueuedIngestClient, FileDescriptor, StreamDescriptor, BlobDescriptor, IngestionProperties, ManagedStreamingIngestClient +```java +package com.example; -def print_result_as_value_list(result): - row_count = 1 +import java.io.FileWriter; - # create a list of columns - cols = list((col.column_name for col in result.primary_results[0].columns)) +import com.azure.identity.DefaultAzureCredential; +import com.azure.identity.DefaultAzureCredentialBuilder; +import com.microsoft.azure.kusto.data.Client; +import com.microsoft.azure.kusto.data.ClientFactory; +import com.microsoft.azure.kusto.data.KustoOperationResult; +import com.microsoft.azure.kusto.data.KustoResultSetTable; +import com.microsoft.azure.kusto.data.KustoResultColumn; +import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder; +import com.microsoft.azure.kusto.ingest.IngestClientFactory; +import com.microsoft.azure.kusto.ingest.IngestionProperties; +import com.microsoft.azure.kusto.ingest.ManagedStreamingIngestClient; +import com.microsoft.azure.kusto.ingest.IngestionProperties.DataFormat; +import com.microsoft.azure.kusto.ingest.source.FileSourceInfo; - # print the values for each row - for row in result.primary_results[0]: - if row_count > 1: - print("######################") +public class BatchIngestion { + public static void main(String[] args) throws Exception { - print("row", row_count, ":") - for col in cols: - print("\t", col, "-", row[col]) + String clusterUri = ""; + String ingestionUri = ""; + String database = ""; + String table = "MyStormEvents"; -def main(): - # Connect to the public access Help cluster - file_path = os.curdir + "/stormevents.csv" - cluster_url = "" - ingestion_url = "" - database_name = " " - table_name = "MyStormEvents" - cluster_kcsb = KustoConnectionStringBuilder.with_interactive_login(cluster_url) - ingestion_kcsb = KustoConnectionStringBuilder.with_interactive_login(ingestion_url) + ConnectionStringBuilder clusterKcsb = ConnectionStringBuilder.createWithUserPrompt(clusterUri); + ConnectionStringBuilder ingestionKcsb = ConnectionStringBuilder.createWithUserPrompt(ingestionUri); - with KustoClient(cluster_kcsb) as kusto_client: - with ManagedStreamingIngestClient(cluster_kcsb, ingestion_kcsb) as ingest_client: - # with KustoStreamingIngestClient(cluster_kcsb) as ingest_client: + try ( + Client kustoClient = ClientFactory.createClient(clusterKcsb)) { - print("Number of rows in " + table_name) - result = kusto_client.execute_query(database_name, table_name + " | count") - print_result_as_value_list(result) + String query = table + " | count"; + KustoOperationResult results = kustoClient.execute(database, query); + KustoResultSetTable primaryResults = results.getPrimaryResults(); + System.out.println("\nNumber of rows in " + table + " BEFORE ingestion:"); + printResultsAsValueList(primaryResults); + } + } -main() + public static void printResultsAsValueList(KustoResultSetTable results) { + while (results.next()) { + KustoResultColumn[] columns = results.getColumns(); + for (int i = 0; i < columns.length; i++) { + System.out.println("\t" + columns[i].getColumnName() + " - " + + (results.getObject(i) == null ? "None" : results.getString(i))); + } + } + } +} ``` ## Stream a file for ingestion -Use the `ingest_from_file()` API to ingest the *stormevents.csv* file. -Place *stormevents.csv* file in the same location as your script. Since our CSV file contains a header row, use `ignore_first_record=True` to ignore the header. +Use the `ingestFromFile()` method to ingest the *stormevents.csv* file. +Place *stormevents.csv* file in the same location as your script. Since our CSV file contains a header row, use `ingestionProperties.setIgnoreFirstRecord(true);` to ignore the header. Add and ingestion section using the following lines to the end of `main()`. -```python - # Ingestion section - print("Ingesting data from a file") - ingest_props = IngestionProperties(database_name, table_name, DataFormat.CSV, ignore_first_record=True) - ingest_client.ingest_from_file(file_path, ingest_props) +```java + // Ingestion section + try ( + ManagedStreamingIngestClient ingestClient = (ManagedStreamingIngestClient) IngestClientFactory + .createManagedStreamingIngestClient(clusterKcsb, ingestionKcsb)) { + System.out.println("Ingesting data from a file"); + String filePath = "stormevents.csv"; + IngestionProperties ingestionProperties = new IngestionProperties(database, table); + ingestionProperties.setDataFormat(DataFormat.CSV); + ingestionProperties.setIgnoreFirstRecord(true); + FileSourceInfo fileSourceInfo = new FileSourceInfo(filePath, 0); + ingestClient.ingestFromFile(fileSourceInfo, ingestionProperties); + + } catch (Exception e) { + // TODO: handle exception + System.out.println("Error: " + e); + } + ``` Let’s also query the new number of rows and the most recent row after the ingestion. Add the following lines after the ingestion command: -```python - print("New number of rows in " + table_name) - result = kusto_client.execute_query(database_name, table_name + " | count") - print_result_as_value_list(result) - - print("Example line from " + table_name) - result = kusto_client.execute_query(database_name, table_name + " | top 1 by EndTime") - print_result_as_value_list(result) +```java + query = table + " | count"; + results = kustoClient.execute(database, query); + primaryResults = results.getPrimaryResults(); + System.out.println("\nNumber of rows in " + table + " AFTER ingestion:"); + printResultsAsValueList(primaryResults); + + query = table + " | top 1 by EndTime"; + results = kustoClient.execute(database, query); + primaryResults = results.getPrimaryResults(); + System.out.println("\nExample line from " + table); + printResultsAsValueList(primaryResults); ``` -Run the script from the directory where the script and stormevents.csv are located. Alternatively, you can specify the full path to the file replacing `file_path = os.curdir + "/stormevents.csv"` with `file_path = ""` - --- The first time your run the application the results are as follows: @@ -380,9 +486,9 @@ To ingest data from memory, create a stream containing the data for ingestion. ### [C#](#tab/c-sharp) -Call the `IngestFromStreamAsync()` method to ingest the stream. +To ingest the stream from memory, call the `IngestFromStreamAsync()` method. -Replace the ingestion section code with the following: +Replace the ingestion section with the following code: ```csharp // Ingestion section @@ -401,9 +507,9 @@ Replace the ingestion section code with the following: ### [Python](#tab/python) -Call the `ingest_from_stream()` API to ingest the stream. +To ingest the stream from memory, call the `ingest_from_stream()` API. -Replace the ingestion section code with the following: +Replace the ingestion section with the following code: ```python # Ingestion section @@ -416,11 +522,11 @@ Replace the ingestion section code with the following: ingest_client.ingest_from_stream(stream_descriptor, ingest_props) ``` -### [Typescript](#tab/typescript) +### [TypeScript](#tab/typescript) -Call the `ingestFromStream()` API to ingest the stream. +To ingest the stream from memory, call the `ingestFromStream()` API. -Replace the ingestion section code with the following: +Replace the ingestion section with the following code: ```typescript //Ingest section @@ -429,6 +535,31 @@ Replace the ingestion section code with the following: await ingestClient.ingestFromStream(Buffer.from(single_line), ingestProperties) ``` +### [Java](#tab/java) + +To ingest the stream from memory, call the `ingestFromStream()` API. + +Replace the ingestion section with the following code: + +```java + String singleLine = "2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,\"{}\""; + ByteArrayInputStream inputStream = new ByteArrayInputStream(singleLine.getBytes(StandardCharsets.UTF_8)); + try ( + ManagedStreamingIngestClient ingestClient = (ManagedStreamingIngestClient) IngestClientFactory + .createManagedStreamingIngestClient(clusterKcsb, ingestionKcsb)) { + System.out.println("Ingesting data from a byte array"); + IngestionProperties ingestionProperties = new IngestionProperties(database, table); + ingestionProperties.setDataFormat(DataFormat.CSV); + ingestionProperties.setIgnoreFirstRecord(true); + StreamSourceInfo streamSourceInfo = new StreamSourceInfo(inputStream); + ingestClient.ingestFromStream(streamSourceInfo, ingestionProperties); + + } catch (Exception e) { + // TODO: handle exception + System.out.println("Error: " + e); + } +``` + --- The results are as follows: @@ -459,26 +590,30 @@ row 1 : ### Stream a blob for ingestion -Kusto supports ingestion from Azure Storage blobs, Azure Data Lake files and Amazon S3 files. +Kusto supports ingestion from Azure Storage blobs, Azure Data Lake files, and Amazon S3 files. -When you send a blob for streaming, the client only sends the blob reference to the database, and the data is actually read once by the service itself. Read Access to the blob can be granted with keys, SAS tokens or managed identities attached to the Kusto Cluster. +When you send a blob for streaming, the client only sends the blob reference to the database. The data is read by the database service itself. Read Access to the blob can be granted with keys, SAS tokens, or managed identities attached to the Kusto Cluster. -For this section you’ll need to upload the sample csv file to your storage account and generate a URI with built-in read permissions (e.g. via SAS) to provide to the code sample. For information on uploading a file to blob storage, see [Upload, download, and list blobs with the Azure portal](/azure/storage/blobs/storage-quickstart-blobs-portal). For information on generation an SAS URL, see [Generate a SAS token](/kusto/api/connection-strings/generate-sas-token?view=azure-data-explorer&preserve-view=true). +To stream from a blob, upload the sample csv file to your storage account and generate a SAS URI with built-in read permissions. Use the URIin the code sample. For information on uploading a file to blob storage, see [Upload, download, and list blobs with the Azure portal](/azure/storage/blobs/storage-quickstart-blobs-portal). For information on generation an SAS URL, see [Generate a SAS token](/kusto/api/connection-strings/generate-sas-token?view=azure-data-explorer&preserve-view=true). ### [C#](#tab/c-sharp) -Replace the ingestion section code with the following section: +To ingest the blob, call the `IngestFromStorageAsync()` method. + +Replace the ingestion section with the following code: ```csharp - // Ingestion section - Console.WriteLine("Ingesting data from an existing blob"); - var sasURI =""; - ingestClient.IngestFromStorageAsync(sasURI, ingestProps).Wait(); + // Ingestion section + Console.WriteLine("Ingesting data from an existing blob"); + var sasURI =""; + ingestClient.IngestFromStorageAsync(sasURI, ingestProps).Wait(); ``` ### [Python](#tab/python) -Replace the ingestion section code with the following section: +To ingest the blob, call the `ingest_from_blob()` API. + +Replace the ingestion section with the following code: ```python # Ingestion section @@ -488,9 +623,11 @@ Replace the ingestion section code with the following section: ingest_client.ingest_from_blob(blob_descriptor, ingest_props) ``` -### [Typescript](#tab/typescript) +### [TypeScript](#tab/typescript) -Replace the ingestion section code with the following section: +To ingest the blob, call the `ingestFromBlob()` API. + +Replace the ingestion section with the following code: ```typescript // Ingestion section @@ -499,6 +636,30 @@ Replace the ingestion section code with the following section: await ingestClient.ingestFromBlob(sasURI, ingestProperties); ``` +### [Java](#tab/java) + +To ingest the blob, call the `ingestFromBlob()` API. + +Replace the ingestion section with the following code: + +```java + // Ingestion section + try (ManagedStreamingIngestClient ingestClient = (ManagedStreamingIngestClient) IngestClientFactory + .createManagedStreamingIngestClient(clusterKcsb, ingestionKcsb)) { + System.out.println("Ingesting data from a blob"); + String sasURI = ""; + BlobSourceInfo blobSourceInfo = new BlobSourceInfo(sasURI); + IngestionProperties ingestionProperties = new IngestionProperties(database, table); + ingestionProperties.setDataFormat(DataFormat.CSV); + ingestionProperties.setIgnoreFirstRecord(true); + ingestClient.ingestFromBlob(blobSourceInfo, ingestionProperties); + + } catch (Exception e) { + // TODO: handle exception + System.out.println("Error: " + e); + } +``` + --- The results are as follows: @@ -530,8 +691,8 @@ row 1 : > You can also use a managed identity based authorization as an alternative to SAS or account keys in Azure Storage and Azure Data Lake. For more information, see [Ingest data using managed identity authentication](/azure/data-explorer/ingest-data-managed-identity) ## Resources -+ [Kusto Python Git Hub repository](https://github.com/Azure/azure-kusto-python) -+ [Kusto NodeJS Git Hub repository](https://github.com/Azure/azure-kusto-node) -+ [Kusto Java Git Hub repository](https://github.com/azure/azure-kusto-java) -+ [Kusto .Net API SDK](/kusto/api/netfx/about-the-sdk) ++ [Kusto Python GitHub repository](https://github.com/Azure/azure-kusto-python) ++ [Kusto NodeJS GitHub repository](https://github.com/Azure/azure-kusto-node) ++ [Kusto Java GitHub repository](https://github.com/azure/azure-kusto-java) ++ [Kusto .NET API SDK](/kusto/api/netfx/about-the-sdk) + [Generate a Sample App wizard](https://dataexplorer.azure.com/oneclick/generatecode) From 0da5e109e637b1168242f4690112b54bb436ba4c Mon Sep 17 00:00:00 2001 From: Ed Baynash <105771311+EdB-MSFT@users.noreply.github.com> Date: Mon, 10 Feb 2025 12:26:03 +0200 Subject: [PATCH 08/20] fixes --- .../app-managed-streaming-ingest.md | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md b/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md index 509681f333..9e46a69d71 100644 --- a/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md +++ b/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md @@ -19,27 +19,27 @@ Streaming Ingestion allows writing data to Kusto with near-real-time latencies. In this article, you’ll learn how to ingest data to Kusto by queuing it to Kusto’s batching manager. You'll ingest a data stream in the form of a file, stream, or blob. -> [NOTE!] +> [!NOTE] > Streaming ingestion is a high velocity ingestion protocol. Streaming Ingestion isn't the same as `IngestFromStream`. -> `IngestFromStream` is an API that takes in a memory stream and sends it for ingestion. It is available for all ingestion client implementations including queued and Streaming ingestion. +> `IngestFromStream` is an API that takes in a memory stream and sends it for ingestion. `IngestFromStream` is available for all ingestion client implementations including queued and streaming ingestion. ## Streaming and Managed Streaming Kusto SDKs provide two flavors of Streaming Ingestion Clients, `StreamingIngestionClient` and `ManagedStreamingIngestionClient` where Managed Streaming has built-in retry and failover logic. -When ingesting with managed streaming failures and retries are handled automatically as follows: +When ingesting with `ManagedStreamingIngestionClient`, failures and retries are handled automatically as follows: + Streaming requests that fail due to server-side size limitations are failed-over to queued ingestion. + Data that's larger than 4 MB is automatically sent to queued ingestion, regardless of format or compression. + Transient failure, for example throttling, are retried three times, then moved to queued ingestion. -+ Permanent failures aren't retired. ++ Permanent failures aren't retried. ## Limitations Data Streaming has some limitations compared to queuing data for ingestion. -• Tags can’t be set on data -• Mapping can only be provided using [`ingestionMappingReference`](kusto/management/mappings?view=microsoft-fabric#mapping-with-ingestionmappingreference). Inline mapping isn't supported. -• The payload sent in the request can’t exceed 10MBs (regardless of format or compression). ++ Tags can’t be set on data. ++ Mapping can only be provided using [`ingestionMappingReference`](kusto/management/mappings?view=microsoft-fabric#mapping-with-ingestionmappingreference). Inline mapping isn't supported. ++ The payload sent in the request can’t exceed 10 MB, regardless of format or compression. For more information, see [Streaming Limitations](/azure/data-explorer/ingest-data-streaming#limitations). @@ -51,9 +51,9 @@ For more information, see [Streaming Limitations](/azure/data-explorer/ingest-da ## Before you begin -Before creating the app: +Before creating the app the following steps are required. Each step is detailed in the following sections. -1. Configure streaming ingestion on your Azure Data Explorer cluster. +1. Configure streaming ingestion on your Azure Data Explorer cluster. 1. Create a Kusto table to ingest the data into. 1. Enable the streaming ingestion policy on the table. 1. Download the [stormevent.csv](https://github.com/MicrosoftDocs/dataexplorer-docs-samples/blob/main/docs/resources/app-basic-ingestion/stormevents.csv) sample data file containing 1,000 storm event records. From ac7af1860820a52530f4cf85493fced92aeb399b Mon Sep 17 00:00:00 2001 From: Ed Baynash <105771311+EdB-MSFT@users.noreply.github.com> Date: Mon, 10 Feb 2025 12:32:35 +0200 Subject: [PATCH 09/20] fixes --- .../app-managed-streaming-ingest.md | 21 ++++++++----------- .../api/get-started/app-queued-ingestion.md | 6 +++--- data-explorer/toc.yml | 2 ++ 3 files changed, 14 insertions(+), 15 deletions(-) diff --git a/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md b/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md index 9e46a69d71..787212c490 100644 --- a/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md +++ b/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md @@ -60,7 +60,7 @@ Before creating the app the following steps are required. Each step is detailed ## Configure streaming ingestion -To configure streaming ingestion on your Azure Data Explorer cluster, see [Configure streaming ingestion on your Azure Data Explorer cluster](/azure/data-explorer/ingest-data-streaming?tabs=azure-portal%2Ccsharp). If you're using a free cluster, streaming ingestion is automatically enabled. +To configure streaming ingestion, see [Configure streaming ingestion on your Azure Data Explorer cluster](/azure/data-explorer/ingest-data-streaming?tabs=azure-portal%2Ccsharp). If you're using a free cluster, streaming ingestion is automatically enabled. ### Create a Kusto table @@ -91,7 +91,7 @@ Enter the cluster query and ingest URI and database name in the relevant variabl ### [C#](#tab/c-sharp) -The code sample includes a service function `PrintResultAsValueList()` for printing query results +The code sample includes a service function `PrintResultAsValueList()` for printing query results. ```C# using System; @@ -181,11 +181,6 @@ Add the following lines after the ingestion command: PrintResultAsValueList(result); ``` -### [TypeScript](#tab/typescript) - -The code sample includes a service function `printResultAsValueList()` for printing query results - - ### [Python](#tab/python) The code sample includes a service function `print_result_as_value_list()` for printing query results @@ -266,6 +261,10 @@ Add the following lines after the ingestion command: Run the script from the directory where the script and stormevents.csv are located. Alternatively, you can specify the full path to the file replacing `file_path = os.curdir + "/stormevents.csv"` with `file_path = ""` +### [TypeScript](#tab/typescript) + +The code sample includes a service function `printResultAsValueList()` for printing query results. + ```typescript import { Client as KustoClient, KustoConnectionStringBuilder } from "azure-kusto-data"; @@ -325,7 +324,6 @@ Place *stormevents.csv* file in the same location as your script. Since our CSV Add and ingestion section using the following lines to the end of `main()`. - ```typescript const ingestProperties = new IngestionProperties({ @@ -339,6 +337,7 @@ Add and ingestion section using the following lines to the end of `main()`. console.log("Ingesting data from a file"); await ingestClient.ingestFromFile(".\\stormevents.csv", ingestProperties); ``` + Let’s also query the new number of rows and the most recent row after the ingestion. Add the following lines after the ingestion command: @@ -352,10 +351,9 @@ Add the following lines after the ingestion command: printResultAsValueList(result); ``` - ### [Java](#tab/java) -The code sample includes a service method `printResultsAsValueList()` for printing query results +The code sample includes a service method `printResultsAsValueList()` for printing query results. ```java @@ -414,7 +412,6 @@ public class BatchIngestion { ## Stream a file for ingestion - Use the `ingestFromFile()` method to ingest the *stormevents.csv* file. Place *stormevents.csv* file in the same location as your script. Since our CSV file contains a header row, use `ingestionProperties.setIgnoreFirstRecord(true);` to ignore the header. @@ -594,7 +591,7 @@ Kusto supports ingestion from Azure Storage blobs, Azure Data Lake files, and Am When you send a blob for streaming, the client only sends the blob reference to the database. The data is read by the database service itself. Read Access to the blob can be granted with keys, SAS tokens, or managed identities attached to the Kusto Cluster. -To stream from a blob, upload the sample csv file to your storage account and generate a SAS URI with built-in read permissions. Use the URIin the code sample. For information on uploading a file to blob storage, see [Upload, download, and list blobs with the Azure portal](/azure/storage/blobs/storage-quickstart-blobs-portal). For information on generation an SAS URL, see [Generate a SAS token](/kusto/api/connection-strings/generate-sas-token?view=azure-data-explorer&preserve-view=true). +To ingest from a blob, upload the sample csv file to your storage account and generate a SAS URI with built-in read permissions. Use the URI in the code sample. For information on uploading a file to blob storage, see [Upload, download, and list blobs with the Azure portal](/azure/storage/blobs/storage-quickstart-blobs-portal). For information on generation an SAS URL, see [Generate a SAS token](/kusto/api/connection-strings/generate-sas-token?view=azure-data-explorer&preserve-view=true). ### [C#](#tab/c-sharp) diff --git a/data-explorer/kusto/api/get-started/app-queued-ingestion.md b/data-explorer/kusto/api/get-started/app-queued-ingestion.md index c9a83b0de0..501afae2e0 100644 --- a/data-explorer/kusto/api/get-started/app-queued-ingestion.md +++ b/data-explorer/kusto/api/get-started/app-queued-ingestion.md @@ -179,7 +179,7 @@ Add the following code: const query = table + " | count"; let response = await kustoClient.execute(database, query); console.log("\nNumber of rows in " + table + " BEFORE ingestion:"); - printResultAsValueList(response); + printResultsAsValueList(response); } function printResultsAsValueList(response) { @@ -225,7 +225,7 @@ Add the following code: KustoOperationResult results = kustoClient.execute(database, query); KustoResultSetTable primaryResults = results.getPrimaryResults(); System.out.println("\nNumber of rows in " + table + " BEFORE ingestion:"); - printResultAsValueList(primaryResults); + printResultsAsValueList(primaryResults); } } @@ -679,7 +679,7 @@ public class BatchIngestion { KustoOperationResult results = kustoClient.execute(database, query); KustoResultSetTable primaryResults = results.getPrimaryResults(); System.out.println("\nNumber of rows in " + table + " BEFORE ingestion:"); - printResultAsValueList(primaryResults); + printResultsAsValueList(primaryResults); System.out.println("\nIngesting data from file: \n\t " + fileSourceInfo.toString()); IngestionProperties ingestProps = new IngestionProperties(database, table); diff --git a/data-explorer/toc.yml b/data-explorer/toc.yml index e1b47533f7..3b974e256f 100644 --- a/data-explorer/toc.yml +++ b/data-explorer/toc.yml @@ -574,6 +574,8 @@ items: href: /kusto/api/get-started/app-management-commands?view=azure-data-explorer&preserve-view=true - name: Queue data for ingestion href: /kusto/api/get-started/app-queued-ingestion?view=azure-data-explorer&preserve-view=true + - name: Ingest data the managed streaming ingestion client + href: /kusto/api/get-started/app-managed-streaming-ingest?view=azure-data-explorer&preserve-view=true - name: Connection strings items: - name: Connection strings overview From 1dc8799855dff471c6ca8d57b80270d8cb4f4a39 Mon Sep 17 00:00:00 2001 From: Ed Baynash <105771311+EdB-MSFT@users.noreply.github.com> Date: Mon, 10 Feb 2025 12:39:21 +0200 Subject: [PATCH 10/20] fixes2 --- .../kusto/api/get-started/app-managed-streaming-ingest.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md b/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md index 787212c490..89d9ef0c01 100644 --- a/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md +++ b/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md @@ -38,7 +38,7 @@ When ingesting with `ManagedStreamingIngestionClient`, failures and retries are Data Streaming has some limitations compared to queuing data for ingestion. + Tags can’t be set on data. -+ Mapping can only be provided using [`ingestionMappingReference`](kusto/management/mappings?view=microsoft-fabric#mapping-with-ingestionmappingreference). Inline mapping isn't supported. ++ Mapping can only be provided using [`ingestionMappingReference`](/kusto/management/mappings?view=azure-data-explorer#mapping-with-ingestionmappingreference). Inline mapping isn't supported. + The payload sent in the request can’t exceed 10 MB, regardless of format or compression. For more information, see [Streaming Limitations](/azure/data-explorer/ingest-data-streaming#limitations). From 7519ace8c462582dee8471ae6e471fcb7b1d56e0 Mon Sep 17 00:00:00 2001 From: Ed Baynash <105771311+EdB-MSFT@users.noreply.github.com> Date: Mon, 10 Feb 2025 12:48:06 +0200 Subject: [PATCH 11/20] fixes toc --- data-explorer/toc.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data-explorer/toc.yml b/data-explorer/toc.yml index 3b974e256f..c54be51c4c 100644 --- a/data-explorer/toc.yml +++ b/data-explorer/toc.yml @@ -574,7 +574,7 @@ items: href: /kusto/api/get-started/app-management-commands?view=azure-data-explorer&preserve-view=true - name: Queue data for ingestion href: /kusto/api/get-started/app-queued-ingestion?view=azure-data-explorer&preserve-view=true - - name: Ingest data the managed streaming ingestion client + - name: Ingest data using the managed streaming ingestion client href: /kusto/api/get-started/app-managed-streaming-ingest?view=azure-data-explorer&preserve-view=true - name: Connection strings items: From c098afcba2c737beadecd70f246e373d1a703642 Mon Sep 17 00:00:00 2001 From: Ed Baynash <105771311+EdB-MSFT@users.noreply.github.com> Date: Mon, 10 Feb 2025 12:57:01 +0200 Subject: [PATCH 12/20] fixe 3 --- .../kusto/api/get-started/app-managed-streaming-ingest.md | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md b/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md index 89d9ef0c01..537c04e9ef 100644 --- a/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md +++ b/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md @@ -6,18 +6,17 @@ ms.topic: how-to ms.date: 02/03/2025 monikerRange: "azure-data-explorer" -# customer intent: To learn about creating an app ingest using Kusto’s batching manager and streaming ingestion. +# customer intent: To learn about creating an app ingest using Kusto’s managed streaming ingestion client. --- -# Create an app for streaming ingestion using Kusto’s batching manager +# Create an app for streaming ingestion using Kusto’s managed streaming ingestion client > [!INCLUDE [applies](../../includes/applies-to-version/applies.md)] [!INCLUDE [fabric](../../includes/applies-to-version/fabric.md)] [!INCLUDE [azure-data-explorer](../../includes/applies-to-version/azure-data-explorer.md)] Streaming Ingestion allows writing data to Kusto with near-real-time latencies. It’s also useful when writing small amounts of data to a large number of tables, making batching inefficient. -In this article, you’ll learn how to ingest data to Kusto by queuing it to Kusto’s batching manager. -You'll ingest a data stream in the form of a file, stream, or blob. +In this article, you’ll learn how to ingest data to Kusto using the managed streaming ingestion client. You'll ingest a data stream in the form of a file, in-memory stream, or blob. > [!NOTE] > Streaming ingestion is a high velocity ingestion protocol. Streaming Ingestion isn't the same as `IngestFromStream`. From b2a5b074c8aac51dcf6bb029c8c3240a82bd5cde Mon Sep 17 00:00:00 2001 From: Ed Baynash <105771311+EdB-MSFT@users.noreply.github.com> Date: Mon, 10 Feb 2025 14:23:45 +0200 Subject: [PATCH 13/20] kusto toc --- data-explorer/kusto/toc.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/data-explorer/kusto/toc.yml b/data-explorer/kusto/toc.yml index d02447de5c..a989c392f0 100644 --- a/data-explorer/kusto/toc.yml +++ b/data-explorer/kusto/toc.yml @@ -96,6 +96,8 @@ items: href: api/get-started/app-management-commands.md - name: Queue data for ingestion href: api/get-started/app-queued-ingestion.md + - name: Ingest data using the managed streaming ingestion client + href: api/get-started/app-managed-streaming-ingest.md - name: Connection strings items: - name: Connection strings overview From effe2ece5326918fbe74ba19ed9a1fa1b2649077 Mon Sep 17 00:00:00 2001 From: Alexander Sloutsky <17802887+sloutsky@users.noreply.github.com> Date: Tue, 11 Feb 2025 21:15:23 +0200 Subject: [PATCH 14/20] Update data-explorer/kusto/query/scalar-data-types/dynamic.md Co-authored-by: Shlomo Sagir <51323195+shsagir@users.noreply.github.com> --- data-explorer/kusto/query/scalar-data-types/dynamic.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data-explorer/kusto/query/scalar-data-types/dynamic.md b/data-explorer/kusto/query/scalar-data-types/dynamic.md index a89befb059..d40b1a9c55 100644 --- a/data-explorer/kusto/query/scalar-data-types/dynamic.md +++ b/data-explorer/kusto/query/scalar-data-types/dynamic.md @@ -20,7 +20,7 @@ The `dynamic` scalar data type can be any of the following values: > > * Although the `dynamic` type appears JSON-like, it can hold values that the JSON model doesn't represent because they don't exist in JSON (e.g. `long`, `real`, `datetime`, `timespan`, and `guid`). Therefore, in serializing `dynamic` values into a JSON representation, values that JSON can't represent are serialized into `string` values. Conversely, Kusto will parse strings as strongly-typed values if they can be parsed as such. This applies to `datetime`, `real`, `long`, and `guid` types. For more information on the JSON object model, see [json.org](https://json.org/). > * Kusto doesn't attempt to preserve the order of name-to-value mappings in a property bag, and so you can't assume the order to be preserved. It's entirely possible for two property bags with the same set of mappings to yield different results when they are represented as `string` values, for example. -> * At ingestion: Values of `dynamic` values are limited by default to 1MiB (2^20), uncompressed. You can increase the `MaxValueSize` of the column by changing its [encoding policy](../../management/alter-encoding-policy.md) and increase it up-to 32MiB. +> * When ingesting data, values of `dynamic` data types are limited by default to 1 MiB (2^20), uncompressed. You can increase the `MaxValueSize` of the column by changing its [encoding policy](../../management/alter-encoding-policy.md) to allow values up to 32 MiB. ## Dynamic literals From c9126509f97f073da447b6d9f89b9ecffde8766c Mon Sep 17 00:00:00 2001 From: Ed Baynash <105771311+EdB-MSFT@users.noreply.github.com> Date: Wed, 12 Feb 2025 12:45:34 +0200 Subject: [PATCH 15/20] fixes --- .../app-managed-streaming-ingest.md | 21 +++++++++++-------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md b/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md index 537c04e9ef..f17fc9e95b 100644 --- a/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md +++ b/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md @@ -33,6 +33,9 @@ When ingesting with `ManagedStreamingIngestionClient`, failures and retries are + Transient failure, for example throttling, are retried three times, then moved to queued ingestion. + Permanent failures aren't retried. +> [!NOTE] +> If the streaming ingestion fails and the data is sent to queued ingestion, some latency is introduced until the data is visible in the table. + ## Limitations Data Streaming has some limitations compared to queuing data for ingestion. @@ -50,7 +53,7 @@ For more information, see [Streaming Limitations](/azure/data-explorer/ingest-da ## Before you begin -Before creating the app the following steps are required. Each step is detailed in the following sections. +Before creating the app, the following steps are required. Each step is detailed in the following sections. 1. Configure streaming ingestion on your Azure Data Explorer cluster. 1. Create a Kusto table to ingest the data into. @@ -109,22 +112,18 @@ class Program var cluster_url = ""; var ingestion_url = ""; var database_name = " "; - - + var clusterKcsb = new KustoConnectionStringBuilder(cluster_url).WithAadUserPromptAuthentication(); var ingestionKcsb = new KustoConnectionStringBuilder(ingestion_url).WithAadUserPromptAuthentication();; using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb)) using (var ingestClient = KustoIngestFactory.CreateManagedStreamingIngestClient(clusterKcsb, ingestionKcsb)) - { - - + { Console.WriteLine("Number of rows in " + tableName); var queryProvider = KustoClientFactory.CreateCslQueryProvider(clusterKcsb); var result = kustoClient.ExecuteQuery(databaseName, tableName + " | count", new ClientRequestProperties()); PrintResultAsValueList(result); - } } @@ -138,7 +137,6 @@ class Program Console.WriteLine("row:" + row.ToString() + "\t"); for (int i = 0; i < result.FieldCount; i++) { - Console.WriteLine("\t"+ result.GetName(i)+" - " + result.GetValue(i) ); } Console.WriteLine(); @@ -208,7 +206,6 @@ def print_result_as_value_list(result): for col in cols: print("\t", col, "-", row[col]) - def main(): # Connect to the public access Help cluster file_path = os.curdir + "/stormevents.csv" @@ -243,6 +240,7 @@ Add and ingestion section using the following lines to the end of `main()`. print("Ingesting data from a file") ingest_props = IngestionProperties(database_name, table_name, DataFormat.CSV, ignore_first_record=True) ingest_client.ingest_from_file(file_path, ingest_props) + ingest_client.close() ``` Let’s also query the new number of rows and the most recent row after the ingestion. @@ -335,6 +333,7 @@ Add and ingestion section using the following lines to the end of `main()`. //Ingest section console.log("Ingesting data from a file"); await ingestClient.ingestFromFile(".\\stormevents.csv", ingestProperties); + ingestClient.close(); ``` Let’s also query the new number of rows and the most recent row after the ingestion. @@ -516,6 +515,7 @@ Replace the ingestion section with the following code: # when possible provide the size of the raw data stream_descriptor = StreamDescriptor(string_stream, is_compressed=False, size=len(single_line)) ingest_client.ingest_from_stream(stream_descriptor, ingest_props) + ingest_client.close() ``` ### [TypeScript](#tab/typescript) @@ -529,6 +529,7 @@ Replace the ingestion section with the following code: console.log('Ingesting data from memory'); const single_line = '2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,"{}"' await ingestClient.ingestFromStream(Buffer.from(single_line), ingestProperties) + ingestClient.close(); ``` ### [Java](#tab/java) @@ -617,6 +618,7 @@ Replace the ingestion section with the following code: blob_descriptor = BlobDescriptor("") ingest_props = IngestionProperties(database_name, table_name, DataFormat.CSV, ignore_first_record=True) ingest_client.ingest_from_blob(blob_descriptor, ingest_props) + ingest_client.close() ``` ### [TypeScript](#tab/typescript) @@ -630,6 +632,7 @@ Replace the ingestion section with the following code: console.log('Ingesting data from an existing blob'); const sasURI = ""; await ingestClient.ingestFromBlob(sasURI, ingestProperties); + ingestClient.close(); ``` ### [Java](#tab/java) From 1e55222eda7fec51509d5c508de58c5ec67d775a Mon Sep 17 00:00:00 2001 From: Ed Baynash <105771311+EdB-MSFT@users.noreply.github.com> Date: Wed, 12 Feb 2025 15:42:21 +0200 Subject: [PATCH 16/20] removed IgnoreFirstRecord --- .../app-managed-streaming-ingest.md | 24 ++++++++----------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md b/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md index f17fc9e95b..83189eb803 100644 --- a/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md +++ b/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md @@ -42,6 +42,7 @@ Data Streaming has some limitations compared to queuing data for ingestion. + Tags can’t be set on data. + Mapping can only be provided using [`ingestionMappingReference`](/kusto/management/mappings?view=azure-data-explorer#mapping-with-ingestionmappingreference). Inline mapping isn't supported. + The payload sent in the request can’t exceed 10 MB, regardless of format or compression. ++ The `ignoreFirstRecord` property isn't supported for managed streaming ingestion, so ingested data must not contain a header row. For more information, see [Streaming Limitations](/azure/data-explorer/ingest-data-streaming#limitations). @@ -58,7 +59,7 @@ Before creating the app, the following steps are required. Each step is detailed 1. Configure streaming ingestion on your Azure Data Explorer cluster. 1. Create a Kusto table to ingest the data into. 1. Enable the streaming ingestion policy on the table. -1. Download the [stormevent.csv](https://github.com/MicrosoftDocs/dataexplorer-docs-samples/blob/main/docs/resources/app-basic-ingestion/stormevents.csv) sample data file containing 1,000 storm event records. +1. Download the [stormevent.csv](https://github.com/MicrosoftDocs/dataexplorer-docs-samples/blob/main/docs/resources/app-managed-streaming-ingestion/stormevents.csv) sample data file containing 1,000 storm event records. ## Configure streaming ingestion @@ -149,15 +150,14 @@ class Program Use the `IngestFromStorageAsync` method to ingest the *stormevents.csv* file. -Copy *stormevents.csv* file in the same location as your script. Since our CSV file contains a header row, use `IgnoreFirstRecord=True` to ignore the header. +Copy *stormevents.csv* file to the same location as your script. Since our input is a CSV file, use `Format = DataSourceFormat.csv` in the ingestion properties. Add and ingestion section using the following lines to the end of `Main()`. ```csharp var ingestProps = new KustoIngestionProperties(databaseName, tableName) { - Format = DataSourceFormat.csv, - IgnoreFirstRecord = true + Format = DataSourceFormat.csv }; //Ingestion section @@ -231,14 +231,14 @@ main() Use the `ingest_from_file()` API to ingest the *stormevents.csv* file. -Place *stormevents.csv* file in the same location as your script. Since our CSV file contains a header row, use `ignore_first_record=True` to ignore the header. +Place the *stormevents.csv* file in the same location as your script. Since our input is a CSV file, use `DataFormat.CSV` in the ingestion properties. Add and ingestion section using the following lines to the end of `main()`. ```python # Ingestion section print("Ingesting data from a file") - ingest_props = IngestionProperties(database_name, table_name, DataFormat.CSV, ignore_first_record=True) + ingest_props = IngestionProperties(database_name, table_name, DataFormat.CSV) ingest_client.ingest_from_file(file_path, ingest_props) ingest_client.close() ``` @@ -317,7 +317,7 @@ main().catch((err) => { Use the `ingestFromFile()` API to ingest the *stormevents.csv* file. -Place *stormevents.csv* file in the same location as your script. Since our CSV file contains a header row, use `ignoreFirstRecord=True` to ignore the header. +Place the *stormevents.csv* file in the same location as your script. Since our input is a CSV file, use `format: DataFormat.CSV` in the ingestion properties. Add and ingestion section using the following lines to the end of `main()`. @@ -326,8 +326,7 @@ Add and ingestion section using the following lines to the end of `main()`. const ingestProperties = new IngestionProperties({ database: databaseName, table: tableName, - format: DataFormat.CSV, - additionalProperties: { ignoreFirstRecord: true } + format: DataFormat.CSV }); //Ingest section @@ -411,7 +410,7 @@ public class BatchIngestion { ## Stream a file for ingestion Use the `ingestFromFile()` method to ingest the *stormevents.csv* file. -Place *stormevents.csv* file in the same location as your script. Since our CSV file contains a header row, use `ingestionProperties.setIgnoreFirstRecord(true);` to ignore the header. +Place the *stormevents.csv* file in the same location as your script. Since our input is a CSV file, use `ingestionProperties.setDataFormat(DataFormat.CSV)` in the ingestion properties. Add and ingestion section using the following lines to the end of `main()`. @@ -424,7 +423,6 @@ Add and ingestion section using the following lines to the end of `main()`. String filePath = "stormevents.csv"; IngestionProperties ingestionProperties = new IngestionProperties(database, table); ingestionProperties.setDataFormat(DataFormat.CSV); - ingestionProperties.setIgnoreFirstRecord(true); FileSourceInfo fileSourceInfo = new FileSourceInfo(filePath, 0); ingestClient.ingestFromFile(fileSourceInfo, ingestionProperties); @@ -547,7 +545,6 @@ Replace the ingestion section with the following code: System.out.println("Ingesting data from a byte array"); IngestionProperties ingestionProperties = new IngestionProperties(database, table); ingestionProperties.setDataFormat(DataFormat.CSV); - ingestionProperties.setIgnoreFirstRecord(true); StreamSourceInfo streamSourceInfo = new StreamSourceInfo(inputStream); ingestClient.ingestFromStream(streamSourceInfo, ingestionProperties); @@ -616,7 +613,7 @@ Replace the ingestion section with the following code: # Ingestion section print("Ingesting data from an existing blob") blob_descriptor = BlobDescriptor("") - ingest_props = IngestionProperties(database_name, table_name, DataFormat.CSV, ignore_first_record=True) + ingest_props = IngestionProperties(database_name, table_name, DataFormat.CSV) ingest_client.ingest_from_blob(blob_descriptor, ingest_props) ingest_client.close() ``` @@ -650,7 +647,6 @@ Replace the ingestion section with the following code: BlobSourceInfo blobSourceInfo = new BlobSourceInfo(sasURI); IngestionProperties ingestionProperties = new IngestionProperties(database, table); ingestionProperties.setDataFormat(DataFormat.CSV); - ingestionProperties.setIgnoreFirstRecord(true); ingestClient.ingestFromBlob(blobSourceInfo, ingestionProperties); } catch (Exception e) { From 589cb2ab862bd62a14a6eed244fa4fd0da2df645 Mon Sep 17 00:00:00 2001 From: EdB-MSFT <105771311+EdB-MSFT@users.noreply.github.com> Date: Sun, 16 Feb 2025 14:22:48 +0200 Subject: [PATCH 17/20] removed blob --- .../app-managed-streaming-ingest.md | 141 +++--------------- data-explorer/kusto/toc.yml | 2 +- 2 files changed, 21 insertions(+), 122 deletions(-) diff --git a/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md b/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md index 83189eb803..cdbbc3621a 100644 --- a/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md +++ b/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md @@ -1,16 +1,16 @@ --- -title: Create an app to ingest data by streaming ingestion using Kusto’s managed streaming ingestion client -description: Learn how to create an app to ingest data from a file, stream, or blob streaming using the ingestion managed streaming ingestion client. +title: Create an app to get data using the managed streaming ingestion client +description: Learn how to create an app to ingest data from a file or in-memory stream using the managed streaming ingestion client. ms.reviewer: yogilad ms.topic: how-to ms.date: 02/03/2025 monikerRange: "azure-data-explorer" -# customer intent: To learn about creating an app ingest using Kusto’s managed streaming ingestion client. +# customer intent: To learn about creating an app to ingest data using Kusto’s managed streaming ingestion client. --- -# Create an app for streaming ingestion using Kusto’s managed streaming ingestion client +# Create an app to get data using the managed streaming ingestion client > [!INCLUDE [applies](../../includes/applies-to-version/applies.md)] [!INCLUDE [fabric](../../includes/applies-to-version/fabric.md)] [!INCLUDE [azure-data-explorer](../../includes/applies-to-version/azure-data-explorer.md)] @@ -26,7 +26,7 @@ In this article, you’ll learn how to ingest data to Kusto using the managed st Kusto SDKs provide two flavors of Streaming Ingestion Clients, `StreamingIngestionClient` and `ManagedStreamingIngestionClient` where Managed Streaming has built-in retry and failover logic. -When ingesting with `ManagedStreamingIngestionClient`, failures and retries are handled automatically as follows: +When ingesting with the `ManagedStreamingIngestionClient` API, failures and retries are handled automatically as follows: + Streaming requests that fail due to server-side size limitations are failed-over to queued ingestion. + Data that's larger than 4 MB is automatically sent to queued ingestion, regardless of format or compression. @@ -34,7 +34,7 @@ When ingesting with `ManagedStreamingIngestionClient`, failures and retries are + Permanent failures aren't retried. > [!NOTE] -> If the streaming ingestion fails and the data is sent to queued ingestion, some latency is introduced until the data is visible in the table. +> If the streaming ingestion fails and the data is sent to queued ingestion, some delay is be experienced before the data is visible in the table. ## Limitations @@ -110,12 +110,12 @@ class Program static void Main(string[] args) { var tableName = "MyStormEvents"; - var cluster_url = ""; - var ingestion_url = ""; - var database_name = " "; + var clusterUrl = ""; + var ingestionUrl = ""; + var databaseName = " "; - var clusterKcsb = new KustoConnectionStringBuilder(cluster_url).WithAadUserPromptAuthentication(); - var ingestionKcsb = new KustoConnectionStringBuilder(ingestion_url).WithAadUserPromptAuthentication();; + var clusterKcsb = new KustoConnectionStringBuilder(clusterUrl).WithAadUserPromptAuthentication(); + var ingestionKcsb = new KustoConnectionStringBuilder(ingestionUrl).WithAadUserPromptAuthentication();; using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb)) using (var ingestClient = KustoIngestFactory.CreateManagedStreamingIngestClient(clusterKcsb, ingestionKcsb)) @@ -155,14 +155,14 @@ Copy *stormevents.csv* file to the same location as your script. Since our input Add and ingestion section using the following lines to the end of `Main()`. ```csharp - var ingestProps = new KustoIngestionProperties(databaseName, tableName) + var ingestProperties = new KustoIngestionProperties(databaseName, tableName) { Format = DataSourceFormat.csv }; //Ingestion section Console.WriteLine("Ingesting data from a file"); - ingestClient.IngestFromStorageAsync(".\\stormevents.csv", ingestProps).Wait(); + ingestClient.IngestFromStorageAsync(".\\stormevents.csv", ingestProperties).Wait(); ``` Let’s also query the new number of rows and the most recent row after the ingestion. @@ -218,7 +218,6 @@ def main(): with KustoClient(cluster_kcsb) as kusto_client: with ManagedStreamingIngestClient(cluster_kcsb, ingestion_kcsb) as ingest_client: - # with KustoStreamingIngestClient(cluster_kcsb) as ingest_client: print("Number of rows in " + table_name) result = kusto_client.execute_query(database_name, table_name + " | count") @@ -229,7 +228,6 @@ main() ## Stream a file for ingestion - Use the `ingest_from_file()` API to ingest the *stormevents.csv* file. Place the *stormevents.csv* file in the same location as your script. Since our input is a CSV file, use `DataFormat.CSV` in the ingestion properties. @@ -238,8 +236,8 @@ Add and ingestion section using the following lines to the end of `main()`. ```python # Ingestion section print("Ingesting data from a file") - ingest_props = IngestionProperties(database_name, table_name, DataFormat.CSV) - ingest_client.ingest_from_file(file_path, ingest_props) + ingest_properties = IngestionProperties(database_name, table_name, DataFormat.CSV) + ingest_client.ingest_from_file(file_path, ingest_properties) ingest_client.close() ``` @@ -494,7 +492,7 @@ Replace the ingestion section with the following code: { LeaveOpen = false }; - ingestClient.IngestFromStreamAsync(stream, ingestProps, streamSourceOptions).Wait(); + ingestClient.IngestFromStreamAsync(stream, ingestProperties, streamSourceOptions).Wait(); } ``` @@ -509,10 +507,10 @@ Replace the ingestion section with the following code: print("Ingesting data from memory") single_line = '2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,"{}"' string_stream = io.StringIO(single_line) - ingest_props = IngestionProperties(database_name, table_name, DataFormat.CSV) + ingest_properties = IngestionProperties(database_name, table_name, DataFormat.CSV) # when possible provide the size of the raw data stream_descriptor = StreamDescriptor(string_stream, is_compressed=False, size=len(single_line)) - ingest_client.ingest_from_stream(stream_descriptor, ingest_props) + ingest_client.ingest_from_stream(stream_descriptor, ingest_properties) ingest_client.close() ``` @@ -525,8 +523,8 @@ Replace the ingestion section with the following code: ```typescript //Ingest section console.log('Ingesting data from memory'); - const single_line = '2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,"{}"' - await ingestClient.ingestFromStream(Buffer.from(single_line), ingestProperties) + const singleLine = '2018-01-26 00:00:00.0000000,2018-01-27 14:00:00.0000000,MEXICO,0,0,Unknown,"{}"' + await ingestClient.ingestFromStream(Buffer.from(singleLine), ingestProperties) ingestClient.close(); ``` @@ -582,105 +580,6 @@ row 1 : --- -### Stream a blob for ingestion - -Kusto supports ingestion from Azure Storage blobs, Azure Data Lake files, and Amazon S3 files. - -When you send a blob for streaming, the client only sends the blob reference to the database. The data is read by the database service itself. Read Access to the blob can be granted with keys, SAS tokens, or managed identities attached to the Kusto Cluster. - -To ingest from a blob, upload the sample csv file to your storage account and generate a SAS URI with built-in read permissions. Use the URI in the code sample. For information on uploading a file to blob storage, see [Upload, download, and list blobs with the Azure portal](/azure/storage/blobs/storage-quickstart-blobs-portal). For information on generation an SAS URL, see [Generate a SAS token](/kusto/api/connection-strings/generate-sas-token?view=azure-data-explorer&preserve-view=true). - -### [C#](#tab/c-sharp) - -To ingest the blob, call the `IngestFromStorageAsync()` method. - -Replace the ingestion section with the following code: - -```csharp - // Ingestion section - Console.WriteLine("Ingesting data from an existing blob"); - var sasURI =""; - ingestClient.IngestFromStorageAsync(sasURI, ingestProps).Wait(); -``` - -### [Python](#tab/python) - -To ingest the blob, call the `ingest_from_blob()` API. - -Replace the ingestion section with the following code: - -```python - # Ingestion section - print("Ingesting data from an existing blob") - blob_descriptor = BlobDescriptor("") - ingest_props = IngestionProperties(database_name, table_name, DataFormat.CSV) - ingest_client.ingest_from_blob(blob_descriptor, ingest_props) - ingest_client.close() -``` - -### [TypeScript](#tab/typescript) - -To ingest the blob, call the `ingestFromBlob()` API. - -Replace the ingestion section with the following code: - -```typescript - // Ingestion section - console.log('Ingesting data from an existing blob'); - const sasURI = ""; - await ingestClient.ingestFromBlob(sasURI, ingestProperties); - ingestClient.close(); -``` - -### [Java](#tab/java) - -To ingest the blob, call the `ingestFromBlob()` API. - -Replace the ingestion section with the following code: - -```java - // Ingestion section - try (ManagedStreamingIngestClient ingestClient = (ManagedStreamingIngestClient) IngestClientFactory - .createManagedStreamingIngestClient(clusterKcsb, ingestionKcsb)) { - System.out.println("Ingesting data from a blob"); - String sasURI = ""; - BlobSourceInfo blobSourceInfo = new BlobSourceInfo(sasURI); - IngestionProperties ingestionProperties = new IngestionProperties(database, table); - ingestionProperties.setDataFormat(DataFormat.CSV); - ingestClient.ingestFromBlob(blobSourceInfo, ingestionProperties); - - } catch (Exception e) { - // TODO: handle exception - System.out.println("Error: " + e); - } -``` - ---- - -The results are as follows: - -```plaintext -Number of rows in MyStormEvents -row 1 : - Count - 1002 - -Ingesting data from an existing blob - -New number of rows in MyStormEvents -row 1 : - Count - 2002 - -Example line from MyStormEvents -row 1 : - StartTime - 2018-01-26 00:00:00+00:00 - EndTime - 2018-01-27 14:00:00+00:00 - State - MEXICO - DamageProperty - 0 - DamageCrops - 0 - Source - Unknown - StormSummary - {} - -``` > [!NOTE] > You can also use a managed identity based authorization as an alternative to SAS or account keys in Azure Storage and Azure Data Lake. For more information, see [Ingest data using managed identity authentication](/azure/data-explorer/ingest-data-managed-identity) diff --git a/data-explorer/kusto/toc.yml b/data-explorer/kusto/toc.yml index a989c392f0..d29cd41ec1 100644 --- a/data-explorer/kusto/toc.yml +++ b/data-explorer/kusto/toc.yml @@ -96,7 +96,7 @@ items: href: api/get-started/app-management-commands.md - name: Queue data for ingestion href: api/get-started/app-queued-ingestion.md - - name: Ingest data using the managed streaming ingestion client + - name: Managed streaming ingestion href: api/get-started/app-managed-streaming-ingest.md - name: Connection strings items: From 471918cbb06e5d9326d67bc2acb020f8f19656b2 Mon Sep 17 00:00:00 2001 From: EdB-MSFT <105771311+EdB-MSFT@users.noreply.github.com> Date: Sun, 16 Feb 2025 16:03:47 +0200 Subject: [PATCH 18/20] tested code --- .../app-managed-streaming-ingest.md | 57 ++++++++++--------- 1 file changed, 31 insertions(+), 26 deletions(-) diff --git a/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md b/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md index cdbbc3621a..6481dd38f3 100644 --- a/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md +++ b/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md @@ -82,7 +82,7 @@ Enable streaming ingestion on the table or on the entire database using one of t ```kql .alter table policy streamingingestion enable -.alter database policy streamingingestion enable +.alter database policy streamingingestion enable ``` For more information about streaming policy, see [Streaming ingestion policy - Azure Data Explorer & Real-Time Analytics](../../../kusto//management/streaming-ingestion-policy.md) @@ -96,6 +96,13 @@ Enter the cluster query and ingest URI and database name in the relevant variabl The code sample includes a service function `PrintResultAsValueList()` for printing query results. +Add the Kusto libraries using the following commands: + +```powershell +dotnet add package Microsoft.Azure.Kusto.Data +dotnet add package Microsoft.Azure.Kusto.ingest +``` + ```C# using System; using Kusto.Data; @@ -104,18 +111,19 @@ using Kusto.Ingest; using Kusto.Data.Common; using Microsoft.Identity.Client; using System.Data; +using System.Text; class Program { static void Main(string[] args) { var tableName = "MyStormEvents"; - var clusterUrl = ""; - var ingestionUrl = ""; - var databaseName = " "; + var clusterUrl = ""; + var ingestionUrl = ""; + var databaseName = ""; var clusterKcsb = new KustoConnectionStringBuilder(clusterUrl).WithAadUserPromptAuthentication(); - var ingestionKcsb = new KustoConnectionStringBuilder(ingestionUrl).WithAadUserPromptAuthentication();; + var ingestionKcsb = new KustoConnectionStringBuilder(ingestionUrl).WithAadUserPromptAuthentication(); using (var kustoClient = KustoClientFactory.CreateCslQueryProvider(clusterKcsb)) using (var ingestClient = KustoIngestFactory.CreateManagedStreamingIngestClient(clusterKcsb, ingestionKcsb)) @@ -184,9 +192,9 @@ The code sample includes a service function `print_result_as_value_list()` for p ```python import os -import time import io + from azure.kusto.data import KustoClient, KustoConnectionStringBuilder, ClientRequestProperties, DataFormat from azure.kusto.ingest import QueuedIngestClient, FileDescriptor, StreamDescriptor, BlobDescriptor, IngestionProperties, ManagedStreamingIngestClient @@ -209,9 +217,9 @@ def print_result_as_value_list(result): def main(): # Connect to the public access Help cluster file_path = os.curdir + "/stormevents.csv" - cluster_url = "" - ingestion_url = "" - database_name = " " + cluster_url = "" + ingestion_url = "" + database_name = "" table_name = "MyStormEvents" cluster_kcsb = KustoConnectionStringBuilder.with_interactive_login(cluster_url) ingestion_kcsb = KustoConnectionStringBuilder.with_interactive_login(ingestion_url) @@ -261,14 +269,13 @@ Run the script from the directory where the script and stormevents.csv are locat The code sample includes a service function `printResultAsValueList()` for printing query results. ```typescript - import { Client as KustoClient, KustoConnectionStringBuilder } from "azure-kusto-data"; import { InteractiveBrowserCredentialInBrowserOptions } from "@azure/identity"; import { ManagedStreamingIngestClient, BlobDescriptor, IngestionProperties, DataFormat } from 'azure-kusto-ingest'; -const clusterUrl = ""; -const ingestionUrl =""; -const databaseName = " "; +const clusterUrl = ""; +const ingestionUrl =""; +const databaseName = ""; const tableName = "MyStormEvents" @@ -320,7 +327,6 @@ Place the *stormevents.csv* file in the same location as your script. Since our Add and ingestion section using the following lines to the end of `main()`. ```typescript - const ingestProperties = new IngestionProperties({ database: databaseName, table: tableName, @@ -373,10 +379,9 @@ import com.microsoft.azure.kusto.ingest.source.FileSourceInfo; public class BatchIngestion { public static void main(String[] args) throws Exception { - String clusterUri = ""; - String ingestionUri = ""; - - String database = ""; + String clusterUri = ""; + String ingestionUri = ""; + String databaseName = ""; String table = "MyStormEvents"; ConnectionStringBuilder clusterKcsb = ConnectionStringBuilder.createWithUserPrompt(clusterUri); @@ -386,7 +391,7 @@ public class BatchIngestion { Client kustoClient = ClientFactory.createClient(clusterKcsb)) { String query = table + " | count"; - KustoOperationResult results = kustoClient.execute(database, query); + KustoOperationResult results = kustoClient.execute(databaseName, query); KustoResultSetTable primaryResults = results.getPrimaryResults(); System.out.println("\nNumber of rows in " + table + " BEFORE ingestion:"); printResultsAsValueList(primaryResults); @@ -419,7 +424,7 @@ Add and ingestion section using the following lines to the end of `main()`. .createManagedStreamingIngestClient(clusterKcsb, ingestionKcsb)) { System.out.println("Ingesting data from a file"); String filePath = "stormevents.csv"; - IngestionProperties ingestionProperties = new IngestionProperties(database, table); + IngestionProperties ingestionProperties = new IngestionProperties(databaseName, table); ingestionProperties.setDataFormat(DataFormat.CSV); FileSourceInfo fileSourceInfo = new FileSourceInfo(filePath, 0); ingestClient.ingestFromFile(fileSourceInfo, ingestionProperties); @@ -436,13 +441,13 @@ Add the following lines after the ingestion command: ```java query = table + " | count"; - results = kustoClient.execute(database, query); + results = kustoClient.execute(databaseName, query); primaryResults = results.getPrimaryResults(); System.out.println("\nNumber of rows in " + table + " AFTER ingestion:"); printResultsAsValueList(primaryResults); query = table + " | top 1 by EndTime"; - results = kustoClient.execute(database, query); + results = kustoClient.execute(databaseName, query); primaryResults = results.getPrimaryResults(); System.out.println("\nExample line from " + table); printResultsAsValueList(primaryResults); @@ -459,7 +464,7 @@ row 1 : Ingesting data from a file New number of rows in MyStormEvents row 1 : - Count - 1001 + Count - 1000 Example line from MyStormEvents row 1 : StartTime - 2007-12-31 11:15:00+00:00 @@ -541,7 +546,7 @@ Replace the ingestion section with the following code: ManagedStreamingIngestClient ingestClient = (ManagedStreamingIngestClient) IngestClientFactory .createManagedStreamingIngestClient(clusterKcsb, ingestionKcsb)) { System.out.println("Ingesting data from a byte array"); - IngestionProperties ingestionProperties = new IngestionProperties(database, table); + IngestionProperties ingestionProperties = new IngestionProperties(databaseName, table); ingestionProperties.setDataFormat(DataFormat.CSV); StreamSourceInfo streamSourceInfo = new StreamSourceInfo(inputStream); ingestClient.ingestFromStream(streamSourceInfo, ingestionProperties); @@ -559,13 +564,13 @@ The results are as follows: ```plaintext Number of rows in MyStormEvents row 1 : - Count - 1001 + Count - 1000 Ingesting data from memory New number of rows in MyStormEvents row 1 : - Count - 1002 + Count - 1001 Example line from MyStormEvents row 1 : From 4906082a2c4d94fb37b3e35f6cf844631c91bd06 Mon Sep 17 00:00:00 2001 From: EdB-MSFT <105771311+EdB-MSFT@users.noreply.github.com> Date: Mon, 17 Feb 2025 10:47:11 +0200 Subject: [PATCH 19/20] final --- .../app-managed-streaming-ingest.md | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md b/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md index 6481dd38f3..5b13c1a496 100644 --- a/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md +++ b/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md @@ -16,7 +16,7 @@ monikerRange: "azure-data-explorer" Streaming Ingestion allows writing data to Kusto with near-real-time latencies. It’s also useful when writing small amounts of data to a large number of tables, making batching inefficient. -In this article, you’ll learn how to ingest data to Kusto using the managed streaming ingestion client. You'll ingest a data stream in the form of a file, in-memory stream, or blob. +In this article, you’ll learn how to ingest data to Kusto using the managed streaming ingestion client. You'll ingest a data stream in the form of a file or in-memory stream. > [!NOTE] > Streaming ingestion is a high velocity ingestion protocol. Streaming Ingestion isn't the same as `IngestFromStream`. @@ -28,13 +28,13 @@ Kusto SDKs provide two flavors of Streaming Ingestion Clients, `StreamingIngesti When ingesting with the `ManagedStreamingIngestionClient` API, failures and retries are handled automatically as follows: -+ Streaming requests that fail due to server-side size limitations are failed-over to queued ingestion. ++ Streaming requests that fail due to server-side size limitations are moved to queued ingestion. + Data that's larger than 4 MB is automatically sent to queued ingestion, regardless of format or compression. + Transient failure, for example throttling, are retried three times, then moved to queued ingestion. + Permanent failures aren't retried. > [!NOTE] -> If the streaming ingestion fails and the data is sent to queued ingestion, some delay is be experienced before the data is visible in the table. +> If the streaming ingestion fails and the data is moved to queued ingestion, some delay is expected before the data is visible in the table. ## Limitations @@ -48,7 +48,7 @@ For more information, see [Streaming Limitations](/azure/data-explorer/ingest-da ## Prerequisites -+ A Kusto cluster where you have database User or higher rights. Provision a free Kusto cluster at . ++ Fabric or an Azure Data Explorer cluster where you have database User or higher rights. Provision a free cluster at . + [Set up your development environment](/kusto/api/get-started/app-set-up?view=azure-data-explorer) to use the Kusto client library. @@ -63,7 +63,7 @@ Before creating the app, the following steps are required. Each step is detailed ## Configure streaming ingestion -To configure streaming ingestion, see [Configure streaming ingestion on your Azure Data Explorer cluster](/azure/data-explorer/ingest-data-streaming?tabs=azure-portal%2Ccsharp). If you're using a free cluster, streaming ingestion is automatically enabled. +To configure streaming ingestion, see [Configure streaming ingestion on your Azure Data Explorer cluster](/azure/data-explorer/ingest-data-streaming?tabs=azure-portal%2Ccsharp). It can take several minutes for the configuration to take effect. If you're using Fabric or a free cluster, streaming ingestion is automatically enabled. ### Create a Kusto table @@ -85,12 +85,15 @@ Enable streaming ingestion on the table or on the entire database using one of t .alter database policy streamingingestion enable ``` +It can take up to two minutes for the policy to take effect. + For more information about streaming policy, see [Streaming ingestion policy - Azure Data Explorer & Real-Time Analytics](../../../kusto//management/streaming-ingestion-policy.md) ## Create a basic client application Create a basic client application which connects to the Kusto Help cluster. Enter the cluster query and ingest URI and database name in the relevant variables. +The app uses two clients: one for querying and one for ingestion. Each client brings up a browser window to authenticate the user. ### [C#](#tab/c-sharp) @@ -246,7 +249,6 @@ Add and ingestion section using the following lines to the end of `main()`. print("Ingesting data from a file") ingest_properties = IngestionProperties(database_name, table_name, DataFormat.CSV) ingest_client.ingest_from_file(file_path, ingest_properties) - ingest_client.close() ``` Let’s also query the new number of rows and the most recent row after the ingestion. @@ -516,7 +518,6 @@ Replace the ingestion section with the following code: # when possible provide the size of the raw data stream_descriptor = StreamDescriptor(string_stream, is_compressed=False, size=len(single_line)) ingest_client.ingest_from_stream(stream_descriptor, ingest_properties) - ingest_client.close() ``` ### [TypeScript](#tab/typescript) @@ -585,10 +586,6 @@ row 1 : --- - -> [!NOTE] -> You can also use a managed identity based authorization as an alternative to SAS or account keys in Azure Storage and Azure Data Lake. For more information, see [Ingest data using managed identity authentication](/azure/data-explorer/ingest-data-managed-identity) - ## Resources + [Kusto Python GitHub repository](https://github.com/Azure/azure-kusto-python) + [Kusto NodeJS GitHub repository](https://github.com/Azure/azure-kusto-node) From 10e874370f86c90e6bc6e0e5caf7dd7fb1ca54e3 Mon Sep 17 00:00:00 2001 From: EdB-MSFT <105771311+EdB-MSFT@users.noreply.github.com> Date: Mon, 17 Feb 2025 10:48:48 +0200 Subject: [PATCH 20/20] final --- .../kusto/api/get-started/app-managed-streaming-ingest.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md b/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md index 5b13c1a496..1cacf48900 100644 --- a/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md +++ b/data-explorer/kusto/api/get-started/app-managed-streaming-ingest.md @@ -422,7 +422,7 @@ Add and ingestion section using the following lines to the end of `main()`. ```java // Ingestion section try ( - ManagedStreamingIngestClient ingestClient = (ManagedStreamingIngestClient) IngestClientFactory + ManagedStreamingIngestClient ingestClient = IngestClientFactory .createManagedStreamingIngestClient(clusterKcsb, ingestionKcsb)) { System.out.println("Ingesting data from a file"); String filePath = "stormevents.csv";