Skip to content

Commit

Permalink
Улучшил стабильность работы в момент подключения-отключения Quik от с…
Browse files Browse the repository at this point in the history
…ервера
  • Loading branch information
boscogh committed Dec 21, 2014
1 parent aa5cc85 commit d454835
Show file tree
Hide file tree
Showing 3 changed files with 154 additions and 55 deletions.
16 changes: 16 additions & 0 deletions PipesAPI.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,25 @@
package hum.bosco.trade.quik.adapter;

import java.nio.ByteBuffer;

import com.sun.jna.platform.win32.Kernel32;
import com.sun.jna.ptr.IntByReference;

/*
Copyright (c) Pavel M Bosco, 2014
*/
public interface PipesAPI extends Kernel32{
boolean PeekNamedPipe(
HANDLE hNamedPipe,
ByteBuffer lpBuffer,
int nBufferSize,
IntByReference lpBytesRead,
DWORDByReference lpTotalBytesAvail,
DWORDByReference lpBytesLeftThisMessage
);

DWORD PIPE_READMODE_MESSAGE = new DWORD(2);
DWORD PIPE_READMODE_BYTE = new DWORD(0);
boolean SetNamedPipeHandleState(
HANDLE hNamedPipe,
DWORDByReference lpMode,
Expand Down
95 changes: 64 additions & 31 deletions QuikCommandPipeAdapter.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.text.NumberFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;

import com.sun.jna.Native;
import com.sun.jna.platform.win32.Kernel32;
Expand All @@ -14,14 +14,16 @@
import com.sun.jna.ptr.IntByReference;
import com.sun.jna.win32.W32APIOptions;

/*
Copyright (c) Pavel M Bosco, 2014
*/
public class QuikCommandPipeAdapter implements Closeable {
private HANDLE pipeHandle = WinNT.INVALID_HANDLE_VALUE;
PipesAPI kernel32;

public QuikCommandPipeAdapter(){
// проблем с загрузкой ядра вообще-то быть не должно.
static PipesAPI kernel32 = init();
static PipesAPI init(){
// проблем с загрузкой ядра вообще-то быть недолжно.
System.setProperty("jna.encoding", "Cp1251");
kernel32 = (PipesAPI) Native.loadLibrary("kernel32", PipesAPI.class, W32APIOptions.UNICODE_OPTIONS);
return (PipesAPI) Native.loadLibrary("kernel32", PipesAPI.class, W32APIOptions.UNICODE_OPTIONS);
}

private void forceDisconnect(){
Expand All @@ -31,11 +33,12 @@ private void forceDisconnect(){
//System.out.println("Закрыли трубу!");
}
}

private boolean ensureConnected(){
if (pipeHandle != WinNT.INVALID_HANDLE_VALUE)
return true;

int retryCount = 3;
int retryCount = 5;
while (pipeHandle == WinNT.INVALID_HANDLE_VALUE){
pipeHandle = kernel32.CreateFile(
"\\\\.\\pipe\\pmb.quik.pipe",
Expand All @@ -46,20 +49,22 @@ private boolean ensureConnected(){
0,
null);
int errorCode = kernel32.GetLastError();
if ((errorCode == 2 || errorCode == 231) && --retryCount < 0) // трубы нет или занята
return false;
if (errorCode == 2 || errorCode == 231) {// трубы нет или занята
forceDisconnect();
if (--retryCount < 0) {
// System.out.println(errorCode);
return false;
}
}
}
//System.out.println("Готово!");
//DWORD mode = PipesAPI.PIPE_READMODE_MESSAGE;
//kernel32.SetNamedPipeHandleState(pipeHandle, new DWORDByReference(mode), null, null);
//System.out.println(kernel32.GetLastError());
// System.out.println(kernel32.GetLastError());
if (kernel32.GetLastError() != Kernel32.ERROR_SUCCESS){
forceDisconnect();
return false;
}
return true;
}

public String executeRequest(String command, boolean doCloseConnection){
try{
//System.out.println("Соединяемся");
Expand All @@ -80,18 +85,25 @@ public String executeRequest(String command, boolean doCloseConnection){
IntByReference bytesRead = new IntByReference(buffer.capacity());
int lastError = 0;
//System.out.println("Начали читать..");
while (!(kernel32.ReadFile(pipeHandle, buffer, buffer.capacity(), bytesRead, null))
|| (lastError=kernel32.GetLastError()) == Kernel32.ERROR_MORE_DATA){
// читаем и читаем
if (lastError == Kernel32.ERROR_PIPE_NOT_CONNECTED )
break;
}

//проверим, чтобы не зависнуть
if (kernel32.PeekNamedPipe(pipeHandle, buffer, buffer.capacity(), bytesRead, null, null))
while (!(kernel32.ReadFile(pipeHandle, buffer, buffer.capacity(), bytesRead, overlapped))
|| (lastError=kernel32.GetLastError()) == Kernel32.ERROR_MORE_DATA){
// читаем и читаем
if (lastError == Kernel32.ERROR_PIPE_NOT_CONNECTED || overlapped.Internal.intValue() != WinNT.ERROR_IO_PENDING)
break;
}

//System.out.println("Считали: " + bytesRead.getValue() + " байт");
if (doCloseConnection){
forceDisconnect();
}
String result = new String(buffer.array(), 0, bytesRead.getValue());
//System.out.println("Quik pipe -> : " + result);

if ("not connected".equals(result))
return null;
return result;
} else{
//System.out.println("Quik Pipe cоединение не может быть установлено. Вероятно сервер выключен");
Expand Down Expand Up @@ -152,18 +164,39 @@ private Interval(int code) {
}


public static void main(String s[]) throws IOException{
public static void main(String s[]) throws Exception{
SimpleDateFormat fmt = new SimpleDateFormat("dd.MM.yyyy");

for (int i=0;i<10000000;i++)
try(QuikCommandPipeAdapter adapter = new QuikCommandPipeAdapter()) {
long started = System.currentTimeMillis();
System.out.println(adapter.isConnectedToServer(false));
System.out.println(adapter.getTradeDate(false));
System.out.println(adapter.executeRequest("staSPBFUT:Si-12.14", false));
System.out.println(adapter.getServerCurrentTime(false).replaceAll(":", ""));
System.out.println("ГО : " + adapter.getContractPrice("SPBFUT", "Si-12.14", false));
// adapter.getTradeDate(false);
// adapter.getServerCurrentHour(false);
System.out.println(adapter.getLastCandlesOf("SPBFUT", "Si-12.14", Interval.HOUR, 50, false));
System.out.println("На всё ушло: " + (System.currentTimeMillis() - started));


Thread.sleep(10);

if (adapter.isConnectedToServer(false)){
long started = System.currentTimeMillis();
// торговый день должен быть - СЕГОДНЯ
String theDate = fmt.format(new Date());
String tradeDate = adapter.getTradeDate(false);
System.out.println(true);
System.out.println(tradeDate);
if (theDate.equals(tradeDate)){ // да, значит торги ведутся
String serverTime = adapter.getServerCurrentTime(false).replaceAll(":", "");
System.out.println(serverTime);
// System.out.println(adapter.executeRequest("stiEQBREMU:SBER03", false));
System.out.println(adapter.getServerCurrentTime(false).replaceAll(":", ""));
//System.out.println("ГО : " + adapter.getContractPrice("SPBFUT", "Si-12.14", false));
// System.out.println("ГО : " + adapter.getContractPrice("EQBREMU", "SBER03", false));

// adapter.getTradeDate(false);
// adapter.getServerCurrentHour(false);
System.out.println(adapter.getLastCandlesOf("SPBFUT", "Si-12.14", Interval.HOUR, 5, false));
System.out.println("На всё ушло: " + (System.currentTimeMillis() - started));
}
} else
System.out.println(false);
}


}
}
98 changes: 74 additions & 24 deletions qapi.lua
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,16 @@ Copyright (c) Pavel M Bosco, 2014
*******************************************************************
]]

mode = 0
stopped = false
f = nil
s = ""
cls = ""
sec = ""
cmd = ""
response = ""
candleCount = 0
last50CandlesAsString = ""
SERVER_NOT_CONNECTED = "not connected"

function ds_getCandlesByIndex(ds,count)
local size=ds:Size()
Expand All @@ -33,7 +34,9 @@ function ds_getCandlesByIndex(ds,count)
if first_candle < 0 then
first_candle = 0
end
s = ""
--message("in func: " .. string.format("%d, %d, %d\n", first_candle, end_candle, count), 1)

local s = ""
for i=first_candle,end_candle do
j = i-first_candle
t[j]={
Expand All @@ -53,22 +56,28 @@ end

--колбек вызывается при получении нового стакана
function OnQuote(cl, sc )
s = ""
if cl == cls and sc == sec then
ql2 = getQuoteLevel2(cls, sec)
for i=1,ql2.bid_count do
s = s.. string.format("%1.2f:%1.2f\n", ql2.bid[i].quantity, ql2.bid[i].price)
end
s = s .. "###\n"
for i=1,ql2.offer_count do
s = s.. string.format("%1.2f:%1.2f\n", ql2.offer[i].quantity, ql2.offer[i].price)
end
local s = ""
if cmd == "stakan" and cl == cls and sc == sec then
s = marshalStakan(cl, sc)
end
if cmd == "stakan" and string.len(s)>0 then
response = s
end
end

function marshalStakan(cl, sc)
local s = ""
ql2 = getQuoteLevel2(cls, sec)
for i=1,ql2.bid_count do
s = s.. string.format("%1.2f:%1.2f\n", ql2.bid[i].quantity, ql2.bid[i].price)
end
s = s .. "###\n"
for i=1,ql2.offer_count do
s = s.. string.format("%1.2f:%1.2f\n", ql2.offer[i].quantity, ql2.offer[i].price)
end
return s
end

-- вызывается при нажатии кнопки "остановить" в диалоге
function OnStop(signal)
stopped = true
Expand Down Expand Up @@ -110,7 +119,7 @@ ERROR_IO_PENDING = 997;
ERROR_PIPE_CONNECTED = 535;
PIPE_ACCESS_DUPLEX = 0x00000003;
PIPE_ACCESS_INBOUND = 0x00000001;
PIPE_ACCESS_OUTBOUND = 0x00000001;
PIPE_ACCESS_OUTBOUND = 0x00000002;
FILE_FLAG_FIRST_PIPE_INSTANCE = 0x00080000;
FILE_FLAG_OVERLAPPED = 0x40000000;
PIPE_TYPE_MESSAGE = 0x00000004;
Expand All @@ -125,13 +134,41 @@ readBuffer = ffi.new("char [4*1024]")
bytesRead = ffi.new("unsigned long[1]", 1)
params = ""

function OnConnected()
if mode == 0 then
disconnectAndReconnect(true)
message("Труба открыта", 1)
end
end

function OnClose()
if mode == 1 then
r = ffi.C.ReadFile(handle, readBuffer, 4*1024, bytesRead, nil);
end
mode = 0
disconnectAndReconnect(false)
end

function OnDisconnected()
OnClose()
message("Труба закрыта", 1)
end

function disconnectAndReconnect(doConnect)
ffi.C.FlushFileBuffers(handle)
ffi.C.DisconnectNamedPipe(handle)
if doConnect then
assert(ffi.C.ConnectNamedPipe(handle, poverlapped))
assert(ffi.C.ConnectNamedPipe(handle, poverlapped), "Соединение установить не удалось")
end
end

function notEmpty(ss)
if ss == "" then
return SERVER_NOT_CONNECTED
else
return ss
end
end

function processCommand(request)
--[[
Expand All @@ -142,15 +179,24 @@ function processCommand(request)
5. устанавливаем команду
]]
command = string.sub(request, 1, 3)
if command == "sta" then -- стакан
if command == "sta" or command == "sti" then -- sta - стакан после изменения, sti - стакан немедленно!
-- 2. выделяем параметры class code, sec code
local ind = string.find(request,":",3,true)

cls = string.sub(request, 4, ind-1)
sec = string.sub(request, 1+ind)
--message(string.format("Запрос стакана. Параметры: %s, %s", cls, sec), 1)
response = ""
cmd = "stakan"
if command == "sta" then
cmd = "stakan"
if isConnected() == 1 then
cmd = "stakan"
else
response = SERVER_NOT_CONNECTED
end
else
response = notEmpty(marshalStakan(cls, sec))
end
end
if command == "sve" then -- свеча
--2. выделяем параметры class code, sec code, период, количество свечей
Expand All @@ -171,7 +217,7 @@ function processCommand(request)
ds:Close()
end

response = last50CandlesAsString --""
response = notEmpty(last50CandlesAsString) --""
--Если много свечей, то квик безбожно виснет
--message("Ответ по графику: " .. last50CandlesAsString, 1)
cmd = "svecha"
Expand All @@ -180,19 +226,20 @@ function processCommand(request)
response = tostring(isConnected())
end
if command == "stm" then -- время сервера, отвечаем сразу
response = getInfoParam("SERVERTIME")
response = notEmpty(getInfoParam("SERVERTIME"))
--message("Ответ: " .. response, 1)
end
if command == "trd" then -- время сервера, отвечаем сразу
response = getInfoParam("TRADEDATE")
response = notEmpty(getInfoParam("TRADEDATE"))
--message("Ответ: " .. response, 1)
end
if command == "go " then
ind = string.find(request,":",3,true)
cls = string.sub(request, 4, ind-1)
sec = string.sub(request, 1+ind)
info = getParamEx(cls, sec, "BUYDEPO")
response = info.param_image

response = notEmpty(info.param_image)
end

end
Expand All @@ -205,16 +252,20 @@ function main( )
1,
4*1024, 4*1024,
0, nil))
assert(ffi.C.ConnectNamedPipe(handle, poverlapped))
mode = 0; -- подключаемся
assert(ffi.C.ConnectNamedPipe(handle, poverlapped), "Проблемы с подключением к трубе!")
mode = 0; -- 0 - подключаемся и ждём клиента, 1 - читаем, 2 - пишем
--message("Запустились", 1)
while not stopped do
if mode == 0 then -- подключаемся
if poverlapped[0].Internal ~= STATUS_PENDING then -- пока никого
--message("Клиент подключился", 1)
mode = 1 -- пора читать
else
sleep(5)
sleep(4)
if poverlapped[0].Internal ~= STATUS_PENDING then -- пока никого
--message("Клиент подключился", 1)
mode = 1 -- пора читать
end
end
end
if mode == 1 then
Expand Down Expand Up @@ -247,4 +298,3 @@ function main( )
disconnectAndReconnect(false)
ffi.C.CloseHandle(handle)
end

0 comments on commit d454835

Please sign in to comment.