• About Us
  • Disclaimer
  • Contact Us
  • Privacy Policy
Thursday, January 22, 2026
mGrowTech
No Result
View All Result
  • Technology And Software
    • Account Based Marketing
    • Channel Marketing
    • Marketing Automation
      • Al, Analytics and Automation
      • Ad Management
  • Digital Marketing
    • Social Media Management
    • Google Marketing
  • Direct Marketing
    • Brand Management
    • Marketing Attribution and Consulting
  • Mobile Marketing
  • Event Management
  • PR Solutions
  • Technology And Software
    • Account Based Marketing
    • Channel Marketing
    • Marketing Automation
      • Al, Analytics and Automation
      • Ad Management
  • Digital Marketing
    • Social Media Management
    • Google Marketing
  • Direct Marketing
    • Brand Management
    • Marketing Attribution and Consulting
  • Mobile Marketing
  • Event Management
  • PR Solutions
No Result
View All Result
mGrowTech
No Result
View All Result
Home Al, Analytics and Automation

A Coding Implementation to Build a Unified Apache Beam Pipeline Demonstrating Batch and Stream Processing with Event-Time Windowing Using DirectRunner

Josh by Josh
January 8, 2026
in Al, Analytics and Automation
0
A Coding Implementation to Build a Unified Apache Beam Pipeline Demonstrating Batch and Stream Processing with Event-Time Windowing Using DirectRunner
0
SHARES
0
VIEWS
Share on FacebookShare on Twitter


In this tutorial, we demonstrate how to build a unified Apache Beam pipeline that works seamlessly in both batch and stream-like modes using the DirectRunner. We generate synthetic, event-time–aware data and apply fixed windowing with triggers and allowed lateness to demonstrate how Apache Beam consistently handles both on-time and late events. By switching only the input source, we keep the core aggregation logic identical, which helps us clearly understand how Beam’s event-time model, windows, and panes behave without relying on external streaming infrastructure. Check out the FULL CODES here.

!pip -q install -U "grpcio>=1.71.2" "grpcio-status>=1.71.2"
!pip -q install -U apache-beam crcmod


import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.trigger import AfterWatermark, AfterProcessingTime, AccumulationMode
from apache_beam.testing.test_stream import TestStream
import json
from datetime import datetime, timezone

We install the required dependencies and ensure version compatibility so that Apache Beam. We import the core Beam APIs along with windowing, triggers, and TestStream utilities needed later in the pipeline. We also bring in standard Python modules for time handling and JSON formatting. Check out the FULL CODES here.

READ ALSO

Inworld AI Releases TTS-1.5 For Realtime, Production Grade Voice Agents

FlashLabs Researchers Release Chroma 1.0: A 4B Real Time Speech Dialogue Model With Personalized Voice Cloning

MODE = "stream"
WINDOW_SIZE_SECS = 60
ALLOWED_LATENESS_SECS = 120


def make_event(user_id, event_type, amount, event_time_epoch_s):
   return {"user_id": user_id, "event_type": event_type, "amount": float(amount), "event_time": int(event_time_epoch_s)}


base = datetime.now(timezone.utc).replace(microsecond=0)
t0 = int(base.timestamp())


BATCH_EVENTS = [
   make_event("u1", "purchase", 20, t0 + 5),
   make_event("u1", "purchase", 15, t0 + 20),
   make_event("u2", "purchase",  8, t0 + 35),
   make_event("u1", "refund",   -5, t0 + 62),
   make_event("u2", "purchase", 12, t0 + 70),
   make_event("u3", "purchase",  9, t0 + 75),
   make_event("u2", "purchase",  3, t0 + 50),
]

We define the global configuration that controls window size, lateness, and execution mode. We create synthetic events with explicit event-time timestamps so that windowing behavior is deterministic and easy to reason about. We prepare a small dataset that intentionally includes out-of-order and late events to observe Beam’s event-time semantics. Check out the FULL CODES here.

def format_joined_record(kv):
   user_id, d = kv
   return {
       "user_id": user_id,
       "count": int(d["count"][0]) if d["count"] else 0,
       "sum_amount": float(d["sum_amount"][0]) if d["sum_amount"] else 0.0,
   }


