Not able create distinct primary key values in the pipeline

I have used set counter and assigned it a name. Then I have used that named variable and set-column to create a primary key (distinct) but in preview of wrangler its showing distinct values but when I deployed and ran the pipeline. Results are little different I have same value repeated throughout the column.

Following is the directives for reference.

```

set-column :counter ''
copy :DATE :DATE_copy true
increment-variable count1 1 true
set-column :PK_DTL_WEB counter+count1
drop :counter
drop :DATE_copy

```

Solved Solved
0 3 83
1 ACCEPTED SOLUTION

In Data Fusion, setting a pipeline-level variable isn't as straightforward as using a single command in the Wrangler or any other plugin's configuration directly. Instead, you typically manage such variables through the pipeline's configuration or by passing runtime arguments when the pipeline is deployed or triggered.

Here's how you can manage and use pipeline-level variables effectively in Cloud Data Fusion:

  1. Passing Runtime Arguments When you deploy or manually trigger a pipeline, you can specify runtime arguments that act like global variables accessible across the entire pipeline. This is useful for setting values that need to be consistent throughout the pipeline execution.

Steps to set runtime arguments:

  • Go to your pipeline in the Cloud Data Fusion UI.
  • Click on Configure.
  • Navigate to the Runtime Arguments tab.
  • Set your key-value pair here (e.g., counter=1).
  1. Using Runtime Arguments in Transformations Once you've set a runtime argument, you can use it in your transformations by referencing it in the plugin properties where variables are supported. For example, if you're using a plugin that supports macro syntax, you can reference the runtime argument like ${counter}. Note that direct increment operations on this runtime argument within transformations (like adding directly within a Wrangler directive) are not supported. Instead, you'd need to handle such logic in a more stateful component or script.

  2. Scripting Plugins For advanced use cases, consider using scripting plugins like Python or JavaScript, which can execute more complex logic. You can read runtime arguments, perform calculations, manage state, and pass the output to subsequent steps.

  3. Custom Plugins If your needs exceed the capabilities of the available transformations and runtime configurations, developing a custom plugin might be necessary. Custom plugins can manage internal state, perform complex transformations, and utilize runtime arguments as needed.

Example Using Python Executor Plugin: If you need to generate a unique sequence number, you might use a Python executor with logic similar to this:

 
# Assuming 'counter' is passed as a runtime argument
import datetime

# Generate a timestamp-based ID
ts = datetime.datetime.now().strftime("%Y%m%d%H%M%S%f")
output = f"{ts}_{counter}"

emit(output)

Considerations

  • Performance: Be mindful of the performance implications when using more complex logic or custom scripting in your pipeline.
  • Consistency: Ensure that your logic for generating unique identifiers or managing state is consistent and fault-tolerant, especially in distributed environments.

Setting and managing global or pipeline-level variables in Data Fusion requires a good understanding of the tool's capabilities and limitations. For unique key generation, using a combination of timestamping and passed-in initial values (like a counter) in a scripted or custom plugin often provides the best balance of uniqueness and simplicity.

View solution in original post

3 REPLIES 3

There seems there's a discrepancy between the results in your Wrangler preview and the actual pipeline execution. The likely issue involves how variables are scoped and handled within Cloud Data Fusion, particularly with concurrent execution environments.

Here's an approach to reliably create your primary key:

Consistent Variable Scoping:

  • Pipeline Level: Define and initialize your counter variable at the pipeline level to ensure it is accessible across all transformations and maintained throughout the entire pipeline execution.
  • Wrangler Transformation: Use the pipeline-level counter variable directly in your transformation without resetting or re-initializing it within the transformation itself.

Transformation Logic:

// In the Cloud Data Fusion Pipeline definition
set-pipeline-variable :counter 1 

// Inside your Wrangler transformation
copy :DATE :DATE_copy true // Optional, if you need this copy for other reasons
set-column :PK_DTL_WEB pipeline(:counter) + :count1 // Use pipeline-scoped counter
increment-pipeline-variable :count1 1 true // Increment after using
drop :DATE_copy // Optional, if no longer needed 

