Convert Epoch to Timestamps in Apache Flink with ``TO_TIMESTAMP_LTZ``
- 2 minutes read - 226 wordsSometimes when defining an Apache Flink® table using SQL we need to map an epoch timestamp and use it as record/message timestamp, this blog contains few tricks to get it right.
Doing it properly is not complex, but some attention needs to be paid since small changes can have huge impact (ask me how I know… lost a good hour on it today).
The TO_TIMESTAMP_LTZ
function
When dealing with epoch timestamps, Flink offers the TO_TIMESTAMP_LTZ
function, with a good set of documentation around it. Basically the function can be used with epoch timestamps in either seconds or milliseconds.
- For epoch timestamp in seconds use:
TO_TIMESTAMP_LTZ(timestamp_column, 0)
- For epoch timestamp in milliseconds use:
TO_TIMESTAMP_LTZ(timestamp_column, 3)
What about the timestamp_column
definition?
Ok, ok, that works! What about the definition of the timestamp_column
? You need to pay attention to it as well!
If the epoch timestamps in seconds you can get away with it by declaring the column as INT
.
But if you do the same when dealing with epoch timestamps in milliseconds you’ll face an error like
Caused by: java.lang.NumberFormatException: For input string: "1661760207278"
This is because the number is greater than the INT
range (-2,147,483,648
to 2,147,483,647
). Therefore is better to have the column declared as BIGINT
, like the following
ts BIGINT,
name string,
ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '10' SECOND