class WindowedUserAgg(beam.PTransform):
   def expand(self, pcoll):
       stamped = pcoll | beam.Map(lambda e: beam.window.TimestampedValue(e, e["event_time"]))
       windowed = stamped | beam.WindowInto(
           FixedWindows(WINDOW_SIZE_SECS),
           allowed_lateness=ALLOWED_LATENESS_SECS,
           trigger=AfterWatermark(
               early=AfterProcessingTime(10),
               late=AfterProcessingTime(10),
           ),
           accumulation_mode=AccumulationMode.ACCUMULATING,
       )
       keyed = windowed | beam.Map(lambda e: (e["user_id"], e["amount"]))
       counts = keyed | beam.combiners.Count.PerKey()
       sums = keyed | beam.CombinePerKey(sum)
       return (
           {"count": counts, "sum_amount": sums}
           | beam.CoGroupByKey()
           | beam.Map(format_joined_record)
       )

We build a reusable Beam PTransform that encapsulates all windowed aggregation logic. We apply fixed windows, triggers, and accumulation rules, then group events by user and compute counts and sums. We keep this transform independent of the data source, so the same logic applies to both batch and streaming inputs. Check out the FULL CODES here.

class AddWindowInfo(beam.DoFn):
   def process(self, element, window=beam.DoFn.WindowParam, pane_info=beam.DoFn.PaneInfoParam):
       ws = float(window.start)
       we = float(window.end)
       yield {
           **element,
           "window_start_utc": datetime.fromtimestamp(ws, tz=timezone.utc).strftime("%H:%M:%S"),
           "window_end_utc": datetime.fromtimestamp(we, tz=timezone.utc).strftime("%H:%M:%S"),
           "pane_timing": str(pane_info.timing),
           "pane_is_first": pane_info.is_first,
           "pane_is_last": pane_info.is_last,
       }


def build_test_stream():
   return (
       TestStream()
       .advance_watermark_to(t0)
       .add_elements([
           beam.window.TimestampedValue(make_event("u1", "purchase", 20, t0 + 5), t0 + 5),
           beam.window.TimestampedValue(make_event("u1", "purchase", 15, t0 + 20), t0 + 20),
           beam.window.TimestampedValue(make_event("u2", "purchase", 8, t0 + 35), t0 + 35),
       ])
       .advance_processing_time(5)
       .advance_watermark_to(t0 + 61)
       .add_elements([
           beam.window.TimestampedValue(make_event("u1", "refund", -5, t0 + 62), t0 + 62),
           beam.window.TimestampedValue(make_event("u2", "purchase", 12, t0 + 70), t0 + 70),
           beam.window.TimestampedValue(make_event("u3", "purchase", 9, t0 + 75), t0 + 75),
       ])
       .advance_processing_time(5)
       .add_elements([
           beam.window.TimestampedValue(make_event("u2", "purchase", 3, t0 + 50), t0 + 50),
       ])
       .advance_watermark_to(t0 + 121)
       .advance_watermark_to_infinity()
   )

We enrich each aggregated record with window and pane metadata so we can clearly see when and why results are emitted. We convert Beam’s internal timestamps into human-readable UTC times for clarity. We also define a TestStream that simulates real streaming behavior using watermarks, processing-time advances, and late data. Check out the FULL CODES here.

def run_batch():
   with beam.Pipeline(options=PipelineOptions([])) as p:
       (
           p
           | beam.Create(BATCH_EVENTS)
           | WindowedUserAgg()
           | beam.ParDo(AddWindowInfo())
           | beam.Map(json.dumps)
           | beam.Map(print)
       )


def run_stream():
   opts = PipelineOptions([])
   opts.view_as(StandardOptions).streaming = True
   with beam.Pipeline(options=opts) as p:
       (
           p
           | build_test_stream()
           | WindowedUserAgg()
           | beam.ParDo(AddWindowInfo())
           | beam.Map(json.dumps)
           | beam.Map(print)
       )


run_stream() if MODE == "stream" else run_batch()

We wire everything together into executable batch and stream-like pipelines. We toggle between modes by changing a single flag while reusing the same aggregation transform. We run the pipeline and print the windowed results directly, making the execution flow and outputs easy to inspect.

In conclusion, we demonstrated that the same Beam pipeline can process both bounded batch data and unbounded, stream-like data while preserving identical windowing and aggregation semantics. We observed how watermarks, triggers, and accumulation modes influence when results are emitted and how late data updates previously computed windows. Also, we focused on the conceptual foundations of Beam’s unified model, providing a solid base for later scaling the same design to real streaming runners and production environments.


Check out the FULL CODES here. Also, feel free to follow us on Twitter and don’t forget to join our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.

