Forex Tick to OHLCV in Apache Beam

So recently I was working on a small project to automatically place Forex trades according to predictions of some model. I had all the data in Google Data Storage, I had everything ready to go. My idea was to build a model that given previous forex open, high, low and close values, gives suggestion whether, to buy, hold or sell. The simplest version of the model that I wanted to build, let’s call this the alpha model, would make use of 5 minute aggregated forex data.

The problem

One problem though, the data I had was tick data and it was partitioned in files which weren’t divided by financial instrument. Therefore, some rows might be in one file, while others could be in another file. If the data was relatively small, we could load all these chunks using Pandas, resample and we’re good. However, these files added up to Gigabytes which we all know would make life in Pandas difficult without the use of Dask. Instead of going down that route, I took this as an opportunity to build a pipeline in Apache Beam, just because I love the framework.

Apache Beam makes life so simple to build pipelines on top of different runners such as Spark or Flink. Since we’re using the Python version it is challenging to set it up on your cluster unless you use something like a managed service in GCP which runs it on DataFlow. Furthermore, it is (probably) impossible to set it up running on a local Spark cluster running on a Windows machine. I tried, I failed, I cried. I had to boot up my Ubuntu partition. Sigh.

The Approach

Before I dive into my approach here, why the Python version? I have used the Java version in the past, why would I do that to myself again?

As a hybrid data scientist, data engineer and developer, I am all pro setting up the cleanest code possible. The quote below, that for the life of me I cannot remember who the author was, is something I live by for every line of code I write.

Good code is the modern definition of art

Therefore, having this line imprinted in my mind, i wanted to design a modular pipeline that could have parts added and removed very easily.

The diagram below is what I came up with. It might be beautify, it my be horrible, it might be Van Gogh level, either way, I am very open to opinions, because I might be doing things horribly wrong. So comment away!

The Pipeline#

In the diagram the FxBeam class is what manages the main the data flow. This function reads the data, calls the resample class process method, extracts the output and writes back to file. In the diagram the classes linked with dotted lines are the classes or functions in use by that segment of the pipelin

Fx Beam

In Apache Beam we make heavy use of ParDo functions. If you haven’t used ParDo before, these are the basis of everything you do in Apache Beam. Essentially you’re creating a class that would be inserted as a segment in the pipeline. Very much like building a flowchart where each DoFn has a particular task.

The Input function

When reading the data we us the ReadFromText function from apache_beam.io. This function can take file names or file patterns which is convenient, as well as compression types if the file is a compressed gzip for example.

rows = self.pipeline | 'Read data file' >> ReadFromText(
    self.input_file_pattern,
    compression_type=_compression
)

If the pipe symbol is weird, you’re not the only one! When starting using Apache Beam it was a bit of a shock for me too. To save you the search, this is a synonym for .apply() method and is used to overload python functions. Link for more info here.

In the function the we can either take a JSON input or a CSV input via a parameter for the class. If we’re reading a CSV we make use of a custom parse rows function which takes in the CSV rows and transforms them into a dictionary that can easily flow and be edited by the pipeline.

if self.input_file_type == 'csv':
    rows = rows | 'Convert CSV to tick elements' >> beam.ParDo(
        ParseDataRows(),
        headers=self.input_columns
    )
else:
    rows = rows | 'Convert JSON to tick elements' >> beam.Map(json.loads)

Finally, before returning the data to be processed, we need to do some preprocessing which includes converting the datetime given in the CSV / JSON to a timestamp, we need the rows as timestamped values and finally convert all the fields to float.

rows = rows | 'Convert to timestamp field' >> beam.ParDo(
    ToTimestamp(),
    string_format=self.TICK_DATA_TIMESTAMP_FORMAT,
    datetime_key=self.input_columns[0],
    timestamp_key='timestamp'
)

rows = rows | 'Convert to datetime object' >> beam.ParDo(
    AddTimestamp(),
    timestamp_key='timestamp'
)

