diff --git a/apache_beam/runners/worker/operations.py b/apache_beam/runners/worker/operations.py index 3464c5750c..5921c72b90 100644 --- a/apache_beam/runners/worker/operations.py +++ b/apache_beam/runners/worker/operations.py @@ -69,18 +69,6 @@ if TYPE_CHECKING: from apache_beam.runners.worker.statesampler import StateSampler from apache_beam.transforms.userstate import TimerSpec -# Allow some "pure mode" declarations. -try: - import cython -except ImportError: - - class FakeCython(object): - @staticmethod - def cast(type, value): - return value - - globals()['cython'] = FakeCython() - _globally_windowed_value = GlobalWindows.windowed_value(None) _global_window_type = type(_globally_windowed_value.windows[0]) @@ -149,7 +137,7 @@ class ConsumerSet(Receiver): # type: (WindowedValue) -> None self.update_counters_start(windowed_value) for consumer in self.consumers: - cython.cast(Operation, consumer).process(windowed_value) + consumer.process(windowed_value) self.update_counters_finish() def try_split(self, fraction_of_remainder): @@ -345,7 +333,7 @@ class Operation(object): def output(self, windowed_value, output_index=0): # type: (WindowedValue, int) -> None - cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value) + self.receivers[output_index].receive(windowed_value) def add_receiver(self, operation, output_index=0): # type: (Operation, int) -> None