Explanation:

  • Pipeline-Level Variable: Initializing the counter at the pipeline level ensures it starts at 1 when the pipeline begins.
  • Wrangler Transformation:
    • pipeline(:counter): This syntax accesses the pipeline-level counter within the transformation.
    • By directly adding the incrementing :count1 to the pipeline's counter value, each row receives a unique primary key.
    • increment-pipeline-variable :count1 1 true: Ensures that :count1 increments after each row, maintaining unique key values.
  • Dropping Variables: Optionally, drop the :counter and :DATE_copy if they are not needed downstream.

Why This Approach Works:

  • Guaranteed Uniqueness: Using a pipeline-level counter as the base ensures each row processed across all plugin instances contributes to incrementing the counter, thus maintaining uniqueness.
  • **Deterministic Order:**The counter's incrementation is handled at the pipeline level, not within the individual transformation, reducing the risk of non-deterministic behavior.

Additional Tips:

  • Monitoring: Utilize Cloud Data Fusion's monitoring capabilities to track the counter variable's values throughout the pipeline run, verifying correct incrementation and primary key generation.
  • Error Handling: Implement error handling logic to address potential failures in accessing or incrementing the pipeline variable.

 

// In the Cloud Data Fusion Pipeline definition
set-pipeline-variable :counter 1

Where exactly I should give this command? I tried to use it in one of the wrangler surely thats not the way.
How can I define pipeline level variable

In Data Fusion, setting a pipeline-level variable isn't as straightforward as using a single command in the Wrangler or any other plugin's configuration directly. Instead, you typically manage such variables through the pipeline's configuration or by passing runtime arguments when the pipeline is deployed or triggered.

Here's how you can manage and use pipeline-level variables effectively in Cloud Data Fusion:

  1. Passing Runtime Arguments When you deploy or manually trigger a pipeline, you can specify runtime arguments that act like global variables accessible across the entire pipeline. This is useful for setting values that need to be consistent throughout the pipeline execution.

Steps to set runtime arguments:

  • Go to your pipeline in the Cloud Data Fusion UI.
  • Click on Configure.
  • Navigate to the Runtime Arguments tab.
  • Set your key-value pair here (e.g., counter=1).
  1. Using Runtime Arguments in Transformations Once you've set a runtime argument, you can use it in your transformations by referencing it in the plugin properties where variables are supported. For example, if you're using a plugin that supports macro syntax, you can reference the runtime argument like ${counter}. Note that direct increment operations on this runtime argument within transformations (like adding directly within a Wrangler directive) are not supported. Instead, you'd need to handle such logic in a more stateful component or script.

  2. Scripting Plugins For advanced use cases, consider using scripting plugins like Python or JavaScript, which can execute more complex logic. You can read runtime arguments, perform calculations, manage state, and pass the output to subsequent steps.

  3. Custom Plugins If your needs exceed the capabilities of the available transformations and runtime configurations, developing a custom plugin might be necessary. Custom plugins can manage internal state, perform complex transformations, and utilize runtime arguments as needed.

Example Using Python Executor Plugin: If you need to generate a unique sequence number, you might use a Python executor with logic similar to this:

 
# Assuming 'counter' is passed as a runtime argument
import datetime

# Generate a timestamp-based ID
ts = datetime.datetime.now().strftime("%Y%m%d%H%M%S%f")
output = f"{ts}_{counter}"

emit(output)

Considerations

  • Performance: Be mindful of the performance implications when using more complex logic or custom scripting in your pipeline.
  • Consistency: Ensure that your logic for generating unique identifiers or managing state is consistent and fault-tolerant, especially in distributed environments.

Setting and managing global or pipeline-level variables in Data Fusion requires a good understanding of the tool's capabilities and limitations. For unique key generation, using a combination of timestamping and passed-in initial values (like a counter) in a scripted or custom plugin often provides the best balance of uniqueness and simplicity.