Check out our latest release of ai2025.dev, a 2025-focused analytics platform that turns model launches, benchmarks, and ecosystem activity into a structured dataset you can filter, compare, and export


Michal Sutter is a data science professional with a Master of Science in Data Science from the University of Padova. With a solid foundation in statistical analysis, machine learning, and data engineering, Michal excels at transforming complex datasets into actionable insights.



Source_link

Related Posts

Inworld AI Releases TTS-1.5 For Realtime, Production Grade Voice Agents
Al, Analytics and Automation

Inworld AI Releases TTS-1.5 For Realtime, Production Grade Voice Agents

January 22, 2026
FlashLabs Researchers Release Chroma 1.0: A 4B Real Time Speech Dialogue Model With Personalized Voice Cloning
Al, Analytics and Automation

FlashLabs Researchers Release Chroma 1.0: A 4B Real Time Speech Dialogue Model With Personalized Voice Cloning

January 22, 2026
Al, Analytics and Automation

Salesforce AI Introduces FOFPred: A Language-Driven Future Optical Flow Prediction Framework that Enables Improved Robot Control and Video Generation

January 21, 2026
Why it’s critical to move beyond overly aggregated machine-learning metrics | MIT News
Al, Analytics and Automation

Why it’s critical to move beyond overly aggregated machine-learning metrics | MIT News

January 21, 2026
What are Context Graphs? – MarkTechPost
Al, Analytics and Automation

What are Context Graphs? – MarkTechPost

January 21, 2026
IVO’s $55M Boost Signals AI-Driven Law Future (and It’s Just Getting Started)
Al, Analytics and Automation

IVO’s $55M Boost Signals AI-Driven Law Future (and It’s Just Getting Started)

January 20, 2026
Next Post
Gmail is entering the Gemini era

Gmail is entering the Gemini era

Leave a Reply Cancel reply

Your email address will not be published. Required fields are marked *

POPULAR NEWS

Trump ends trade talks with Canada over a digital services tax

Trump ends trade talks with Canada over a digital services tax

June 28, 2025
Communication Effectiveness Skills For Business Leaders

Communication Effectiveness Skills For Business Leaders

June 10, 2025
15 Trending Songs on TikTok in 2025 (+ How to Use Them)

15 Trending Songs on TikTok in 2025 (+ How to Use Them)

June 18, 2025
App Development Cost in Singapore: Pricing Breakdown & Insights

App Development Cost in Singapore: Pricing Breakdown & Insights

June 22, 2025
Google announced the next step in its nuclear energy plans 

Google announced the next step in its nuclear energy plans 

August 20, 2025

EDITOR'S PICK

Fine-Tuning, RLHF & Red Teaming

Fine-Tuning, RLHF & Red Teaming

October 23, 2025
How to Create Optimized Location Pages

How to Create Optimized Location Pages

November 13, 2025

Hello world!

May 23, 2025
Ultimate Guide to Mobile Partner Marketing, Part 2

Ultimate Guide to Mobile Partner Marketing, Part 2

June 3, 2025

About

We bring you the best Premium WordPress Themes that perfect for news, magazine, personal blog, etc. Check our landing page for details.

Follow us

Categories

  • Account Based Marketing
  • Ad Management
  • Al, Analytics and Automation
  • Brand Management
  • Channel Marketing
  • Digital Marketing
  • Direct Marketing
  • Event Management
  • Google Marketing
  • Marketing Attribution and Consulting
  • Marketing Automation
  • Mobile Marketing
  • PR Solutions
  • Social Media Management
  • Technology And Software
  • Uncategorized

Recent Posts

  • 8 Best Gig Economy Jobs To Consider For Passive Income
  • Inworld AI Releases TTS-1.5 For Realtime, Production Grade Voice Agents
  • Why You Shouldn’t Use AI for Logo Design (And How to Use It the Right Way Instead)
  • Cross-Platform App Development 2026 – Trends & Guide
  • About Us
  • Disclaimer
  • Contact Us
  • Privacy Policy
No Result
View All Result
  • Technology And Software
    • Account Based Marketing
    • Channel Marketing
    • Marketing Automation
      • Al, Analytics and Automation
      • Ad Management
  • Digital Marketing
    • Social Media Management
    • Google Marketing
  • Direct Marketing
    • Brand Management
    • Marketing Attribution and Consulting
  • Mobile Marketing
  • Event Management
  • PR Solutions

Are you sure want to unlock this post?
Unlock left : 0
Are you sure want to cancel subscription?