rows = rows | 'Convert to values to floats' >> beam.ParDo(
    ToFloat(),
    fields=self.BASE_INPUT_COLUMNS[1:]  # ALL EXCEPT TIMESTAMP AND INSTRUMENT
)

In the snippet above, the ToTimestamp() takes in TICK_DATA_TIMESTAMP_FORMAT and parses it. The AddTimestamp function creates the t timestamped rows. This is given by this DoFn.

class AddTimestamp(beam.DoFn):
    """ParDo take a timestamp column and assign it 
		to row to produce a timestamped value
		"""
    def process(self, element, timestamp_key, delete_key=False, **kwargs):
        """
        :param element: Element being processed
        :param timestamp_key: Field containing the timestamp
        :param delete_key: Whether to delete the original timestamp or not
        :return: Elements with a datetime filed
        """
        timestamp = element.pop(timestamp_key) if delete_key else element.get(timestamp_key)
        yield beam.window.TimestampedValue(element, timestamp)

The Resampler

If you have a look at the Github code, you’ll notice that I have two resampler classes. The first time I tried this out, I went for the built in window function, that creates fixed windows. However, I couldn’t find a way to make them start at certain times. For example if we’re resampling OHLCV data from 1 minute to 5 minute windows, I’d want the window to start at 12:00 and last to 12:05 not randomly starting and finishing other times such as 12:01 to 12:06. Now one might argue that such shifts technically shouldn’t make a different when building a model based on candle sticks since it is all based on tick data behind the scenes anyway. However, for ease of use I wanted these to start on round figures.

In order to do this, I decided to create time groups myself based on the timestamp and window size, and then use a simple CombinePerKey function to create the OHLCV values. To the combine per key function we pass in the combiner which does the actual lifting. I’ll go into that in the next section. Here is a diagram of the resampler I created:

TimeGroup Resampler

Ok, let’s start dissecting this down. The TimestampToTimeGroup is a Pardo function that essentially does this:

...
yield {
    **element,
    time_group_key: element['timestamp'] // window_size
}

The division and floor operation is enough to create the time group. This is because the output value will remain constant while in a certain multiple of the window size.

0 // 3 = 0
1 // 3 = 0
2 // 3 = 0
3 // 3 = 1
...

The resample function then calls a map elements function, the CombineByKey function and then the de-map function. The map and de-map functions are needed to restructure the data to be a tuple in the form of a key:value pair. This function also has another purpose. If the data contains multiple instruments, we can provide an instrument_column to the resampler to make sure it resamples per instruments. Else it will mash everything together into one big unusable clump of data.

This function is given below:

def map_elements(self, data):
      """Map function to create key:value pairs to run CombinePerKey function.
      :param data: PCollection being processed with time_group_key column and
          instrument_column if set to use.
      :return: PCollection with mapped data
      """
      action_name = 'Resampler - Map data'
      if self.instrument_column:
          return data | action_name >> beam.Map(
              lambda x: ((x['time_group_key'], x[self.instrument_column]), x)
          )
      return data | action_name >> beam.Map(lambda x: (x['time_group_key'], x))

The de-map function follows the same lines doing the thing in reverse. Won’t really go into that now, but if you’re interested have a look at the code. It’s all neat and tidy I promise!

The CombinePerKey function is what does the actual aggregation. This function requires a combiner to handle what happens as rows come in and then how it accumulates combiners running on different nodes. Out of the box combines include sum combiners or moving average combiners. We created one out of the box to do the resampling which is explained next.

The Combiner#

The combiner is inheriting from apache_beam.CombineFn. When using this class we need to make sure some methods are provided as shown below:

class CustomCombineFn(beam.CombineFn):
  def create_accumulator(self):
    return # create accumulator here

  def add_input(self, current_accumulation, input):
    return # perform accumulation here

  def merge_accumulators(self, accumulators):
    return # merge accumulators running un different nodes

  def extract_output(self, sum_count):
    return # function to extract output from accumulator values

In our case the combiner has the following functionality:

Combiner

When receiving a new input, we check for the open, high, low, close values using the appropriate operation. The volume is a simple increment. In the merge accumulators we do the same checks and operations but across pre-accumulated results. The functions are very simple, but the class does get a bit lengthy due to all the doc strings. Hence, I won’t overload this article with the code but you can easily have a look at it here.

The get_value function is a simple function to define what value would be used to calculate the OHLCV aggregation. The current implementation uses only the ask value from the tick data. However, this can easily be manipulated to use the big or combination of both.

@staticmethod
def get_value(item):
    """Function to calculate the value of the element.
    This is done so we can easily change how the value is calculated,
    by changing it in this one place.
    :param item: Tick item
    :return: Value of tick item
    """
    return item['ask']

Unit testing

One thing I seriously love about Apache Beam is how it forces you to use DoFn. This makes it easy to:

  • Create well documented code
  • Stick to the one function on task rule
  • Use test driven development

Now in the codebase for the FxBeam project i didn’t use a test driven development approach. However, tests could be easily defined as follows:

class FxBeamTestCase(unittest.TestCase):
    def setUp(self):
        pipeline_options = PipelineOptions(['--runner=DirectRunner'])
        pipeline_options.view_as(SetupOptions).save_main_session = True
        self.pipeline = beam.Pipeline(options=pipeline_options)

class TestFunctionX(FxBeamTestCase):
    def test_run(self):
        with self.pipeline:
            result = (
                self.pipeline | beam.Create([
                    # Create test data here
                ]) |
                beam.ParDo(FunctionX(), other_additional parameters=other)
            )
            assert_that(result, equal_to( what_is_expected ))

Using this structure we can test anything in the pipeline, from specific DoFns, segments or even the whole flow. Therefore, we can pinpoint exactly where an issue lies if something comes up.

Running the pipeline

The final quick note is related to running the pipeline.

I have created a simple script that accepts arguments and runs the pipeline. This by default runs the pipeline using a direct runner which runs it on the current local machine. This is a less optimized way of running Apache Beam and is only mostly used for testing. To run it on a Spark cluster it is not as simple as providing the endpoint of the master node for Spark.

Since we’re using the Python version of Apache Beam we need the Beam Spark Job Server. This job service will create the spark job to run before submitting it to the cluster. More information on this is given here.

Script: run_fxbeam.py

import logging
import argparse

from fxbeam.fxbeam import FxBeam

if __name__ == '__main__':

    logging.getLogger().setLevel(logging.INFO)
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--input', '-i',
        dest='input',
        required=True,
        help='Google cloud storage or local files to process')

    parser.add_argument(
        '--output', '-o',
        dest='output',
        required=True,
        help='Output file name to write results to.')

    parser.add_argument(
        '--compression', '-c',
        dest='compression',
        required=False,
        default=None,
        help='Compression used for input files')

    parser.add_argument(
        '--file-type', '-ft',
        dest='file_type',
        required=False,
        default='json',
        help='The type of input files. One of json or csv. '
             'If json is provided, this needs to be new line delimited')

    parser.add_argument(
        '--instrument-column', '-ic',
        dest='instrument_column',
        required=False,
        default=None,
        help='Column containing the instrument symbol if present')

    args = parser.parse_args()
    fx_beam = FxBeam(
        args.input,
        args.output,
        window_size=300,

        pipeline_params=['--runner=DirectRunner'],
        # WHEN running in an Apache Spark Cluster
        # pipeline_params=['--runner=PortableRunner', '--job_endpoint=localhost:8099'],
		compression=args.compression,
        instrument_column=args.instrument_column,
        input_file_type=args.file_type
    )
    fx_beam.build()
    fx_beam.run()

Conclusion

And that should be it! Once completed you should have a fully running pipeline to ingest tick data and it provides OHLCV values. Unfortunately, I haven’t set this up in streaming mode yet. I need to look into that and see what changes are needed. I might go back to using the actual window functions after all! Once that is done I’ll make sure to create another article that highlights the changes.

Code#