From f77fa2abbad7a130881f15a50f74b0fb69756d30 Mon Sep 17 00:00:00 2001 From: Giuseppe Date: Sat, 28 May 2016 10:39:10 +0100 Subject: [PATCH 01/12] Use same time used by duka downloader --- duka/core/csv_dumper.py | 4 +++- duka/core/fetch.py | 27 ++++++++++++++++++++------- 2 files changed, 23 insertions(+), 8 deletions(-) diff --git a/duka/core/csv_dumper.py b/duka/core/csv_dumper.py index a6f8087..597fa22 100644 --- a/duka/core/csv_dumper.py +++ b/duka/core/csv_dumper.py @@ -42,6 +42,7 @@ def __init__(self, symbol, timeframe, start, end, folder): self.start = start self.end = end self.folder = folder + self.include_header = False self.buffer = {} def get_header(self): @@ -77,7 +78,8 @@ def dump(self): with open(join(self.folder, file_name), 'w') as csv_file: writer = csv.DictWriter(csv_file, fieldnames=self.get_header()) - writer.writeheader() + if self.include_header: + writer.writeheader() for day in sorted(self.buffer.keys()): for value in self.buffer[day]: if self.timeframe == TimeFrame.TICK: diff --git a/duka/core/fetch.py b/duka/core/fetch.py index 63a4357..4473acf 100644 --- a/duka/core/fetch.py +++ b/duka/core/fetch.py @@ -1,6 +1,8 @@ import asyncio -import threading + import time +import datetime +import threading from functools import reduce from io import BytesIO, DEFAULT_BUFFER_SIZE @@ -34,20 +36,31 @@ async def get(url): raise Exception("Request failed for {0} after ATTEMPTS attempts".format(url)) -def fetch_day(symbol, day): - local_data = threading.local() - loop = getattr(local_data, 'loop', asyncio.new_event_loop()) - asyncio.set_event_loop(loop) - +def create_tasks(symbol, day ): url_info = { 'currency': symbol, 'year': day.year, 'month': day.month - 1, 'day': day.day } + tasks = [asyncio.ensure_future(get(URL.format(**url_info, hour=i))) for i in range(1, 24)] + next_day = day + datetime.timedelta(days=1) + url_info = { + 'currency': symbol, + 'year': next_day.year, + 'month': next_day.month - 1, + 'day': next_day.day + } + tasks.append(asyncio.ensure_future(get(URL.format(**url_info, hour=0)))) + return tasks + +def fetch_day(symbol, day): + local_data = threading.local() + loop = getattr(local_data, 'loop', asyncio.new_event_loop()) + asyncio.set_event_loop(loop) loop = asyncio.get_event_loop() - tasks = [asyncio.ensure_future(get(URL.format(**url_info, hour=i))) for i in range(24)] + tasks = create_tasks(symbol, day) loop.run_until_complete(asyncio.wait(tasks)) def add(acc, task): From be1c7f769ad2d570655251b309841ed6209a4be8 Mon Sep 17 00:00:00 2001 From: Giuseppe Date: Sat, 28 May 2016 10:43:11 +0100 Subject: [PATCH 02/12] Format float numbers to 5 decimal place --- duka/core/csv_dumper.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/duka/core/csv_dumper.py b/duka/core/csv_dumper.py index 597fa22..f2a1706 100644 --- a/duka/core/csv_dumper.py +++ b/duka/core/csv_dumper.py @@ -1,14 +1,17 @@ import csv import time +from os.path import join from .candle import Candle from .utils import TimeFrame, stringify, Logger -from os.path import join - TEMPLATE_FILE_NAME = "{}-{}_{:02d}_{:02d}-{}_{:02d}_{:02d}.csv" +def format_float(number): + return format(number, '.5f') + + class CSVFormatter(object): COLUMN_TIME = 0 COLUMN_ASK = 1 @@ -20,8 +23,8 @@ class CSVFormatter(object): def write_tick(writer, tick): writer.writerow( {'time': tick[0], - 'ask': tick[1], - 'bid': tick[2], + 'ask': format_float(tick[1]), + 'bid': format_float(tick[2]), 'ask_volume': tick[3], 'bid_volume': tick[4]}) @@ -29,10 +32,10 @@ def write_tick(writer, tick): def write_candle(writer, candle): writer.writerow( {'time': stringify(candle.timestamp), - 'open': candle.open_price, - 'close': candle.close_price, - 'high': candle.high, - 'low': candle.low}) + 'open': format_float(candle.open_price), + 'close': format_float(candle.close_price), + 'high': format_float(candle.high), + 'low': format_float(candle.low)}) class CSVDumper: From 0511919fe4f9e46a360f510189b15f058b026f14 Mon Sep 17 00:00:00 2001 From: Giuseppe Date: Sat, 28 May 2016 15:48:38 +0100 Subject: [PATCH 03/12] fixes --- duka/core/csv_dumper.py | 2 +- duka/core/fetch.py | 26 +++++++++++++------------- duka/core/processor.py | 2 +- 3 files changed, 15 insertions(+), 15 deletions(-) diff --git a/duka/core/csv_dumper.py b/duka/core/csv_dumper.py index f2a1706..8a12995 100644 --- a/duka/core/csv_dumper.py +++ b/duka/core/csv_dumper.py @@ -45,7 +45,7 @@ def __init__(self, symbol, timeframe, start, end, folder): self.start = start self.end = end self.folder = folder - self.include_header = False + self.include_header = True self.buffer = {} def get_header(self): diff --git a/duka/core/fetch.py b/duka/core/fetch.py index 4473acf..e6e9b66 100644 --- a/duka/core/fetch.py +++ b/duka/core/fetch.py @@ -1,8 +1,7 @@ import asyncio - -import time import datetime import threading +import time from functools import reduce from io import BytesIO, DEFAULT_BUFFER_SIZE @@ -13,6 +12,7 @@ URL = "https://www.dukascopy.com/datafeed/{currency}/{year}/{month:02d}/{day:02d}/{hour:02d}h_ticks.bi5" ATTEMPTS = 5 + async def get(url): loop = asyncio.get_event_loop() buffer = BytesIO() @@ -31,27 +31,27 @@ async def get(url): Logger.warn("Request to {0} failed with error code : {1} ".format(url, str(res.status_code))) except Exception as e: Logger.warn("Request {0} failed with exception : {1}".format(id, str(e))) - time.sleep(0.5*i) + time.sleep(0.5 * i) raise Exception("Request failed for {0} after ATTEMPTS attempts".format(url)) -def create_tasks(symbol, day ): +def create_tasks(symbol, day): url_info = { 'currency': symbol, 'year': day.year, 'month': day.month - 1, 'day': day.day } - tasks = [asyncio.ensure_future(get(URL.format(**url_info, hour=i))) for i in range(1, 24)] - next_day = day + datetime.timedelta(days=1) - url_info = { - 'currency': symbol, - 'year': next_day.year, - 'month': next_day.month - 1, - 'day': next_day.day - } - tasks.append(asyncio.ensure_future(get(URL.format(**url_info, hour=0)))) + tasks = [asyncio.ensure_future(get(URL.format(**url_info, hour=i))) for i in range(0, 24)] + # next_day = day + datetime.timedelta(days=1) + # url_info = { + # 'currency': symbol, + # 'year': next_day.year, + # 'month': next_day.month - 1, + # 'day': next_day.day + # } + # tasks.append(asyncio.ensure_future(get(URL.format(**url_info, hour=0)))) return tasks diff --git a/duka/core/processor.py b/duka/core/processor.py index 0c55f84..06404aa 100644 --- a/duka/core/processor.py +++ b/duka/core/processor.py @@ -40,7 +40,7 @@ def add_hour(ticks): hour_delta = 0 if ticks[0][0].weekday() == 6: - hour_delta = 22 + hour_delta = 21 for index, v in enumerate(ticks): if index != 0: From c87b84066ebe88b6fcee14af982d781b11545e26 Mon Sep 17 00:00:00 2001 From: Giuseppe Date: Sat, 28 May 2016 17:08:00 +0100 Subject: [PATCH 04/12] Add interpolation for missing data points --- duka/core/csv_dumper.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/duka/core/csv_dumper.py b/duka/core/csv_dumper.py index 8a12995..86569c2 100644 --- a/duka/core/csv_dumper.py +++ b/duka/core/csv_dumper.py @@ -64,7 +64,9 @@ def append(self, day, ticks): ts = time.mktime(tick[0].timetuple()) key = int(ts - (ts % self.timeframe)) if previous_key != key and previous_key is not None: - self.buffer[day].append(Candle(self.symbol, previous_key, self.timeframe, current_ticks)) + n = int((key - previous_key) / self.timeframe) + for i in range(0, n): + self.buffer[day].append(Candle(self.symbol, previous_key, self.timeframe, current_ticks)) current_ticks = [] current_ticks.append(tick[1]) previous_key = key From 819ffe0a0802b6858b0193be2720706dd6bed49d Mon Sep 17 00:00:00 2001 From: Giuseppe Date: Sat, 28 May 2016 20:03:12 +0100 Subject: [PATCH 05/12] Fixed missing data --- duka/core/csv_dumper.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/duka/core/csv_dumper.py b/duka/core/csv_dumper.py index 86569c2..d7f5391 100644 --- a/duka/core/csv_dumper.py +++ b/duka/core/csv_dumper.py @@ -66,7 +66,8 @@ def append(self, day, ticks): if previous_key != key and previous_key is not None: n = int((key - previous_key) / self.timeframe) for i in range(0, n): - self.buffer[day].append(Candle(self.symbol, previous_key, self.timeframe, current_ticks)) + self.buffer[day].append( + Candle(self.symbol, previous_key + i * self.timeframe, self.timeframe, current_ticks)) current_ticks = [] current_ticks.append(tick[1]) previous_key = key From 8fc50892008ecc8f2378536971db75877d34333e Mon Sep 17 00:00:00 2001 From: Giuseppe Date: Tue, 31 May 2016 22:15:49 +0100 Subject: [PATCH 06/12] Add start dst and end dst functions --- duka/core/fetch.py | 2 ++ duka/core/processor.py | 5 +++-- duka/core/utils.py | 31 +++++++++++++++++++++++++- duka/tests/test_find_sunday.py | 40 ++++++++++++++++++++++++++++++++++ 4 files changed, 75 insertions(+), 3 deletions(-) create mode 100644 duka/tests/test_find_sunday.py diff --git a/duka/core/fetch.py b/duka/core/fetch.py index e6e9b66..34b1535 100644 --- a/duka/core/fetch.py +++ b/duka/core/fetch.py @@ -26,6 +26,8 @@ async def get(url): for chunk in res.iter_content(DEFAULT_BUFFER_SIZE): buffer.write(chunk) Logger.info("Fetched {0} completed in {1}s".format(id, time.time() - start)) + if len(buffer.getbuffer()) <= 0: + Logger.info("Buffer for {0} is empty ".format(id)) return buffer.getbuffer() else: Logger.warn("Request to {0} failed with error code : {1} ".format(url, str(res.status_code))) diff --git a/duka/core/processor.py b/duka/core/processor.py index 06404aa..237ebda 100644 --- a/duka/core/processor.py +++ b/duka/core/processor.py @@ -39,8 +39,8 @@ def add_hour(ticks): hour_delta = 0 - if ticks[0][0].weekday() == 6: - hour_delta = 21 + if ticks[0][0].weekday() == 6 or (ticks[0][0].day == 1 and ticks[0][0].month == 1): + hour_delta = 22 for index, v in enumerate(ticks): if index != 0: @@ -56,6 +56,7 @@ def add_hour(ticks): def normalize(day, ticks): def norm(time, ask, bid, volume_ask, volume_bid): date = datetime(day.year, day.month, day.day) + timedelta(milliseconds=time) + # date.replace(tzinfo=datetime.tzinfo("UTC")) return date, ask / 100000, bid / 100000, round(volume_ask * 1000000), round(volume_bid * 1000000) return add_hour(list(map(lambda x: norm(*x), ticks))) diff --git a/duka/core/utils.py b/duka/core/utils.py index 9cadc63..d7cb116 100644 --- a/duka/core/utils.py +++ b/duka/core/utils.py @@ -4,10 +4,12 @@ import signal import sys import time -from datetime import datetime +from datetime import datetime, timedelta TEMPLATE = '%(asctime)s - %(levelname)s - %(threadName)s [%(thread)d] - %(message)s' +SUNDAY = 7 + class TimeFrame(object): TICK = 0 @@ -85,3 +87,30 @@ def from_time_string(time_str): def stringify(timestamp): return str(datetime.fromtimestamp(timestamp)) + + +def find_sunday(year, month, position): + start = datetime(year, month, 1) + day_delta = timedelta(days=1) + counter = 0 + + while True: + if start.isoweekday() == SUNDAY: + counter += 1 + if counter == position: + return start + start += day_delta + + +def find_dst_begin(year): + """ + DST starts the second sunday of March + """ + return find_sunday(year, 3, 2) + + +def find_dst_end(year): + """ + DST ends the first sunday of November + """ + return find_sunday(year, 11, 1) diff --git a/duka/tests/test_find_sunday.py b/duka/tests/test_find_sunday.py new file mode 100644 index 0000000..733189f --- /dev/null +++ b/duka/tests/test_find_sunday.py @@ -0,0 +1,40 @@ +import unittest +from duka.core.utils import find_sunday, find_dst_begin, find_dst_end + + +class TestFindSunday(unittest.TestCase): + + def test_find_8_march_2015(self): + res = find_sunday(2015, 3, 2) + self.assertEqual(res.day, 8) + + def test_find_9_march_2014(self): + res = find_sunday(2014, 3, 2) + self.assertEqual(res.day, 9) + + def test_find_13_march_2016(self): + res = find_sunday(2016, 3, 2) + self.assertEqual(res.day, 13) + + def test_find_1_november_2015(self): + res = find_sunday(2015, 11, 1) + self.assertEqual(res.day, 1) + + def test_find_2_november_2014(self): + res = find_sunday(2014, 11, 1) + self.assertEqual(res.day, 2) + + def test_find_6_november_2016(self): + res = find_sunday(2016, 11, 1) + self.assertEqual(res.day, 6) + + def test_dst_2015(self): + start = find_dst_begin(2015) + end = find_dst_end(2015) + + self.assertEqual(start.day, 8) + self.assertEqual(start.month, 3) + + self.assertEqual(end.day, 1) + self.assertEqual(end.month, 11) + From f868699fd0754995d0bc6e26820c36eddcfc3eb6 Mon Sep 17 00:00:00 2001 From: Giuseppe Date: Tue, 31 May 2016 22:25:35 +0100 Subject: [PATCH 07/12] Add tests --- duka/core/utils.py | 4 ++++ duka/tests/test_find_sunday.py | 22 ++++++++++++++++++---- 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/duka/core/utils.py b/duka/core/utils.py index d7cb116..6b8ed25 100644 --- a/duka/core/utils.py +++ b/duka/core/utils.py @@ -114,3 +114,7 @@ def find_dst_end(year): DST ends the first sunday of November """ return find_sunday(year, 11, 1) + + +def is_dst(day): + return day >= find_dst_begin(day.year) and day < find_dst_end(day.year) diff --git a/duka/tests/test_find_sunday.py b/duka/tests/test_find_sunday.py index 733189f..7b2e81c 100644 --- a/duka/tests/test_find_sunday.py +++ b/duka/tests/test_find_sunday.py @@ -1,9 +1,10 @@ +import datetime import unittest -from duka.core.utils import find_sunday, find_dst_begin, find_dst_end +from duka.core.utils import find_sunday, find_dst_begin, find_dst_end, is_dst -class TestFindSunday(unittest.TestCase): +class TestFindSunday(unittest.TestCase): def test_find_8_march_2015(self): res = find_sunday(2015, 3, 2) self.assertEqual(res.day, 8) @@ -31,10 +32,23 @@ def test_find_6_november_2016(self): def test_dst_2015(self): start = find_dst_begin(2015) end = find_dst_end(2015) - self.assertEqual(start.day, 8) self.assertEqual(start.month, 3) - self.assertEqual(end.day, 1) self.assertEqual(end.month, 11) + def test_is_dst(self): + day = datetime.datetime(2015, 4, 5) + self.assertTrue(is_dst(day)) + + def test_is_not_dst(self): + day = datetime.datetime(2015, 1, 1) + self.assertFalse(is_dst(day)) + + def test_day_change_is_dst(self): + day = datetime.datetime(2015, 3, 8) + self.assertTrue(is_dst(day)) + + def test_day_change_back_is_not_dst(self): + day = datetime.datetime(2015, 11, 1) + self.assertFalse(is_dst(day)) From ea85c970574f2d45f67ee71df634be425096f660 Mon Sep 17 00:00:00 2001 From: Giuseppe Date: Tue, 31 May 2016 23:37:22 +0100 Subject: [PATCH 08/12] add correct time --- duka/core/fetch.py | 26 +++++++++++++++++--------- duka/core/processor.py | 6 +++++- duka/core/utils.py | 4 ++-- 3 files changed, 24 insertions(+), 12 deletions(-) diff --git a/duka/core/fetch.py b/duka/core/fetch.py index 34b1535..a10dbe3 100644 --- a/duka/core/fetch.py +++ b/duka/core/fetch.py @@ -7,7 +7,7 @@ import requests -from ..core.utils import Logger +from ..core.utils import Logger, is_dst URL = "https://www.dukascopy.com/datafeed/{currency}/{year}/{month:02d}/{day:02d}/{hour:02d}h_ticks.bi5" ATTEMPTS = 5 @@ -39,6 +39,12 @@ async def get(url): def create_tasks(symbol, day): + + start = 0 + + if is_dst(day): + start = 1 + url_info = { 'currency': symbol, 'year': day.year, @@ -46,14 +52,16 @@ def create_tasks(symbol, day): 'day': day.day } tasks = [asyncio.ensure_future(get(URL.format(**url_info, hour=i))) for i in range(0, 24)] - # next_day = day + datetime.timedelta(days=1) - # url_info = { - # 'currency': symbol, - # 'year': next_day.year, - # 'month': next_day.month - 1, - # 'day': next_day.day - # } - # tasks.append(asyncio.ensure_future(get(URL.format(**url_info, hour=0)))) + + # if is_dst(day): + # next_day = day + datetime.timedelta(days=1) + # url_info = { + # 'currency': symbol, + # 'year': next_day.year, + # 'month': next_day.month - 1, + # 'day': next_day.day + # } + # tasks.append(asyncio.ensure_future(get(URL.format(**url_info, hour=0)))) return tasks diff --git a/duka/core/processor.py b/duka/core/processor.py index 237ebda..cb8c4e9 100644 --- a/duka/core/processor.py +++ b/duka/core/processor.py @@ -1,6 +1,7 @@ import struct from datetime import timedelta, datetime from lzma import LZMADecompressor, LZMAError, FORMAT_AUTO +from .utils import is_dst def decompress_lzma(data): @@ -40,7 +41,10 @@ def add_hour(ticks): hour_delta = 0 if ticks[0][0].weekday() == 6 or (ticks[0][0].day == 1 and ticks[0][0].month == 1): - hour_delta = 22 + if is_dst(ticks[0][0].date()): + hour_delta = 21 + else: + hour_delta = 22 for index, v in enumerate(ticks): if index != 0: diff --git a/duka/core/utils.py b/duka/core/utils.py index 6b8ed25..17cc05e 100644 --- a/duka/core/utils.py +++ b/duka/core/utils.py @@ -4,7 +4,7 @@ import signal import sys import time -from datetime import datetime, timedelta +from datetime import datetime, timedelta, date TEMPLATE = '%(asctime)s - %(levelname)s - %(threadName)s [%(thread)d] - %(message)s' @@ -90,7 +90,7 @@ def stringify(timestamp): def find_sunday(year, month, position): - start = datetime(year, month, 1) + start = date(year, month, 1) day_delta = timedelta(days=1) counter = 0 From 677af738154c94bd7f9353a665f4a3d9319185eb Mon Sep 17 00:00:00 2001 From: Giuseppe Date: Sat, 4 Jun 2016 12:33:19 +0100 Subject: [PATCH 09/12] add test --- duka/tests/test_find_sunday.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/duka/tests/test_find_sunday.py b/duka/tests/test_find_sunday.py index 7b2e81c..c2f67b0 100644 --- a/duka/tests/test_find_sunday.py +++ b/duka/tests/test_find_sunday.py @@ -52,3 +52,7 @@ def test_day_change_is_dst(self): def test_day_change_back_is_not_dst(self): day = datetime.datetime(2015, 11, 1) self.assertFalse(is_dst(day)) + + def test_is_dst(self): + day = datetime.datetime(2013, 11, 3) + self.assertFalse(is_dst(day)) From 65aa541e53118555aec83a6d906c3eb007a2a37f Mon Sep 17 00:00:00 2001 From: Giuseppe Date: Sat, 4 Jun 2016 12:36:50 +0100 Subject: [PATCH 10/12] bump version --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 93796f6..430c1aa 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ from setuptools import setup, find_packages NAME = "duka" -VERSION = '0.1.6' +VERSION = '0.2.0' setup( name=NAME, From 0ae039e21a04f986e9843094a19f9e997dee5969 Mon Sep 17 00:00:00 2001 From: Giuseppe Date: Sat, 4 Jun 2016 12:42:31 +0100 Subject: [PATCH 11/12] Add version --- duka/main.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/duka/main.py b/duka/main.py index 15ba2ca..1802f8a 100644 --- a/duka/main.py +++ b/duka/main.py @@ -7,9 +7,14 @@ from duka.core import valid_date, set_up_signals from duka.core.utils import valid_timeframe, TimeFrame +NAME = "duka" +VERSION = '0.2.0' + def main(): parser = argparse.ArgumentParser(prog='duka', usage='%(prog)s [options]') + parser.add_argument('-v', '--version', action='version', + version='Version: {name}-{version}'.format(name=NAME, version=VERSION)) parser.add_argument('symbols', metavar='SYMBOLS', type=str, nargs='+', help='symbol list using format EURUSD EURGBP') parser.add_argument('-d', '--day', type=valid_date, help='specific day format YYYY-MM-DD (default today)', From 1e2b47a8f00ef0380150db6b9939b0ad0320a6a3 Mon Sep 17 00:00:00 2001 From: Giuseppe Date: Sat, 4 Jun 2016 12:43:36 +0100 Subject: [PATCH 12/12] Add version improvement --- duka/main.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/duka/main.py b/duka/main.py index 1802f8a..38c7b33 100644 --- a/duka/main.py +++ b/duka/main.py @@ -7,14 +7,13 @@ from duka.core import valid_date, set_up_signals from duka.core.utils import valid_timeframe, TimeFrame -NAME = "duka" VERSION = '0.2.0' def main(): parser = argparse.ArgumentParser(prog='duka', usage='%(prog)s [options]') parser.add_argument('-v', '--version', action='version', - version='Version: {name}-{version}'.format(name=NAME, version=VERSION)) + version='Version: %(prog)s-{version}'.format(version=VERSION)) parser.add_argument('symbols', metavar='SYMBOLS', type=str, nargs='+', help='symbol list using format EURUSD EURGBP') parser.add_argument('-d', '--day', type=valid_date, help='specific day format YYYY-MM-DD (default today)',