Skip to content

ranjanrak/clickhouse-tickstore

Repository files navigation

clickhouse-tickstore

Run Tests Go Reference

Go package to store real time streaming websocket data in clickhouse using queuing and bulk insert based on go-routine and channels.

Installation

go get -u github.com/ranjanrak/clickhouse-tickstore

Usage

package main

import (
	tickstore "github.com/ranjanrak/clickhouse-tickstore"
)
func main() {
    // Create new ticker instance
    tickClient := tickstore.New(tickstore.ClientParam{
        // Send DSN as per your clickhouse DB setup.
        // visit https://github.com/ClickHouse/clickhouse-go#dsn to know more
        DBSource:    "",
        ApiKey:      "your_api_key",
        AccessToken: "your_access_token",
        TokenList:   []uint32{633601, 895745, 1723649, 3050241, 975873, 969473, 3721473, 738561, 969473},
        DumpSize:    5000,
	})
    // Start the ticker instance
    // Nothing will run after this
    tickClient.StartTicker()

    // Fetch minute candle OHLC data
    timeStart := time.Date(2022, 5, 11, 9, 51, 0, 0, time.Local)
    timeEnd := time.Date(2022, 5, 11, 10, 02, 0, 0, time.Local)
    candles, err := tickClient.FetchCandle(633601, timeStart, timeEnd)
    if err != nil {
        log.Fatalf("Error fetching candle data: %v", err)
    }
    fmt.Printf("%+v\n", candles)
}

Response

FetchCandle(633601, timeStart, timeEnd)

[{InstrumentToken:633601 TimeStamp:2022-05-11 09:51:00 +0530 IST Open:156.65 High:156.75 Low:156.45 Close:156.65}
{InstrumentToken:633601 TimeStamp:2022-05-11 09:52:00 +0530 IST Open:156.75 High:156.95 Low:156.7 Close:156.75}
{InstrumentToken:633601 TimeStamp:2022-05-11 09:53:00 +0530 IST Open:156.75 High:156.75 Low:156.2 Close:156.3}
{InstrumentToken:633601 TimeStamp:2022-05-11 09:54:00 +0530 IST Open:156.3 High:156.3 Low:156 Close:156.1}
......]

Example

SELECT *
FROM tickdata
FINAL
WHERE (instrument_token = 633601) AND
(timestamp >= toDateTime('2022-04-22 13:23:00', 'Asia/Calcutta')) AND
(timestamp <= toDateTime('2022-04-22 13:25:00', 'Asia/Calcutta'))
ORDER BY timestamp ASC
Query id: 8e356516-107c-4012-948b-df90e49e9906

┌─instrument_token─┬───────────timestamp─┬──price─┐
│           6336012022-04-22 13:23:00174.65 │
│           6336012022-04-22 13:23:01174.7 │
│           6336012022-04-22 13:23:02174.65 │
│           6336012022-04-22 13:23:04174.7 │
│           6336012022-04-22 13:23:05174.65 │
│           6336012022-04-22 13:23:06174.7 │
│           6336012022-04-22 13:23:08174.7 │
│           6336012022-04-22 13:23:09174.7 │
│           6336012022-04-22 13:23:10174.7 │
│           6336012022-04-22 13:23:13174.7 │
│           6336012022-04-22 13:23:14174.65 │
│           6336012022-04-22 13:23:15174.65 │
│           6336012022-04-22 13:23:16174.7 │
│           6336012022-04-22 13:23:17174.65 │
│           6336012022-04-22 13:23:19174.7 │
│           6336012022-04-22 13:23:21174.65 │
│           6336012022-04-22 13:23:24174.65 │
│           6336012022-04-22 13:23:25174.7 │
│           6336012022-04-22 13:23:26174.7 │
│           6336012022-04-22 13:23:27174.65 │
│           6336012022-04-22 13:23:28174.65 │
│           6336012022-04-22 13:23:29174.7 │
│           6336012022-04-22 13:23:31174.7 │
│           6336012022-04-22 13:23:32174.7 │
│           6336012022-04-22 13:23:33174.7 │
│           6336012022-04-22 13:23:35174.7 │
│           6336012022-04-22 13:23:36174.7 │
|           ...... | ..................  | ...... |

