如何底层调用最快地复制OPC数据到关系数据库
计算机上的二大应用,一是从WEB服务器上获得数据,另一种是向关系数据库中写入数据。在上集我已提出了一个从WEB上获得OPC数据的独创方法,现在谈谈第二种如何快速地把OPC数据写进到数据库中,这也是Calssic OPC最典型的一个应用场景。
使用基金会提供的基于.NET的ADO.NET无疑不是一个最快最有效率的办法,原因是显而易见的。要想速度快,必然要考虑到原生的基于COM的数据库技术,比如OLE DB,ADO或者ODBC。根据《ADO ActiveX Data Objects》一书描述的三者架构关系图,
?
显然,ADO是对OLE DB技术的上一层封装,是对当时OLE DB技术上的繁琐和难以找到熟悉COM的开发人员的一种妥协。自然,ADO的表现要比OLE DB逊色一些。不同于微软私有的ADO/OLE DB技术,ODBC是一个国际标准,有通用的接口,但性能上还是比OLE DB差了些。原因有三:第一,ODBC诞生在1992年,OLE DB出现在1996年,当年微软是想用它代替ODBC的,所以OLE DB在设计上有后发优势。第二,ODBC和OLEDB都有BIND的功能,比如ODBC有SQLBindCol()函数调用,而OLE DB不一样,要自己亲手写BIND,看上去很繁琐。其实也正是这样的繁琐保证了它性能上的优越。第三,最重要的一点,ODBC是工作在不同的查询语句上的,比如INSERT,UPDATE等,所以服务端需要进行解析。OLE DB可以使用查询语句,也可以不使用查询语句而完成INSERT、UPDATE等操作——没有了服务端的解析,自然就快了许多。有人做过测试,用ODBC的INSERT语句完成十万行的插入,而OLE DB没有使用任何INSERT语句,OLE DB比ODBC快了至少一倍以上。再多聊一些OLE DB的历史,当年没能成功替代ODBC,微软宣布准备让它退出底层的原生数据库编程应用,但是有众多厂家反对再加上OLE DB自身的性能优势,非常符合云时代的要求。所以在2017年微软宣布重新支持OLE DB的编程技术并发布了新一代的OLE DB驱动程序。新的驱动加上了加密功能,更能适应于云生时代。
虽然OLE DB性能优越,但繁琐的code让人望而生却,有没有办法?答案是?Active Template Library(ATL),它封装了很多繁琐的OLE DB底层调用,即起到防止内存泄漏,又帮你写出又快又好的程序。
本样本程序使用最新的OLE DB驱动程序,给出一个有INSERT语句的完整演示,完成快速地把OPC数据复制到数据库中,然后再展现出存贮在数据库中的所有数据。
int main(int argc, CHAR* argv[]) {
CoInitializeEx(NULL, COINIT_MULTITHREADED);
{
CDataSource dataSource;
CSession session;
const WCHAR szUDLFile[] = L"OPCDA.udl";
HRESULT hr = dataSource.OpenFromFileName(szUDLFile);
if (FAILED(hr)) {
printf("OpenFromFileName() failed\n");
goto END;
}
hr = session.Open(dataSource);
if (FAILED(hr))
{
printf("Open() failed\n");
dataSource.Close();
goto END;
}
CLSID cidOpcServer;
if (FAILED(listServers(cidOpcServer)))
{
printf("listServers() failed\n");
dataSource.Close();
goto END;
}
if (FAILED(DA(cidOpcServer, session))) {
printf("DA() failed\n");
dataSource.Close();
goto END;
}
printf("\nretrieving rows from database...\n\n");
displayResult(session);
dataSource.Close();
}
system("pause");
END:
CoUninitialize();
return(EXIT_SUCCESS);
}
这是主程序,运行在多线程状态,这样后面OPC的DataCallBack可以运行在另一个单独的线程中,否则全部都使用一个主线程。
接下来是根据UDL的文件设定来连接数据库。这个UDL的文件如下,
[oledb]
; Everything after this line is an OLE DB initstring
Provider=MSOLEDBSQL19.1;Integrated Security=SSPI;Persist Security Info=False;Initial Catalog=TEST;Data Source=localhost;Use Encryption for Data=Optional;
可以看到,使用了最新的19版本OLE DB的驱动(对应的是msoledbsql19.dll),指定了相应的数据库名和服务器名,不使用用户名和密码作为身份验证手段,同时不要求数据进行加密。有一点要注意的是,当通过UDL界面保存设置时,可能会有很多的属性存在这个UDL文件中,会造成OpenFromFileName()的失败,所以只要留最少的如上属性即可。
?
OpenFromFileName()是ATL提供的API,帮助获得第一个基于IDataInitialize接口的实例,然后根据UDL连接属性建立和数据库的连接,然后初始化基于IDBInitialize的实例如下,
HRESULT OpenFromFileName(_In_z_ LPCOLESTR szFileName) throw()
{
CComPtr<IDataInitialize> spDataInit;
CComHeapPtr<OLECHAR> spszInitString;
HRESULT hr = CoCreateInstance(__uuidof(MSDAINITIALIZE), NULL, CLSCTX_INPROC_SERVER,
__uuidof(IDataInitialize), (void**)&spDataInit);
if (FAILED(hr))
return hr;
hr = spDataInit->LoadStringFromStorage(szFileName, &spszInitString);
if (FAILED(hr))
return hr;
return OpenFromInitializationString(spszInitString);
}
// Open the datasource specified by the passed initialization string
HRESULT OpenFromInitializationString(
_In_z_ LPCOLESTR szInitializationString,
_In_ bool fPromptForInfo = false) throw()
{
CComPtr<IDataInitialize> spDataInit;
HRESULT hr = CoCreateInstance(__uuidof(MSDAINITIALIZE), NULL, CLSCTX_INPROC_SERVER,
__uuidof(IDataInitialize), (void**)&spDataInit);
if (FAILED(hr))
return hr;
hr = spDataInit->GetDataSource(NULL, CLSCTX_INPROC_SERVER, szInitializationString,
__uuidof(IDBInitialize), (IUnknown**)&m_spInit);
if (FAILED(hr))
return hr;
if( fPromptForInfo )
{
CComPtr<IDBProperties> spIDBProperties;
hr = m_spInit->QueryInterface( &spIDBProperties );
DBPROP rgProperties[1];
DBPROPSET rgPropertySets[1];
VariantInit(&rgProperties[0].vValue);
rgProperties[0].dwOptions = DBPROPOPTIONS_REQUIRED;
rgProperties[0].colid = DB_NULLID;
rgProperties[0].dwPropertyID = DBPROP_INIT_PROMPT;
rgProperties[0].vValue.vt = VT_I2;
rgProperties[0].vValue.lVal = DBPROMPT_COMPLETEREQUIRED;
rgPropertySets[0].rgProperties = rgProperties;
rgPropertySets[0].cProperties = 1;
rgPropertySets[0].guidPropertySet = DBPROPSET_DBINIT;
hr = spIDBProperties->SetProperties( 1, rgPropertySets );
if (FAILED(hr))
return hr;
}
return m_spInit->Initialize();
}
注意下这里CLSID用的是MSDAINITIALIZE,搜索注册表显示的是
?
也就是从oledb32.dll的地址空间中先获得IDataInitialize的实例,再调用GetDataSource()来获得IDBInitialize的指针,所以这个m_spInit指针也是在oledb32.dll的地址空间中,只不过它同时也加载了msoledbsql19.dll中的相应接口。
回到主程序,session.Open()也是ATL的API,主要是为了获得IOpenRowset的指针如下,
HRESULT Open(
_In_ const CDataSource& ds,
_Inout_updates_opt_(ulPropSets) DBPROPSET *pPropSet = NULL,
_In_ ULONG ulPropSets = 0) throw()
{
CComPtr<IDBCreateSession> spSession;
// Check we have connected to the database
ATLASSERT(ds.m_spInit != NULL);
HRESULT hr = ds.m_spInit->QueryInterface(__uuidof(IDBCreateSession), (void**)&spSession);
if (FAILED(hr))
return hr;
hr = spSession->CreateSession(NULL, __uuidof(IOpenRowset), (IUnknown**)&m_spOpenRowset);
if( pPropSet != NULL && SUCCEEDED(hr) && m_spOpenRowset != NULL )
{
// If the user didn't specify the default parameter, use one
if (pPropSet != NULL && ulPropSets == 0)
ulPropSets = 1;
CComPtr<ISessionProperties> spSessionProperties;
hr = m_spOpenRowset->QueryInterface(__uuidof(ISessionProperties), (void**)&spSessionProperties);
if(FAILED(hr))
return hr;
hr = spSessionProperties->SetProperties( ulPropSets, pPropSet );
}
return hr;
}
接下来的主程序是关于OPC的操作,listServers()是为了获得本机上OPC DA的CLSID,如下,
HRESULT listServers(CLSID& cidOpcServer)
{
ULONG fetched = 0;
HRESULT hr = S_OK;
CComHeapPtr<OLECHAR> bsProgID, lpszUserType, lpszVerIndProgID;
CATID arrcatid[3] = { NULL };
arrcatid[0] = __uuidof(CATID_OPCDAServer10);
arrcatid[1] = __uuidof(CATID_OPCDAServer20);
arrcatid[2] = __uuidof(CATID_OPCDAServer30);
CComPtr<IOPCServerList2> spIOPCServerList2;
if (FAILED(hr = spIOPCServerList2.CoCreateInstance(__uuidof(OpcServerList), spIOPCServerList2, CLSCTX_ALL)))
{
printf("CoCreateInstance() for IOPCServerList2 failed\n");
return hr;
}
CComPtr<IOPCEnumGUID> spEnum;
hr = spIOPCServerList2->EnumClassesOfCategories(sizeof arrcatid / sizeof CATID, arrcatid, 0, NULL, &spEnum);
if (spEnum.p)
{
while ((hr = spEnum->Next(1, &cidOpcServer, &fetched)) == S_OK)
{
hr = spIOPCServerList2->GetClassDetails(cidOpcServer, &bsProgID, &lpszUserType, &lpszVerIndProgID);
if (FAILED(hr)) {
_tprintf(_T("GetClassDetails() failed\n"));
return hr;
}
break;
}
}
return hr;
}
此段程序也不复杂,获得一个IOPCServerList2的实例,然后对相应的OPC类别进行枚举,再在枚举中循环得到本机的OPC DA的CLSID。
有了DA的CLSID后,开始对DA进行操作,比如创建一个实例,建立一个新组,创建一个回调函数,通知服务端,加入感兴趣的TAG,暂停等待回调函数的结束。具体见下,
HRESULT DA(CLSID& cidOpcServer, CSession& session) {
CComPtr<IOPCServer> pIOPCServer;
HRESULT hr = pIOPCServer.CoCreateInstance(cidOpcServer, pIOPCServer, CLSCTX_ALL);
if (FAILED(hr)) {
printf("CoCreateInstance() for IOPCServer failed\n");
return E_FAIL;
}
DWORD dwRevisedUpdateRate = 0;
OPCHANDLE hGroup = 0;
CComPtr<IOPCItemMgt> pOPCItemMgt;
hr = pIOPCServer->AddGroup(L"", TRUE, 1000, NULL, NULL, NULL, LOCALE_SYSTEM_DEFAULT, &hGroup, &dwRevisedUpdateRate, __uuidof(IOPCItemMgt), (LPUNKNOWN*)&pOPCItemMgt);
if (FAILED(hr)) {
printf("AddGroup() failed\n");
return E_FAIL;
}
DataCallback* pDataCallback = new DataCallback(session);
pDataCallback->AddRef();
DWORD m_dwCookie;
AtlAdvise(pOPCItemMgt, pDataCallback, __uuidof(IOPCDataCallback), &m_dwCookie);
hr = addItems(pOPCItemMgt);
if (FAILED(hr)) {
printf("addItems() failed\n");
return E_FAIL;
}
printf("\npress any key to complete inserting rows to database\n");
getchar();
AtlUnadvise(pOPCItemMgt, __uuidof(IOPCDataCallback), m_dwCookie);
pDataCallback->Release();
return S_OK;
}
下面具体看下回调函数,它的作用是当TAG的值有变化时,此函数被唤醒在另一线程执行,返回的参数包括TAG的值,时间戳和状态,如下,
STDMETHODIMP OnDataChange(
DWORD dwTransid,
OPCHANDLE hGroup,
HRESULT hrMasterquality,
HRESULT hrMastererror,
DWORD dwCount,
OPCHANDLE* phClientItems,
VARIANT* pvValues,
WORD* pwQualities,
FILETIME* pftTimeStamps,
HRESULT* pErrors
)
{
CCommand<CManualAccessor> command;
CComVariant vVariant[4];
vVariant[0].vt = VT_BSTR;
vVariant[1].vt = VT_R4;
vVariant[2].vt = VT_DATE;
vVariant[3].vt = VT_UINT;
hr = command.CreateParameterAccessor(4, vVariant, sizeof vVariant);
if (FAILED(hr)) {
printf("command.CreateParameterAccessor() failed");
return hr;
}
for (DWORD ii = 0; ii < dwCount; ii++)
{
CComVariant vValue;
WORD quality = pwQualities[ii] & OPC_QUALITY_MASK;
COleDateTime oleTime = COleDateTime(pftTimeStamps[ii]);
SYSTEMTIME st;
oleTime.GetAsSystemTime(st);
if (phClientItems[ii] == 0)
CComBSTR("Random.Int1").CopyTo(&vVariant[0].bstrVal);
if (phClientItems[ii] == 1)
CComBSTR("Random.Int2").CopyTo(&vVariant[0].bstrVal);
else if (phClientItems[ii] == 2)
CComBSTR("Random.Real8").CopyTo(&vVariant[0].bstrVal);
vVariant[1].fltVal = (FLOAT)pvValues[ii].dblVal;
vVariant[2].date = oleTime;
vVariant[3].iVal = quality;
command.m_nCurrentParameter = 0;
command.AddParameterEntry(1, DBTYPE_BSTR, NULL, &vVariant[0].bstrVal);
command.AddParameterEntry(2, DBTYPE_R4, NULL, &vVariant[1].fltVal);
command.AddParameterEntry(3, DBTYPE_DATE, NULL, &vVariant[2].date);
command.AddParameterEntry(4, DBTYPE_UI2, NULL, &vVariant[3].iVal);
/*
This is not the most efficient and fastest way to insert a row to database due to query building/parsing and commit each time.
To bulk insert, interface of IRowsetFastLoad has to be used and it is quite different from this code example.
Contact developer to have a code example using IRowsetFastLoad, so you can completely understand the big difference between
IDBInitialize and IDataInitialize interfaces when trying to get a pointer to IRowsetFastLoad.
*/
hr = command.Open(session, "insert into OPCDA (Tag, Value, Time, Quality) Values (?,?,?,?)", NULL, NULL);
if (FAILED(hr)) {
printf("command.Open() failed");
break;
}
else
printf("\nOnDataChange: %S (%f, %s.%d, %s)", vVariant[0].bstrVal, vVariant[1].fltVal, oleTime.Format("%F %T").GetString(), st.wMilliseconds, quality == OPC_QUALITY_GOOD ? "good" : "bad");
SysFreeString(vVariant[0].bstrVal);
}
return hr;
}
这段程序中使用了ATL提供的CCommand,然后用CreateParameterAccessor()构建一个关于查询语句参数的存取器。这也是个ATL的函数,不再展开讨论,主要是执行有关参数的BIND,具体可以参见它的源代码。然后根据OPC提供的返回值的数目进行循环,取出每一个TAG的值、时间戳和状态,结合TAG名称来满足INSERT语句四个参数的要求,最后使用ATL的Open()完成INSERT语句的执行。
回到主程序,完成了INSERT的操作,下一步是从数据库中把刚才插入的数据取出来展示,
void displayResult(CSession &session) {
CCommand<CManualAccessor> command;
const USHORT uColumns = 4;
CComVariant vValues[uColumns]{};
HRESULT hr = command.CreateAccessor(uColumns, vValues, sizeof vValues);
if (FAILED(hr))
{
printf("CreateAccessor() failed\n");
return;
}
for (ULONG l = 0; l < uColumns; l++)
{
command.AddBindEntry(l + 1, DBTYPE_VARIANT, NULL, &vValues[l], NULL, NULL);
}
hr = command.Open(session, "select * from OPCDA", NULL, NULL);
if (FAILED(hr))
{
printf("command.Open() failed\n");
return;
}
ULONG count = 0;
while (command.MoveNext() == S_OK) {
CComVariant* pBind = (CComVariant*)command.m_pBuffer;
count++;
COleDateTime dateTime(pBind[2].date);
printf("%S (%f, %s, %s)\n", pBind[0].bstrVal, pBind[1].fltVal, dateTime.Format("%F %T").GetString(), pBind[3].iVal == OPC_QUALITY_GOOD ? "good" : "bad");;
}
printf("\nTotal rows: %d\n", count);
}
这段的所有操作都是调用ATL的API,先是CreateAccessor()构建个无参数的存取器,也就是建立一个BIND,供返回的数据存在内存中用。一个Open()语句完成数据的获得,再进行个循环依次展示获得的值。注意一点,返回的是一行的值,有四列。
运行后的结果如下,
?
综观这一程序,由于有了ATL的加持OLE DB的编程不再那么困难。ATL带来了便利,但也掩盖了对底层OLE DB的理解。每次的INSERT操作都伴随着COMMIT,显然不是最快和最有效率的OLE DB编程方法。也是基于此微软当年(2012年)在新版的Native Client 驱动中引入了IRowsetFastLoad接口,专门进行批量插入。此接口也非常简单只有二个函数,InsertRow()和Commit(),即多次调用InsertRow(),然后一次性地Commit()。为了深入理解更底层的OLE DB编程,我又独自开发了基于IRowsetFastLoad的OPC范例。本以为和这个程序差不多,没想到却被打脸。在开发过程中让我体会到使用IDataInitialize和IDBInitialize实例来获取IRowsetFastLoad指针的巨大不同,对老版的OLE DB驱动sqloledb.dll,老版的Native?Client驱动sqlncli11.dll和新版的OLE DB驱动msoledbsql19.dll三者之间的关系有了进一步的了解。在进行完整的BIND过程中也领会到最原始的底层ORM的美(相对于高级语言的ORM,如Hibernate或Entity Framework),这种底层ORM和内存布局直接呼应,没有任何INSERT语句却能快速地完成批量插入,真是“不著一字,尽得风流”。感兴趣的同学可以邮箱联系我获取一份范例,在关键处我都加了注释来加深对OLE DB和COM的编程理解,确保获益满满。
本范例已经在GITHUB开源,下载在此。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!