84 rows in set. Elapsed: 0.006 sec. Processed 8.19 thousand rows, 98.30 KB (1.28 million rows/s., 15.37 MB/s.)

Fetch OHLC candle data for any given interval using CTE and aggregate function

WITH price_select AS (SELECT price
FROM tickdata
FINAL
WHERE (instrument_token = 975873) AND
(timestamp >= toDateTime('2022-05-02 14:47:00')) AND
(timestamp <= toDateTime('2022-05-02 14:47:59'))
ORDER BY timestamp ASC)
SELECT groupArray(price)[1] AS open,
max(price) AS high,
min(price) AS low,
groupArray(price)[-1] AS close FROM price_select;
Query id: 98d92c26-e054-4f0a-8448-064bc0d939a0
┌───open─┬───high─┬───low─┬─close─┐
│ 252.25252.35252.1252.2 │
└────────┴────────┴───────┴───────┘

Create base minute candle OHLC

SELECT
    instrument_token,
    time_minute,
    groupArray(price)[1] AS open,
    max(price) AS high,
    min(price) AS low,
    groupArray(price)[-1] AS close
FROM
(
    SELECT
        instrument_token,
        toStartOfMinute(timestamp) AS time_minute,
        price
    FROM tickdata
    WHERE (instrument_token = 975873) AND
    (timestamp >= toDateTime('2022-05-02 14:47:00')) AND
    (timestamp <= toDateTime('2022-05-02 14:59:59'))
)
GROUP BY (instrument_token, time_minute)
ORDER BY time_minute ASC
Query id: 2ba74fd2-6047-42c9-9436-be8987a5d3a9

┌─instrument_token─┬─────────time_minute─┬───open─┬───high─┬────low─┬──close─┐
│           9758732022-05-02 14:47:00252.25252.35252.1252.2 │
│           9758732022-05-02 14:48:00252.2252.3251.9252.25 │
│           9758732022-05-02 14:49:00252.3252.3252.05252.1 │
│           9758732022-05-02 14:50:00252.1252.45252.05252.35 │
│           9758732022-05-02 14:51:00252.2252.45252.2252.35 │
│           9758732022-05-02 14:52:00252.35252.35252252 │
│           9758732022-05-02 14:53:00252253.15252252.8 │
│           9758732022-05-02 14:54:00252.8253.2252.7252.8 │
│           9758732022-05-02 14:55:00252.8253.4252.75253.3 │
│           9758732022-05-02 14:56:00253.25253.4253253.1 │
│           9758732022-05-02 14:57:00253.1253.1252.85252.85 │
│           9758732022-05-02 14:58:00252.8253.05252.5252.6 │
│           9758732022-05-02 14:59:00252.6253.4252.6253.4 │
└──────────────────┴─────────────────────┴────────┴────────┴────────┴────────┘

Create candle_data materialized views to store minute OHLC

CREATE MATERIALIZED VIEW candle_data
ENGINE = ReplacingMergeTree
ORDER BY (instrument_token, time_minute)
PRIMARY KEY (instrument_token, time_minute) POPULATE AS
SELECT
    instrument_token,
    time_minute,
    groupArray(price)[1] AS open,
    max(price) AS high,
    min(price) AS low,
    groupArray(price)[-1] AS close
FROM
(
    SELECT
        instrument_token,
        toStartOfMinute(timestamp) AS time_minute,
        price
    FROM tickdata
)
GROUP BY (instrument_token, time_minute)

Create n-minute candle OHLC

WITH w AS (
    SELECT open, high, low, close,
    time, intDiv(toUnixTimestamp(toDateTime('2022-05-10 14:59:00')) - toUnixTimestamp(time), 60*n(in minutes of interval)) as grp
    FROM tickdata
    WHERE (instrument_token = 975873) AND
    (timestamp >= toDateTime('2022-05-02 14:47:00')) AND
    (timestamp <= toDateTime('2022-05-10 14:59:00')) order by time asc
)
SELECT
    first_value(time) as time, first_value(open) as open,
    max(high) as high, min(low) as low,
    last_value(close) as close
FROM
    w
GROUP BY grp ORDER BY time asc;

About

Go package to store real time streaming websocket data in clickhouse using queuing and bulk insert.

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages